Commit c8c7b722 by L

接口测试_v1.0

parent 5331ec5b
# 默认忽略的文件
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N806" />
<option value="N801" />
<option value="N803" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="list.*" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.8 (pythonProject)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8 (pythonProject)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/ApiTest.iml" filepath="$PROJECT_DIR$/.idea/ApiTest.iml" />
</modules>
</component>
</project>
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: ApiTest
# FileName: db.py
# Author: liu
# Datetime: 2024/9/7 下午3:49
# Description: 连接数据库
# ---------------------------------------------------------------------------
import pymysql
from common.read_basic_ini import Read_basic_ini
from report import log
class DB:
def __init__(self):
read_ini = Read_basic_ini()
host = read_ini.get_sql('host')
port = read_ini.get_sql('port')
user = read_ini.get_sql('user')
password = read_ini.get_sql('password')
database = read_ini.get_sql('database')
# 连接数据库
try:
self.con = pymysql.connect(
host=host, user=user, password=password, database=database, port=int(port), charset='utf8'
)
except Exception as e:
log.error(f'连接错误,数据:host={host}, port={port}, user={user}, password={password}, database={database}')
raise e
else:
# 游标数据
self.cursor = self.con.cursor()
def close(self):
# 关闭连接
self.cursor.close()
self.con.close()
def delete(self, sql):
# 判断是否为str
if isinstance(sql, str):
# 判断是否以delete开头
if sql.lower().strip().startswith('delete'):
try:
self.cursor.execute(sql)
self.con.commit()
log.info(f'delete方法执行{sql}语句成功!')
except Exception as e:
log.error(f'执行delete方法时,发生错误,sql语句为:{sql},错误为:{e}')
raise e
else:
log.error(f'执行delete方法时,发生错误,sql语句为:{sql}')
raise ValueError(f'sql语句错误')
else:
log.error(f'执行delete方法时,发生错误,sql语句为:{sql}')
raise ValueError(f'sql语句错误')
def select(self, sql, n=0):
# 判断是否为str
if isinstance(sql, str):
# 判断是否以delete开头
if sql.lower().strip().startswith('select'):
try:
self.cursor.execute(sql)
log.info(f'select方法执行{sql}语句成功!')
except Exception as e:
log.error(f'执行select方法时,发生错误,sql语句为:{sql},错误为:{e}')
raise e
else:
select_result = self.cursor.fetchall()
# 获取的结果格式为:((key,),),因此需要res[0][0]进行取值,默认取第一个的第一个,其他取值更改n来实现
if isinstance(n, int) and select_result:
try:
return select_result[n][0]
except Exception as e:
log.error(f"执行select方法时,获取查询结果失败,传入的n为:{n}")
raise e
else:
log.error(f'执行select方法时,发生错误,sql语句为:{sql}')
raise ValueError(f'sql语句错误')
else:
log.error(f'执行select方法时,发生错误,sql语句为:{sql}')
raise ValueError(f'sql语句错误')
if __name__ == '__main__':
db = DB()
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: ApiTest
# FileName: read_basic_ini.py
# Author: liu
# Datetime: 2024/9/7 上午9:30
# Description: 读取基础ini文件
# ---------------------------------------------------------------------------
import os
from configparser import ConfigParser
from report import log
class Read_basic_ini:
def __init__(self):
# config_data的路径
self.main_path = os.path.dirname(os.path.dirname(__file__))
data_path = os.path.join(self.main_path, 'config_data')
# print(data_path)
# ini 路径
ini_path = os.path.join(data_path, 'basic_config.ini')
# print(ini_path)
# 初始化
self.config = ConfigParser()
# 读取文件
self.config.read(ini_path, encoding='utf-8')
def get_sql(self, key):
try:
# log.info(f"执行方法get_sql为:根据key获取sql节点下key对应文件的路径,形参key的传参为{key},")
key_value = self.config.get('sql', key)
return key_value
except Exception as e:
log.error('值错误,没有查询到值,请检查代码')
raise e
def get_host(self, key):
try:
key_value = self.config.get('host', key)
return key_value
except Exception as e:
log.error('值错误,没有查询到值,请检查代码')
raise e
def get_report(self, key):
try:
# 返回路径
report_path = os.path.join(self.main_path, 'report')
key_path = os.path.join(report_path, self.config.get('report', key))
# 判断是否存在,不存在则创建一个文件夹,无法创建则报错说路径不对
if os.path.exists(key_path):
pass
else:
os.makedirs(key_path)
except Exception as e:
# # log.error(f'{key_path}错误,请检查路径是否正确且为文件夹,错误为{e}')
log.error(f"方法get_file_path执行失败,形参key传参为:{key},错误为:{e}")
raise e
if __name__ == '__main__':
read_ini = Read_basic_ini()
print(read_ini.get_sql('host'))
print(read_ini.get_host('bpm_host'))
print(read_ini.get_report('log1'))
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: ApiTest
# FileName: read_excel.py
# Author: liu
# Datetime: 2024/9/7 上午11:32
# Description: 读取excel文件
# ---------------------------------------------------------------------------
import os
import openpyxl
from common.read_basic_ini import Read_basic_ini
from common.read_json import read_json
from common.read_user_ini import Read_user_ini
from report import log
from config_data.settings import *
class Read_excel:
def __init__(self, username):
# 获取excel文件
read_user_ini = Read_user_ini(username=username)
self.read_basic_ini = Read_basic_ini()
excel_path = read_user_ini.get_file(FILE_EXCEL)
self.username = username
table = read_user_ini.get_table(TABLE)
# 读取excel
self.wb = openpyxl.load_workbook(excel_path)
self.ws = self.wb[table]
def __get_cell(self, col, row):
try:
# 获取单元格数据
cell_value = self.ws[col + str(row)].value
except Exception as e:
log.error(f'获取指定单元格错误,错误为:{e},传入的列号:{col}, 传入的行号:{row}')
raise e
else:
# 判断是否取空或者出去两边后为空
if cell_value is None:
return None
elif cell_value.strip():
return cell_value.strip()
def case_id(self, row):
# 获取用例编号
return self.__get_cell(ID, row=row)
def module_name(self, row):
# 获取模块名称
return self.__get_cell(MODULE, row=row)
def api_name(self, row):
# 获取接口名称
return self.__get_cell(API, row=row)
def title(self, row):
# 获取标题
return self.__get_cell(TITLE, row=row)
def level(self, row):
# 获取用例等级
return self.__get_cell(LEVEL, row=row)
def req_method(self, row):
# 获取请求方式(post\get\delete\put),必有
return self.__get_cell(REQ_METHOD, row=row)
def url(self, row):
# 获取url
host = self.read_basic_ini.get_host(HOST)
# print(host)
url = self.__get_cell(URL, row=row)
return host+url
def req_type(self, row):
# 媒体类型
req_type = self.__get_cell(REQ_TYPE, row=row)
if req_type:
return str(req_type).lower()
def case_data(self, row):
# 用例数据
module_name = self.module_name(row=row)
api_name = self.api_name(row=row)
case_key = self.__get_cell(CASE_DATA, row=row)
case_data = read_json(username=self.username, json_name=FILE_CASE)
return case_data[module_name][api_name][case_key]
def except_data(self, row):
# 期望数据
module_name = self.module_name(row=row)
api_name = self.api_name(row=row)
except_key = self.__get_cell(EXCEPT_DATA, row=row)
except_data = read_json(username=self.username, json_name=FILE_EXCEPT)
return except_data[module_name][api_name][except_key]
def sql_type(self, row):
# sql语句类型
sql_type = self.__get_cell(SQL_TYPE, row=row)
if sql_type:
return str(sql_type).lower()
def sql_data(self, row):
# sql数据
sql_key = self.__get_cell(SQL_DATA, row=row)
if sql_key:
module_name = self.module_name(row=row)
api_name = self.api_name(row=row)
sql_data = read_json(username=self.username, json_name=FILE_SQL)
return sql_data[module_name][api_name][sql_key]
def update_key(self, row):
# 需要更新的值(指在case_data.json中对应数据需要更新,且写在excel中的key)
return self.__get_cell(UPDATE_KAY, row=row)
def get_data(self):
list_data = []
for i in range(2, self.ws.max_row + 1):
case_id = self.case_id(i)
module_name = self.module_name(i)
api_name = self.api_name(i)
title = self.title(i)
level = self.level(i)
req_method = self.req_method(i)
url = self.url(i)
req_type = self.req_type(i)
case_data = self.case_data(i)
except_data = self.except_data(i)
sql_type = self.sql_type(i)
sql_data = self.sql_data(i)
update_key = self.update_key(i)
if url is not None and except_data is not None and req_method is not None:
list_data.append([case_id, module_name, api_name, title, level, req_method, url, req_type, case_data, except_data,
sql_type, sql_data, update_key])
return list_data
if __name__ == '__main__':
excel = Read_excel('test_demo')
# print(excel.module_name(2))
print(excel.get_data())
# print(excel.url(2))
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: ApiTest
# FileName: read_json.py
# Author: liu
# Datetime: 2024/9/7 上午11:37
# Description: 读取json数据
# ---------------------------------------------------------------------------
import json
from common.read_user_ini import Read_user_ini
from report import log
def read_json(username, json_name):
# 获取json文件
read_ini = Read_user_ini(username=username)
json_path = read_ini.get_file(json_name)
# print(json_path)
# 获取json文件对象
try:
with open(json_path, mode='r', encoding='utf-8') as f:
log.info(f'加载json文件{json_path}成功!')
return json.load(f)
except Exception as e:
log.error(f'请检查{json_path}')
raise e
if __name__ == '__main__':
print(read_json(username='test_demo', json_name='case'))
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: ApiTest
# FileName: read_user_ini.py
# Author: liu
# Datetime: 2024/9/7 上午11:00
# Description: 读取用户ini文件
# ---------------------------------------------------------------------------
import os
from configparser import ConfigParser
from report import log
class Read_user_ini:
def __init__(self, username):
# 获取用户的文件,判断用户目录是否存在
config_data = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config_data')
self.user_dir = os.path.join(config_data, username)
if os.path.isdir(self.user_dir):
pass
else:
log.error(f'用户{username}的目录不存在于{config_data}中')
raise KeyError(f"用户{username}的目录不存在于{config_data}中")
user_ini = os.path.join(self.user_dir, 'config.ini')
# 初始化
self.config = ConfigParser()
# 读取ini文件
self.config.read(user_ini, encoding='utf-8')
def get_file(self, key):
try:
file_path = os.path.join(self.user_dir, self.config.get('file', key))
log.info(f'使用get_file方法读取{key}的值,得到{file_path}的文件路径')
return file_path
except Exception as e:
log.error(f'key:{key}传值错误,请确定key存在且没有填写错误')
raise e
def get_table(self, key):
try:
table = self.config.get('table', key)
log.info(f'使用get_table方法读取{key}的值,得到用户excel文件中的{table}表名')
return table
except Exception as e:
log.error(f'key:{key}传值错误,请确定key存在且没有填写错误')
raise e
if __name__ == '__main__':
read_ini = Read_user_ini(username='test_demo')
print(read_ini.get_file('excel'))
print(read_ini.get_table('sheet1'))
\ No newline at end of file
[report]
# 只存放文件夹,否则记得更改read_basic_config
log=log
[host]
# 配置被测系统的域名
bpm_host= http://36.139.193.99:8088
[sql]
host=36.139.193.99
port=3306
user=root
password=Rhrc@2024
database=eip8
\ No newline at end of file
{
"登录": {
"认证接口": {
"LoginSuccess":{
"username": "admin",
"password": "bF6N3L93cX8pV6x2yEqxNjwIIPaEOYw9bNI5GuIY4g9MeoFyPFPL5WteHaV0LcxQqmDJWlhuCRMXzAPvrFcxJrA8BgGjJpmB1WMrQrazIJPbWbCfmDit2s2jzn+DRerVlYIFojDM96y24drEniwWzHtaJKiWoc7LGL1csNmokvQ="
}
}
}
}
\ No newline at end of file
[file]
excel=case.xlsx
case=case_data.json
except=except_data.json
sql=sql_data.json
[table]
sheet1=bpm
\ No newline at end of file
{
"登录": {
"认证接口": {
"LoginSuccess": {"username":"超级管理员","account":"admin","userId":"1","expiration":86400,"loginStatus":true}
}
}
}
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: ApiTest
# FileName: settings.py
# Author: liu
# Datetime: 2024/9/7 下午3:07
# Description: 数据代称设置
# ---------------------------------------------------------------------------
"""excel的列号"""
ID = 'a'
MODULE = 'b'
API = 'c'
TITLE = 'd'
LEVEL = 'e'
REQ_METHOD = 'f'
URL = 'g'
REQ_TYPE = 'h'
CASE_DATA = 'i'
EXCEPT_DATA = 'j'
SQL_TYPE = 'k'
SQL_DATA = 'l'
UPDATE_KAY = 'm'
"""basic_ini的host对应的key"""
HOST = 'bpm_host'
"""user_ini的table对应的key"""
TABLE = 'sheet1'
"""user_ini的file对应的key"""
FILE_EXCEL = 'excel'
FILE_CASE = 'case'
FILE_EXCEPT = 'except'
FILE_SQL = 'sql'
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: ApiTest
# FileName: __init__.py
# Author: liu
# Datetime: 2024/9/7 上午9:22
# Description: 创建log方法,运行时就使用
# ---------------------------------------------------------------------------
import logging
import os
import time
def log():
logger = logging.getLogger()
# read_basic_ini = Read_basic_ini()
# 日志名字:时间日期+.log
log_name = time.strftime("%Y-%m-%d %H-%M-%S", time.localtime()) + '.log'
# 日志的路径
log_dir = os.path.join(os.path.join(os.path.dirname(os.path.dirname(__file__)), "report"), "log")
# 判断是否存在,不存在则创建一个文件夹
if os.path.exists(log_dir):
pass
else:
os.makedirs(log_dir)
log_path = os.path.join(log_dir, log_name)
# 设置日志的级别
logger.level = logging.INFO
# 日志的输出格式
handler = logging.FileHandler(log_path, 'a', encoding='utf-8')
# formatter = logging.Formatter('%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s --%(
# name)s')
formatter = logging.Formatter('%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
log = log()
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: ApiTest
# FileName: request_method.py
# Author: liu
# Datetime: 2024/9/7 下午4:40
# Description: 根据媒体请求类型的不同,返回不同的请求结果
# ---------------------------------------------------------------------------
import requests
class RequestsMethod:
def __init__(self):
# 获取认证
login_host = 'http://36.139.193.99:8088/auth'
login_data = {
"username": "admin",
"password": "bF6N3L93cX8pV6x2yEqxNjwIIPaEOYw9bNI5GuIY4g9MeoFyPFPL5WteHaV0LcxQqmDJWlhuCRMXzAPvrFcxJrA8BgGjJpmB1WMrQrazIJPbWbCfmDit2s2jzn+DRerVlYIFojDM96y24drEniwWzHtaJKiWoc7LGL1csNmokvQ="
}
self.req_session = requests.Session()
res = self.req_session.request(url=login_host, json=login_data, method='post')
token = res.json().get('token')
self.req_session.headers["Authorization"] = f"Bearer {token}"
def request_all(self, url, req_method, req_type, case_data):
if req_type == 'application/json' or req_type == 'json':
return self.req_session.request(url=url, json=case_data, method=req_method)
elif req_type == "application/x-www-form-urlencoded" or req_type == "form":
return self.req_session.request(url=url, data=case_data, method=req_method)
elif req_type == "multipart/form-data" or req_type == "form-data":
return self.req_session.request(url=url, files=case_data, method=req_method)
elif req_type == 'query' or req_type == 'params':
return self.req_session.request(url=url, params=case_data, method=req_method)
elif req_type is None:
return self.req_session.request(url=url, method=req_method)
# 判断媒体类型是否为query|json或者为json|query, 表示请求体和地址栏同时传参
elif req_type == "query|json" or req_type == "json|query":
# requests是支持地址栏和请求体同时传参的。地址栏使用params传参,请求体根据不同的媒体类型选择不同的关键字传参。
return self.req_session.request(method=req_method, url=url, params=case_data["query"],
json=case_data["body"])
else:
return self.req_session.request(url=url, data=case_data, method=req_method)
if __name__ == '__main__':
req = RequestsMethod()
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: ApiTest
# FileName: __init__.py
# Author: liu
# Datetime: 2024/9/7 下午5:23
# Description:
# ---------------------------------------------------------------------------
username = 'demo'
\ No newline at end of file
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: ApiTest
# FileName: conftest.py
# Author: liu
# Datetime: 2024/9/7 下午4:57
# Description: 自定义固件 session级
# ---------------------------------------------------------------------------
import pytest
from common.db import DB
from requests_method.request_method import RequestsMethod
@pytest.fixture(scope='session')
def fix_db():
db = DB()
yield db
db.close()
@pytest.fixture(scope='session')
def fix_req():
req = RequestsMethod()
yield req
# -*-coding:utf-8 -*- #
# ---------------------------------------------------------------------------
# ProjectName: ApiTest
# FileName: test_bpm.py
# Author: liu
# Datetime: 2024/9/7 下午5:05
# Description: 测试用例
# ---------------------------------------------------------------------------
import pytest
from common.read_excel import Read_excel
from report import log
from test_case.test_demo import username
excel = Read_excel(username)
class TestBPM:
@pytest.mark.parametrize("case_id, module_name, api_name, title, level, req_method, url, req_type, case_data, except_data, sql_type, sql_data, update_key", excel.get_data())
def test_bpm(self, fix_db, fix_req, case_id, module_name, api_name, title, level, req_method, url, req_type,
case_data, except_data, sql_type, sql_data, update_key):
if sql_type == 'delete':
fix_db.delete(sql_data)
elif sql_type == 'select':
select_result = fix_db.select(sql_data)
# 更新数据
case_data[update_key] = select_result
# 请求
res = fix_req.request_all(url=url, req_method=req_method, req_type=req_type, case_data=case_data)
# 断言
try:
for key in except_data.keys():
assert except_data[key] == res.json()[key]
except AssertionError:
log.error(f'断言失败,接口url:{url},用例数据:{case_data},期望数据:{except_data},服务器返回数据:{res.text}')
raise AssertionError('断言失败')
else:
log.info(f'断言成功,接口url:{url},用例数据:{case_data},期望数据:{except_data},服务器返回数据:{res.text}')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment