Commit 1df893b5 by 云天羽

apiautotest-v1

parent 64348aed
import pymysql
from Homework_files.APITesting_LZJ.common.read_ini import ReadIni
class DB:
def __init__(self):
ini = ReadIni()
try:
self.conn = pymysql.connect(
host=ini.connect_database_msg("host"),
port=int(ini.connect_database_msg("port")),
user=ini.connect_database_msg("user"),
password=ini.connect_database_msg("pwd"),
database=ini.connect_database_msg("database"),
charset="utf8"
)
self.cursor = self.conn.cursor()
except Exception as e:
print("链接数据库出错,错误为:", e)
raise e
def close(self):
self.cursor.close()
self.conn.close()
def delete(self, sql_sentence):
try:
self.cursor.execute(sql_sentence)
except Exception as e:
print("执行删除的sql语句时报错,错误为:", e)
raise e
else:
self.conn.commit()
def select(self, sql_sentence):
try:
self.cursor.execute(sql_sentence)
except Exception as e:
print("执行查询的sql语句时报错,错误为:", e)
raise e
else:
select_result = self.cursor.fetchall()
if select_result:
return select_result[0][0]
if __name__ == '__main__':
db = DB()
sql = """SELECT ID_ FROM uc_demension WHERE `CODE_`="test_dem_xyz_123";"""
print(db.select(sql))
\ No newline at end of file
import logging
import os
import time
from Homework_files.APITesting_LZJ.common.read_basic_ini import ReadBasicIni
def get_log():
logger = logging.getLogger()
logger.setLevel(level=logging.DEBUG)
log_name = time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime(time.time())) + ".log"
log_dir = ReadBasicIni().get_log_dir("log")
log_path = os.path.join(log_dir, log_name)
file_handler = logging.FileHandler(log_path, mode="a", encoding="utf-8")
log_format = logging.Formatter('%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
file_handler.setFormatter(log_format)
logger.addHandler(file_handler)
return logger
import os, configparser
class ReadBasicIni:
def __init__(self):
self.data_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data_config")
ini_path = os.path.join(self.data_config_path, "basic_config.ini")
self.conf = configparser.ConfigParser()
self.conf.read(ini_path, encoding="utf-8")
def get_url(self, key):
try:
return self.conf.get("host", key)
except Exception as e:
raise e
def sql_connect_msg(self, key):
try:
return self.conf.get("sql", key)
except Exception as e:
raise e
def get_log_dir(self, key):
try:
dir_name = self.conf.get("report", key)
except Exception as e:
raise e
else:
report_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "report")
return os.path.join(report_path, dir_name)
\ No newline at end of file
import openpyxl
from Homework_files.APITesting_LZJ.common.read_ini import ReadIni
from Homework_files.APITesting_LZJ.common.read_json import read_json
class ReadExcel:
def __init__(self):
self.ini = ReadIni()
case_data_path = self.ini.get_file_path("case")
expect_data_path = self.ini.get_file_path("expect")
sql_data_path = self.ini.get_file_path("sql")
excel_path = self.ini.get_file_path("excel")
self.case_data_dict = read_json(case_data_path)
self.expect_data_dict = read_json(expect_data_path)
self.sql_data_dict = read_json(sql_data_path)
table_name = self.ini.get_table_name("table")
try:
wb = openpyxl.load_workbook(excel_path)
self.ws = wb[table_name]
except Exception as e:
print("加载工作簿和获取工作表时报错,错误为:", e)
raise e
def __get_cell_value(self, column: str, row: int) -> str:
try:
cell_value = self.ws[column + str(row)].value
except Exception as e:
print("获取指定单元格数据报错,错误为:", e)
raise e
else:
if cell_value is None:
return None
elif cell_value.strip():
return cell_value.strip()
def module_name(self, row):
return self.__get_cell_value("B", row)
def api_name(self, row):
return self.__get_cell_value("C", row)
def req_method(self, row):
return self.__get_cell_value("F", row)
def req_url(self, row):
host = self.ini.get_url("test_host")
path = self.__get_cell_value("G", row)
if path is not None:
return host+path
else:
return None
def case_mime(self, row):
mime = self.__get_cell_value("H", row)
if mime is not None:
return mime.lower()
else:
return None
def case_data(self, row):
case_data_key = self.__get_cell_value("i", row)
if case_data_key is not None:
module_name = self.module_name(row)
api_name = self.api_name(row)
return self.case_data_dict[module_name][api_name][case_data_key]
else:
return None
def expect_data(self, row):
expect_data_key = self.__get_cell_value("j", row)
if expect_data_key is not None:
module_name = self.module_name(row)
api_name = self.api_name(row)
return self.expect_data_dict[module_name][api_name][expect_data_key]
else:
return None
def sql_type(self, row):
sql_sentence_type = self.__get_cell_value("k", row)
if sql_sentence_type is not None:
return sql_sentence_type.lower()
else:
return None
def sql_data(self, row):
sql_data_key = self.__get_cell_value("l", row)
if sql_data_key is not None:
module_name = self.module_name(row)
api_name = self.api_name(row)
return self.sql_data_dict[module_name][api_name][sql_data_key]
else:
return None
def update_key(self, row):
return self.__get_cell_value("m", row)
def get_data(self):
data_list = []
for row in range(2, self.ws.max_row+1):
req_method = self.req_method(row)
req_url = self.req_url(row)
case_mime = self.case_mime(row)
case_data = self.case_data(row)
expect_data = self.expect_data(row)
sql_type = self.sql_type(row)
sql_data = self.sql_data(row)
update_key = self.update_key(row)
if req_method is not None and req_url is not None:
row_list = [req_method, req_url, case_mime, case_data, expect_data, sql_type, sql_data, update_key]
data_list.append(row_list)
else:
return data_list
if __name__ == '__main__':
excel = ReadExcel()
print(excel.get_data())
\ No newline at end of file
import configparser
import os
class ReadIni:
def __init__(self):
self.data_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data_config")
ini_path = os.path.join(self.data_config_path, "config.ini")
self.conf = configparser.ConfigParser()
self.conf.read(ini_path, encoding="utf-8")
def get_file_path(self, key):
try:
file_name = self.conf.get("file", key)
except Exception as e:
print("获取文件的名称时,传入的key错误,错误为:", e)
raise e
else:
return os.path.join(self.data_config_path, file_name)
def get_url(self, key):
try:
return self.conf.get("host", key)
except Exception as e:
print("获取被测系统的域名时,传入的key错误,错误为:", e)
raise e
def get_table_name(self, key):
try:
return self.conf.get("table_name", key)
except Exception as e:
print("获取工作表名称时,传入的key错误,错误为:", e)
raise e
def connect_database_msg(self, key):
try:
return self.conf.get("sql", key)
except Exception as e:
print("获取数据库链接的配置信息时发生错误,错误为:", e)
raise e
if __name__ == '__main__':
ini = ReadIni()
print(ini.get_file_path('excel'))
\ No newline at end of file
import json
import os
def read_json(file_path):
if os.path.isfile(file_path) and file_path.endswith(".json"):
try:
with open(file_path, mode="r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print("读取json文件时发生错误,请查看json文件的内容是否正常,错误为:", e)
raise e
else:
raise FileNotFoundError("传入的json文件路径错误")
if __name__ == '__main__':
path = r'D:\Project\PythonDoc\test61\test61\APIAutoTest_v2\data_config\case_data1.json'
print(read_json(path))
\ No newline at end of file
++ "b/APITesting_LZJ/data_config/\345\274\240\344\270\211/APIAutoTest.xlsx"
++ "b/APITesting_LZJ/data_config/\345\274\240\344\270\211/case_data.json"
++ "b/APITesting_LZJ/data_config/\345\274\240\344\270\211/config.ini"
++ "b/APITesting_LZJ/data_config/\345\274\240\344\270\211/expect.json"
++ "b/APITesting_LZJ/data_config/\345\274\240\344\270\211/sql_data.json"
++ "b/APITesting_LZJ/data_config/\346\235\216\345\233\233/case_data.json"
++ "b/APITesting_LZJ/data_config/\346\235\216\345\233\233/config.ini"
++ "b/APITesting_LZJ/data_config/\346\235\216\345\233\233/expect.json"
++ "b/APITesting_LZJ/data_config/\346\235\216\345\233\233/sql_data.json"
++ "b/APITesting_LZJ/data_config/\346\235\216\345\233\233/\346\235\216\345\233\233.xlsx"
This source diff could not be displayed because it is too large. You can view the blob instead.
import requests
from Homework_files.APITesting_LZJ.common.read_ini import ReadIni
class RequestMethod:
def __init__(self):
login_url = ReadIni().get_url("test_host") + "/auth"
login_data = {"username": "admin", "password": "FMgcBcmzaLp5J8AFyR8XF0MhHRpBhHNFQyDn7HI2nBO0B9RgCEnhVjvXjLMNjwoRKp01dwHueQGZbZcsEdAQQwY1QXvJrr8Z4Jv2Wn5UCC4IKuH9cIeevsc2zSEMmKULN9FJV4e9DBbnTMoxHqyawMlxVN/pidjeA0kkXjOsBE8="}
self.bpm_session = requests.sessions.Session()
self.bpm_session.headers["Authorization"] = "Bearer "+self.bpm_session.post(url=login_url, json=login_data).json()["token"]
def request_all(self, req_method, req_url, req_mime, case_data):
if req_mime == "application/json" or req_mime == "json":
return self.bpm_session.request(method=req_method, url=req_url, json=case_data)
elif req_mime == "application/x-www-form-urlencoded" or req_mime == "x-www-form-urlencoded" or req_mime == "form":
return self.bpm_session.request(method=req_method, url=req_url, data=case_data)
elif req_mime == "multipart/form-data" or req_mime == "form-data":
return self.bpm_session.request(method=req_method, url=req_url, files=case_data)
elif req_mime == "query" or req_mime == "params":
return self.bpm_session.request(method=req_method, url=req_url, params=case_data)
elif req_mime is None:
return self.bpm_session.request(method=req_method, url=req_url)
else:
raise ValueError("传入媒体类型的值错误,请自行封装")
\ No newline at end of file
import functools
import logging
import os
import time
def get_logs():
logger = logging.getLogger()
logger.setLevel(level=logging.DEBUG)
log_name = time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime(time.time())) + ".log"
log_dir = os.path.join(os.path.join(os.path.dirname(__file__), "report"), "result_log")
log_path = os.path.join(log_dir, log_name)
file_handler = logging.FileHandler(log_path, mode="a", encoding="utf-8")
log_format = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')
file_handler.setFormatter(log_format)
logger.addHandler(file_handler)
return logger
log = get_logs()
def log_decorator(func_name):
@functools.wraps(func_name)
def inner(*arg, **kwargs):
try:
log.info(f"执行的功能名称为:{func_name.__name__}, 功能所在的文件为:{func_name.__code__.co_filename}, 所在的行为:{func_name.__code__.co_firstlineno}")
value = func_name(*arg, **kwargs)
except Exception as e:
log.error(f"执行的功能名称为:{func_name.__name__}, 功能所在的文件为:{func_name.__code__.co_filename}, 所在的行为:{func_name.__code__.co_firstlineno}报错,错误为:{e}")
raise e
else:
return value
return inner
\ No newline at end of file
import pytest
data_dict = {}
def set_value(key, value):
data_dict[key] = value
def get_value(key):
if key in data_dict.keys():
return data_dict[key]
else:
return "not fount"
@pytest.fixture(scope="session")
def set_depends_value():
yield set_value
@pytest.fixture(scope="session")
def get_depends_value():
yield get_value
\ No newline at end of file
import random
import string
import pytest
import requests
case_datas = [
[{"code": "需要更新"}, {"isDelete":"0"}],
[{"code": ""}, {"message":"必填:code维度编码必填!"}],
[{"": "abctest"}, {"message":"Required String parameter 'code' is not present"}]
]
class TestBPM:
host = "http://36.139.193.99:8088"
@pytest.mark.dependency()
def test_login(self, set_depends_value):
login_url = self.host + "/auth"
login_data = {"username": "admin",
"password": "FMgcBcmzaLp5J8AFyR8XF0MhHRpBhHNFQyDn7HI2nBO0B9RgCEnhVjvXjLMNjwoRKp01dwHueQGZbZcsEdAQQwY1QXvJrr8Z4Jv2Wn5UCC4IKuH9cIeevsc2zSEMmKULN9FJV4e9DBbnTMoxHqyawMlxVN/pidjeA0kkXjOsBE8="}
res = requests.request(method="post", url=login_url, json=login_data)
try:
assert "超级管理员" in res.text
except AssertionError:
raise AssertionError("断言失败")
else:
token = res.json().get("token")
set_depends_value("TOKEN", token)
@pytest.mark.dependency(depends=["TestBPM::test_login"])
def test_add_dem(self, get_depends_value, set_depends_value):
add_dem_url = self.host + "/api/demension/v1/dem/addDem"
dem_code = "".join(random.sample(string.ascii_letters, 12))
add_dem_data = {
"code": dem_code,
"description": "",
"isDefault": 0,
"name": "测试"+"".join(random.sample(string.ascii_letters, 8))
}
token = get_depends_value("TOKEN")
res = requests.post(url=add_dem_url, json=add_dem_data, headers={"Authorization": "Bearer "+token})
try:
assert "成功" in res.text
except AssertionError:
raise AssertionError("断言失败")
else:
set_depends_value("demCode", dem_code)
@pytest.mark.parametrize("case_data, expect_data", case_datas)
@pytest.mark.dependency(depends=["TestBPM::test_add_dem"])
def test_get_dem_msg(self, get_depends_value, set_depends_value, case_data, expect_data):
get_dem_msg_url = self.host + "/api/demension/v1/dem/getDem"
token = get_depends_value("TOKEN")
if "code" in case_data.keys():
if case_data["code"] == "需要更新":
case_data["code"] = get_depends_value("demCode")
res = requests.get(url=get_dem_msg_url, params=case_data, headers={"Authorization": "Bearer "+token})
try:
for key in expect_data:
assert expect_data[key] == res.json().get(key)
except AssertionError:
raise AssertionError("断言失败")
else:
if "id" in res.json():
dem_id = res.json()["id"]
set_depends_value("demId", dem_id)
print("断言成功")
@pytest.mark.dependency(depends=["TestBPM::test_get_dem_msg"])
def test_org(self, get_depends_value):
print(get_depends_value("TOKEN"))
print(get_depends_value("demId"))
print(get_depends_value("demCode"))
\ No newline at end of file
USERNAME = "张三"
USERNAME = "张三"
\ No newline at end of file
import pytest
import pytest
from Homework_files.APITesting_LZJ.common.db import DB
from Homework_files.APITesting_LZJ.request_method.request_method import RequestMethod
@pytest.fixture(scope="session")
def req_fix():
req = RequestMethod()
yield req
@pytest.fixture(scope="session")
def db_fix():
db = DB()
yield db
db.close()
\ No newline at end of file
import pytest
import pytest
from Homework_files.APITesting_LZJ.common.read_excel import ReadExcel
from Homework_files.APITesting_LZJ.test_case import log_decorator
from Homework_files.APITesting_LZJ.test_case.test_张三 import USERNAME
class TestBPM:
@log_decorator
@pytest.mark.parametrize("case_method, case_url, case_mime, case_data, expect_data, sql_type, sql_data, update_key", ReadExcel(USERNAME).get_data())
def test_bpm(self, req_fix, db_fix, case_method, case_url, case_mime, case_data, expect_data, sql_type, sql_data, update_key):
print(case_method, case_url, case_mime, case_data, expect_data, sql_type, sql_data, update_key)
if sql_type == "delete":
db_fix.delete(sql_data)
elif sql_type == "select":
select_result = db_fix.select(sql_data)
case_data[update_key] = select_result
elif sql_type == "select|delete" or sql_type == "delete|select":
db_fix.delete(sql_data["delete"])
select_result = db_fix.select(sql_data["select"])
case_data[update_key] = select_result
result = req_fix.request_all(req_method=case_method, req_url=case_url, req_mime=case_mime, case_data=case_data)
print(result.text)
try:
for key in expect_data:
assert expect_data[key] == result.json().get(key)
except AssertionError:
raise AssertionError(f"断言失败, 用例数据为:{case_data}, 期望数据为:{expect_data}, 服务器返回的数据为:{result.text}")
else:
print("断言成功")
\ No newline at end of file
# -*-coding:utf-8 -*- #
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test61
# FileName: __init__.py
# Author: lao_zhao
# Datetime: 2024/7/12 16:53
# Description:
#
# ---------------------------------------------------------------------------
USERNAME = "李四"
\ No newline at end of file
# -*-coding:utf-8 -*- #
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test61
# FileName: conftest.py
# Author: lao_zhao
# Datetime: 2024/7/12 14:40
# Description:
#
# ---------------------------------------------------------------------------
import pytest
from APIAutoTest_v3_1.common.db import DB
from APIAutoTest_v3_1.request_method.request_method import RequestMethod
@pytest.fixture(scope="session")
def req_fix():
req = RequestMethod()
yield req
@pytest.fixture(scope="session")
def db_fix():
db = DB()
yield db
db.close()
\ No newline at end of file
# -*-coding:utf-8 -*- #
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: test61
# FileName: test_case.py
# Author: lao_zhao
# Datetime: 2024/7/12 14:38
# Description:
#
# ---------------------------------------------------------------------------
import pytest
from APIAutoTest_v3_1.common.read_excel import ReadExcel
from APIAutoTest_v3_1.test_case.test_李四 import USERNAME
class TestBPM:
@pytest.mark.parametrize("case_method, case_url, case_mime, case_data, expect_data, sql_type, sql_data, update_key", ReadExcel(USERNAME).get_data())
def test_bpm(self, req_fix, db_fix, case_method, case_url, case_mime, case_data, expect_data, sql_type, sql_data, update_key):
print(case_method, case_url, case_mime, case_data, expect_data, sql_type, sql_data, update_key)
# 判断sql语句类型是否为delete,如果是,使用DB类对象调用delete方法执行删除的sql语句
if sql_type == "delete":
# 使用DB类对象调用delete方法执行删除的sql语句, DB类对象就是自定义固件db_fix
db_fix.delete(sql_data)
# 判断sql语句的类型是否为select,如果是,使用DB类对象调用select方法执行查询语句,并接收查询的结果
elif sql_type == "select":
# 使用DB类对象调用select方法执行查询语句,并接收查询的结果
select_result = db_fix.select(sql_data)
# 将查询结果更新到用例数据中
case_data[update_key] = select_result
# 判断sql语句的类型是否为select|delete 或者 delete|select ,如果是,使用DB类对象执行sql语句
elif sql_type == "select|delete" or sql_type == "delete|select":
# 使用DB类对象调用delete方法执行删除的sql语句。
db_fix.delete(sql_data["delete"])
# 使用DB类对象调用select方法执行查询的sql语句, 并接收查询的结果
select_result = db_fix.select(sql_data["select"])
# 将查询结果更新到用例数据中
case_data[update_key] = select_result
result = req_fix.request_all(req_method=case_method, req_url=case_url, req_mime=case_mime, case_data=case_data)
print(result.text)
try:
for key in expect_data:
assert expect_data[key] == result.json().get(key)
except AssertionError:
raise AssertionError("断言失败,描述失败的原因")
else:
print("断言成功")
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment