一、框架架构
二、项目目录结构
三、框架功能说明
四、核心逻辑说明
配置文件
输出目录
请求工具类
代码编写case
程序主入口
执行记录
一、框架架构 二、项目目录结构 三、框架功能说明解决痛点:
通过session会话方式,解决了登录之后cookie关联处理
框架天然支持接口动态传参、关联灵活处理
支持Excel、Yaml文件格式编写接口用例,通过简单配置框架自动读取并执行
执行环境一键切换,解决多环境相互影响问题
支持http/https协议各种请求、传参类型接口
响应数据格式支持json、str类型的提取操作
断言方式支持等于、包含、大于、小于、不等于等方
框架可以直接交给不懂代码的功能测试人员使用,只需要安装规范编写接口用例就行
框架使用说明:
安装依赖包:pip install -r requirements.txt
框架主入口为 run.py
文件
编写用例可以在Excel
或者Yaml
文件里面,按照示例编写即可,也可以在test_case
目录下通过python脚本编写case
断言或者提取参数都是通过jsonpath
、正则表达式
提取数据
用例执行时默认读取Excel
和test_case
目录下用例
工具类封装
assert_util.py 断言工具类封装
def assert_result(response: Response, expected: str) -> None:
""" 断言方法
:param response: 实际响应对象
:param expected: 预期响应内容,从excel中或者yaml读取、或者手动传入
return None
"""
if expected is None:
logging.info("当前用例无断言!")
return
if isinstance(expected, str):
expect_dict = eval(expected)
else:
expect_dict = expected
index = 0
for k, v in expect_dict.items():
# 获取需要断言的实际结果部分
for _k, _v in v.items():
if _k == "http_code":
actual = response.status_code
else:
if response_type(response) == "json":
actual = json_extractor(response.json(), _k)
else:
actual = re_extract(response.text, _k)
index += 1
logging.info(f'第{index}个断言数据,实际结果:{actual} | 预期结果:{_v} 断言方式:{k}')
allure_step(f'第{index}个断言数据', f'实际结果:{actual} = 预期结果:{v}')
try:
if k == "eq": # 相等
assert actual == _v
elif k == "in": # 包含关系
assert _v in actual
elif k == "gt": # 判断大于,值应该为数值型
assert actual > _v
elif k == "lt": # 判断小于,值应该为数值型
assert actual < _v
elif k == "not": # 不等于,非
assert actual != _v
else:
logging.exception(f"判断关键字: {k} 错误!")
except AssertionError:
raise AssertionError(f'第{index}个断言失败 -|- 断言方式:{k} 实际结果:{actual} || 预期结果: {_v}')
case_handle.py Case数据读取工具类
def get_case_data():
case_type = ReadYaml(config_path + "config.yaml").read_yaml["case"]
if case_type == CaseType.EXCEL.value:
cases = []
for file in [excel for excel in os.listdir(data_path) if os.path.splitext(excel)[1] == ".xlsx"]:
data = ReadExcel(data_path + file).read_excel()
name = os.path.splitext(file)[0]
class_name = name.split("_")[0].title() + name.split("_")[1].title()
gen_case(name, data, class_name)
cases.extend(data)
return cases
elif case_type == CaseType.YAML.value:
cases = []
for yaml_file in [yaml for yaml in os.listdir(data_path) if
os.path.splitext(yaml)[1] in [".yaml", "yml"]]:
data = ReadYaml(data_path + yaml_file).read_yaml
name = os.path.splitext(yaml_file)[0]
class_name = name.split("_")[0].title() + name.split("_")[1].title()
gen_case(name, data, class_name)
cases.extend(data)
return cases
else:
cases = []
for file in [excel for excel in os.listdir(data_path) if
os.path.splitext(excel)[1] in [".yaml", "yml", ".xlsx"]]:
if os.path.splitext(file)[1] == ".xlsx":
data = ReadExcel(data_path + file).read_excel()
name = os.path.splitext(file)[0]
cases.extend(data)
else:
data = ReadYaml(data_path + file).read_yaml
name = os.path.splitext(file)[0]
cases.extend(data)
class_name = name.split("_")[0].title() + name.split("_")[1].title()
gen_case(name, data, class_name)
return cases
excel_handle.py 读取Excel工具类
def get_case_data():
case_type = ReadYaml(config_path + "config.yaml").read_yaml["case"]
if case_type == CaseType.EXCEL.value:
cases = []
for file in [excel for excel in os.listdir(data_path) if os.path.splitext(excel)[1] == ".xlsx"]:
data = ReadExcel(data_path + file).read_excel()
name = os.path.splitext(file)[0]
class_name = name.split("_")[0].title() + name.split("_")[1].title()
gen_case(name, data, class_name)
cases.extend(data)
return cases
elif case_type == CaseType.YAML.value:
cases = []
for yaml_file in [yaml for yaml in os.listdir(data_path) if
os.path.splitext(yaml)[1] in [".yaml", "yml"]]:
data = ReadYaml(data_path + yaml_file).read_yaml
name = os.path.splitext(yaml_file)[0]
class_name = name.split("_")[0].title() + name.split("_")[1].title()
gen_case(name, data, class_name)
cases.extend(data)
return cases
else:
cases = []
for file in [excel for excel in os.listdir(data_path) if
os.path.splitext(excel)[1] in [".yaml", "yml", ".xlsx"]]:
if os.path.splitext(file)[1] == ".xlsx":
data = ReadExcel(data_path + file).read_excel()
name = os.path.splitext(file)[0]
cases.extend(data)
else:
data = ReadYaml(data_path + file).read_yaml
name = os.path.splitext(file)[0]
cases.extend(data)
class_name = name.split("_")[0].title() + name.split("_")[1].title()
gen_case(name, data, class_name)
return cases
yaml_handle.py 读取Yaml文件的工具类
class ReadYaml:
def __init__(self, filename):
self.filename = filename
@property
def read_yaml(self) -> object:
with open(file=self.filename, mode="r", encoding="utf-8") as fp:
case_data = yaml.safe_load(fp.read())
return case_data
配置文件
config.yaml 配置信息
# 服务器器地址
host: http://localhost:8091/
case: 1 # 0代表执行Excel和yaml两种格式的用例, 1 代表Excel用例,2 代表 yaml文件用例
输出目录
日志输出目录
import logging
import time
import os
def get_log(logger_name):
"""
:param logger_name: 日志名称
:return: 返回logger handle
"""
# 创建一个logger
logger = logging.getLogger(logger_name)
logger.setLevel(logging.INFO)
# 获取本地时间,转换为设置的格式
rq = time.strftime('%Y%m%d', time.localtime(time.time()))
# 设置所有日志和错误日志的存放路径
path = os.path.dirname(os.path.abspath(__file__))
all_log_path = os.path.join(path, 'interface_logs\\All_Logs\\')
if not os.path.exists(all_log_path):
os.makedirs(all_log_path)
error_log_path = os.path.join(path, 'interface_logs\\Error_Logs\\')
if not os.path.exists(error_log_path):
os.makedirs(error_log_path)
# 设置日志文件名
all_log_name = all_log_path + rq + '.log'
error_log_name = error_log_path + rq + '.log'
if not logger.handlers:
# 创建一个handler写入所有日志
fh = logging.FileHandler(all_log_name, encoding='utf-8')
fh.setLevel(logging.INFO)
# 创建一个handler写入错误日志
eh = logging.FileHandler(error_log_name, encoding='utf-8')
eh.setLevel(logging.ERROR)
# 创建一个handler输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# 以时间-日志器名称-日志级别-文件名-函数行号-错误内容
all_log_formatter = logging.Formatter(
'[%(asctime)s] %(filename)s - %(levelname)s - %(lineno)s - %(message)s')
# 以时间-日志器名称-日志级别-文件名-函数行号-错误内容
error_log_formatter = logging.Formatter(
'[%(asctime)s] %(filename)s - %(levelname)s - %(lineno)s - %(message)s')
# 将定义好的输出形式添加到handler
fh.setFormatter(all_log_formatter)
ch.setFormatter(all_log_formatter)
eh.setFormatter(error_log_formatter)
# 给logger添加handler
logger.addHandler(fh)
logger.addHandler(eh)
logger.addHandler(ch)
return logger
报告目录
执行case后自动生成,执行之前自动删除
allure 数据目录
执行case后自动生成,执行之前自动删除
请求工具类base_request.py 请求封装工具类
class BaseRequest:
session = None
@classmethod
def get_session(cls):
if cls.session is None:
cls.session = requests.Session()
return cls.session
@classmethod
def send_request(cls, case: dict) -> Response:
"""
处理case数据,转换成可用数据发送请求
:param case: 读取出来的每一行用例内容
return: 响应对象
"""
log.info("开始执行用例: {}".format(case.get("title")))
req_data = RequestPreDataHandle(case).to_request_data
res = cls.send_api(
url=req_data["url"],
method=req_data["method"],
pk=req_data["pk"],
header=req_data.get("header", None),
data=req_data.get("data", None),
file=req_data.get("file", None)
)
allure_step('请求响应数据', res.text)
after_extract(res, req_data.get("extract", None))
return res
@classmethod
def send_api(cls, url, method, pk, header=None, data=None, file=None) -> Response:
"""
:param method: 请求方法
:param url: 请求url
:param pk: 入参关键字, params(查询参数类型,明文传输,一般在url?参数名=参数值), data(一般用于form表单类型参数)
json(一般用于json类型请求参数)
:param data: 参数数据,默认等于None
:param file: 文件对象
:param header: 请求头
:return: 返回res对象
"""
session = cls.get_session()
pk = pk.lower()
if pk == 'params':
res = session.request(method=method, url=url, params=data, headers=header)
elif pk == 'data':
res = session.request(method=method, url=url, data=data, files=file, headers=header)
elif pk == 'json':
res = session.request(method=method, url=url, json=data, files=file, headers=header)
else:
raise ValueError('pk可选关键字为params, json, data')
return res
pre_handle_utils.py 请求前置处理工具类
def pre_expr_handle(content) -> object:
"""
:param content: 原始的字符串内容
return content: 替换表达式后的字符串
"""
if content is None:
return None
if len(content) != 0:
log.info(f"开始进行字符串替换: 替换字符串为:{content}")
content = Template(str(content)).safe_substitute(GLOBAL_VARS)
for func in re.findall('\\${(.*?)}', content):
try:
content = content.replace('${%s}' % func, exec_func(func))
except Exception as e:
log.exception(e)
log.info(f"字符串替换完成: 替换字符串后为:{content}")
return content
after_handle_utils.py 后置操作处理工具类
def after_extract(response: Response, exp: str) -> None:
"""
:param response: request 响应对象
:param exp: 需要提取的参数字典 '{"k1": "$.data"}' 或 '{"k1": "data:(.*?)$"}'
:return:
"""
if exp:
if response_type(response) == "json":
res = response.json()
for k, v in exp.items():
GLOBAL_VARS[k] = json_extractor(res, v)
else:
res = response.text
for k, v in exp.items():
GLOBAL_VARS[k] = re_extract(res, v)
代码编写case
test_demo.py 用例文件示例
@allure.feature("登录")
class TestLogin:
@allure.story("正常登录成功")
@allure.severity(allure.severity_level.BLOCKER)
def test_login(self):
allure_title("正常登录")
data = {
"url": "api/login",
"method": "post",
"pk": "data",
"data": {"userName": "king", "pwd": 123456}
}
expected = {
"$.msg": "登录成功!"
}
# 发送请求
response = BaseRequest.send_request(data)
# 断言操作
assert_result(response, expected)
程序主入口
run.py 主入口执行文件
def run():
# 生成case在执行
if os.path.exists(auto_gen_case_path):
shutil.rmtree(auto_gen_case_path)
get_case_data()
if os.path.exists('outputs/reports/'):
shutil.rmtree(path='outputs/reports/')
# 本地调式执行
pytest.main(args=['-s', '--alluredir=outputs/reports'])
# 自动以服务形式打开报告
# os.system('allure serve outputs/reports')
# 本地生成报告
os.system('allure generate outputs/reports -o outputs/html --clean')
shutil.rmtree(auto_gen_case_path)
if __name__ == '__main__':
run()
执行记录
allure 报告
日志记录
[2022-01-11 22:36:04,164] base_request.py - INFO - 42 - 开始执行用例: 正常登录
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 37 - 开始进行字符串替换: 替换字符串为:bank/api/login
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 44 - 字符串替换完成: 替换字符串后为:bank/api/login
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 68 - 处理请求前url:bank/api/login
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 78 - 处理请求后 url:http://localhost:8091/bank/api/login
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 90 - 处理请求前Data: {'password': '123456', 'userName': 'king'}
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 37 - 开始进行字符串替换: 替换字符串为:{'password': '123456', 'userName': 'king'}
[2022-01-11 22:36:04,166] pre_handle_utils.py - INFO - 44 - 字符串替换完成: 替换字符串后为:{'password': '123456', 'userName': 'king'}
[2022-01-11 22:36:04,166] pre_handle_utils.py - INFO - 92 - 处理请求后Data: {'password': '123456', 'userName': 'king'}
[2022-01-11 22:36:04,166] pre_handle_utils.py - INFO - 100 - 处理请求前files: None
[2022-01-11 22:36:04,175] base_request.py - INFO - 53 - 请求响应数据{"code":"0","message":"success","data":null}
[2022-01-11 22:36:04,176] data_handle.py - INFO - 29 - 提取响应内容成功,提取表达式为: $.code 提取值为 0
[2022-01-11 22:36:04,176] assert_util.py - INFO - 49 - 第1个断言数据,实际结果:0 | 预期结果:0 断言方式:eq
[2022-01-11 22:36:04,176] data_handle.py - INFO - 29 - 提取响应内容成功,提取表达式为: $.message 提取值为 success
[2022-01-11 22:36:04,176] assert_util.py - INFO - 49 - 第2个断言数据,实际结果:success | 预期结果:success 断言方式:eq
到此这篇关于Pytest+Yaml+Excel 接口自动化测试框架的实现示例的文章就介绍到这了,更多相关Pytest Yaml Excel 接口自动化 内容请搜索软件开发网以前的文章或继续浏览下面的相关文章希望大家以后多多支持软件开发网!