Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dir_del_event_match = DirDeletedEvent('/path/blah.py')
dir_del_event_not_match = DirDeletedEvent('/path/foobar')
dir_del_event_ignored = DirDeletedEvent('/path/foobar.pyc')
file_del_event_match = FileDeletedEvent('/path/blah.txt')
file_del_event_not_match = FileDeletedEvent('/path/foobar')
file_del_event_ignored = FileDeletedEvent('/path/blah.pyc')
dir_cre_event_match = DirCreatedEvent('/path/blah.py')
dir_cre_event_not_match = DirCreatedEvent('/path/foobar')
dir_cre_event_ignored = DirCreatedEvent('/path/foobar.pyc')
file_cre_event_match = FileCreatedEvent('/path/blah.txt')
file_cre_event_not_match = FileCreatedEvent('/path/foobar')
file_cre_event_ignored = FileCreatedEvent('/path/blah.pyc')
dir_mod_event_match = DirModifiedEvent('/path/blah.py')
dir_mod_event_not_match = DirModifiedEvent('/path/foobar')
dir_mod_event_ignored = DirModifiedEvent('/path/foobar.pyc')
file_mod_event_match = FileModifiedEvent('/path/blah.txt')
file_mod_event_not_match = FileModifiedEvent('/path/foobar')
file_mod_event_ignored = FileModifiedEvent('/path/blah.pyc')
dir_mov_event_match = DirMovedEvent('/path/blah.py', '/path/blah')
dir_mov_event_not_match = DirMovedEvent('/path/foobar', '/path/blah')
dir_mov_event_ignored = DirMovedEvent('/path/foobar.pyc', '/path/blah')
file_mov_event_match = FileMovedEvent('/path/blah.txt', '/path/blah')
file_mov_event_not_match = FileMovedEvent('/path/foobar', '/path/blah')
file_mov_event_ignored = FileMovedEvent('/path/blah.pyc', '/path/blah')
all_dir_events = [
dir_mod_event_match,
dir_mod_event_not_match,
def test_dir_modified_event():
event = DirModifiedEvent(path_1)
assert path_1 == event.src_path
assert EVENT_TYPE_MODIFIED == event.event_type
assert event.is_directory
assert not event.is_synthetic
def test___init__(self):
event = DirModifiedEvent(path_1)
self.assertEqual(path_1, event.src_path)
self.assertEqual(EVENT_TYPE_MODIFIED, event.event_type)
self.assertTrue(event.is_directory)
def on_modified(self, event):
abs_src_path = os.path.abspath(event.src_path)
isdir = isinstance(event, DirModifiedEvent)
if self.is_ignore(abs_src_path, isdir):
return
if isinstance(event, FileModifiedEvent):
relative_path = self.get_relative_src_path(event.src_path)
dst_path = os.path.join(self.dst_path, relative_path)
# Create folder of file
dst_folder_path = dst_path[:-len(dst_path.split("/")[-1])]
# If the file is `.gitignore`, update gitignore dict and list
if dst_path.split("/")[-1] == ".gitignore":
logger.info("Update ignore pattern, because changed "
"file({}) named `.gitignore` locally".format(
abs_src_path
))
self.update_gitignore(abs_src_path)
remote_create_folder(dst_ssh=self.dst_ssh,
dst_path=dst_folder_path)
def on_any_event(self, event):
if not isinstance(event, DirModifiedEvent):
path = event.dest_path if isinstance(event, FileMovedEvent) else event.src_path
item = (time.time(), event.event_type, path)
if self.queue is not None:
self.queue.put(item)
else:
self.callback(*item)
def _dispatch_events_for_path(self, handler, recursive, path):
with self._lock:
diff = self._get_directory_snapshot_diff(path, recursive)
if diff:
for path in diff.files_deleted:
handler.dispatch(FileDeletedEvent(path, handler))
for path in diff.files_modified:
handler.dispatch(FileModifiedEvent(path, handler))
for path in diff.files_created:
handler.dispatch(FileCreatedEvent(path, handler))
for path, dest_path in diff.files_moved.items():
handler.dispatch(FileMovedEvent(path, dest_path, handler))
for path in diff.dirs_modified:
handler.dispatch(DirModifiedEvent(path, handler))
for path in diff.dirs_deleted:
handler.dispatch(DirDeletedEvent(path, handler))
for path in diff.dirs_created:
handler.dispatch(DirCreatedEvent(path, handler))
for path, dest_path in diff.dirs_moved.items():
handler.dispatch(DirMovedEvent(path, dest_path, handler))
def process_IN_CLOSE_WRITE(self, event):
src_path = absolute_path(event.pathname)
if event.dir:
self._event_queue.put(DirModifiedEvent(src_path))
else:
self._event_queue.put(FileModifiedEvent(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
# TODO: generate events for tree
elif event.is_modified or event.is_inode_meta_mod or event.is_xattr_mod :
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
self.queue_event(cls(event.path))
elif event.is_created:
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
elif event.is_removed:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
i += 1
ref_snapshot,
new_snapshot):
yield event
elif is_attrib_modified(kev):
if descriptor.is_directory:
yield DirModifiedEvent(src_path)
else:
yield FileModifiedEvent(src_path)
elif is_modified(kev):
if descriptor.is_directory:
if self.watch.is_recursive or self.watch.path == src_path:
# When a directory is modified, it may be due to
# sub-file/directory renames or new file/directory
# creation. We determine all this by comparing
# snapshots later.
yield DirModifiedEvent(src_path)
else:
yield FileModifiedEvent(src_path)
elif is_deleted(kev):
if descriptor.is_directory:
yield DirDeletedEvent(src_path)
else:
yield FileDeletedEvent(src_path)
# Files.
for src_path in events.files_deleted:
self.queue_event(FileDeletedEvent(src_path))
for src_path in events.files_modified:
self.queue_event(FileModifiedEvent(src_path))
for src_path in events.files_created:
self.queue_event(FileCreatedEvent(src_path))
for src_path, dest_path in events.files_moved:
self.queue_event(FileMovedEvent(src_path, dest_path))
# Directories.
for src_path in events.dirs_deleted:
self.queue_event(DirDeletedEvent(src_path))
for src_path in events.dirs_modified:
self.queue_event(DirModifiedEvent(src_path))
for src_path in events.dirs_created:
self.queue_event(DirCreatedEvent(src_path))
for src_path, dest_path in events.dirs_moved:
self.queue_event(DirMovedEvent(src_path, dest_path))