Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_python(tmpdir):
mktree(tmpdir, tree)
watcher = PythonWatcher(str(tmpdir))
sleep(0.01)
tmpdir.join('foo/spam.py').write('xxx')
tmpdir.join('foo/bar.txt').write('xxx')
assert watcher.check() == {(Change.modified, str(tmpdir.join('foo/spam.py')))}
async def thread_proc(self):
from pycrunch.pipeline.file_modification_task import FileModifiedNotificationTask
logger.debug('thread_proc')
logger.debug(f'files {self.files}')
logger.debug(f'files {self.files}')
path = Path('.').absolute()
print('watching this:...')
print(path)
async for changes in awatch(path, watcher_cls=PythonWatcher):
for c in changes:
change_type = c[0]
force = False
if change_type == Change.added:
force = True
file = c[1]
logger.info(f'File watcher alarm: file: `{file}` type `{change_type}` ')
if force or self.should_watch(file):
if change_type == Change.deleted:
execution_pipeline.add_task(FileRemovedTask(file=file))
logger.info('Added file removal for pipeline ' + file)
else:
execution_pipeline.add_task(FileModifiedNotificationTask(file=file))