Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.configure({
"execution": [{
"data-file": RESOURCES_DIR + "/jmeter/jtl/simple.kpi.jtl"
}]
})
self.obj.prepare()
self.assertIsInstance(self.obj.reader, JTLReader)
self.obj.startup()
self.obj.check()
self.obj.shutdown()
self.obj.post_process()
self.obj.engine.aggregator.post_process()
results = self.results_listener.results
self.assertGreater(len(results), 0)
last_dp = results[-1]
cumulative_kpis = last_dp[DataPoint.CUMULATIVE]['']
self.assertIn('200', cumulative_kpis[KPISet.RESP_CODES])
})
obj = BlazeMeterUploader()
obj.parameters['project'] = 'Proj name'
obj.settings['token'] = '' # public reporting
obj.settings['browser-open'] = 'none'
obj.engine = EngineEmul()
mock.apply(obj._user)
obj.prepare()
obj._session = Session(obj._user, {'id': 1, 'testId': 1, 'userId': 1})
obj._master = Master(obj._user, {'id': 1})
obj.engine.stopping_reason = ValueError('wrong value')
obj.aggregated_second(random_datapoint(10))
obj.kpi_buffer[-1][DataPoint.CUMULATIVE][''][KPISet.ERRORS] = [
{'msg': 'Forbidden', 'cnt': 10, 'type': KPISet.ERRTYPE_ASSERT, 'urls': [], KPISet.RESP_CODES: '111',
'tag': ""},
{'msg': 'Allowed', 'cnt': 20, 'type': KPISet.ERRTYPE_ERROR, 'urls': [], KPISet.RESP_CODES: '222'}]
obj.send_monitoring = False
obj.post_process()
# TODO: looks like this whole block of checks is useless
# check for note appending in _postproc_phase3()
reqs = [{'url': '', 'data': ''} for _ in range(4)] # add template for minimal size
reqs = (reqs + mock.requests)[-4:]
self.assertNotIn('api/v4/sessions/1', reqs[0]['url'])
self.assertNotIn('api/v4/sessions/1', reqs[1]['url'])
self.assertNotIn('api/v4/masters/1', reqs[2]['url'])
self.assertNotIn('api/v4/masters/1', reqs[3]['url'])
if reqs[1]['data']:
self.assertNotIn('ValueError: wrong value', reqs[1]['data'])
def __add_err_diff(self, point, err_diff):
for label in err_diff:
point_label = '' if label == 'ALL' else label
if point_label not in point[DataPoint.CURRENT]:
self.log.warning("Got inconsistent kpi/error data for label: %s", point_label)
kpiset = KPISet()
point[DataPoint.CURRENT][point_label] = kpiset
kpiset[KPISet.SAMPLE_COUNT] = sum([item['count'] for item in err_diff[label].values()])
else:
kpiset = point[DataPoint.CURRENT][point_label]
kpiset[KPISet.ERRORS] = self.__get_kpi_errors(err_diff[label])
kpiset[KPISet.FAILURES] = sum([x['cnt'] for x in kpiset[KPISet.ERRORS]])
kpiset[KPISet.SAMPLE_COUNT] = kpiset[KPISet.SUCCESSES] + kpiset[KPISet.FAILURES]
assert kpiset[KPISet.SAMPLE_COUNT] > 0, point_label
if item['num_requests']:
avg_rt = (item['total_response_time'] / 1000.0) / item['num_requests']
kpiset.sum_rt = item['num_reqs_per_sec'][timestamp] * avg_rt
for err in data['errors'].values():
if err['name'] == item['name']:
new_err = KPISet.error_item_skel(err['error'], None, err['occurences'], KPISet.ERRTYPE_ERROR,
Counter(), None)
KPISet.inc_list(kpiset[KPISet.ERRORS], ("msg", err['error']), new_err)
kpiset[KPISet.FAILURES] += err['occurences']
kpiset[KPISet.SUCCESSES] = kpiset[KPISet.SAMPLE_COUNT] - kpiset[KPISet.FAILURES]
point[DataPoint.CURRENT][item['name']] = kpiset
overall.merge_kpis(kpiset, sid)
point[DataPoint.CURRENT][''] = overall
point.recalculate()
return point
def point_from_locust(timestamp, sid, data):
"""
:type timestamp: str
:type sid: str
:type data: dict
:rtype: DataPoint
"""
point = DataPoint(int(timestamp))
point[DataPoint.SOURCE_ID] = sid
overall = KPISet()
for item in data['stats']:
if timestamp not in item['num_reqs_per_sec']:
continue
kpiset = KPISet()
kpiset[KPISet.SAMPLE_COUNT] = item['num_reqs_per_sec'][timestamp]
kpiset[KPISet.CONCURRENCY] = data['user_count']
kpiset[KPISet.BYTE_COUNT] = item['total_content_length']
if item['num_requests']:
avg_rt = (item['total_response_time'] / 1000.0) / item['num_requests']
kpiset.sum_rt = item['num_reqs_per_sec'][timestamp] * avg_rt
for err in data['errors'].values():
if err['name'] == item['name']:
def __deepcopy__(self, memo):
new = DataPoint(self[self.TIMESTAMP], self.perc_levels)
for key in self.keys():
new[key] = copy.deepcopy(self[key], memo)
return new
def merge_datapoints(self, max_full_ts):
reader_id = self.file.name + "@" + str(id(self))
for key in sorted(self.join_buffer.keys(), key=int):
if int(key) <= max_full_ts:
sec_data = self.join_buffer.pop(key)
self.log.debug("Processing complete second: %s", key)
point = DataPoint(int(key))
point[DataPoint.SOURCE_ID] = reader_id
for sid, item in iteritems(sec_data):
point.merge_point(self.point_from_locust(key, sid, item))
point.recalculate()
yield point
point = points_to_consolidate[0]
point[DataPoint.SUBRESULTS] = [points_to_consolidate[0]]
else:
point = DataPoint(tstamp, self.track_percentiles)
for subresult in points_to_consolidate:
self.log.debug("Merging %s", subresult[DataPoint.TIMESTAMP])
point.merge_point(subresult, do_recalculate=False)
point.recalculate()
current_sids = [x[DataPoint.SOURCE_ID] for x in point[DataPoint.SUBRESULTS]]
for sid in self._sticky_concurrencies:
if sid not in current_sids:
self.log.debug("Adding sticky concurrency for %s", sid)
self._add_sticky_concurrency(point, sid)
point[DataPoint.SOURCE_ID] = self.__class__.__name__ + '@' + str(id(self))
yield point
def check(self):
"""
Check if there is next aggregate data present
:rtype: bool
"""
for point in self.datapoints():
self.log.debug("Processed datapoint: %s/%s with %d labels",
point[DataPoint.TIMESTAMP], point[DataPoint.SOURCE_ID], len(point[DataPoint.CUMULATIVE]))
return super(ConsolidatingAggregator, self).check()
def __init__(self, config, owner):
super(DataCriterion, self).__init__(config, owner)
self.label = config.get('label', '')
self.selector = DataPoint.CURRENT if self.window > 0 else DataPoint.CUMULATIVE