Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self):
super(ProgramsUser, self).__init__()
if not self.host:
raise LocustError(
'You must specify a base host, either in the host attribute in the Locust class, '
'or on the command line using the --host option.'
)
self.client = LocustEdxRestApiClient(
PROGRAMS_API_URL,
session=HttpSession(base_url=self.host),
jwt=self._get_token()
)
def __init__(self):
super(FastHttpLocust, self).__init__()
if self.host is None:
raise LocustError("You must specify the base host. Either in the host attribute in the Locust class, or on the command line using the --host option.")
if not re.match(r"^https?://[^/]+$", self.host, re.I):
raise LocustError("Invalid host (`%s`). The specified host string must be a base URL without a trailing slash. E.g. http://example.org" % self.host)
self.client = FastHttpSession(base_url=self.host)
class LocustError(Exception):
pass
class ResponseError(Exception):
pass
class CatchResponseError(Exception):
pass
class MissingWaitTimeError(LocustError):
pass
class InterruptTaskSet(Exception):
"""
Exception that will interrupt a Locust when thrown inside a task
"""
def __init__(self, reschedule=True):
"""
If *reschedule* is True and the InterruptTaskSet is raised inside a nested TaskSet,
the parent TaskSet whould immediately reschedule another task.
"""
self.reschedule = reschedule
class StopLocust(Exception):
pass
def _send_obj(sock, msg):
data = msg.serialize()
packed = struct.pack('!i', len(data)) + data
try:
sock.sendall(packed)
except Exception:
try:
sock.close()
except:
pass
finally:
raise LocustError("Slave has disconnected")
def run(self, runner=None):
task_set_instance = self.task_set(self)
try:
task_set_instance.run()
except StopLocust:
pass
except (RescheduleTask, RescheduleTaskImmediately) as e:
six.reraise(LocustError, LocustError("A task inside a Locust class' main TaskSet (`%s.task_set` of type `%s`) seems to have called interrupt() or raised an InterruptTaskSet exception. The interrupt() function is used to hand over execution to a parent TaskSet, and should never be called in the main TaskSet which a Locust class' task_set attribute points to." % (type(self).__name__, self.task_set.__name__)), sys.exc_info()[2])
except GreenletExit as e:
if runner:
runner.state = STATE_CLEANUP
# Run the task_set on_stop method, if it has one
if hasattr(task_set_instance, "on_stop"):
task_set_instance.on_stop()
raise # Maybe something relies on this except being raised?
def __init__(self):
super(RealBrowserLocust, self).__init__()
if self.screen_width is None:
raise LocustError("You must specify a screen_width "
"for the browser")
if self.screen_height is None:
raise LocustError("You must specify a screen_height "
"for the browser")
self.proxy_server = os_getenv("LOCUST_BROWSER_PROXY", None)
def __init__(self):
super(RealBrowserLocust, self).__init__()
if self.screen_width is None:
raise LocustError("You must specify a screen_width "
"for the browser")
if self.screen_height is None:
raise LocustError("You must specify a screen_height "
"for the browser")
self.proxy_server = os_getenv("LOCUST_BROWSER_PROXY", None)
def __init__(self):
super(HttpLocust, self).__init__()
if self.host is None:
raise LocustError("You must specify the base host. Either in the host attribute in the Locust class, or on the command line using the --host option.")
session = HttpSession(base_url=self.host)
session.trust_env = self.trust_env
self.client = session
def __getattr__(self, _):
raise LocustError("No client instantiated. Did you intend to inherit from HttpLocust?")