Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test(self):
lw = LocalWorkflow(wf,
activity_workers=16,
workflow_workers=2,
executor=ThreadPoolExecutor)
lw.conf_activity('a', examples.activity)
start = time.time()
result = lw.run(TIME_SCALE)
duration = time.time() - start
lines = [l.strip() for l in wf.__doc__.split("\n")]
expected = None
for line in lines:
if line.startswith('R '):
expected = int(line.split()[-1].split("-")[-1])
break
self.assertEquals(expected, result)
for line in lines:
if line.startswith('Duration:'):
expected_duration = int(line.split()[-1]) * TIME_SCALE * 0.1
break
print(expected_duration, duration)
def download_pages():
page = 1
with futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
# We submit NUM_THREADS tasks at a time since we don't know how many
# pages we will need to download in advance
while True:
l = []
for i in range(NUM_THREADS):
f = executor.submit(download_and_save_page, page)
l.append(f)
page += 1
# Block and stop if we're done downloading the page
if not all(f.result() for f in l):
break
def handle(self, *args, **options):
env = load_env()
# XXX: In Django 1.8 this changes to:
# if 'PORT' in env and not options.get('addrport'):
# options['addrport'] = env['PORT']
if 'PORT' in env and not args:
args = (env['PORT'],)
# We're subclassing runserver, which spawns threads for its
# autoreloader with RUN_MAIN set to true, we have to check for
# this to avoid running gulp twice.
if not os.getenv('RUN_MAIN', False):
pool = futures.ThreadPoolExecutor(max_workers=1)
gulp_thread = pool.submit(self.start_gulp)
gulp_thread.add_done_callback(self.gulp_exited_cb)
return super(Command, self).handle(*args, **options)
def find_all_devices(self, device_cfg):
devices_mp = dict()
discovery_tp_size = self.config.get('discovery_tp_size', 5)
devices = utils.get_device_list(device_cfg)
with futures.ThreadPoolExecutor(discovery_tp_size) as tp_executor:
results = {
tp_executor.submit(
self.find_device,
device): device for device in devices}
devices = [fut.result() for fut in futures.as_completed(results)]
for device in devices:
if device is not None and device.get(
'ip_address') not in devices_mp:
devices_mp[device.get('ip_address')] = device
LOGGER.info(
'get_all_devices, device_count [%d]', len(
devices_mp.values()))
self.active_devices = devices_mp.values()
return self.active_devices
def collect(self):
try:
LOGGER.debug('collect, start')
enabled = self.config.get('enable', True)
if not enabled:
LOGGER.warn(
'collect, data collection is disabled')
return
tp_size = self.config.get('tp_size', 5)
with futures.ThreadPoolExecutor(tp_size) as tp_executor:
for device in self.device_mgr.get_devices():
future = tp_executor.submit(self.collect_data, device)
self.pending_request.append(future)
future.add_done_callback(self.collect_datacb)
futures.wait(self.pending_request)
if LOGGER.isEnabledFor(logging.DEBUG):
self.dump()
for listener in self.listeners:
listener.notify(self.stats, self.poll_time())
self.clear()
except Exception as e:
LOGGER.exception('collect, failed %s', e)
finally:
LOGGER.debug('collect, end')
config['debug'] = False
settings = {
'debug': config['debug'],
}
server_settings = {
"xheaders": True,
}
application = web.Application([
(r'/runs/([a-z]+)', api.AddRunHandler),
], **settings)
application.config = config
application.thread_pool = futures.ThreadPoolExecutor(max_workers=3)
if __name__ == '__main__':
define('port', default=11001, help='TCP port to listen on')
parse_command_line()
setproctitle.setproctitle('orl.api')
mongoengine.connect(
config['db_name'],
host=config['db_uri'])
application.tf = tornadotinyfeedback.Client('openrunlog')
application.redis = tornadoredis.Client()
application.redis.connect()
application.redis_sync = redis.StrictRedis()
application.q = rq.Queue(connection=application.redis_sync)
# a notional default viewport...
VIEWPORT = (1200, 900)
# the version of the notebook format to use... some autodetect would be nice
IPYNB_VERSION = 4
class CaptureServer(HTTPServer):
""" A tornado server that handles serving up static HTTP assets. When the
assets are ready, `capture` is called
This should be subclassed to provide specific behavior: see
nbpresent.exporters.pdf_capture (from which this was refactored)
"""
executor = futures.ThreadPoolExecutor(max_workers=1)
pdf_name = "notebook.pdf"
ipynb_name = "notebook.ipynb"
embed_ipynb = True
@run_on_executor
def capture(self):
""" The main control flow for the capture process.
"""
self.ghost = self.init_ghost()
self.session = self.init_session()
self.session.open("http://localhost:{}/index.html".format(PORT))
try:
self.page_ready()
except Exception as err:
(r'/u/([A-Za-z0-9_]+)/data/sevendaymiles.json', profiledata.SevenDayMileage),
(r'/g', groups.GroupDashboardHandler),
(r'/g/([a-zA-Z0-9_]+)', groups.GroupHandler),
(r'/add', runs.AddRunHandler),
(r'/remove', runs.RemoveRunHandler),
(r'/data/([A-Za-z0-9]{24})/this_week', data.ThisWeekHandler),
(r'/data/([A-Za-z0-9]{24})/recent', data.RecentRunsHandler),
(r'/data/([A-Za-z0-9]{24})/recent_avgs', data.RecentSevenDayAveragesHandler),
(r'/data/([A-Za-z0-9]{24})/mileage/weekly', data.WeeklyMileageHandler),
(r'/data/([A-Za-z0-9]{24})/runs/weekday', data.WeekdayRunsHandler),
(r'/data/([A-Za-z0-9]{24})/runs/year', data.DailyRunsHandler),
(r'/data/([A-Za-z0-9]{24})/runs/month', data.MonthRunsHandler),
], **settings)
application.config = config
application.thread_pool = futures.ThreadPoolExecutor(max_workers=3)
if __name__ == '__main__':
define('port', default=11000, help='TCP port to listen on')
parse_command_line()
setproctitle.setproctitle('orl.app')
mongoengine.connect(
config['db_name'],
host=config['db_uri'])
application.tf = tornadotinyfeedback.Client('openrunlog')
application.redis = tornadoredis.Client()
application.redis.connect()
application.redis_sync = redis.StrictRedis()