Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Monkey patch geventhttpclient.useragent.CompatRequest so that Cookiejar works with Python >= 3.3
# More info: https://github.com/requests/requests/pull/871
CompatRequest.unverifiable = False
# Workaround for AttributeError: 'CompatRequest' object has no attribute 'type' in Cookiejar
# https://github.com/locustio/locust/issues/1138
# Might allow secure cookies over non-secure connections but that is a minor concern in a load testing tool
CompatRequest.type = "https"
# Regexp for checking if an absolute URL was specified
absolute_http_url_regexp = re.compile(r"^https?://", re.I)
# List of exceptions that can be raised by geventhttpclient when sending an HTTP request,
# and that should result in a Locust failure
FAILURE_EXCEPTIONS = (ConnectionError, ConnectionRefusedError, socket.error, \
SSLError, Timeout, HTTPConnectionClosed)
def _construct_basic_auth_str(username, password):
"""Construct Authorization header value to be used in HTTP Basic Auth"""
if isinstance(username, str):
username = username.encode('latin1')
if isinstance(password, str):
password = password.encode('latin1')
return 'Basic ' + b64encode(b':'.join((username, password))).strip().decode("ascii")
class FastHttpLocust(Locust):
"""
Represents an HTTP "user" which is to be hatched and attack the system that is to be load tested.
The behaviour of this user is defined by the task_set attribute, which should point to a
if not chunk:
break
sock.sendall(chunk)
else:
sock.sendall(_request)
except gevent.socket.error as e:
self._connection_pool.release_socket(sock)
if (e.errno == errno.ECONNRESET or e.errno == errno.EPIPE) and attempts_left > 0:
attempts_left -= 1
continue
raise e
try:
response = HTTPSocketPoolResponse(sock, self._connection_pool,
block_size=self.block_size, method=method.upper(), headers_type=self.headers_type)
except HTTPConnectionClosed as e:
# connection is released by the response itself
if attempts_left > 0:
attempts_left -= 1
continue
raise e
else:
response._sent_request = request
return response