2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > 接口测试方法2之编写自动化测试框架实现

接口测试方法2之编写自动化测试框架实现

时间:2018-07-05 18:22:42

相关推荐

接口测试方法2之编写自动化测试框架实现

/weixin_45912307/article/details/108905646

1. 所需技术分析

excel 存放用例测试数据unittest + ddt 数据驱动,测试数据与测试代码分离自定义日志模块配置文件pymysql 与金额相关数据库查询操作参数化、正则匹配接口依赖,动态创建类属性的方式处理生成报告:http-report、allureJenkins持续集成

2. 框架目录

2.1 公共层

1.file_dir.py文件路径

import os# 基本路径base_path = os.path.split(os.path.split(os.path.abspath(__file__))[0])[0]# 配置文件路径case_config_path = os.path.join(base_path,'config','case.config')# 测试数据路径test_data_path = os.path.join(base_path,'testdata','test_data.xlsx')# 日志路径log_path = os.path.join(base_path,'testresult','log','apiTest.log')# 测试报告路径report_path = os.path.join(base_path,'testresult','html_report','api_test.html')

2.handle_baseexcel.py读取excel文件数据

rom openpyxl import load_workbookfrom common.file_dir import test_data_pathclass HandleExcel:def __init__(self,file_name,sheet_name):self.file_name = file_nameself.sheet_name = sheet_namedef get_data(self):wb = load_workbook(self.file_name)sheet = wb[self.sheet_name]test_data = []for i in range(2,sheet.max_row+1):sub_data = {}sub_data['test_id'] = sheet.cell(i,1).valuesub_data['url'] = sheet.cell(i,2).valuesub_data['title'] = sheet.cell(i,3).valuesub_data['http_method'] =sheet.cell(i,4).valuesub_data['expected'] = sheet.cell(i,5).valuesub_data['result'] = sheet.cell(i,6).valuesub_data['TestResult'] = sheet.cell(i,7).valuetest_data.append(sub_data)return test_data # 返回获取到的数据@staticmethoddef write_back(file_name,sheet_name,row,rol,result):wb = load_workbook(file_name)sheet = wb[sheet_name]sheet.chell(row,rol).value = resultwb.save(file_name) # 保存结果@staticmethoddef update_tel(file_name,sheet_name,tel): # 更新手机号码wb = load_workbook(file_name)sheet = wb[sheet_name]sheet.chell(2,1).value = telwb.save(file_name)if __name__ == '__main__':print(HandleExcel(test_data_path,'login').get_data())

3.handle_mysql.pymysql数据库基本操作

import pymysqlfrom common.file_dir import case_config_pathfrom common.read_config import ReadConfigclass HandleMysql:@staticmethoddef handle_mysql(sql,state="all"):#1. 连接数据库db_config = eval(ReadConfig().read_config(case_config_path,'DB','db_config'))#利用这个类从配置文件中读取infoconn = pymysql.connect(**db_config)#2. 获取游标cursor = conn.cursor()#3. 定义要执行的SQL语句#4. 执行SQL语句cursor.execute(sql)#5. 获取结果if state=="1":cursor.fetchone()else:cursor.fetchall()#6. 关闭游标cursor.close()#7. 关闭数据库连接conn.close()return db_configif __name__ == '__main__':sql = 'select * from member where MobielPhone like "150%"'res = HandleMysql().handle_mysql(sql)print(res[0][0])

4.handle_regx.py手机号码正则处理

from common.get_data import GetData# s = {"mobilephone":"${normal_tel}","pwd":"${pwd}"}import reclass Regx:@staticmethoddef handle_regx(s):while re.search('\$\{(.*?)]\}',s):key = re.search('\$\{(.*?)]\}',s).group(0)value = re.search('\$\{(.*?)]\}',s).group(1)s = s.replace(key,str(getattr(GetData,value)))return sif __name__ == '__main__':s = '{"mobilephone": "${admin_tel}", "pwd": "${pwd}"}'res = Regx().handle_regx(s)print(res)

5.http_request.pyrequest发送请求

import requestsfrom common.my_logger import MyLoggerlogger = MyLogger()class HttpRequest:def http_request(self,url,data,method,cookies=None):global restry:if method.upper() =="GET":res = requests.get(url,data,cookies=cookies)elif method.upper()=="POST":res = requests.post(url,data,cookies=cookies)else:logger.info("输入请求方式不对")except Exception as e:logger.error("请求方式报错:{}".format(e))raise ereturn res

6.logger.py自定义日志配置文件

import loggingclass Logger:def logger(self, level, msg):# 1.定义一个日志收集器logger = logging.getLogger()# 2. 设置级别logger.setLevel("DEBUG")# 3. 设置输出格式formatter = logging.Formatter('%(asctime)s-%(levelname)s-%(filename)s-%(name)s-日志信息:%(message)s')# 4. 创建一个输出渠道ch = logging.StreamHandler() # 输出到控制台ch.setLevel("DEBUG")ch.setFormatter(formatter)fh = logging.FileHandler('test.txt', encoding='UTF-8') # 输出到文件fh.setLevel("ERROR")fh.setFormatter(formatter)# 5. 两者对接---指定输出渠道logger.addHandler(ch)logger.addHandler(fh)# 6. 收集日志if level == "DEBUG":logger.debug(msg)elif level == "INFO":logger.info(msg)elif level == "WARNING":logger.warning(msg)elif level == "ERROR":logger.error(msg)elif level == "CRITICAL":logger.critical(msg)# 7. 关闭渠道logger.removeHandler(ch)logger.removeHandler(fh)def debug(self, msg):self.logger("DEBUG",msg)def info(self, msg):self.logger("INFO",msg)def warning(self, msg):self.logger("WARNING",msg)def error(self, msg):self.logger("ERROR",msg)def critical(self, msg):self.logger("CRITICAL",msg)if __name__ == '__main__':logger = Logger()logger.debug('debug级别信息!')logger.info('info级别的信息')logger.warning('warning级别信息')logger.error('error级别信息!')logger.critical('critical级别信息!')

7.read_config.py读取用例配置文件

import configparser # 配置解析器class ReadConfig:@staticmethoddef read_config(file_path,section,option):cf = configparser.ConfigParser()cf.read(file_path,encoding='utf-8')return cf[section][option]if __name__ == '__main__':from common.file_dir import case_config_pathprint(ReadConfig().read_config(case_config_path,'MODE','mode'))

8.send_email.py发送邮件模块

# email:***@ password:***"""使用一个邮箱向另一个邮箱发送测试报告的html文件,这里需要对发送邮件的邮箱进行设置,获取邮箱授权码。username=“发送邮件的邮箱”, password=“邮箱授权码”这里要特别注意password不是邮箱密码而是邮箱授权码。mail_server = "发送邮箱的服务器地址"这里常用的有 qq邮箱——"", 163邮箱——""其他邮箱服务器地址可自行百度"""import osimport smtplibfrom email.mime.text import MIMETextfrom email.header import Headerimport time# 自动发送邮件class SendEmail():def send_email(self, new_report):# 读取测试报告中的内容作为邮件的内容with open(new_report, 'r', encoding='utf8') as f:mail_body = f.read()# 发件人地址send_addr = '88888888@' # 收件人地址reciver_addr = 'zhangsan@'# 发送邮箱的服务器地址 qq邮箱是'', 163邮箱是''mail_server = ''now = time.strftime("%Y-%m-%d %H:%M:%S")# 邮件标题subject = '接口自动化测试报告' + now# 发件人的邮箱及邮箱授权码username = '88888888@'password = 'moasmyxgojcgbbch' # 注意这里是邮箱的授权码而不是邮箱密码# 邮箱的内容和标题message = MIMEText(mail_body, 'html', 'utf8') # MIMEText(邮箱主体内容,内容类型,字符集)message['Subject'] = Header(subject, charset='utf8')# 发送邮件,使用的使smtp协议smtp = smtplib.SMTP()smtp.connect(mail_server)smtp.login(username, password)smtp.sendmail(send_addr, reciver_addr.split(','), message.as_string())smtp.quit()# 获取最新的测试报告地址def acquire_report_address(self, reports_address):# 测试报告文件夹中的所有文件加入到列表test_reports_list = os.listdir(reports_address)# 按照升序排序生成新的列表new_test_reports_list = sorted(test_reports_list)# 获取最新的测试报告the_last_report = new_test_reports_list[-1]# 最新的测试报告地址the_last_report_address = os.path.join(reports_address, the_last_report)return the_last_report_addressif __name__ == '__main__':# 测试报告存放位置test_reports_address = '../test_result/html_report'# 查找最新生成的测试报告地址new_report_addr = SendEmail().acquire_report_address(test_reports_address)# 自动发送邮件SendEmail().send_email(new_report_addr)

2.2测试配置层

case.config用例配置文件

[MODE]mode={"login":[],"register":[],"recharge":[],"business":"all","invest":"all"}[DB]db_config = (host="169.254.160.239",port='3306',user="python",password="pymysql_test",database="login",charset="utf8")[CHECKLEAVEAMOUNT]check_list=['recharge','invest','withdraw']

2.3 测试用例层

1.test_login.py测试登录用例

import unittestfrom common.handle_excel import HandleExcelfrom common.get_data import GetDatafrom common.http_requests import HttpRequestfrom common.file_dir import *from ddt import ddt, data # 列表嵌套列表:索引方式读取;列表嵌套字典:关键字方式读取test_data = HandleExcel.get_data(test_data_path,'login') # 执行登录用例@ddtclass TestLogin(unittest.TestCase):def setUp(self):passdef tearDown(self):pass@data(*test_data)def test_Login(self, item):# 登录global TestResultlogin_url = 'http:path'login_data = '{"mobilephone": "xxxxxx", "pwd": "123456"}'res = HttpRequest.http_request(item['url'], eval(item['data']), item['http_method'],getattr(GetData, 'Cookie'))try:self.assertEqual(item['excepted'], res.json()['code'])TestResult = 'PASS' # 成功的except Exception as e:TestResult = 'failed' # 失败的print("执行用例出错:{0}".format(e))finally:HandleExcel.write_back(test_data_path,item['sheet_name'], item['test_id']+1, str(res.json()),TestResult)print("获取到的结果为:{0}".format(res.json()))

2.test_recharge.py测试充值用例

import unittestfrom common.handle_excel import HandleExcelfrom common.get_data import GetDatafrom common.http_requests import HttpRequestfrom common.file_dir import *from ddt import ddt, data # 列表嵌套列表:索引方式读取;列表嵌套字典:关键字方式读取test_data = HandleExcel.get_data(test_data_path,'recharge') # 执行充值用例@ddtclass TestRecharge(unittest.TestCase):def setUp(self):passdef tearDown(self):pass@data(*test_data)def test_recharge(self, item):# 登录global TestResult# 充值recharge_url = 'http://path'recharge_data = '{"mobilephone": "xxxxxx", "amount": "1000"}'res = HttpRequest.http_request(item['url'], eval(item['data']), item['http_method'],getattr(GetData, 'Cookie'))try:self.assertEqual(item['excepted'], res.json()['code'])TestResult = 'PASS' # 成功的except Exception as e:TestResult = 'failed' # 失败的print("执行用例出错:{0}".format(e))finally:HandleExcel.write_back(test_data_path, item['sheet'], item['test_id'] + 1, str(res.json()),TestResult)print("获取到的结果为:{0}".format(res.json()))

3. 其他测试用例根据接口自行封装

2.4 测试数据层

以登录数据为例

2.5 输出层

1.html_report输出html报告目录

2.log输出日志目录

3.sreenshot错误截屏存放目录

2.6 run.py运行文件

import unittestfrom testcase.test_invest import TestInvestsuite = unittest.TestSuite() # 创建类套接字对象# suite.addTest(TestHttpRequests('test_api')) # 测试类实例loader = unittest.TestLoader()# 方法一:并行多个用例# suite.addTest(loader.loadTestsFromModule(test_login))# suite.addTest(loader.loadTestsFromModule(test_recharge))suite.addTest(loader.loadTestsFromTestCase(TestInvest))with open('test_result/html_report/test_api.html','wb') as file:# 执行用例runner = unittest.TextTestRunner(stream=file,title = '单元测试报告',descriptions='单元测试报告',tester='jsonLiu')runner.run(suite)

3. 可与Jenkins实现集成

Jenkins下载官网:https://www.jenkins.io/download/

关于搭建windows版Python与Jenkins集成可参考此篇:/qq_39247153/article/details/81003244

关于linux版Python与Jenkins集成完整搭建流程可参考此篇:/nunchakushuang/article/details/77118621?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-4.add_param_isCf&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-4.add_param_isCf

4. 总结

此框架用到的技术是Python+requests+unittest+ddt+openpyxl +config+log+email+Jenkins

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。