Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_watch_stop():
class FakeWatcher:
def __init__(self, path):
self._results = iter([
{'r1'},
set(),
{'r2'},
])
def check(self):
return next(self._results)
stop_event = threading.Event()
stop_event.set()
ans = []
for c in watch('xxx', watcher_cls=FakeWatcher, debounce=5, min_sleep=1, stop_event=stop_event):
ans.append(c)
assert ans == []
def test_watch_watcher_kwargs(mocker):
class FakeWatcher:
def __init__(self, path, arg1=None, arg2=None):
self._results = iter([
{arg1},
set(),
{arg2},
set(),
])
def check(self):
return next(self._results)
kwargs = dict(arg1='foo', arg2='bar')
iter_ = watch('xxx', watcher_cls=FakeWatcher, watcher_kwargs=kwargs, debounce=5, normal_sleep=2, min_sleep=1)
assert next(iter_) == {kwargs['arg1']}
assert next(iter_) == {kwargs['arg2']}
def test_watch_keyboard_error():
class FakeWatcher:
def __init__(self, path):
pass
def check(self):
raise KeyboardInterrupt()
iter = watch('xxx', watcher_cls=FakeWatcher, debounce=5, min_sleep=1)
assert list(iter) == []
def test_watch_log(mocker, caplog):
mock_log_enabled = mocker.patch('watchgod.main.logger.isEnabledFor')
mock_log_enabled.return_value = True
class FakeWatcher:
def __init__(self, path):
self.files = [1, 2, 3]
def check(self):
return {'r1'}
iter = watch('xxx', watcher_cls=FakeWatcher, debounce=5, min_sleep=10)
assert next(iter) == {'r1'}
assert 'DEBUG xxx time=Xms debounced=Xms files=3 changes=1 (1)' in re.sub(r'\dms', 'Xms', caplog.text)
def test_watch(mocker):
class FakeWatcher:
def __init__(self, path):
self._results = iter([
{'r1'},
set(),
{'r2'},
set(),
])
def check(self):
return next(self._results)
iter_ = watch('xxx', watcher_cls=FakeWatcher, debounce=5, normal_sleep=2, min_sleep=1)
assert next(iter_) == {'r1'}
assert next(iter_) == {'r2'}
def _watch(directory: str) -> None: # pragma: no cover
try:
from watchgod import watch # pylint: disable=import-outside-toplevel
except ImportError:
logging.error(
'Could not watch for file changes, is "watchgod" installed?')
return
logging.info('Watching for file changes in "%s"', directory)
for changes in watch(directory):
for change in changes:
file_path = change[1]
module_name = os.path.split(
os.path.relpath(file_path, directory))[0]
logging.debug('Changes in file "%s" cause "%s" module (re)load',
file_path, module_name)
import_or_reload(module_name)
def _watch(directory):
logging.info('Watching for file changes in "%s"' % directory)
for changes in watch(directory):
for change in changes:
name, module_name = _get_module_import_parts(change[1])
module = sys.modules.get(name)
if module:
for intent, func in list(handlers.items()):
if func.__module__ == module_name:
del handlers[intent]
logging.info('Reloading module "%s"' % module_name)
try:
importlib.reload(module)
except:
logging.warning('Reloading failed for "%s"' % module_name)
else: