HttpRunner3.x 源码解析(4)-工具类loader.py
这些方法可以灵活的引用在其他项目中。
加载yaml文件
def _load_yaml_file(yaml_file: Text):
""" load yaml file and check file content format
"""
with open(yaml_file, mode="rb") as stream:
try:
yaml_content = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as ex:
err_msg = f"YAMLError:\nfile: {yaml_file}\nerror: {ex}"
logger.error(err_msg)
raise exceptions.FileFormatError
return yaml_content
加载json文件
def _load_json_file(json_file: Text):
""" load json file and check file content format
"""
with open(json_file, mode="rb") as data_file:
try:
json_content = json.load(data_file)
except json.JSONDecodeError as ex:
err_msg = f"JSONDecodeError:\nfile: {json_file}\nerror: {ex}"
raise exceptions.FileFormatError(err_msg)
return json_content
加载测试用例文件
def load_test_file(test_file: Text):
#根据后缀判断是否为json/yaml文件,其他格式不支持
"""load testcase/testsuite file content"""
if not os.path.isfile(test_file):
raise exceptions.FileNotFound(f"test file not exists: {test_file}")
file_suffix = os.path.splitext(test_file)[1].lower()
if file_suffix == ".json":
test_file_content = _load_json_file(test_file)
elif file_suffix in [".yaml", ".yml"]:
test_file_content = _load_yaml_file(test_file)
else:
# '' or other suffix
raise exceptions.FileFormatError(
f"testcase/testsuite file should be YAML/JSON format, invalid format file: {test_file}"
)
return test_file_content
load testcase
def load_testcase(testcase: Dict):
try:
# validate with pydantic TestCase model
testcase_obj = TestCase.parse_obj(testcase)
except ValidationError as ex:
err_msg = f"TestCase ValidationError:\nerror: {ex}\ncontent: {testcase}"
raise exceptions.TestCaseFormatError(err_msg)
return testcase_obj
parse_obj用来根据判断传入的testcase是否符合TestCase模型。
Python - pydantic 入门介绍与 Models 的简单使用 (bbsmax.com)
class TestCase(BaseModel): config: TConfig teststeps: List[TStep]
class TConfig(BaseModel): name: Name verify: Verify = False base_url: BaseUrl = "" # Text: prepare variables in debugtalk.py, ${gen_variables()} variables: Union[VariablesMapping, Text] = {} parameters: Union[VariablesMapping, Text] = {} setup_hooks: Hooks = [] teardown_hooks: Hooks = [] export: Export = [] path: Text = None weight: int = 1
Union Union[int, str] 表示既可以是 int,也可以是 str
变量字典
VariablesMapping = Dict[Text, Any] Hooks = List[Union[Text, Dict[Text, Text]]]
class TStep(BaseModel): name: Name request: Union[TRequest, None] = None testcase: Union[Text, Callable, None] = None variables: VariablesMapping = {} setup_hooks: Hooks = [] teardown_hooks: Hooks = [] # used to extract request's response field extract: VariablesMapping = {} # used to export session variables from referenced testcase export: Export = [] validators: Validators = Field([], alias="validate") validate_script: List[Text] = []
Python - typing 模块 —— Union - 小菠萝测试笔记 - 博客园 (cnblogs.com)
load_dot_env_file
def load_dot_env_file(dot_env_path: Text):
""" load .env file.
Args:
dot_env_path (str): .env file path
Returns:
dict: environment variables mapping
{
"UserName": "debugtalk",
"Password": "123456",
"PROJECT_KEY": "ABCDEFGH"
}
Raises:
exceptions.FileFormatError: If .env file format is invalid.
"""
if not os.path.isfile(dot_env_path):#如果不存在路径,返回空字典
return {}
logger.info(f"Loading environment variables from {dot_env_path}")
env_variables_mapping = {}
with open(dot_env_path, mode="rb") as fp:
for line in fp:
# maxsplit=1
line = line.strip()
if not len(line) or line.startswith(b"#"): #注释
continue
if b"=" in line:
variable, value = line.split(b"=", 1)#支持=
elif b":" in line:
variable, value = line.split(b":", 1) #致辞:
else:
raise exceptions.FileFormatError(".env format error")
env_variables_mapping[
variable.strip().decode("utf-8")
] = value.strip().decode("utf-8")
utils.set_os_environ(env_variables_mapping)
return env_variables_mapping
load_csv_file
def load_csv_file(csv_file: Text):
""" load csv file and check file content format
Args:
csv_file (str): csv file path, csv file content is like below:
Returns:
list: list of parameters, each parameter is in dict format
Examples:
>>> cat csv_file
username,password
test1,111111
test2,222222
test3,333333
>>> load_csv_file(csv_file)
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
"""
if not os.path.isabs(csv_file):
global project_meta
if project_meta is None:
raise exceptions.MyBaseFailure("load_project_meta() has not been called!")
# make compatible with Windows/Linux
csv_file = os.path.join(project_meta.RootDir, *csv_file.split("/"))
if not os.path.isfile(csv_file):
# file path not exist
raise exceptions.CSVNotFound(csv_file)
csv_content_list = []
with open(csv_file, encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
csv_content_list.append(row)
return csv_content_list
load_folder_files
递归返回目录下yaml/json/_test.py结尾的文件
def load_folder_files(folder_path: Text, recursive: bool = True):
""" load folder path, return all files endswith .yml/.yaml/.json/_test.py in list.
Args:
folder_path (str): specified folder path to load
recursive (bool): load files recursively if True
Returns:
list: files endswith yml/yaml/json
"""
if isinstance(folder_path, (list, set)):
files = []
for path in set(folder_path):
files.extend(load_folder_files(path, recursive))
return files
if not os.path.exists(folder_path):#如果目录不存在返回[]
return []
file_list = []
for dirpath, dirnames, filenames in os.walk(folder_path):
filenames_list = []
for filename in filenames:
if not filename.lower().endswith((".yml", ".yaml", ".json", "_test.py")):#不属于这几种的不支持
continue
filenames_list.append(filename)
for filename in filenames_list:
file_path = os.path.join(dirpath, filename)
file_list.append(file_path)
if not recursive:#不递归目录
break
return file_list
os.walk() 方法用于通过在目录树中游走输出在目录中的文件名,向上或者向下。
用法:os.path.dirname(path) 参数: path:代表文件系统路径的path-like对象。 返回类型:此方法返回一个字符串值,该字符串值表示指定路径中的目录名称。
load_file
定位文件,返回文件绝对路径
def locate_file(start_path: Text, file_name: Text):
""" locate filename and return absolute file path.
searching will be recursive upward until system root dir.
Args:
file_name (str): target locate file name
start_path (str): start locating path, maybe file path or directory path
Returns:
str: located file path. None if file not found.
Raises:
exceptions.FileNotFound: If failed to locate file.
"""
if os.path.isfile(start_path):
start_dir_path = os.path.dirname(start_path)
elif os.path.isdir(start_path):
start_dir_path = start_path
else:
raise exceptions.FileNotFound(f"invalid path: {start_path}")
file_path = os.path.join(start_dir_path, file_name)
if os.path.isfile(file_path):
# ensure absolute
return os.path.abspath(file_path)
# system root dir
# Windows, e.g. 'E:\\'
# Linux/Darwin, '/'
parent_dir = os.path.dirname(start_dir_path)
if parent_dir == start_dir_path:
raise exceptions.FileNotFound(f"{file_name} not found in {start_path}")
# locate recursive upward
return locate_file(parent_dir, file_name)