Commit e42000bd by Qwantsun

API test v-1.0

parent f29694fc
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: __init__.py
# Author: lao_zhao
# Datetime: 2024/5/20 15:40
# Description:
#
# ---------------------------------------------------------------------------
import functools
import logging
import os
import time
log_dir_path = os.path.join(os.path.join(os.path.dirname(__file__), "report"), "log")
if not os.path.exists(log_dir_path):
# makedirs(c:\c\d\a) os.mkdir
os.makedirs(log_dir_path)
log_file_name = os.path.join(log_dir_path, time.strftime("%Y_%m_%d_%H_%M_%S") + ".log")
def log():
# 创建logger对象
logger = logging.getLogger()
# 设置日志的级别
logger.level = logging.ERROR
# 设置日志文件
handler = logging.FileHandler(log_file_name, mode="a", encoding="utf-8")
# 设置日志的格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')
# 将格式添加到handler中
handler.setFormatter(formatter)
# 将handler添加到logger对象中
logger.addHandler(handler)
return logger
log = log()
def log_decorator(func_name):
@functools.wraps(func_name)
def inner(*args, **kwargs):
try:
# 写入info级别的日志
"""
__init__(): 构造方法,初始化对象
__new__(): 为对象分配内存空间
__del__(): 析构方法,回收对象
__enter__()/__exit__(): 实现上下文管理器
__mro__: 查看继承顺序
__file__: 获取文件的路径
__name__: 获取功能/类/方法/模块的名称
__iter__(): 创建迭代器
__next__(): 迭代器和生成器取值
__code__: 获取功能的code属性
func.__code__.co_filename: 获取func所在的py文件名称
func.__code__.co_firstlineno: 获取func所在的py文件中行号
__doc__:获取功能的描述
"""
log.info(f"执行的功能为:{func_name.__name__}, 功能的描述为:{func_name.__doc__}, 所在的文件为:"
f"{func_name.__code__.co_filename}, 所在行为:{func_name.__code__.co_firstlineno}")
# 执行功能
return func_name(*args, **kwargs)
except Exception as e:
# 如果发生错误,写入一个错误的日志
log.error(f"执行的功能为:{func_name.__name__}, 功能的描述为:{func_name.__doc__}, 所在的文件为:"
f"{func_name.__code__.co_filename}, 所在行为:{func_name.__code__.co_firstlineno}, 错误为:{e}")
raise e
return inner
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: db.py
# Author: lao_zhao
# Datetime: 2024/5/20 14:08
# Description:
#
# ---------------------------------------------------------------------------
import pymysql
from InterfaceAutoTest import log_decorator
from InterfaceAutoTest.common.read_ini import ReadIni
class DB:
@log_decorator
def __init__(self):
"""链接数据库,获取链接对象和游标对象"""
ini = ReadIni()
try:
self.conn = pymysql.connect(
host=ini.get_db_connect_message("host"),
port=int(ini.get_db_connect_message("port")),
user=ini.get_db_connect_message("username"),
password=ini.get_db_connect_message("password"),
database=ini.get_db_connect_message("database"),
charset="utf8"
)
self.cursor = self.conn.cursor()
except Exception as e:
raise e
@log_decorator
def close(self):
"""关闭吧数据库链接"""
self.cursor.close()
self.conn.close()
@log_decorator
def delete_func(self, sql):
"""执行删除的sql语句"""
try:
self.cursor.execute(sql)
self.conn.commit()
except Exception as e:
raise e
@log_decorator
def select_func(self, sql):
"""执行查询的sql语句,并返回查询结果"""
try:
self.cursor.execute(sql)
except Exception as e:
raise e
else:
result_select = self.cursor.fetchall()
if result_select:
return result_select[0][0]
if __name__ == '__main__':
db = DB()
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: read_excel.py
# Author: lao_zhao
# Datetime: 2024/5/20 14:15
# Description:
#
# ---------------------------------------------------------------------------
import openpyxl
from InterfaceAutoTest import log_decorator
from InterfaceAutoTest.common.read_ini import ReadIni
from InterfaceAutoTest.common.read_json import read_json
class ReadExcel:
@log_decorator
def __init__(self, username="demo", table_name="BPM"):
"""获取所有json文件的路径,再读取json文件,再获取excel的路径和工作表名称,加载工作簿,获取工作表"""
self.ini = ReadIni()
case_data_path = self.ini.get_file_path("case", username)
expect_data_path = self.ini.get_file_path("expect", username)
sql_data_path = self.ini.get_file_path("sql", username)
self.case_data_dict = read_json(case_data_path)
self.expect_data_dict = read_json(expect_data_path)
self.sql_data_dict = read_json(sql_data_path)
excel_path = self.ini.get_file_path("excel", username)
# table_name = self.ini.get_table_name("table_name")
try:
wb = openpyxl.load_workbook(excel_path)
self.ws = wb[table_name]
except Exception as e:
raise e
@log_decorator
def __get_cell_value(self, column: str, row: int) -> str:
"""获取指定单元格数据"""
try:
value = self.ws[column+str(row)].value
except Exception as e:
raise e
else:
if value is None:
return None
elif value.strip():
return value.strip()
@log_decorator
def module_name(self, row):
"""获取模块名称"""
return self.__get_cell_value("b", row)
@log_decorator
def api_name(self, row):
"""获取接口名称"""
return self.__get_cell_value("c", row)
@log_decorator
def case_req_method(self, row):
"""获取请求方法"""
return self.__get_cell_value("f", row)
@log_decorator
def case_req_url(self, row):
"""获取请求url"""
value = self.__get_cell_value("g", row)
if value:
return self.ini.get_host("bpm") + value
@log_decorator
def case_req_mime(self, row):
"""获取请求的媒体类型"""
value = self.__get_cell_value("h", row)
if value:
return value.lower()
@log_decorator
def case_data(self, row):
"""获取用例数据"""
case_data_key = self.__get_cell_value("i", row)
if case_data_key:
module_name = self.module_name(row)
api_name = self.api_name(row)
return self.case_data_dict[module_name][api_name][case_data_key]
@log_decorator
def expect_data(self, row):
"""获取用例数据"""
expect_data_key = self.__get_cell_value("j", row)
if expect_data_key:
module_name = self.module_name(row)
api_name = self.api_name(row)
return self.expect_data_dict[module_name][api_name][expect_data_key]
@log_decorator
def sql_data(self, row):
"""获取用例数据"""
sql_data_key = self.__get_cell_value("k", row)
if sql_data_key:
module_name = self.module_name(row)
api_name = self.api_name(row)
return self.sql_data_dict[module_name][api_name][sql_data_key]
@log_decorator
def sql_type(self, row):
"""获取sql语句类型"""
value = self.__get_cell_value("l", row)
if value:
return value.lower()
@log_decorator
def update_key(self, row):
"""获取更新的key"""
return self.__get_cell_value("m", row)
@log_decorator
def get_data(self):
"""获取excel中的数据存放在二维列表中"""
list_data = []
for row in range(2, self.ws.max_row+1):
method = self.case_req_method(row)
url = self.case_req_url(row)
mime = self.case_req_mime(row)
case_data = self.case_data(row)
expect_data = self.expect_data(row)
sql_data = self.sql_data(row)
sql_type = self.sql_type(row)
update_key = self.update_key(row)
if method and url:
list_data.append([method, url, mime, case_data, expect_data, sql_data, sql_type, update_key])
else:
return list_data
if __name__ == '__main__':
excel = ReadExcel()
print(excel.get_data())
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: read_ini.py
# Author: lao_zhao
# Datetime: 2024/5/20 14:02
# Description:
#
# ---------------------------------------------------------------------------
import configparser
import os
from InterfaceAutoTest import log_decorator
class ReadIni:
@log_decorator
def __init__(self):
"""获取ini文件的路径,再使用Configparser对象读取ini文件"""
self.data_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data_config')
ini_path = os.path.join(self.data_config_path, 'config.ini')
self.conf = configparser.ConfigParser()
self.conf.read(ini_path, encoding="utf-8")
@log_decorator
def get_file_path(self, key, username="demo"):
"""根据key获取file节点下文件的路径"""
try:
value = self.conf.get("file", key)
except Exception as e:
raise e
else:
username_dir = os.path.join(self.data_config_path, username)
return os.path.join(username_dir, value)
@log_decorator
def get_table_name(self, key):
"""根据key获取工作表名称"""
try:
value = self.conf.get("worksheet", key)
except Exception as e:
raise e
else:
return value
@log_decorator
def get_host(self, key):
"""根据key获取域名"""
try:
value = self.conf.get("host", key)
except Exception as e:
raise e
else:
return value
@log_decorator
def get_db_connect_message(self, key):
"""根据key获取数据库的链接信息"""
try:
value = self.conf.get("db", key)
except Exception as e:
raise e
else:
return value
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: read_json.py
# Author: lao_zhao
# Datetime: 2024/5/20 14:13
# Description:
#
# ---------------------------------------------------------------------------
import json
import os.path
from InterfaceAutoTest import log_decorator
@log_decorator
def read_json(filename):
"""读取json文件,将json文件的内容序列化为python对象"""
if os.path.isfile(filename) and filename.endswith(".json"):
try:
with open(filename, mode="r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
raise e
else:
raise FileNotFoundError("json文件的路径错误")
\ No newline at end of file
# 配置数据配置层中文件的名称
[file]
# 配置用例管理文件的名称
excel=APIAutoTest.xlsx
# 配置用例数据文件的名称
case=case_data.json
# 配置期望数据的文件
expect=expect_data.json
# sql语句的文件
sql=sql_data.json
[worksheet]
table_name=BPM
[host]
# 配置测试系统的域名
bpm=http://120.46.172.186:8080
[db]
# 配置测试系统的数据库的链接项
host=120.46.172.186
port=3306
username=root
password=root@2023
database=eipsaas
\ No newline at end of file
{
"认证接口":{
"登录系统":{
"LoginSuccess": {"username":"admin","password":"MTIzNDU2"},
"LoginErrorPasswordIsNone": {"username":"admin","password":""},
"LoginErrorPasswordIsLong":{"username":"admin","password":"MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2"},
"LoginErrorPasswordIsShort":{"username":"admin","password":"m"},
"LoginErrorPasswordIsSpecial":{"username":"admin","password":"☯㍿卍卐"},
"LoginErrorUsernameIsNone":{"username":"","password":"MTIzNDU2"},
"LoginErrorUsernameIsLong":{"username":"adminadminadminadmin","password":"MTIzNDU2"},
"LoginErrorUsernameIsShort":{"username":"a","password":"MTIzNDU2"},
"LoginErrorUsernameIsSpecial":{"username":"☯㍿卍卐","password":"MTIzNDU2"},
"LoginErrorUsernameIsErr":{"username":"admin123","password":"MTIzNDU2"}
}
},
"维度管理": {
"添加维度": {
"AddDemSuccess": {
"code": "api_auto_test_add_dem",
"description": "api_auto_test_add_dem",
"isDefault": 0,
"name": "api_auto_test_add_dem"
}
},
"根据维度编码删除维度": {
"DeleteDemSuccess": {"ids": "需要更新"}
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {
"code": "test_add_org",
"demId": "需要更新",
"exceedLimitNum": 0,
"grade": "",
"limitNum": 0,
"name": "测试添加的组织",
"nowNum": 0,
"orderNo": 0,
"parentId": "0"
}
},
"组织加入用户": {
"UserAddOrgSuccess": {"orgCode": "test_add_org", "accounts": "admin"}
},
"保存组织参数": {
"SaveParamSuccess": {
"query": {"orgCode": "test_add_org"},
"body": [{"alias":"sz","value":100},{"alias":"kc","value":"语文课"}]
}
},
"删除组织": {
"DeleteOrgSuccess": "test_add_org"
}
}
}
\ No newline at end of file
{
"认证接口":{
"登录系统":{
"LoginSuccess": {"username":"超级管理员","loginStatus":true, "account":"admin"},
"LoginErrorPasswordIsNone": {"message":"账号或密码错误"},
"LoginErrorPasswordIsLong":{"message":"账号或密码错误"},
"LoginErrorPasswordIsShort":{"message":"账号或密码错误"},
"LoginErrorPasswordIsSpecial":{"message":"账号或密码错误"},
"LoginErrorUsernameIsNone":{"message":"账号或密码错误"},
"LoginErrorUsernameIsLong":{"message":"账号或密码错误"},
"LoginErrorUsernameIsShort":{"message":"账号或密码错误"},
"LoginErrorUsernameIsSpecial":{"message":"账号或密码错误"},
"LoginErrorUsernameIsErr":{"message":"账号或密码错误"}
},
"刷新token": {
"RefreshTokenSuccess": {"message": "刷新token成功"}
}
},
"维度管理": {
"添加维度": {
"AddDemSuccess":{"message": "添加维度成功!"}
},
"根据维度编码删除维度": {
"DeleteDemSuccess": {"message": "删除维度成功"}
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {"message": "添加组织成功"}
},
"组织加入用户": {
"UserAddOrgSuccess": {"state":true,"message":"加入成功","value":""}
},
"保存组织参数": {
"SaveParamSuccess": {"state":true,"message":"保存组织参数成功!","value":""}
},
"删除组织": {
"DeleteOrgSuccess": {"state":true,"message":"删除组织成功!","value":""}
}
}
}
\ No newline at end of file
{
"维度管理": {
"添加维度": {
"AddDemSuccess": "delete FROM uc_demension WHERE `CODE_`=\"api_auto_test_add_dem\";"
},
"根据维度编码删除维度": {
"DeleteDemSuccess": "select ID_ FROM uc_demension WHERE `CODE_`=\"api_auto_test_add_dem\";"
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {
"select": "select ID_ FROM uc_demension WHERE `CODE_`=\"api_auto_test_add_dem\";",
"delete": "DELETE FROM uc_org WHERE `CODE_`=\"test_add_org\";"
}
}
}
}
\ No newline at end of file
{
"认证接口":{
"登录系统":{
"LoginSuccess": {"username":"admin","password":"MTIzNDU2"},
"LoginErrorPasswordIsNone": {"username":"admin","password":""},
"LoginErrorPasswordIsLong":{"username":"admin","password":"MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2"},
"LoginErrorPasswordIsShort":{"username":"admin","password":"m"},
"LoginErrorPasswordIsSpecial":{"username":"admin","password":"☯㍿卍卐"},
"LoginErrorUsernameIsNone":{"username":"","password":"MTIzNDU2"},
"LoginErrorUsernameIsLong":{"username":"adminadminadminadmin","password":"MTIzNDU2"},
"LoginErrorUsernameIsShort":{"username":"a","password":"MTIzNDU2"},
"LoginErrorUsernameIsSpecial":{"username":"☯㍿卍卐","password":"MTIzNDU2"},
"LoginErrorUsernameIsErr":{"username":"admin123","password":"MTIzNDU2"}
}
},
"维度管理": {
"添加维度": {
"AddDemSuccess": {
"code": "api_auto_test_add_dem",
"description": "api_auto_test_add_dem",
"isDefault": 0,
"name": "api_auto_test_add_dem"
}
},
"根据维度编码删除维度": {
"DeleteDemSuccess": {"ids": "需要更新"}
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {
"code": "test_add_org",
"demId": "需要更新",
"exceedLimitNum": 0,
"grade": "",
"limitNum": 0,
"name": "测试添加的组织",
"nowNum": 0,
"orderNo": 0,
"parentId": "0"
}
},
"组织加入用户": {
"UserAddOrgSuccess": {"orgCode": "test_add_org", "accounts": "admin"}
},
"保存组织参数": {
"SaveParamSuccess": {
"query": {"orgCode": "test_add_org"},
"body": [{"alias":"sz","value":100},{"alias":"kc","value":"语文课"}]
}
},
"删除组织": {
"DeleteOrgSuccess": "test_add_org"
}
}
}
\ No newline at end of file
{
"认证接口":{
"登录系统":{
"LoginSuccess": {"username":"超级管理员","loginStatus":true, "account":"admin"},
"LoginErrorPasswordIsNone": {"message":"账号或密码错误"},
"LoginErrorPasswordIsLong":{"message":"账号或密码错误"},
"LoginErrorPasswordIsShort":{"message":"账号或密码错误"},
"LoginErrorPasswordIsSpecial":{"message":"账号或密码错误"},
"LoginErrorUsernameIsNone":{"message":"账号或密码错误"},
"LoginErrorUsernameIsLong":{"message":"账号或密码错误"},
"LoginErrorUsernameIsShort":{"message":"账号或密码错误"},
"LoginErrorUsernameIsSpecial":{"message":"账号或密码错误"},
"LoginErrorUsernameIsErr":{"message":"账号或密码错误"}
},
"刷新token": {
"RefreshTokenSuccess": {"message": "刷新token成功"}
}
},
"维度管理": {
"添加维度": {
"AddDemSuccess":{"message": "添加维度成功!"}
},
"根据维度编码删除维度": {
"DeleteDemSuccess": {"message": "删除维度成功"}
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {"message": "添加组织成功"}
},
"组织加入用户": {
"UserAddOrgSuccess": {"state":true,"message":"加入成功","value":""}
},
"保存组织参数": {
"SaveParamSuccess": {"state":true,"message":"保存组织参数成功!","value":""}
},
"删除组织": {
"DeleteOrgSuccess": {"state":true,"message":"删除组织成功!","value":""}
}
}
}
\ No newline at end of file
{
"维度管理": {
"添加维度": {
"AddDemSuccess": "delete FROM uc_demension WHERE `CODE_`=\"api_auto_test_add_dem\";"
},
"根据维度编码删除维度": {
"DeleteDemSuccess": "select ID_ FROM uc_demension WHERE `CODE_`=\"api_auto_test_add_dem\";"
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {
"select": "select ID_ FROM uc_demension WHERE `CODE_`=\"api_auto_test_add_dem\";",
"delete": "DELETE FROM uc_org WHERE `CODE_`=\"test_add_org\";"
}
}
}
}
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: requsts_method.py
# Author: lao_zhao
# Datetime: 2024/5/20 14:34
# Description:
#
# ---------------------------------------------------------------------------
import base64
import requests
from InterfaceAutoTest import log_decorator
from InterfaceAutoTest.common.read_ini import ReadIni
class RequestsMethod:
@log_decorator
def __init__(self):
"""关联token"""
login_url = ReadIni().get_host("bpm")+"/auth"
login_data = {"username": "admin", "password": base64.b64encode("123456".encode()).decode()}
self.bpm_session = requests.sessions.Session()
self.bpm_session.headers["Authorization"] = "Bearer "+self.bpm_session.post(url=login_url, json=login_data).json().get("token")
@log_decorator
def request_all(self, req_method, req_url, req_mime, case_data):
"""封装公共的请求方法"""
if req_mime == "json" or req_mime == "application/json":
return self.bpm_session.request(method=req_method, url=req_url, json=case_data)
elif req_mime == "form" or req_mime == "application/x-www-form-urlencoded":
return self.bpm_session.request(method=req_method, url=req_url, data=case_data)
elif req_mime == "text" or req_mime == "text/plain":
# 请求体中除了json类型传参和上传文件以外的数据如果需要在请求体中传参全部使用data传参
return self.bpm_session.request(method=req_method, url=req_url, data=case_data)
elif req_mime == "form-data" or req_mime == "multipart/form-data":
return self.bpm_session.request(method=req_method, url=req_url, files=case_data)
elif req_mime == "query" or req_mime == "params":
# 任何请求如果在地址栏传参使用params关键字传参
return self.bpm_session.request(method=req_method, url=req_url, params=case_data)
elif req_mime is None:
return self.bpm_session.request(method=req_method, url=req_url)
# 判断请求的媒体类型是为query|json或者为json|query
elif req_mime == "query|json" or req_mime == "json|query":
query_data = case_data["query"]
body_data = case_data["body"]
# params和json同时传参时表示请求体和地址栏同时传参
return self.bpm_session.request(method=req_method, url=req_url, params=query_data, json=body_data)
else:
raise ValueError("请求的媒体类型没封装")
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: __init__.py
# Author: lao_zhao
# Datetime: 2024/5/20 17:48
# Description:
#
# ---------------------------------------------------------------------------
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: __init__.py
# Author: lao_zhao
# Datetime: 2024/5/20 17:48
# Description:
#
# ---------------------------------------------------------------------------
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: conftest.py
# Author: lao_zhao
# Datetime: 2024/5/20 16:10
# Description:
#
# ---------------------------------------------------------------------------
import pytest
from InterfaceAutoTest import log_decorator
from InterfaceAutoTest.common.db import DB
from InterfaceAutoTest.requests_method.requsts_method import RequestsMethod
@log_decorator
@pytest.fixture(scope="session")
def req_fix():
"""创建RequestsMethod对象,并返回"""
req = RequestsMethod()
yield req
@log_decorator
@pytest.fixture(scope="session")
def db_fix():
"""创建DB对象,并返回"""
db = DB()
yield db
db.close()
[pytest]
; ;开启日志
; log_cli=true
; ;设置日志的级别,如果不设置级别的话,可以设置为NOTSET,如果要设置级别,级别可以有debug,info,warning,error,致命
; log_level=NOTSET
; ;设置日志显示的信息格式
; log_format=%(levelname)s--%(asctime)s--%(message)s
; ;设置日志中时间显示的格式
; log_date_format=%Y-%m-%d %H:%M:%S
; ;每个py文件运行的时候追加的命令
; ;addopts=-vs
;设置日志保存的文件
; log_file=../report/log/bpm_test.log
;设置日志保存在文件中的级别
log_file_level=error
;设置日志在文件中的信息格式
log_file_format=%(levelname)s--%(asctime)s--%(message)s
;设置文件日志中时间显示的格式
log_file_date_format=%Y-%m-%d %H:%M:%S
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: test_bpm.py
# Author: lao_zhao
# Datetime: 2024/5/20 16:08
# Description:
#
# ---------------------------------------------------------------------------
import pytest
import logging
from InterfaceAutoTest import log_decorator
from InterfaceAutoTest.common.read_excel import ReadExcel
excel = ReadExcel("demo")
class TestBPM:
@pytest.mark.parametrize("method, url, mime, case_data, expect_data, sql_data, sql_type, update_key", excel.get_data())
def test_bpm(self, req_fix, db_fix, method, url, mime, case_data, expect_data, sql_data, sql_type, update_key):
# 判断sql语句类型是否为select
if sql_type == "select":
# 使用DB类对象调用select_func方法执行查询的sql语句,并接收查询结果
select_result = db_fix.select_func(sql_data)
# 将查询结果更新到用例数据中
case_data[update_key] = select_result
# 将更新之后的用例数据发送给服务器。
# 判断sql语句的类型是否为delete
elif sql_type == "delete":
# 使用DB类对象调用delete_func方法执行删除的sql语句
db_fix.delete_func(sql_data)
# 将数据库中的数据删除完之后,再发送请求
# 判断sql语句类型是否为select|delete或者为delete|select
elif sql_type == "select|delete" or sql_type == "delete|select":
# 使用DB类对象调用delete_func方法执行删除的sql语句
db_fix.delete_func(sql_data["delete"])
# 使用DB类对象调用select_func方法执行查询的sql语句,并接收查询结果
select_result = db_fix.select_func(sql_data["select"])
# 将查询结果更新到用例数据中
case_data[update_key] = select_result
# 将更新之后的用例数据发送给服务器。
# 使用RequestsMethod对象发送请求
res = req_fix.request_all(req_method=method, req_url=url, req_mime=mime, case_data=case_data)
# 断言
try:
for key in expect_data:
assert expect_data[key] == res.json().get(key)
except AssertionError as e:
logging.error(f"断言失败, 用例数据为{case_data}, 期望数据为:{expect_data}, 服务器返回的数据为:{res.text}")
raise e
else:
logging.error(f"断言成功, 用例数据为{case_data}, 期望数据为:{expect_data}, 服务器返回的数据为:{res.text}")
print("断言")
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: __init__.py
# Author: lao_zhao
# Datetime: 2024/5/20 17:48
# Description:
#
# ---------------------------------------------------------------------------
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: test_dependency.py
# Author: lao_zhao
# Datetime: 2024/5/20 17:47
# Description:
#
# ---------------------------------------------------------------------------
# 标记此用例会被依赖
import random
import string
import pytest, requests, base64
from faker import Faker
bpm_session = requests.sessions.Session()
@pytest.mark.dependency()
def test_login():
login_url = "http://120.46.172.186:8080/auth"
login_data = {"username": "admin", "password": base64.b64encode("123456".encode()).decode()}
res = bpm_session.request(method="post", url=login_url, json=login_data)
try:
assert "loginStatus" in res.text
except AssertionError as e:
print(e)
raise e
else:
bpm_session.headers["Authorization"] = "Bearer " + res.json().get("token")
print("断言成功")
# 标记当前用例会依赖前面的哪一条用例, 标记此用例会被依赖
@pytest.mark.dependency(depends=["test_login"])
def test_add_dem():
add_dem_url = "http://120.46.172.186:8080/api/demension/v1/dem/addDem"
data = Faker(locale="zh_cn")
add_dem_data = {
"code": "".join(random.sample(string.ascii_letters + string.digits, 12)),
"description": data.address(),
"isDefault": 0,
"name": data.name()
}
res = bpm_session.request(method="post", url=add_dem_url, json=add_dem_data)
try:
assert "成功" in res.text
except Exception as e:
print(e)
raise e
else:
print("断言成功")
@pytest.mark.dependency(depends=["test_add_dem", "test_login"])
def test1():
print("用例函数1")
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: __init__.py
# Author: lao_zhao
# Datetime: 2024/5/20 17:48
# Description:
#
# ---------------------------------------------------------------------------
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: conftest.py
# Author: lao_zhao
# Datetime: 2024/5/20 16:10
# Description:
#
# ---------------------------------------------------------------------------
import pytest
from InterfaceAutoTest import log_decorator
from InterfaceAutoTest.common.db import DB
from InterfaceAutoTest.requests_method.requsts_method import RequestsMethod
@log_decorator
@pytest.fixture(scope="session")
def req_fix():
"""创建RequestsMethod对象,并返回"""
req = RequestsMethod()
yield req
@log_decorator
@pytest.fixture(scope="session")
def db_fix():
"""创建DB对象,并返回"""
db = DB()
yield db
db.close()
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test60
# FileName: test_bpm.py
# Author: lao_zhao
# Datetime: 2024/5/20 16:08
# Description:
#
# ---------------------------------------------------------------------------
import pytest
import logging
from InterfaceAutoTest import log_decorator
from InterfaceAutoTest.common.read_excel import ReadExcel
excel = ReadExcel("mirZhang", "BPM-组织管理")
class TestBPM:
@pytest.mark.parametrize("method, url, mime, case_data, expect_data, sql_data, sql_type, update_key", excel.get_data())
def test_bpm(self, req_fix, db_fix, method, url, mime, case_data, expect_data, sql_data, sql_type, update_key):
# 判断sql语句类型是否为select
if sql_type == "select":
# 使用DB类对象调用select_func方法执行查询的sql语句,并接收查询结果
select_result = db_fix.select_func(sql_data)
# 将查询结果更新到用例数据中
case_data[update_key] = select_result
# 将更新之后的用例数据发送给服务器。
# 判断sql语句的类型是否为delete
elif sql_type == "delete":
# 使用DB类对象调用delete_func方法执行删除的sql语句
db_fix.delete_func(sql_data)
# 将数据库中的数据删除完之后,再发送请求
# 判断sql语句类型是否为select|delete或者为delete|select
elif sql_type == "select|delete" or sql_type == "delete|select":
# 使用DB类对象调用delete_func方法执行删除的sql语句
db_fix.delete_func(sql_data["delete"])
# 使用DB类对象调用select_func方法执行查询的sql语句,并接收查询结果
select_result = db_fix.select_func(sql_data["select"])
# 将查询结果更新到用例数据中
case_data[update_key] = select_result
# 将更新之后的用例数据发送给服务器。
# 使用RequestsMethod对象发送请求
res = req_fix.request_all(req_method=method, req_url=url, req_mime=mime, case_data=case_data)
# 断言
try:
for key in expect_data:
assert expect_data[key] == res.json().get(key)
except AssertionError as e:
logging.error(f"断言失败, 用例数据为{case_data}, 期望数据为:{expect_data}, 服务器返回的数据为:{res.text}")
raise e
else:
logging.error(f"断言成功, 用例数据为{case_data}, 期望数据为:{expect_data}, 服务器返回的数据为:{res.text}")
print("断言")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment