将上篇中提到的python顺序执行并分析Jmeter结果的脚本放这里。
parse_data处理Jmeter的jtl文件(csv格式的),处理结果经过测试,和Jmeter本身聚合报告监听器处理逻辑一致。
import subprocess
import os
import pandas as pd
def run_all(dir_name, postfix, run_func, delta_time):
"""
顺序执行当前目录下所有jmx文件的压测
:param dir_name: 需要执行jmx文件放置目录
:param postfix: jmx文件后缀-一般就是jmx
:param run_func: 执行方法,即下面的run方法
:param delta_time: 有两个元素的数组,第一个元素是“删去前x秒的数据”,第二个元素是“删去后x秒的数据”,
以去掉Jmeter创建或销毁线程时不稳定的部分。
:return:
"""
os.chdir(dir_name)
for each in os.listdir('./'):
if each.endswith(postfix):
run_func(each, delta_time)
print(each + ' : parse OK')
return True
def run(item, delta_time):
"""
调用系统命令执行jmeter测试
"""
jtl_name = item[:-4]+'.jtl'
subprocess.run('jmeter -n -t {jmx} -l {jtl}'.format(jmx=item, jtl=jtl_name),
shell=True, stdout=subprocess.PIPE)
p = ParseJtl(jtl_name, delta_time)
p.write_to_csv()
return True
class ParseJtl:
def __init__(self, jtl_name, delta_time):
self.jtl_name = jtl_name
self.csv_data = pd.read_csv(self.jtl_name, low_memory=False)
self.excel_name = self.jtl_name[:-4] + '.csv'
self.front_cut = delta_time[0]*1000 # 前后删去部分数据,即jmeter的线程创建与销毁过程, 秒数*1000
self.end_cut = delta_time[1]*1000
def data(self):
return self.csv_data
def parse_data(self, item):
"""
按照label区分数据整理
"""
c = self.csv_data
_data = c[c['label'] == item] # 按照label分数据
if _data.__len__() > 3:
first_timestamp = c.iloc[0].timeStamp + self.front_cut
last_timestamp = c.iloc[-1].timeStamp - self.end_cut
c = c[(c['timeStamp'] > first_timestamp) & (c['timeStamp'] 1:
throughput = '%.2f' % (_data.__len__()*1000 / (_data.iloc[-1].timeStamp-_data.iloc[0].timeStamp))
receive_kb_per_s = '%.2f' % (_data['bytes'].sum() /
(_data.iloc[-1].timeStamp-_data.iloc[0].timeStamp) / 1.024)
sent_kb_per_s = '%.2f' % (_data['sentBytes'].sum() /
(_data.iloc[-1].timeStamp-_data.iloc[0].timeStamp) / 1.024)
else:
throughput = '%.2f' % (_data.__len__()*1000 / _data.iloc[0].elapsed)
receive_kb_per_s = '%.2f' % (_data['bytes'].sum() / _data.iloc[0].elapsed / 1.024)
sent_kb_per_s = '%.2f' % (_data['sentBytes'].sum() / _data.iloc[0].elapsed / 1.024)
return {'label': item,
'data': {'sampler_number': sampler_number, 'average': average,
'median': median, 'line_90': line_90, 'line_95': line_95, 'line_99': line_99,
'max_time': max_time, 'min_time': min_time, 'error_rate': error_rate,
'throughput': throughput, 'receive_KB_per_s': receive_kb_per_s, 'sent_KB_per_s': sent_kb_per_s
}}
def write_to_csv(self):
data = self.final_parse()
# print(data)
_data = {'name': []}
for each in data:
_data['name'].append(each['label'])
for every in each['data']:
if _data.get(every):
_data[every].append(each['data'][every])
else:
_data[every] = [each['data'][every]]
data_frame = pd.DataFrame(_data)
data_frame.to_csv(self.excel_name, index=False, sep=',', encoding='utf-8')
return True
def final_parse(self):
"""
执行jtl文件解析,应当在run 后调用
"""
label_list = self.csv_data['label'].unique() # get the unique label name
_data = [self.parse_data(each) for each in label_list]
return _data
if __name__ == '__main__':
run_all(dir_name='./', postfix='.jmx', run_func=run, delta_time=[0, 0])
执行run方法即可
作者:ak131313