Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@contextmanager
def _middleware(self, spider, settings, **kwargs):
_crawler = Crawler(spider, settings)
if 'auth_encoding' in kwargs:
_mw = RotatedProxyMiddleware(_crawler,
auth_encoding=kwargs['auth_encoding'])
else:
_mw = RotatedProxyMiddleware(_crawler)
_mw.spider_opened(spider)
try:
yield _mw
finally:
_mw.spider_closed(spider)
@contextmanager
def real_docker():
def configurator(binder):
binder.bind_to_constructor(IDockerClient, lambda: DockerTwistedClient())
inject.clear_and_configure(configurator)
yield
@contextmanager
def mock_docker():
mock = flexmock(DockerTwistedClient())
inject.clear_and_configure(lambda binder: binder.bind(IDockerClient, mock))
yield mock
@contextmanager
def take_at_least_seconds(amount_in_seconds):
t_issued = datetime.now()
yield
t_expired = datetime.now() - t_issued
while t_expired.total_seconds() < amount_in_seconds:
sleep(1)
t_expired = datetime.now() - t_issued
@contextmanager
def _logging(self):
start_ts = time.time()
yield
end_ts = time.time()
duration = end_ts - start_ts
self.connection.log(self.query, end_ts, duration, self)
@contextmanager
def disabled_triggers():
disable_triggers()
yield
enable_triggers()
@contextmanager
def take_at_least_seconds(amount_in_seconds):
t_issued = datetime.now()
yield
t_expired = datetime.now() - t_issued
while t_expired.total_seconds() < amount_in_seconds:
sleep(1)
t_expired = datetime.now() - t_issued
@contextmanager
def seamless_modification(
node, deployment, force_seamless=True, force_operational=False):
"""
Rotates the ``node`` in the ``deployment`` in and out of operation if
possible to avoid service interruption. If ``force_seamless`` is True
(default) then the user will be prompted if it's not possible to rotate in
and out seamlessly because the required redundancy isn't met.
Understands that only active, operational nodes need to be rotated out and
that only healthy nodes should be rotated back in.
"""
# should we make this node operational as the last step
make_operational = force_operational
if node and force_seamless:
if not deployment.has_required_redundancy(node):
@contextmanager
def logs_duration(deploy_timers, timer_name='Total', output_result=False):
"""
A decorator to output the duration of the function after completion.
"""
start_time = datetime.now()
try:
yield
finally:
duration = datetime.now() - start_time
deploy_timers[timer_name] = duration.seconds
if output_result:
time_logger.info("%02ds- %s", duration.seconds, timer_name)
@contextmanager
def poolcontext(*args, **kwargs):
"""poolcontext makes it easier to run a function with a process Pool.
Example:
with poolcontext(processes=n_jobs) as pool:
bs_scores = pool.map(compute_bias, sentences)
avg_bias = sum(bs_scores)
"""
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()