Commit 6dc724e6 by ht

描述信息

parents
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: Python_learn
# FileName: conftest.py
# Author: xxxxxxx
# Datetime: 2023-08-08 6:33
# Description:
#
# ---------------------------------------------------------------------------
import pytest
from learn.example0807.common.read_db import DB
from learn.example0807.requests_method.requests_method import Requests
@pytest.fixture(scope="session")
def req_fix():
req = Requests()
yield req
@pytest.fixture(scope="session")
def db_fix():
db = DB()
yield db
db.close()
def pytest_collection_modifyitems(items):
# item表示每个测试用例,解决用例名称中文显示问题
for item in items:
item.name = item.name.encode("utf-8").decode("unicode-escape")
item._nodeid = item._nodeid.encode("utf-8").decode("unicode-escape")
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: Python_learn
# FileName: test_bpm.py
# Author: xxxxxxx
# Datetime: 2023-08-08 6:01
# Description:
#
# ---------------------------------------------------------------------------
import logging
import allure
import pytest
from learn.example0807.common.read_excel import ReadExcel
class TestBPM:
@allure.epic("BPM系统")
@pytest.mark.parametrize("module,interface,title,level,request_method,case_url,mime,case_data,except_data,sql_type,sql_sentence,updata_key",ReadExcel().get_data())
def test_bpm(self,req_fix,db_fix,module,interface,title,level,request_method,case_url,mime,case_data,except_data,sql_type,sql_sentence,updata_key):
allure.dynamic.feature(module)
allure.dynamic.story(interface)
allure.dynamic.title(title)
allure.dynamic.severity(level)
if sql_type == "delete":
db_fix.delete_sql(sql_sentence)
elif sql_type == "select":
result = db_fix.select_sql(sql_sentence)
case_data[updata_key] = result
elif sql_type == "delete|select" or sql_type == "select|delete":
db_fix.delete(sql_sentence["delete"])
select_result = db_fix.select(sql_sentence["select"])
case_data[updata_key] = select_result
result = req_fix.request_all(req_method=request_method, req_url=case_url, req_mime=mime, case_data=case_data)
print(result.text)
try:
for key in except_data:
assert except_data[key] == result.json().get(key)
except:
print("断言失败")
raise AssertionError("断言失败")
else:
print("断言成功")
if __name__ == '__main__':
pytest.main()
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: Python_learn
# FileName: __init__.py
# Author: xxxxxxx
# Datetime: 2023-08-08 6:01
# Description:
#
# ---------------------------------------------------------------------------
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: Python_learn
# FileName: read_db.py
# Author: xxxxxxx
# Datetime: 2023-08-07 19:09
# Description:
#
# ---------------------------------------------------------------------------
import pymysql
from learn.example0807.common.read_ini import ReadIni
class DB:
def __init__(self):
read_ini = ReadIni()
self.coon = pymysql.connect(
host=read_ini.get_sqlepath("host"),
port=int(read_ini.get_sqlepath("port")),
password=read_ini.get_sqlepath("pwd"),
user=read_ini.get_sqlepath("user"),
database=read_ini.get_sqlepath("database"),
charset="utf8"
)
self.cursor = self.coon.cursor()
def close(self):
self.cursor.close()
self.coon.close()
def delete_data(self, sql_sentence):
self.cursor.execute(sql_sentence)
self.coon.commit()
def select_data(self,sql_sentence):
self.cursor.execute(sql_sentence)
select_result = self.cursor.fetchall()
if select_result:
return select_result[0][0]
if __name__ == '__main__':
db = DB()
sql = """SELECT ID_ FROM uc_demension WHERE `CODE_`="test56_dem";"""
print(db.select_data(sql))
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: Python_learn
# FileName: read_excel.py
# Author: xxxxxxx
# Datetime: 2023-08-07 19:38
# Description:
#
# ---------------------------------------------------------------------------
import openpyxl
from learn.example0807.common.read_ini import ReadIni
from learn.example0807.common.read_json import read_json
class ReadExcel:
def __init__(self):
self.read_ini = ReadIni()
case_data_path = self.read_ini.get_filepath("case")
expect_data_path = self.read_ini.get_filepath("except")
sql_data_path = self.read_ini.get_filepath("sql")
self.case_data_dict = read_json(case_data_path)
self.expect_data_dict = read_json(expect_data_path)
self.sql_data_dict = read_json(sql_data_path)
self.excel_name = self.read_ini.get_filepath("excel")
self.table_name = self.read_ini.get_tablepath("table_name")
wb =openpyxl.load_workbook(self.excel_name)
self.ws = wb[self.table_name]
def module_name(self,row):
return self.ws["B"+str(row)].value.strip()
def interface_name(self,row):
return self.ws["C"+str(row)].value.strip()
def case_title(self,row):
return self.ws["D"+str(row)].value.strip()
def case_level(self,row):
return self.ws["E"+str(row)].value.strip()
def request_method(self,row):
return self.ws["F"+str(row)].value.strip().lower()
def case_url(self,row):
host =self.read_ini.get_hostpath("test_bpm")
result = self.ws["G"+str(row)].value.strip().lower()
return host+result
def mime(self,row):
result = self.ws["H" + str(row)].value
if result is None:
return None
elif result.strip():
return result.strip().lower()
def case_data(self,row):
result = self.ws["I" + str(row)].value
moudle_name = self.module_name(row)
interface_name = self.interface_name(row)
if result is None:
return None
elif result.strip():
return self.case_data_dict[moudle_name][interface_name][result]
def except_data(self,row):
result = self.ws["J" + str(row)].value
moudle_name = self.module_name(row)
interface_name = self.interface_name(row)
if result is None:
return None
elif result.strip():
return self.expect_data_dict[moudle_name][interface_name][result]
def sql_type(self,row):
result = self.ws["K" + str(row)].value
if result:
return result.strip().lower()
def sql_sentence(self,row):
result = self.ws["L"+str(row)].value
moudle_name = self.module_name(row)
interface_name = self.interface_name(row)
if result is None:
return None
elif result.strip():
return self.sql_data_dict[moudle_name][interface_name][result]
def updata_key(self,row):
return self.ws["M"+str(row)].value
def get_data(self):
list_data = []
for row in range(2, self.ws.max_row + 1):
module = self.module_name(row)
interface = self.interface_name(row)
title = self.case_title(row)
level = self.case_level(row)
request_method = self.request_method(row)
case_url = self.case_url(row)
mime = self.mime(row)
case_data = self.case_data(row)
except_data = self.except_data(row)
sql_type = self.sql_type(row)
sql_sentence = self.sql_sentence(row)
updata_key = self.updata_key(row)
list_data.append([module,interface,title,level,request_method,case_url,mime,case_data,except_data,sql_type,sql_sentence,updata_key])
else:
return list_data
if __name__ == '__main__':
read_excel =ReadExcel()
print(read_excel.get_data())
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: Python_learn
# FileName: read_ini.py
# Author: xxxxxxx
# Datetime: 2023-08-07 18:43
# Description:
#
# ---------------------------------------------------------------------------
import configparser
import os
class ReadIni:
def __init__(self):
self.common_path = os.path.join(os.path.dirname(os.path.dirname(__file__)),"config_data")
ini_path = os.path.join(self.common_path,"config.ini")
self.conf = configparser.ConfigParser()
self.conf.read(ini_path, encoding="utf-8")
def get_filepath(self,key):
file_path = self.conf.get("file",key).strip()
return os.path.join(self.common_path,file_path)
def get_hostpath(self,key):
return self.conf.get("host",key).strip()
def get_tablepath(self,key):
return self.conf.get("table",key).strip()
def get_sqlepath(self,key):
return self.conf.get("sql",key).strip()
if __name__ == '__main__':
read_ini = ReadIni()
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: Python_learn
# FileName: read_json.py
# Author: xxxxxxx
# Datetime: 2023-08-07 19:06
# Description:
#
# ---------------------------------------------------------------------------
import json
def read_json(filename):
with open(filename,mode="r",encoding="utf-8")as fp:
return json.load(fp)
{
"认证接口":{
"登录系统":{
"LoginSuccess": {"password": "MTIzNDU2","username": "admin"},
"LoginFailUsernameIsNone":{"password": "MTIzNDU2","username": ""},
"LoginFailUsernameIsLong":{"password": "MTIzNDU2","username": "adminadminadminadminadminadminadminadminadmin"},
"LoginFailUsernameIsShort":{"password": "MTIzNDU2","username": "a"},
"LoginFailUsernameIsSpecialChar":{"password": "MTIzNDU2","username": "▬♦◊◦☼♠♣"},
"LoginFailUsernameIsError":{"password": "MTIzNDU2","username": "tomcat"},
"LoginFailPWDIsNone":{"password": "","username": "admin"},
"LoginFailPWDIsLong":{"password": "MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2","username": "admin"},
"LoginFailPWDIsShort":{"password": "1","username": "admin"},
"LoginFailPWDIsSpecial":{"password": "▬♦◊◦☼♠♣","username": "admin"}
}
},
"维度管理": {
"添加维度": {
"AddDemSuccess": {
"code": "test56_dem",
"description": "测试56期的维度",
"isDefault": 0,
"name": "测试56期的维度"
}
},
"根据维度编码删除维度": {
"DeleteDemSuccess": {"ids": "不知道"}
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {
"code": "test56_org",
"demId": "不知道",
"exceedLimitNum": 0,
"grade": "",
"limitNum": 0,
"name": "测试的组织",
"nowNum": 0,
"orderNo": 0,
"parentId": "0"
}
},
"删除组织": {
"DeleteOrgSuccess": "test56_org"
},
"保存组织参数": {
"SaveOrgParamSuccess": {
"query": {"orgCode": "test56_org"},
"body": [
{"alias":"sz","value":0},
{"alias":"zy","value":"math"},
{"alias":"rq","value":"2023-08-04T16:00:00.000Z"},
{"alias":"ah","value":"lq"},
{"alias":"yyyy","value":"越高越好"},
{"alias":"xb","value":"2"}]
}
}
}
}
[file]
case=case_data.json
except=except_data.json
excel=APIAutoTest.xlsx
sql=sql.json
[host]
test_bpm=http://120.46.172.186:8080
[table]
table_name=BPM接口
[sql]
host=120.46.172.186
port=3306
user=root
pwd=root@2023
database=eipsaas
\ No newline at end of file
{
"认证接口":{
"登录系统":{
"LoginSuccess": {"username": "超级管理员","account": "admin"},
"LoginFailUsernameIsNone":{"state": false,"message": "账号或密码错误"},
"LoginFailUsernameIsLong":{"state": false,"message": "账号或密码错误"},
"LoginFailUsernameIsShort":{"state": false,"message": "账号或密码错误"},
"LoginFailUsernameIsSpecialChar":{"state": false,"message": "账号或密码错误"},
"LoginFailUsernameIsError":{"state": false,"message": "账号或密码错误"},
"LoginFailPWDIsNone":{"state": false,"message": "账号或密码错误"},
"LoginFailPWDIsLong":{"state": false,"message": "账号或密码错误"},
"LoginFailPWDIsShort":{"state": false,"message": "账号或密码错误"},
"LoginFailPWDIsSpecial":{"state": false,"message": "账号或密码错误"}
},
"刷新token": {
"RefreshSuccess": {"message": "刷新token成功"}
}
},
"维度管理": {
"添加维度": {
"AddDemSuccess": {"state": true, "message": "添加维度成功!"}
},
"根据维度编码删除维度": {
"DeleteDemSuccess": {"state": true, "message": "删除维度成功"}
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {"state": true, "message": "添加组织成功!"}
},
"删除组织": {
"DeleteOrgSuccess": {"state": true, "message": "删除组织成功!"}
},
"保存组织参数": {
"SaveOrgParamSuccess": {"state": true, "message": "保存组织参数成功!"}
}
}
}
{
"维度管理": {
"添加维度": {
"AddDemSuccess": "DELETE FROM uc_demension WHERE `CODE_`=\"test56_dem\";"
},
"根据维度编码删除维度": {
"DeleteDemSuccess": "SELECT ID_ FROM uc_demension WHERE `CODE_`=\"test56_dem\";"
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {
"select": "SELECT ID_ FROM uc_demension WHERE `CODE_`=\"test56_dem\";",
"delete": "DELETE FROM uc_org WHERE `CODE_`=\"test56_org\";"
}
}
}
}
\ No newline at end of file
++ "a/example0807/report/log/\346\227\245\345\277\227.log"
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: Python_learn
# FileName: requests_method.py
# Author: xxxxxxx
# Datetime: 2023-08-08 6:04
# Description:
#
# ---------------------------------------------------------------------------
import base64
import requests
from learn.example0807.common.read_excel import ReadExcel
from learn.example0807.common.read_ini import ReadIni
class Requests:
def __init__(self):
read_excel = ReadExcel()
read_ini = ReadIni()
self.host = read_ini.get_hostpath("test_bpm")+"/auth"
data = {"username":"admin","password":"123456"}
data["password"] = base64.b64encode("123456".encode("utf-8")).decode("utf-8")
self.sessions = requests.sessions.Session()
token = self.sessions.post(url=self.host,json=data).json().get("token")
self.sessions.headers["Authorization"] = "Bearer "+token
def request_all(self, req_method, req_url, req_mime, case_data):
if req_mime == "json" or req_mime == "application/json":
return self.sessions.request(method=req_method, url=req_url, json=case_data)
elif req_mime == "x-www-form-urlencoded" or req_mime == "application/x-www-form-urlencoded":
return self.sessions.request(method=req_method, url=req_url, data=case_data)
elif req_mime == "form-data" or req_mime == "multipart/form-data":
return self.sessions.request(method=req_method, url=req_url, files=case_data)
elif req_mime == "query" or req_mime == "params":
return self.sessions.request(method=req_method, url=req_url, params=case_data)
elif req_mime == "application/json|query" or req_mime == "json|query" or req_mime == "query|json":
return self.sessions.request(method=req_method, url=req_url, params=case_data["query"], json=case_data["body"])
elif req_mime == "text/plain" or req_mime == "text":
return self.sessions.request(method=req_method, url=req_url, data=case_data)
elif req_mime is None:
return self.sessions.request(method=req_method, url=req_url)
else:
raise NameError("传入的媒体类型的名称错误")
if __name__ == '__main__':
req = Requests()
print(req)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment