Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, timeout, tries):
"""Create a new empty requests set."""
if timeout is not None and timeout < 0:
raise Broker.Error("timeout must be non-negative or None")
if tries is not None and tries <= 0:
raise Broker.Error("tries must be positive or None")
self.waiting = collections.deque()
self.processing = collections.deque()
self.bypath = {}
self.timeout = timeout
self.tries = tries
def __init__(self, timeout, tries):
"""Create a new empty requests set."""
if timeout is not None and timeout < 0:
raise Broker.Error("timeout must be non-negative or None")
if tries is not None and tries <= 0:
raise Broker.Error("tries must be positive or None")
self.waiting = collections.deque()
self.processing = collections.deque()
self.bypath = {}
self.timeout = timeout
self.tries = tries
def get_request(self):
"""Get the first request in the queue."""
if self.request_waiting():
request = self.waiting.popleft()
return self.process_request(request)
raise Broker.Error("internal error")