# -*-coding:utf-8 -*- # # --------------------------------------------------------------------------- # ProjectName: test63 # FileName: requests_method.py # Author: lao_zhao # Datetime: 2024/11/5 15:22 # Description: # # --------------------------------------------------------------------------- import requests from APIAutoTest_v3_1 import log from APIAutoTest_v3_1.common.db import DB from APIAutoTest_v3_1.common.read_basic_ini import ReadBasicIni class RequestsMethod: def __init__(self): """关联登录的状态""" self.db = DB() self.bpm_session = requests.sessions.Session() login_url = ReadBasicIni().get_host("bpm") + "/auth" login_data = {"username": "admin", "password": "QKpkgIwnyACrUXQrQVRaSUbhPSOsKnj5hoRN5LGKH" "T3NmLCwGIN+aGa981YZtlLoEfpVvGDnILn0QU09VHSkgL+Or6" "oTG1E8OtVTmAEjivAe/a4zLPwd/OfvKYIYbBP5ExZyy+wlVTbLgDfqkuK" "PGOMPPSLlZWLfvG7yzdc+CyE="} token = self.bpm_session.post(url=login_url, json=login_data).json().get("token") self.bpm_session.headers.update({"Authorization": f"Bearer {token}"}) def request_all(self, req_method, req_url, req_mime=None, req_case_data=None, sql_type=None, sql_data=None, update_key=None): """ 封装公共的请求方法 :param req_method: 请求方法 :param req_url: 请求url :param req_mime: 请求的媒体类型 :param req_case_data: 请求数据 :param sql_type: sql语句类型 :param sql_data: sql语句 :param update_key: 更新的key :return: Response type """ # 判断sql语句类型是否为delete,如果是,先执行删除的sql语句,执行完之后再发送请求 if sql_type == "delete": self.db.delete(sql_data) # 判断sql语句类型是否为select,如果是,先执行查询的sql语句,并获取查询的结果,并将查询结果更新到用例数据中,再发送请求 elif sql_type == "select": select_result = self.db.select(sql_data) req_case_data[update_key] = select_result # 判断sql语句类型是否为:select|delete or delete|select,如果是,同时执行查询和删除 elif sql_type == "select|delete" or sql_type == "delete|select": # 要执行删除--删除数据库中现有的数据 self.db.delete(sql_data["delete"]) # 查询语句--将查询结果更新到用例数据中 select_result = self.db.select(sql_data["select"]) req_case_data[update_key] = select_result # 上面的分支执行完之后,该删除的数据已经删除,该更新用例数据的已更新,下面就可以发送请求 # 发送请求时,用例使用使用哪些关键字传参,需要添加判断 # 判断媒体类型的值是否为:application/json 或 json,使用json传参 if req_mime == "json" or req_mime == "application/json": return self.bpm_session.request(method=req_method, url=req_url, json=req_case_data) # 判断媒体类型的值是否为:application/x-www-form-urlencoded 、x-www-form-urlencoded 或 form,使用data传参 # 除了上传文件和json类型传参以外的任何方式在请求体中传参,都使用data传参。 elif req_mime == "application/x-www-form-urlencoded" or req_mime == "x-www-form-urlencoded" or req_mime == "form": return self.bpm_session.request(method=req_method, url=req_url, data=req_case_data) # 判断媒体类型的值是否为:multipart/form-data 或 form-data,使用files传参 elif req_mime == "multipart/form-data" or req_mime == "form-data": return self.bpm_session.request(method=req_method, url=req_url, files=req_case_data) # 判断媒体类型的值是否为:query,使用params传参 elif req_mime == "query": return self.bpm_session.request(method=req_method, url=req_url, params=req_case_data) # 判断媒体类型的值是否为:None,表示没有传参 elif req_mime is None: return self.bpm_session.request(method=req_method, url=req_url) # 判断请求的媒体类型是否为:query|json 或者 json|query 如果是,表示在地址栏和请求体中同时传参,请求体还是以json类型传参。 elif req_mime == "query|json" or req_mime == "json|query": # requests可以在地址栏和请求体同时传参,请求体传参根据不同的媒体类型选择不同关键字传参,地址使用params return self.bpm_session.request(method=req_method, url=req_url, params=req_case_data["query"], json=req_case_data["body"]) else: log.error(f"方法:request_all,传入的媒体类型为:{req_mime}, 没有符合的判断分支,可以自行封装,也可以继承之后覆盖") raise ValueError("没有符合的判断分支,可以自行封装,也可以继承之后覆盖")