Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def watch(path: Union[Path, str], **kwargs):
"""
Watch a directory and yield a set of changes whenever files change in that directory or its subdirectories.
"""
loop = asyncio.new_event_loop()
try:
_awatch = awatch(path, loop=loop, **kwargs)
while True:
try:
yield loop.run_until_complete(_awatch.__anext__())
except StopAsyncIteration:
break
except KeyboardInterrupt:
logger.debug('KeyboardInterrupt, exiting')
finally:
loop.close()
async def arun_process(path: Union[Path, str], target: Callable, *,
args: Tuple[Any] = (),
kwargs: Dict[str, Any] = None,
callback: Callable[[Set[Tuple[Change, str]]], Awaitable] = None,
watcher_cls: Type[AllWatcher] = PythonWatcher,
debounce=400,
min_sleep=100):
"""
Run a function in a subprocess using multiprocessing.Process, restart it whenever files change in path.
"""
watcher = awatch(path, watcher_cls=watcher_cls, debounce=debounce, min_sleep=min_sleep)
start_process = partial(_start_process, target=target, args=args, kwargs=kwargs)
process = await watcher.run_in_executor(start_process)
reloads = 0
async for changes in watcher:
callback and await callback(changes)
await watcher.run_in_executor(_stop_process, process)
process = await watcher.run_in_executor(start_process)
reloads += 1
return reloads