Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
setattr(request, key, value)
# Set SPNEGO token in the metadata
metadata = self.get_metadata(session_token)
# Call the gRPC endpoint
endpoint_func = getattr(stub, endpoint)
response = endpoint_func(request, metadata=metadata)
except Exception as e:
# Submit metrics to Locust in case of errors
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type='broker_rpc', name=endpoint, response_time=total_time, exception=e)
else:
# Submit metrics to Locust in case of success
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type='broker_rpc', name=endpoint, response_time=total_time, response_length=0)
return response
"""
Call this function from a locustfile to enable event markers in logging.
"""
# "import locust" within this scope so that this module is importable by
# code running in environments which do not have locust installed.
import locust
# install simple event markers
locust.events.locust_start_hatching += EventMarker('locust_start_hatching')
locust.events.master_start_hatching += EventMarker('master_start_hatching')
locust.events.quitting += EventMarker('quitting')
locust.events.hatch_complete += EventMarker('hatch_complete')
# install heartbeat markers which are rate limited
heartbeat_handler = HeartbeatEventMarker()
locust.events.request_success += heartbeat_handler
locust.events.request_failure += heartbeat_handler
:param transaction: protobuf Transaction
:return: None
"""
start_time = time.time()
try:
tx_future = self._command_service_stub.Torii.future(transaction)
tx_status = 'NOT_RECEIVED'
while tx_status not in ['COMMITTED', 'REJECTED']:
for status in self.tx_status_stream(transaction):
tx_status = status[0]
except grpc.RpcError as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="grpc", name='send_tx_await', response_time=total_time, exception=e)
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="grpc", name='send_tx_await', response_time=total_time, response_length=0)
# In this example, I've hardcoded response_length=0. If we would want the response length to be
name=f"locust_{func.__name__}",
response_time=total,
exception=e,
response_length=0,
)
# Uncomment with fix of __on_failure() function from Taurus. Expected Taurus version with the fix is 1.14.3
# events.request_failure.fire(request_type="Action",
# name=f"locust_{func.__name__}",
# response_time=total,
# response_length=0,
# exception=e)
logger.error(f'{func.__name__} action failed. Reason: {e}')
else:
total = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="Action",
name=f"locust_{func.__name__}",
response_time=total,
response_length=0)
logger.info(f'{func.__name__} is finished successfully')
return result
# save the old last_request_timestamp, to see if we should store a new copy
# of the response times in the response times cache
old_last_request_timestamp = global_stats.total.last_request_timestamp
# update the total StatsEntry
global_stats.total.extend(StatsEntry.unserialize(data["stats_total"]))
if global_stats.total.last_request_timestamp and global_stats.total.last_request_timestamp > (old_last_request_timestamp or 0):
# If we've entered a new second, we'll cache the response times. Note that there
# might still be reports from other slave nodes - that contains requests for the same
# time periods - that hasn't been received/accounted for yet. This will cause the cache to
# lag behind a second or two, but since StatsEntry.current_response_time_percentile()
# (which is what the response times cache is used for) uses an approximation of the
# last 10 seconds anyway, it should be fine to ignore this.
global_stats.total._cache_response_times(int(global_stats.total.last_request_timestamp))
events.request_success += on_request_success
events.request_failure += on_request_failure
events.report_to_master += on_report_to_master
events.slave_report += on_slave_report
def print_stats(stats, current=True):
console_logger.info((" %-" + str(STATS_NAME_WIDTH) + "s %7s %12s %7s %7s %7s | %7s %7s %7s") % ('Name', '# reqs', '# fails', 'Avg', 'Min', 'Max', 'Median', 'req/s', 'failures/s'))
console_logger.info("-" * (80 + STATS_NAME_WIDTH))
for key in sorted(six.iterkeys(stats.entries)):
r = stats.entries[key]
console_logger.info(r.to_string(current=current))
console_logger.info("-" * (80 + STATS_NAME_WIDTH))
console_logger.info(stats.total.to_string(current=current))
console_logger.info("")
:param pwd: InfluxDB password.
:param database: InfluxDB database name. Will be created if not exist.
:param interval_ms: Interval to save the data points to influxdb.
"""
influxdb_client = InfluxDBClient(influx_host, influx_port, user, pwd, database)
influxdb_client.create_database(database)
node_id = 'local'
if '--master' in sys.argv:
node_id = 'master'
if '--slave' in sys.argv:
# TODO: Get real ID of slaves form locust somehow
node_id = 'slave'
# Start a greenlet that will save the data to influx according to the interval informed
flush_worker = gevent.spawn(__flush_cached_points_worker, influxdb_client, interval_ms)
# Request events
events.request_success += __listen_for_requests_events(node_id, success=True)
events.request_failure += __listen_for_requests_events(node_id, success=False)
# Locust events
events.hatch_complete += __listen_for_locust_events(node_id, event='hatch_complete')
events.quitting += __listen_for_locust_events(node_id, event='quitting')
events.master_start_hatching += __listen_for_locust_events(node_id, event='master_start_hatching')
events.master_stop_hatching += __listen_for_locust_events(node_id, event='master_stop_hatching')
events.locust_start_hatching += __listen_for_locust_events(node_id, event='locust_start_hatching')
events.locust_stop_hatching += __listen_for_locust_events(node_id, event='locust_stop_hatching')
# Locust exceptions
events.locust_error += __listen_for_locust_errors(node_id)
def last_flush_on_quitting():
global stop_flag
stop_flag = True
flush_worker.join()
__flush_points(influxdb_client)
fname = os.getenv("JTL")
is_csv = True
else:
raise ValueError("Please specify JTL or SLAVES_LDJSON environment variable")
with open(fname, 'wt') as self.fhd:
if is_csv:
fieldnames = list(self.__getrec(None, None, None, None).keys())
dialect = guess_csv_dialect(",".join(fieldnames))
self.writer = csv.DictWriter(self.fhd, fieldnames=fieldnames, dialect=dialect)
self.writer.writeheader()
self.fhd.flush()
else:
self.writer = None # FIXME: bad code design, have zero object for it
events.request_success += self.__on_request_success
events.request_failure += self.__on_request_failure
events.locust_error += self.__on_exception
events.slave_report += self.__on_slave_report
events.quitting += self.__on_quit
main.main()
self.fhd.flush()
def __fire_success(self, **kwargs):
events.request_success.fire(**kwargs)
def send(self, service_name, method, *args):
if service_name=="object" and method=="execute_kw":
call_name = "%s : %s" % args[3:5]
else:
call_name = '%s : %s' % (service_name, method)
start_time = time.time()
try:
res = openerplib.json_rpc(self.url, "call", {"service": service_name, "method": method, "args": args})
except Exception as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="Odoo JsonRPC", name=call_name, response_time=total_time, exception=e)
raise e
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="Odoo JsonRPC", name=call_name, response_time=total_time, response_length=sys.getsizeof(res))
return res
if catch_response:
response.locust_request_meta = request_meta
return ResponseContextManager(response)
else:
try:
response.raise_for_status()
except FAILURE_EXCEPTIONS as e:
events.request_failure.fire(
request_type=request_meta["method"],
name=request_meta["name"],
response_time=request_meta["response_time"],
response_length=request_meta["content_size"],
exception=e,
)
else:
events.request_success.fire(
request_type=request_meta["method"],
name=request_meta["name"],
response_time=request_meta["response_time"],
response_length=request_meta["content_size"],
)
return response