Commit 2f98bfeb by MK

第一次提交

parent bb626d4b
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: __init__.py
# Author: xxxxxxx
# Datetime: 2023/11/3 14:00
# Description:
#
# ---------------------------------------------------------------------------
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: __init__.py
# Author: xxxxxxx
# Datetime: 2023/11/3 14:01
# Description:
#
# ---------------------------------------------------------------------------
from InterfaceAutoTest.common.log import write_log
log = write_log()
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: db.py
# Author: xxxxxxx
# Datetime: 2023/11/3 14:23
# Description:
#
# ---------------------------------------------------------------------------
import pymysql
from common import log
from common.read_ini import ReadIni
from data_config.settings import *
class DB:
def __init__(self):
"""链接数据库,获取链接对象和游标对象"""
read_ini = ReadIni()
try:
self.conn = pymysql.connect(
host=read_ini.get_sql_message(HOST),
port=int(read_ini.get_sql_message(PORT)),
user=read_ini.get_sql_message(USER),
password=read_ini.get_sql_message(PWD),
database=read_ini.get_sql_message(DATABASE),
charset="utf8"
)
self.cursor = self.conn.cursor()
except:
log.error("链接数据库错误或者获取游标对象失败!!!,请求察看数据库的链接配置.")
raise pymysql.MySQLError("链接数据库错误或者获取游标对象失败!!!,请求察看数据库的链接配置.")
def close(self):
self.cursor.close()
self.conn.close()
def delete(self, sql):
"""执行删除的sql语句"""
try:
self.cursor.execute(sql)
self.conn.commit()
except:
log.error("执行删除的sql语句错误,请察看对应的删除的sql语句")
raise ValueError("执行删除的sql语句错误,请察看对应的删除的sql语句")
def select(self, sql, num=1):
"""执行查询的sql语句"""
try:
self.cursor.execute(sql)
select_result = self.cursor.fetchall()
if select_result and num == 1:
return select_result[0][0]
elif select_result and num == 2:
return select_result[0][0], select_result[0][1]
elif select_result and num > 3:
return select_result
except:
log.error("执行查询的sql语句错误,请察看对应的查询的sql语句")
raise ValueError("执行查询的sql语句错误,请察看对应的查询的sql语句")
if __name__ == '__main__':
db = DB()
sql = """SELECT * FROM uc_demension LIMIT 1"""
print(db.select(sql))
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: log.py
# Author: xxxxxxx
# Datetime: 2023/11/3 14:02
# Description:
#
# ---------------------------------------------------------------------------
import logging
import os
def write_log():
"""创建写入日志对象"""
logger = logging.getLogger(name="黄总")
logger.level = logging.NOTSET
log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "report/log/日志汇总.log")
handler = logging.FileHandler(log_path, mode="a", encoding="utf-8")
format = logging.Formatter('%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s - %(name)s')
handler.setFormatter(format)
logger.addHandler(handler)
return logger
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: read_excel.py
# Author: xxxxxxx
# Datetime: 2023/11/3 14:31
# Description:
#
# ---------------------------------------------------------------------------
import openpyxl
from data_config.settings import *
from common import log
from common.read_ini import ReadIni
from common.read_json import read_json
class ReadExcel:
def __init__(self, table_nme, excel_path=None):
"""获取数据配置层中除了ini文件以外的所有文件的路径,再获取excel的工作表,再读取所有的json文件"""
self.read_ini = ReadIni()
excel_path = self.read_ini.get_file_path(EXCEL)
case_data_path = self.read_ini.get_file_path(CASE)
expect_data_path = self.read_ini.get_file_path(EXPECT)
sql_data_path = self.read_ini.get_file_path(SQL)
# 获取工作表名称
# table_name = self.read_ini.get_table_name(TABLE_NAME)
table_name = table_nme
wb = openpyxl.load_workbook(excel_path)
try:
self.ws = wb[table_name]
except:
log.error("获取工作表失败,请察看工作表名称是否配置正确!!!")
raise KeyError("获取工作表失败,请察看工作表名称是否配置正确!!!")
self.case_data_dict = read_json(case_data_path)
self.expect_data_dict = read_json(expect_data_path)
self.sql_data_dict = read_json(sql_data_path)
def __get_cell_value(self, column: str, row: int) -> str:
"""获取指定单元格数据"""
try:
value = self.ws[column+str(row)].value
if value is None:
return None
elif value.strip():
return value.strip()
except:
log.error("获取指定单元格数据失败,请察看输入的列号和行号是否正确!!")
raise KeyError("获取指定单元格数据失败,请察看输入的列号和行号是否正确!!")
def module_name(self, row):
"""根据行号,获取模块名称"""
return self.__get_cell_value(MODULE, row)
def api_name(self, row):
"""根据行号,获取接口名称"""
return self.__get_cell_value(API, row)
def title(self, row):
"""根据行号,获取用例标题"""
return self.__get_cell_value(TITLE, row)
def level(self, row):
"""根据行号,获取用例等级"""
return self.__get_cell_value(LEVEL, row)
def case_url(self, row):
"""根据行号,获取请求的url"""
return self.read_ini.get_host(URL_HOST_CUSTOM) + self.__get_cell_value(URL, row)
def case_method(self, row):
"""根据行号,获取请求方法"""
return self.__get_cell_value(METHOD, row)
def case_mime(self, row):
"""根据行号,获取媒体类型"""
mime = self.__get_cell_value(MIME, row)
if mime:
return mime.lower()
def case_data(self, row):
"""根据行号,获取用例数据"""
case_data_key = self.__get_cell_value(CASE_DATA, row)
if case_data_key:
module_name = self.module_name(row)
api_name = self.api_name(row)
try:
return self.case_data_dict[module_name][api_name][case_data_key]
except:
log.error("请察看是否配置了用例数据!!!")
raise KeyError("请察看是否配置了用例数据!!!")
def expect_data(self, row):
"""根据行号,获取期望数据"""
expect_data_key = self.__get_cell_value(EXPECT_DATA, row)
module_name = self.module_name(row)
api_name = self.api_name(row)
try:
return self.expect_data_dict[module_name][api_name][expect_data_key]
except:
log.error("请察看是否配置了期望数据!!!")
raise KeyError("请察看是否配置了期望数据!!!")
def sql_type(self, row):
"""根据行号,获取sql语句类型"""
sql_type_value = self.__get_cell_value(SQL_TYPE, row)
if sql_type_value:
return sql_type_value.lower()
def sql_data(self, row):
"""根据行号,获取sql语句"""
sql_data_key = self.__get_cell_value(SQL_DATA, row)
if sql_data_key:
module_name = self.module_name(row)
api_name = self.api_name(row)
try:
return self.sql_data_dict[module_name][api_name][sql_data_key]
except:
log.error("请察看是否配置了sql数据!!!")
raise KeyError("请察看是否配置了sql数据!!!")
def update_key(self, row):
"""根据行号,获取更新的key"""
return self.__get_cell_value(UPDATE_KEY, row)
def get_data(self):
"""将测试数据存放在一个二维列表中"""
list_data = []
try:
for row in range(2, self.ws.max_row + 1):
# module_name
module_name = self.module_name(row)
# api
api_name = self.api_name(row)
# title
title = self.title(row)
# level
level = self.level(row)
# url
case_url = self.case_url(row)
# method
case_method = self.case_method(row)
# mime
case_mime = self.case_mime(row)
# case_data
case_data = self.case_data(row)
# expect_data
expect_data = self.expect_data(row)
# sql_type
sql_type = self.sql_type(row)
# sql_data
sql_data = self.sql_data(row)
# update_key
update_key = self.update_key(row)
list_data.append(
[module_name, api_name, title, level, case_url, case_method, case_mime, case_data, expect_data, sql_type, sql_data, update_key])
else:
return list_data
except:
log.error("请察看excel中是否存在空行,如果存在,请删除空行!!")
raise TypeError("请察看excel中是否存在空行,如果存在,请删除空行!!")
if __name__ == '__main__':
excel = ReadExcel()
print(excel.get_data())
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: read_ini.py
# Author: xxxxxxx
# Datetime: 2023/11/3 14:05
# Description:
#
# ---------------------------------------------------------------------------
import configparser
import os
from common import log
from data_config.settings import *
class ReadIni:
def __init__(self):
"""读取ini文件"""
self.data_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data_config")
ini_path = os.path.join(self.data_config_path, "config.ini")
self.conf = configparser.ConfigParser()
self.conf.read(ini_path, encoding="utf-8")
def get_file_path(self, key):
"""根据key,获取file节点下文件的路径"""
try:
file_name = self.conf.get(FILE, key)
except:
log.error("输入file节点下的key,错误,请求察看配置文件~!!!")
raise KeyError("输入file节点下的key,错误,请求察看配置文件~!!!")
file_path = os.path.join(self.data_config_path, file_name)
if os.path.isfile(file_path):
return file_path
else:
log.error("获取file节点下,文件的路径错误,请察看配置文件是否配置正确~!!!")
raise FileExistsError("获取file节点下,文件的路径错误,请察看配置文件是否配置正确~!!!")
def get_host(self, key):
"""根据key,获取域名"""
try:
return self.conf.get(URL_HOST, key)
except:
log.error("输入host节点下的key,错误,请求察看配置文件~!!!")
raise KeyError("输入host节点下的key,错误,请求察看配置文件~!!!")
def get_table_name(self, key):
"""根据key,获取工作表名称"""
try:
return self.conf.get(TABLE, key)
except:
log.error("输入table节点下的key,错误,请求察看配置文件~!!!")
raise KeyError("输入table节点下的key,错误,请求察看配置文件~!!!")
def get_sql_message(self, key):
"""根据key,获取数据库"""
try:
return self.conf.get(CONN_SQL, key)
except:
log.error("输入sql节点下的key,错误,请求察看配置文件~!!!")
raise KeyError("输入sql节点下的key,错误,请求察看配置文件~!!!")
if __name__ == '__main__':
ini = ReadIni()
print(ini.get_sql_message("host"))
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: read_json.py
# Author: xxxxxxx
# Datetime: 2023/11/3 14:16
# Description:
#
# ---------------------------------------------------------------------------
import json
import os
from common import log
def read_json(filename):
"""读取json文件,将json文件的内容转成python对象,并返回"""
if os.path.isfile(filename) and filename.endswith(".json"):
try:
with open(filename, mode="r", encoding="utf-8") as f:
return json.loads(f.read())
except:
log.error("开打json文件获取json文件的内容序列化为python对象失败,请察看json文件是否有错误!!!")
raise FileExistsError("开打json文件获取json文件的内容序列化为python对象失败,请察看json文件是否有错误!!!")
else:
log.error("json文件的路径不合法")
raise FileNotFoundError("json文件的路径不合法")
{
"认证接口":{
"登录系统":{
"LoginSuccess": {"password": "MTIzNDU2","username": "admin"},
"LoginErrorUsernameLong":{"password": "MTIzNDU2","username": "adminadminadminadminadminadmin"},
"LoginErrorUsernameShort":{"password": "MTIzNDU2","username": "a"},
"LoginErrorUsernameNone":{"password": "MTIzNDU2","username": ""},
"LoginErrorUsernameSpecial":{"password": "MTIzNDU2","username": "@#!@#!@"},
"LoginErrorPwdLong":{"password": "MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2","username": "admin"},
"LoginErrorPwdShort":{"password": "m","username": "admin"},
"LoginErrorPwdNone":{"password": "","username": "admin"},
"LoginErrorPwdSpecial":{"password": "@#!@#!@","username": "admin"},
"LoginErrorPwdError":{"password": "MTIzNDU222","username": "admin"}
}
},
"维度管理": {
"添加维度": {
"AddDemSuccess": {
"code": "dem_test57_abc",
"description": "测试维度57",
"isDefault": 0,
"name": "测试57期的维度"
}
},
"根据维度编码获取维度信息": {
"GetDemMessageSuccess": {"code": "dem_test57_abc"}
},
"根据维度编码删除维度": {
"DeleteDemSuccess": {"ids": "不知道"}
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {
"code": "test_org_57",
"demId": "不知道",
"exceedLimitNum": 0,
"grade": "",
"limitNum": 0,
"name": "测试组织",
"nowNum": 0,
"orderNo": 0,
"parentId": "0"
}
},
"添加组织参数": {
"AddOrgParamsSuccess": {"query": {"orgCode": "test_org_57"}, "body": [{"alias":"sz","value":9000}]}
},
"删除组织": {
"DelOrgSuccess": "test_org_57"
}
}
}
[file]
;case为用例数据文件
case=case_data.json
;expect为期望数据文件
expect=expect_data.json
;excel为用例管理文件
excel=APIAutoTest.xlsx
;sql语句的json文件
sql=sql_data.json
[host]
;host为被测系统的域名
host=http://120.46.172.186:8080
[table]
;table_name为excel的工作表名称
table_name=BPM
[sql]
;链接数据库的配置
host=120.46.172.186
port=3306
user=root
pwd=root@2023
database=eipsaas
\ No newline at end of file
{
"认证接口":{
"登录系统":{
"LoginSuccess": {"username": "超级管理","account": "admin", "loginStatus": true},
"LoginErrorUsernameLong":{"state": false, "message":"账号或密码错误"},
"LoginErrorUsernameShort":{"state": false, "message":"账号或密码错误"},
"LoginErrorUsernameNone":{"state": false, "message":"账号或密码错误"},
"LoginErrorUsernameSpecial":{"state": false, "message":"账号或密码错误"},
"LoginErrorPwdLong":{"state": false, "message":"账号或密码错误"},
"LoginErrorPwdShort":{"state": false, "message":"账号或密码错误"},
"LoginErrorPwdNone":{"state": false, "message":"账号或密码错误"},
"LoginErrorPwdSpecial":{"state": false, "message":"账号或密码错误"},
"LoginErrorPwdError":{"state": false, "message":"账号或密码错误"}
},
"刷新token": {
"RefreshSuccess": {"message": "刷新成功"}
}
},
"维度管理": {
"添加维度": {
"AddDemSuccess": {"state": true, "message": "添加维度成功!"}
},
"根据维度编码获取维度信息": {
"GetDemMessageSuccess": {"demCode": "dem_test57_abc", "name": "测试57期的维度"}
},
"根据维度编码删除维度": {
"DeleteDemSuccess": {"state": true, "message": "删除维度成功"}
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {"state": true, "message": "添加组织成功!"}
},
"添加组织参数": {
"AddOrgParamsSuccess": {"state":true,"message":"保存组织参数成功!"}
},
"删除组织": {
"DelOrgSuccess": {"state":true,"message":"删除组织成功!"}
}
}
}
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: settings.py
# Author: xxxxxxx
# Datetime: 2023/11/3 17:03
# Description:
#
# ---------------------------------------------------------------------------
# file节点名称====================
FILE = "file"
# file节点名称=key的名称===================
CASE = "case"
EXPECT = "expect"
EXCEL = "excel"
SQL = "sql"
# table节点名称========================
TABLE = "table"
# table节点名称=key的名称=======================
TABLE_NAME = "table_name"
# host节点名称=================
URL_HOST = "host"
# host节点名称=key的名称================
URL_HOST_CUSTOM = "host"
# sql节点名称==========================
CONN_SQL = "sql"
# sql节点名称=key的名称=========================
HOST = "host"
PORT = "port"
USER = "user"
PWD = "pwd"
DATABASE = "database"
# excel文件中的列号
NUMBER = "A"
MODULE = "B"
API = "C"
TITLE = "D"
LEVEL = "E"
URL = "G"
METHOD = "F"
MIME = "H"
CASE_DATA = "I"
EXPECT_DATA = "J"
SQL_TYPE = "K"
SQL_DATA = "L"
UPDATE_KEY = "M"
{
"维度管理": {
"添加维度": {
"AddDemSuccess": "DELETE FROM uc_demension WHERE `CODE_`=\"dem_test57_abc\";"
},
"根据维度编码删除维度": {
"DeleteDemSuccess": "SELECT ID_ from uc_demension WHERE `CODE_`=\"dem_test57_abc\";"
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {
"select": "SELECT ID_ from uc_demension WHERE `CODE_`=\"dem_test57_abc\";",
"delete": "DELETE FROM uc_org WHERE `CODE_`=\"test_org_57\";"
}
}
}
}
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: __init__.py
# Author: xxxxxxx
# Datetime: 2023/11/3 14:01
# Description:
#
# ---------------------------------------------------------------------------
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: requests_method.py
# Author: xxxxxxx
# Datetime: 2023/11/3 15:24
# Description:
#
# ---------------------------------------------------------------------------
import base64
import logging
import requests
from common import log
from common.read_ini import ReadIni
from data_config.settings import *
class RequestsMethod:
def __init__(self):
"""关联被测系统的登录状态"""
login_url = ReadIni().get_host(URL_HOST_CUSTOM) + "/auth"
login_data = {"username": "admin", "password": base64.b64encode("123456".encode()).decode()}
self.bpm_session = requests.sessions.Session()
self.bpm_session.headers.update({"Authorization": "Bearer "+self.bpm_session.post(url=login_url, json=login_data).json().get("token")})
def request_all(self, req_method, req_url, req_mime=None, case_data=None):
"""封装公共的请求方法"""
if req_mime == "json" or req_mime == "application/json":
return self.bpm_session.request(method=req_method, url=req_url, json=case_data)
elif req_mime == "x-www-form-urlencoded" or req_mime == "application/x-www-form-urlencoded":
return self.bpm_session.request(method=req_method, url=req_url, data=case_data)
elif req_mime == "form-data" or req_mime == "multipart/form-data":
return self.bpm_session.request(method=req_method, url=req_url, files=case_data)
elif req_mime == "query" or req_mime == "params" or req_mime == "param":
return self.bpm_session.request(method=req_method, url=req_url, params=case_data)
elif req_mime is None:
return self.bpm_session.request(method=req_method, url=req_url)
# 判断媒体类型是否为query|body 或者 body|query 、json|query、query|json
elif req_mime == "query|body" or req_mime == "body|query" or req_mime == "json|query" or req_mime == "query|json":
return self.bpm_session.request(method=req_method, url=req_url, params=case_data["query"], json=case_data["body"])
else:
log.error("传入的媒体类型的值错误,请察看excel中是否填入正确")
raise NameError("传入的媒体类型的值错误,请察看excel中是否填入正确")
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: __init__.py
# Author: xxxxxxx
# Datetime: 2023/11/3 14:01
# Description:
#
# ---------------------------------------------------------------------------
[pytest]
;开启日志
;log_cli=true
;设置日志的级别,如果不设置级别的话,可以设置为NOTSET,如果要设置级别,级别可以有debug,info,warning,error,致命
;log_level=NOTSET
;设置日志显示的信息格式
;log_format=%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s
;设置日志中时间显示的格式
;log_date_format=%Y-%m-%d %H:%M:%S
;每个py文件运行的时候追加的命令
;addopts=-vs
;设置日志保存的文件
log_file=./report/log/bpm_接口自动化框架最新运行日志.log
;设置日志保存在文件中的级别
log_file_level=error
;设置日志在文件中的信息格式
log_file_format=%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s
;设置文件日志中时间显示的格式
log_file_date_format=%Y-%m-%d %H:%M:%S
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: __init__.py
# Author: xxxxxxx
# Datetime: 2023/11/3 17:37
# Description:
#
# ---------------------------------------------------------------------------
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: conftest.py
# Author: xxxxxxx
# Datetime: 2023/11/3 15:33
# Description:
#
# ---------------------------------------------------------------------------
import pytest
from common.db import DB
from requests_method.requests_method import RequestsMethod
@pytest.fixture(scope="session")
def req_fix():
req = RequestsMethod()
yield req
@pytest.fixture(scope="session")
def db_fix():
db = DB()
yield db
db.close()
# 黄总定义的自定义固件,请勿乱动=================start=========>
# <====================================================end
def pytest_collection_modifyitems(items):
# item表示每个测试用例,解决用例名称中文显示问题
for item in items:
item.name = item.name.encode("utf-8").decode("unicode-escape")
item._nodeid = item._nodeid.encode("utf-8").decode("unicode-escape")
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: test_bpm.py
# Author: xxxxxxx
# Datetime: 2023/11/3 15:33
# Description:
#
# ---------------------------------------------------------------------------
import pytest
from common import log
from common.read_excel import ReadExcel
import allure
class TestBPM:
@allure.epic("BPM项目-老黄")
# @allure.feature("模块名称")
# @allure.story("接口名称")
# @allure.title("用例标题")
# @allure.severity("用例等级")
@pytest.mark.parametrize("module_name, api_name, title, level, case_url, case_method, case_mime, case_data, expect_data, sql_type, sql_data, update_key", ReadExcel("BPM-laohuang").get_data())
def test_bpm(self, db_fix, req_fix, module_name, api_name, title, level, case_url, case_method, case_mime, case_data, expect_data, sql_type, sql_data, update_key):
# 影响allure报告的数据,不影响代码流程
allure.dynamic.feature(module_name)
allure.dynamic.story(api_name)
allure.dynamic.title(title)
allure.dynamic.severity(level)
# 判断sql语句的类型是否为delete
if sql_type == "delete":
# 使用DB类对象执行删除的sql语句
db_fix.delete(sql_data)
# 使用RequestsMethod类对象发送请求--pass
# 判断sql语句类型是否为select
elif sql_type == "select":
# 使用DB类对象执行查询的sql语句,并接收查询的结果
select_result = db_fix.select(sql_data)
# 将查询结果更新到用例数据中
case_data[update_key] = select_result
# 使用RequestsMethod类对象发送请求--pass
# 判断sql语句的类型是否为select|delete或者为delete|select
elif sql_type == "select|delete" or sql_type == "delete|select":
# 使用DB类对象执行删除的sql语句
db_fix.delete(sql_data.get("delete"))
# 使用DB类对象执行查询的sql语句,并接收查询的结果
select_result = db_fix.select(sql_data.get("select"))
# 将查询结果更新到用例数据中
case_data[update_key] = select_result
# 使用RequestsMethod类对象发送请求
res = req_fix.request_all(req_method=case_method, req_url=case_url, req_mime=case_mime, case_data=case_data)
# 断言
try:
for key in expect_data:
assert expect_data[key] == res.json().get(key)
except:
log.error("断言失败"+",用例数据为:"+str(case_data)+",期望数据为:"+str(expect_data)+",服务器返回的数据为:"+res.text)
raise AssertionError("断言失败")
if __name__ == '__main__':
pytest.main()
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: __init__.py
# Author: xxxxxxx
# Datetime: 2023/11/3 14:01
# Description:
#
# ---------------------------------------------------------------------------
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: test_pytest_func.py
# Author: xxxxxxx
# Datetime: 2023/11/3 9:36
# Description:
#
# ---------------------------------------------------------------------------
import base64
import random
import pytest
import requests
import allure
"""
pytest装饰器:
1:@pytest.fixture(scope="指定自定义固件的级别", autouse=False, params=可迭代类型):实现自定义固件
2:@pytest.mark.parametrize("字符串", 可迭代类型): 实现用例的参数化
3: 下载插件,插件为:pytest-dependency, 下载方式为:pip install pytest-dependency
插件:pytest-dependency的功能为实现用例之间的依赖。
使用流程:
先使用装饰器@pytest.mark.dependency()标记依赖的用例
再使用装饰器@pytest.mark.dependency(depends=["用例的名称"])管理依赖的用例,也可以标记用例被后面的用例依赖
4: @pytest.mark.标签名:功能给用例打上标签,在终端运行的时候,使用的命令为:pytest -vs 用例py文件 -m="标签名"
5: @pytest.mark.run(order=num): num为整形,数字越小越先执行。但是需要下载插件,插件为:pip install pytest-ordering
6:@pytest.mark.usefixtures("自定义固件名称"):实现自定义固件的调用
7: @pytest.mark.skip(reason="无条件跳过"): 让用例无条件跳过
8: @pytest.mark.skipif(bool, reason="有条件跳过"):如果bool表达式为True,用例有条件跳过
9:@pytest.mark.xfail(reason="预期失败")
"""
dict1 = {}
@allure.epic("BPM_流程测试")
@allure.feature("认证接口和维度管理流程")
class Test01:
def setup_class(self):
login_url = "http://120.46.172.186:8080/auth"
# 配置登录数据
login_data = {"username": "admin", "password": base64.b64encode("123456".encode()).decode()}
self.bpm_sess = requests.sessions.Session()
self.token = self.bpm_sess.post(login_url, json=login_data).json().get("token")
self.bpm_sess.headers.update({"Authorization": "Bearer "+self.token})
@allure.story("登录")
@pytest.mark.dependency()
def test1(self):
print("用例1")
# 实现登录,获取服务器返回的token
login_url = "http://120.46.172.186:8080/auth"
# 配置登录数据
login_data = {"username": "admin", "password": base64.b64encode("123456".encode()).decode()}
res = self.bpm_sess.post(url=login_url, json=login_data)
print(res.text)
assert "超级管理员" == res.json().get("username")
@allure.story("添加维度")
@pytest.mark.dependency(depends=["Test01::test1"])
def test2(self):
print("添加维度用例")
add_dem_url = "http://120.46.172.186:8080/api/demension/v1/dem/addDem"
add_dem_data = {
"code": "addDem"+str(random.randrange(1, 100)),
"description": "addDem",
"isDefault": 0,
"name": "addDem"
}
res = self.bpm_sess.request(method="post", url=add_dem_url, json=add_dem_data)
print(res.text)
assert "添加维度成功" in res.text
@pytest.mark.dependency(depends=["test1", "test2"])
def test3(self):
print("用例3")
'''
@pytest.mark.run(order=4)
@pytest.mark.p1
def test1():
print("用例1")
@pytest.mark.dependency(depends=["test4"])
@pytest.mark.run(order=2)
@pytest.mark.p0
def test2():
print("用例2")
@pytest.mark.run(order=3)
@pytest.mark.p0
def test3():
print("用例3")
@pytest.mark.dependency()
@pytest.mark.run(order=1)
@pytest.mark.p1
def test4():
print("用例4")
assert 1 == 2
'''
'''
def test1():
print("用例1")
@pytest.mark.skip(reason="无条件跳过")
def test2():
print("用例2")
@pytest.mark.skipif(1 == 2, reason="无条件跳过")
def test3():
print("用例2")
@pytest.mark.xfail(reason="预期失败")
def test4():
print("预期失败")
assert 1 == 3
'''
if __name__ == '__main__':
pytest.main()
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: __init__.py
# Author: xxxxxxx
# Datetime: 2023/11/3 17:38
# Description:
#
# ---------------------------------------------------------------------------
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: conftest.py
# Author: xxxxxxx
# Datetime: 2023/11/3 15:33
# Description:
#
# ---------------------------------------------------------------------------
import pytest
from common.db import DB
from requests_method.requests_method import RequestsMethod
@pytest.fixture(scope="session")
def req_fix():
req = RequestsMethod()
yield req
@pytest.fixture(scope="session")
def db_fix():
db = DB()
yield db
db.close()
# 黄总定义的自定义固件,请勿乱动=================start=========>
# <====================================================end
def pytest_collection_modifyitems(items):
# item表示每个测试用例,解决用例名称中文显示问题
for item in items:
item.name = item.name.encode("utf-8").decode("unicode-escape")
item._nodeid = item._nodeid.encode("utf-8").decode("unicode-escape")
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_57
# FileName: test_bpm.py
# Author: xxxxxxx
# Datetime: 2023/11/3 15:33
# Description:
#
# ---------------------------------------------------------------------------
import pytest
from common import log
from common.read_excel import ReadExcel
from common.read_ini import ReadIni
import allure
excel_path = ReadIni().get_file_path("excel")
class TestBPM:
@allure.epic("BPM_张三")
@pytest.mark.parametrize("module_name, api_name, title, level, case_url, case_method, case_mime, case_data, expect_data, sql_type, sql_data, update_key", ReadExcel("BPM", excel_path=excel_path).get_data())
def test_bpm(self, db_fix, req_fix, module_name, api_name, title, level, case_url, case_method, case_mime, case_data, expect_data, sql_type, sql_data, update_key):
allure.dynamic.feature(module_name)
allure.dynamic.story(api_name)
allure.dynamic.title(title)
allure.dynamic.severity(level)
# 判断sql语句的类型是否为delete
if sql_type == "delete":
# 使用DB类对象执行删除的sql语句
db_fix.delete(sql_data)
# 使用RequestsMethod类对象发送请求--pass
# 判断sql语句类型是否为select
elif sql_type == "select":
# 使用DB类对象执行查询的sql语句,并接收查询的结果
select_result = db_fix.select(sql_data)
# 将查询结果更新到用例数据中
case_data[update_key] = select_result
# 使用RequestsMethod类对象发送请求--pass
# 判断sql语句的类型是否为select|delete或者为delete|select
elif sql_type == "select|delete" or sql_type == "delete|select":
# 使用DB类对象执行删除的sql语句
db_fix.delete(sql_data.get("delete"))
# 使用DB类对象执行查询的sql语句,并接收查询的结果
select_result = db_fix.select(sql_data.get("select"))
# 将查询结果更新到用例数据中
case_data[update_key] = select_result
# 使用RequestsMethod类对象发送请求
res = req_fix.request_all(req_method=case_method, req_url=case_url, req_mime=case_mime, case_data=case_data)
# 断言
try:
for key in expect_data:
assert expect_data[key] == res.json().get(key)
except:
log.error("断言失败"+",用例数据为:"+str(case_data)+",期望数据为:"+str(expect_data)+",服务器返回的数据为:"+res.text)
raise AssertionError("断言失败")
if __name__ == '__main__':
pytest.main()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment