Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from hupper.watchdog import WatchdogFileMonitor
kw['monitor_factory'] = WatchdogFileMonitor
if opts.watchman:
from hupper.watchman import WatchmanFileMonitor
kw['monitor_factory'] = WatchmanFileMonitor
if opts.reload_interval:
kw['reload_interval'] = opts.reload_interval
hupper.start_reloader(__name__ + '.main', **kw)
if hupper.is_active():
hupper.get_reloader().watch_files([os.path.join(here, 'foo.ini')])
hupper.get_reloader().watch_files(opts.watch_files)
if opts.callback_file:
with open(opts.callback_file, 'ab') as fp:
fp.write('{:d}\n'.format(int(time.time())).encode('utf8'))
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
def register() -> None:
if hupper.is_active(): # pragma: no cover
hupper.get_reloader().watch_files([filepath])
spec_dict = read_yaml_file(filepath)
validate_spec(spec_dict)
spec = create_spec(spec_dict)
def spec_view(request: Request) -> FileResponse:
return FileResponse(filepath, request=request, content_type="text/yaml")
config.add_route(route_name, route)
config.add_view(route_name=route_name, view=spec_view)
custom_formatters = config.registry.settings.get("pyramid_openapi3_formatters")
config.registry.settings["pyramid_openapi3"] = {
"filepath": filepath,
"spec_route_name": route_name,
if opts.reload and not hupper.is_active():
if self.verbose > 1:
self.out('Running reloading file monitor')
reloader = hupper.reloader.Reloader(
worker_path='gearbox.main.main',
reload_interval=opts.reload_interval,
monitor_factory=hupper.reloader.find_default_monitor_factory(
logging.getLogger('gearbox')
),
logger=logging.getLogger('gearbox'),
)
reloader.run()
if hupper.is_active():
# Tack also config file changes
hupper.get_reloader().watch_files([opts.config_file])
if cmd == 'restart' or cmd == 'stop':
result = self.stop_daemon(opts)
if result:
if cmd == 'restart':
self.out("Could not stop daemon; aborting")
else:
self.out("Could not stop daemon")
return result
if cmd == 'stop':
return result
opts.daemon = True
if cmd == 'start':
opts.daemon = True