Commit 3f217834 by ying

APIAutoTest02 commit

parents
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_56
# FileName: db.py
# Author: xxxxxxx
# Datetime: 2023/8/4 14:21
# Description:
#
# ---------------------------------------------------------------------------
import pymysql
from APIAutoTest02.common.read_ini import ReadIni
class DB:
def __init__(self):
"""链接数据库,获取链接对象,再获取游标对象"""
read_ini = ReadIni()
# 使用pymysql里面的connect类链接数据库,获取链接对象
self.conn = pymysql.connect(
host=read_ini.get_sql_connect_message("host"),
port=int(read_ini.get_sql_connect_message("port")),
user=read_ini.get_sql_connect_message('user'),
password=read_ini.get_sql_connect_message("pwd"),
database=read_ini.get_sql_connect_message("database"),
charset="utf8")
# 使用链接对象,创建游标对象
self.cursor = self.conn.cursor()
def close(self):
# 先关闭游标对象
self.cursor.close()
# 再关闭链接对象
self.conn.close()
def delete(self, sql_sentence):
"""
执行删除的sql语句
:param sql_sentence: sql语句
:return: None
"""
# 使用游标对象的execute方法执行sql语句
self.cursor.execute(sql_sentence)
# 使用链接对象提交
self.conn.commit()
def select(self, sql_sentence):
"""
执行查询的sql语句
:param sql_sentence: sql语句
:return: 查询的结果
"""
# 使用游标对象的execute方法执行sql语句
self.cursor.execute(sql_sentence)
# 使用游标对象的fetchall方法获取查询的结果
select_result = self.cursor.fetchall()
# 判断查询的结果是否为空的元组,如果是空的元组返回None,如果不是空的元组返回查询结果的值
if select_result:
return select_result[0][0]
if __name__ == '__main__':
db = DB()
sql = """SELECT ID_ FROM uc_demension WHERE `CODE_`="test56_dem";"""
print(db.select(sql))
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_56
# FileName: read_excel.py
# Author: xxxxxxx
# Datetime: 2023/8/4 14:36
# Description:
#
# ---------------------------------------------------------------------------
import openpyxl
from APIAutoTest02.common.read_json import read_json
from APIAutoTest02.common.read_ini import ReadIni
class ReadExcel:
def __init__(self):
"""获取所有json和excel文件的路径,并读取所有的json和获取excel的工作表"""
# 创建ReadIni对象
self.read_ini = ReadIni()
# 获取所有的json文件的路径, 使用ReadIni对象调用get_file_path获取文件的路径
case_data_path = self.read_ini.get_file_path("case")
expect_data_path = self.read_ini.get_file_path("expect")
sql_data_path = self.read_ini.get_file_path("sql")
# 调用read_json函数读取所有的json文件
self.case_data_dict = read_json(case_data_path)
self.expect_data_dict = read_json(expect_data_path)
self.sql_data_dict = read_json(sql_data_path)
# 获取excel的路径, 使用ReadIni对象调用get_file_path获取文件的路径
excel_path = self.read_ini.get_file_path("excel")
# 获取工作簿
wb = openpyxl.load_workbook(excel_path)
table_name = self.read_ini.get_table_name("table")
self.ws = wb[table_name]
def __get_cell_value(self, column: str, row: int) -> str:
"""
根据列号和行号获取指定单元格数据
:param column:列号
:param row:行号
:return:单元格数据
"""
# 根据 工作表["列号行号"].value 获取单元格数据
value = self.ws[str(column)+str(row)].value
# 判断单元格数据如果为None返回None
if value is None:
return None
# 判断单元格数据去掉空格之后是否为空的字符串,如果是返回None,如果不是空的字符串,返回去掉前后空的字符串
elif value.strip():
return value.strip()
def module_name(self, row):
"""
根据行号获取模块名称
:param row: 行号
:return: 模块名称
"""
# 调用私有方法__get_cell_value获取单元格数据
return self.__get_cell_value("B", row)
def interface_name(self, row):
"""
根据行号,获取接口名称
:param row: 行号
:return: 接口名称
"""
# 调用私有方法__get_cell_value获取单元格数据
return self.__get_cell_value("c", row)
def case_method(self, row):
"""
根据行号,获取请求方法
:param row: 行号
:return: 请求方法
"""
# 调用私有方法__get_cell_value获取单元格数据
return self.__get_cell_value("F", row)
def case_url(self, row):
"""
根据行号,获取请求的url
:param row: 行号
:return: url
"""
# 获取域名,使用ReadIni对象,调用get_host方法获取域名
host = self.read_ini.get_host("test_bpm")
path = self.__get_cell_value("g", row)
return host+path
def case_mime(self, row):
"""
根据行号,获取用例的媒体类型
:param row: 行号
:return: 媒体类型
"""
value = self.__get_cell_value("h", row)
if value:
return value.lower()
def case_data(self, row):
"""
根据行号,获取用例数据
:param row: 行号
:return: 用例数据
"""
case_data_key = self.__get_cell_value("i", row)
if case_data_key:
# 获取用例json文件的路径
# 使用read_json函数读取用例的json文件
module_name = self.module_name(row)
interface_name = self.interface_name(row)
return self.case_data_dict[module_name][interface_name][case_data_key]
def expect_data(self, row):
"""
根据行号,获取期望数据
:param row: 行号
:return: 期望数据
"""
# 获取期望数据的key,调用私有方法__get_cell_value获取单元格数据
expect_data_key = self.__get_cell_value("j", row)
if expect_data_key:
module_name = self.module_name(row)
interface_name = self.interface_name(row)
return self.expect_data_dict[module_name][interface_name][expect_data_key]
def get_sql_type(self, row):
"""
根据行号,获取sql语句的类型
:param row: 行号
:return: sql语句的类型
"""
# 调用私有方法__get_cell_value获取单元格数据
value = self.__get_cell_value("k", row)
if value:
return value.lower()
def sql_sentence(self, row):
"""
根据行号,获取sql语句
:param row:
:return:
"""
# 获取sql语句的key,调用私有方法__get_cell_value获取单元格数据
sql_data_key = self.__get_cell_value("l", row)
if sql_data_key:
module_name = self.module_name(row)
interface_name = self.interface_name(row)
return self.sql_data_dict[module_name][interface_name][sql_data_key]
def update_key(self, row):
"""
根据行号,获取更新的key
:param row: 行号
:return: 更新的key
"""
# 调用私有方法__get_cell_value获取单元格数据
value = self.__get_cell_value("m", row)
if value:
return value
def get_data(self):
"""
获取数据的数据,并存放到一个二维列表中,每行就是一个子列表
:return: 二维列表
"""
# 创建一个空列表,用于存放数据
list_data = []
# 循环取出每行的数据,并追加到空列表中
for row in range(2, self.ws.max_row+1):
req_method = self.case_method(row)
url = self.case_url(row)
mime = self.case_mime(row)
case_data = self.case_data(row)
expect_data = self.expect_data(row)
sql_type = self.get_sql_type(row)
sql_sentence = self.sql_sentence(row)
update_key = self.update_key(row)
list_data.append([req_method, url, mime, case_data, expect_data, sql_type, sql_sentence, update_key])
else:
return list_data
if __name__ == '__main__':
read_excel = ReadExcel()
read_excel.get_data()
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_56
# FileName: read_ini.py
# Author: xxxxxxx
# Datetime: 2023/8/4 14:12
# Description:
#
# ---------------------------------------------------------------------------
import configparser
import os
class ReadIni:
def __init__(self):
"""获取ini文件的路径,在读取ini文件"""
self.data_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data_config")
ini_path = os.path.join(self.data_config_path, "config.ini")
self.conf = configparser.ConfigParser()
self.conf.read(ini_path, encoding="utf-8")
def get_file_path(self, key):
"""
根据key,获取file节点下key对应的文件的路径
:param key: 键
:return:文件的路径
"""
# 使用Configparser对象调用get方法,获取file节点下key对应的文件名称
file_name = self.conf.get("file", key).strip()
# 返回拼接的文件路径
return os.path.join(self.data_config_path, file_name)
def get_host(self, key):
"""
根据key,获取被测系统的域名
:param key: 键
:return: 域名
"""
# 使用Configparser对象调用get方法,获取host节点下key对应的域名
return self.conf.get("host", key).strip()
def get_table_name(self, key):
"""
根据key,获取excel工作表的名称
:param key: 键
:return: 表名
"""
# 使用Configparser对象调用get方法,获取table_name节点下key对应的excel工作表的名称
return self.conf.get("table_name", key).strip()
def get_sql_connect_message(self, key):
"""
根据key,获取数据库的链接信息
:param key: 键
:return: 数据库的链接信息
"""
# 使用Configparser对象调用get方法,获取sql节点下key对应的值
return self.conf.get("sql", key).strip()
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_56
# FileName: read_json.py
# Author: xxxxxxx
# Datetime: 2023/8/4 14:20
# Description:
#
# ---------------------------------------------------------------------------
import json
def read_json(filename):
with open(filename, mode="r", encoding="utf-8") as fp:
return json.load(fp)
{
"认证接口":{
"登录系统":{
"LoginSuccess": {"password": "MTIzNDU2","username": "admin"},
"LoginFailUsernameIsNone":{"password": "MTIzNDU2","username": ""},
"LoginFailUsernameIsLong":{"password": "MTIzNDU2","username": "adminadminadminadminadminadminadminadminadmin"},
"LoginFailUsernameIsShort":{"password": "MTIzNDU2","username": "a"},
"LoginFailUsernameIsSpecialChar":{"password": "MTIzNDU2","username": "▬♦◊◦☼♠♣"},
"LoginFailUsernameIsError":{"password": "MTIzNDU2","username": "tomcat"},
"LoginFailPWDIsNone":{"password": "","username": "admin"},
"LoginFailPWDIsLong":{"password": "MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2MTIzNDU2","username": "admin"},
"LoginFailPWDIsShort":{"password": "1","username": "admin"},
"LoginFailPWDIsSpecial":{"password": "▬♦◊◦☼♠♣","username": "admin"}
}
},
"维度管理": {
"添加维度": {
"AddDemSuccess": {
"code": "test56_dem",
"description": "测试56期的维度",
"isDefault": 0,
"name": "测试56期的维度"
}
},
"根据维度编码删除维度": {
"DeleteDemSuccess": {"ids": "不知道"}
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {
"code": "test56_org",
"demId": "不知道",
"exceedLimitNum": 0,
"grade": "",
"limitNum": 0,
"name": "测试的组织",
"nowNum": 0,
"orderNo": 0,
"parentId": "0"
}
},
"删除组织": {
"DeleteOrgSuccess": "test56_org"
},
"保存组织参数": {
"SaveOrgParamSuccess": {
"query": {"orgCode": "test56_org"},
"body": [
{"alias":"sz","value":0},
{"alias":"zy","value":"math"},
{"alias":"rq","value":"2023-08-04T16:00:00.000Z"},
{"alias":"ah","value":"lq"},
{"alias":"yyyy","value":"越高越好"},
{"alias":"xb","value":"2"}]
}
}
}
}
[file]
excel=APIAutoTest.xlsx
case=case_data.json
expect=expect_data.json
sql=sql.json
[host]
test_bpm=http://120.46.172.186:8080
[table_name]
table=BPM接口
[sql]
host=120.46.172.186
port=3306
user=root
pwd=root@2023
database=eipsaas
\ No newline at end of file
{
"认证接口":{
"登录系统":{
"LoginSuccess": {"username": "超级管理员","account": "admin"},
"LoginFailUsernameIsNone":{"state": false,"message": "账号或密码错误"},
"LoginFailUsernameIsLong":{"state": false,"message": "账号或密码错误"},
"LoginFailUsernameIsShort":{"state": false,"message": "账号或密码错误"},
"LoginFailUsernameIsSpecialChar":{"state": false,"message": "账号或密码错误"},
"LoginFailUsernameIsError":{"state": false,"message": "账号或密码错误"},
"LoginFailPWDIsNone":{"state": false,"message": "账号或密码错误"},
"LoginFailPWDIsLong":{"state": false,"message": "账号或密码错误"},
"LoginFailPWDIsShort":{"state": false,"message": "账号或密码错误"},
"LoginFailPWDIsSpecial":{"state": false,"message": "账号或密码错误"}
},
"刷新token": {
"RefreshSuccess": {"message": "刷新token成功"}
}
},
"维度管理": {
"添加维度": {
"AddDemSuccess": {"state": true, "message": "添加维度成功!"}
},
"根据维度编码删除维度": {
"DeleteDemSuccess": {"state": true, "message": "删除维度成功"}
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {"state": true, "message": "添加组织成功!"}
},
"删除组织": {
"DeleteOrgSuccess": {"state": true, "message": "删除组织成功!"}
},
"保存组织参数": {
"SaveOrgParamSuccess": {"state": true, "message": "保存组织参数成功!"}
}
}
}
{
"维度管理": {
"添加维度": {
"AddDemSuccess": "DELETE FROM uc_demension WHERE `CODE_`=\"test56_dem\";"
},
"根据维度编码删除维度": {
"DeleteDemSuccess": "SELECT ID_ FROM uc_demension WHERE `CODE_`=\"test56_dem\";"
}
},
"组织管理": {
"添加组织": {
"AddOrgSuccess": {
"select": "SELECT ID_ FROM uc_demension WHERE `CODE_`=\"test56_dem\";",
"delete": "DELETE FROM uc_org WHERE `CODE_`=\"test56_org\";"
}
}
}
}
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_56
# FileName: requests_method.py
# Author: xxxxxxx
# Datetime: 2023/8/4 15:42
# Description:
#
# ---------------------------------------------------------------------------
import base64
import requests
from APIAutoTest02.common.read_ini import ReadIni
class RequestsMethod:
def __init__(self):
"""
关联登录成功的状态,关联token
"""
url = ReadIni().get_host("test_bpm") + "/auth"
data = {"username": "admin", "password": base64.b64encode("123456".encode("utf-8")).decode("utf-8")}
self.bpm_session = requests.sessions.Session()
# 发送请求,获取token,将token更新到Session对象的headers
self.bpm_session.headers["Authorization"] = "Bearer " + self.bpm_session.post(url=url, json=data).json().get("token")
def request_all(self, method, url, mime, case_data):
"""
根据不同的媒体类型,使用不同的关键字传参
:param method: 请求方法
:param url: url
:param mime: 媒体类型
:param case_data: 用例数据
:return: Response type
"""
if mime == "application/x-www-form-urlencoded" or mime == "x-www-form-urlencoded":
return self.bpm_session.request(method=method, url=url, data=case_data)
elif mime == "application/json" or mime == "json":
return self.bpm_session.request(method=method, url=url, json=case_data)
elif mime == "multipart/form-data" or mime == "form-data":
return self.bpm_session.request(method=method, url=url, files=case_data)
elif mime == "query" or mime == "params":
return self.bpm_session.request(method=method, url=url, params=case_data)
elif mime == "text/plain" or mime == "text":
return self.bpm_session.request(method=method, url=url, data=case_data)
elif mime == "application/Json|query" or mime == "json|query" or "json|params":
return self.bpm_session.request(method=method, url=url, params=case_data["query"], json=case_data["body"])
elif mime is None:
return self.bpm_session.request(method=method, url=url)
else:
raise NameError("媒体类型的值错误")
if __name__ == '__main__':
req = RequestsMethod()
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_56
# FileName: conftest.py
# Author: xxxxxxx
# Datetime: 2023/8/4 15:58
# Description:
#
# ---------------------------------------------------------------------------
import pytest
from APIAutoTest02.common.db import DB
from APIAutoTest02.requests_method.requests_method import RequestsMethod
@pytest.fixture(scope="session")
def req_fix():
req = RequestsMethod()
yield req
@pytest.fixture(scope="session")
def db_fix():
db = DB()
yield db
db.close()
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test_56
# FileName: test_bpm.py
# Author: xxxxxxx
# Datetime: 2023/8/4 15:54
# Description:
#
# ---------------------------------------------------------------------------
import pytest
from APIAutoTest02.common.read_excel import ReadExcel
class TestBPM:
@pytest.mark.parametrize("req_method, url, mime, case_data, expect_data, sql_type, sql_sentence, update_key", ReadExcel().get_data())
def test_bpm(self, db_fix, req_fix, req_method, url, mime, case_data, expect_data, sql_type, sql_sentence, update_key):
print(req_method, url, mime, case_data, expect_data, sql_type, sql_sentence, update_key)
# 请求之前的操作
# 1:判断sql语句的类型是否为delete,如果为delete,先执行删除的sql语句,再发送请求
if sql_type == "delete":
# 执行删除的sql语句,使用DB类对象,调用delete方法执行删除语句.DB类对象就是自定义固件db_fix
db_fix.delete(sql_sentence)
# 再发送请求
result = req_fix.request_all(method=req_method, url=url, mime=mime, case_data=case_data)
# 2:判断sql语句的类型是否为select,如果为select,执行查询sql语句,并获取查询的结果,将查询的结果更新到用例数据中
elif sql_type == "select":
# 执行查询sql语句,使用DB类对象,调用select方法执行查询的sql语句,并接收查询的结果
select_result = db_fix.select(sql_sentence)
# 将查询结果更新到用例数据中
case_data[update_key] = select_result
print(case_data)
# 再发送请求
result = req_fix.request_all(method=req_method, url=url, mime=mime, case_data=case_data)
# 3: 判断sql语句类型是否为select|delete或为delete|select,如果是就需要执行查询和删除。
elif sql_type == "select|delete" or sql_type == "delete|select":
# 先执行删除的sql语句。# 这里的sql语句存放在一个字典中, 字典的格式为:{"delete":"删除sql语句", "select":"查询sql语句"}
# 执行删除的sql语句,使用DB类对象,调用delete方法执行删除语句.DB类对象就是自定义固件db_fix
db_fix.delete(sql_sentence["delete"])
# 再执行查询的sql语句
# 执行查询sql语句,使用DB类对象,调用select方法执行查询的sql语句,并接收查询的结果
select_result = db_fix.select(sql_sentence["select"])
# 将查询的结果更新到用例数据中。
case_data[update_key] = select_result
# 再发送请求
result = req_fix.request_all(method=req_method, url=url, mime=mime, case_data=case_data)
else:
result = req_fix.request_all(method=req_method, url=url, mime=mime, case_data=case_data)
# 请求之后的操作
print("="*100)
print("服务器返回数据", result.json())
# 断言
try:
# 取出期望数据的key,对比 期望数据key对应的value和服务器返回数据key对应的value
for key in expect_data.keys():
assert expect_data[key] == result.json().get(key)
except:
raise AssertionError("断言失败")
else:
print("断言成功")
if __name__ == '__main__':
pytest.main()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment