基于Jmeter与python,建立半自动化压测流程 (二)

Esta ·
更新时间:2024-11-10
· 996 次阅读

将上篇中提到的python顺序执行并分析Jmeter结果的脚本放这里。

parse_data处理Jmeter的jtl文件(csv格式的),处理结果经过测试,和Jmeter本身聚合报告监听器处理逻辑一致。

import subprocess import os import pandas as pd def run_all(dir_name, postfix, run_func, delta_time): """ 顺序执行当前目录下所有jmx文件的压测 :param dir_name: 需要执行jmx文件放置目录 :param postfix: jmx文件后缀-一般就是jmx :param run_func: 执行方法,即下面的run方法 :param delta_time: 有两个元素的数组,第一个元素是“删去前x秒的数据”,第二个元素是“删去后x秒的数据”, 以去掉Jmeter创建或销毁线程时不稳定的部分。 :return: """ os.chdir(dir_name) for each in os.listdir('./'): if each.endswith(postfix): run_func(each, delta_time) print(each + ' : parse OK') return True def run(item, delta_time): """ 调用系统命令执行jmeter测试 """ jtl_name = item[:-4]+'.jtl' subprocess.run('jmeter -n -t {jmx} -l {jtl}'.format(jmx=item, jtl=jtl_name), shell=True, stdout=subprocess.PIPE) p = ParseJtl(jtl_name, delta_time) p.write_to_csv() return True class ParseJtl: def __init__(self, jtl_name, delta_time): self.jtl_name = jtl_name self.csv_data = pd.read_csv(self.jtl_name, low_memory=False) self.excel_name = self.jtl_name[:-4] + '.csv' self.front_cut = delta_time[0]*1000 # 前后删去部分数据,即jmeter的线程创建与销毁过程, 秒数*1000 self.end_cut = delta_time[1]*1000 def data(self): return self.csv_data def parse_data(self, item): """ 按照label区分数据整理 """ c = self.csv_data _data = c[c['label'] == item] # 按照label分数据 if _data.__len__() > 3: first_timestamp = c.iloc[0].timeStamp + self.front_cut last_timestamp = c.iloc[-1].timeStamp - self.end_cut c = c[(c['timeStamp'] > first_timestamp) & (c['timeStamp'] 1: throughput = '%.2f' % (_data.__len__()*1000 / (_data.iloc[-1].timeStamp-_data.iloc[0].timeStamp)) receive_kb_per_s = '%.2f' % (_data['bytes'].sum() / (_data.iloc[-1].timeStamp-_data.iloc[0].timeStamp) / 1.024) sent_kb_per_s = '%.2f' % (_data['sentBytes'].sum() / (_data.iloc[-1].timeStamp-_data.iloc[0].timeStamp) / 1.024) else: throughput = '%.2f' % (_data.__len__()*1000 / _data.iloc[0].elapsed) receive_kb_per_s = '%.2f' % (_data['bytes'].sum() / _data.iloc[0].elapsed / 1.024) sent_kb_per_s = '%.2f' % (_data['sentBytes'].sum() / _data.iloc[0].elapsed / 1.024) return {'label': item, 'data': {'sampler_number': sampler_number, 'average': average, 'median': median, 'line_90': line_90, 'line_95': line_95, 'line_99': line_99, 'max_time': max_time, 'min_time': min_time, 'error_rate': error_rate, 'throughput': throughput, 'receive_KB_per_s': receive_kb_per_s, 'sent_KB_per_s': sent_kb_per_s }} def write_to_csv(self): data = self.final_parse() # print(data) _data = {'name': []} for each in data: _data['name'].append(each['label']) for every in each['data']: if _data.get(every): _data[every].append(each['data'][every]) else: _data[every] = [each['data'][every]] data_frame = pd.DataFrame(_data) data_frame.to_csv(self.excel_name, index=False, sep=',', encoding='utf-8') return True def final_parse(self): """ 执行jtl文件解析,应当在run 后调用 """ label_list = self.csv_data['label'].unique() # get the unique label name _data = [self.parse_data(each) for each in label_list] return _data if __name__ == '__main__': run_all(dir_name='./', postfix='.jmx', run_func=run, delta_time=[0, 0])

执行run方法即可


作者:ak131313



自动 自动化 jmeter Python

需要 登录 后方可回复, 如果你还没有账号请 注册新账号