Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_move_folder(self):
project = self.PROJECT_STRUCTURE[0]
dir_path = self.root_dir.join(
project['files'][0]['rel_path'].lstrip(os.path.sep)
)
new_dir_path = str(dir_path).replace(dir_path.basename, 'newdir')
shutil.move(str(dir_path), new_dir_path)
self.sync_worker.flushed.wait()
assert len(self.sync_worker._event_cache.events) == 1, 'exactly one event captured'
assert isinstance(self.sync_worker._event_cache.events[0], DirMovedEvent) is True, 'the one captured event is a DirMovedEvent'
def test_rename_folder(self):
project = self.PROJECT_STRUCTURE[0]
dir_path = self.root_dir.join(
project['files'][0]['rel_path'].lstrip(os.path.sep)
)
new_dir_path = str(dir_path).replace(dir_path.basename, 'newdir')
os.rename(str(dir_path), new_dir_path)
self.sync_worker.flushed.wait()
assert len(self.sync_worker._event_cache.events) == 1, 'exactly one event captured'
assert isinstance(self.sync_worker._event_cache.events[0], DirMovedEvent) is True, 'the one captured event is a DirMovedEvent'
dir_cre_event_match = DirCreatedEvent('/path/blah.py')
dir_cre_event_not_match = DirCreatedEvent('/path/foobar')
dir_cre_event_ignored = DirCreatedEvent('/path/foobar.pyc')
file_cre_event_match = FileCreatedEvent('/path/blah.txt')
file_cre_event_not_match = FileCreatedEvent('/path/foobar')
file_cre_event_ignored = FileCreatedEvent('/path/blah.pyc')
dir_mod_event_match = DirModifiedEvent('/path/blah.py')
dir_mod_event_not_match = DirModifiedEvent('/path/foobar')
dir_mod_event_ignored = DirModifiedEvent('/path/foobar.pyc')
file_mod_event_match = FileModifiedEvent('/path/blah.txt')
file_mod_event_not_match = FileModifiedEvent('/path/foobar')
file_mod_event_ignored = FileModifiedEvent('/path/blah.pyc')
dir_mov_event_match = DirMovedEvent('/path/blah.py', '/path/blah')
dir_mov_event_not_match = DirMovedEvent('/path/foobar', '/path/blah')
dir_mov_event_ignored = DirMovedEvent('/path/foobar.pyc', '/path/blah')
file_mov_event_match = FileMovedEvent('/path/blah.txt', '/path/blah')
file_mov_event_not_match = FileMovedEvent('/path/foobar', '/path/blah')
file_mov_event_ignored = FileMovedEvent('/path/blah.pyc', '/path/blah')
all_dir_events = [
dir_mod_event_match,
dir_mod_event_not_match,
dir_mod_event_ignored,
dir_del_event_match,
dir_del_event_not_match,
dir_del_event_ignored,
dir_cre_event_match,
dir_cre_event_not_match,
dir_cre_event_ignored,
dir_mov_event_match,
dir_cre_event_match = DirCreatedEvent('/path/blah.py')
dir_cre_event_not_match = DirCreatedEvent('/path/foobar')
dir_cre_event_ignored = DirCreatedEvent('/path/foobar.pyc')
file_cre_event_match = FileCreatedEvent('/path/blah.txt')
file_cre_event_not_match = FileCreatedEvent('/path/foobar')
file_cre_event_ignored = FileCreatedEvent('/path/blah.pyc')
dir_mod_event_match = DirModifiedEvent('/path/blah.py')
dir_mod_event_not_match = DirModifiedEvent('/path/foobar')
dir_mod_event_ignored = DirModifiedEvent('/path/foobar.pyc')
file_mod_event_match = FileModifiedEvent('/path/blah.txt')
file_mod_event_not_match = FileModifiedEvent('/path/foobar')
file_mod_event_ignored = FileModifiedEvent('/path/blah.pyc')
dir_mov_event_match = DirMovedEvent('/path/blah.py', '/path/blah')
dir_mov_event_not_match = DirMovedEvent('/path/foobar', '/path/blah')
dir_mov_event_ignored = DirMovedEvent('/path/foobar.pyc', '/path/blah')
file_mov_event_match = FileMovedEvent('/path/blah.txt', '/path/blah')
file_mov_event_not_match = FileMovedEvent('/path/foobar', '/path/blah')
file_mov_event_ignored = FileMovedEvent('/path/blah.pyc', '/path/blah')
all_dir_events = [
dir_mod_event_match,
dir_mod_event_not_match,
dir_mod_event_ignored,
dir_del_event_match,
dir_del_event_not_match,
dir_del_event_ignored,
dir_cre_event_match,
dir_cre_event_not_match,
dir_cre_event_ignored,
def test_dispatch(self):
# Utilities.
dir_del_event = DirDeletedEvent('/path/blah.py')
file_del_event = FileDeletedEvent('/path/blah.txt')
dir_cre_event = DirCreatedEvent('/path/blah.py')
file_cre_event = FileCreatedEvent('/path/blah.txt')
dir_mod_event = DirModifiedEvent('/path/blah.py')
file_mod_event = FileModifiedEvent('/path/blah.txt')
dir_mov_event = DirMovedEvent('/path/blah.py', '/path/blah')
file_mov_event = FileMovedEvent('/path/blah.txt', '/path/blah')
all_events = [
dir_mod_event,
dir_del_event,
dir_cre_event,
dir_mov_event,
file_mod_event,
file_del_event,
file_cre_event,
file_mov_event,
]
handler = _TestableEventHandler()
for event in all_events:
handler.dispatch(event)
if not self.watch.is_recursive and self.watch.path not in self.pathnames:
return
new_snapshot = DirectorySnapshot(self.watch.path, self.watch.is_recursive)
events = new_snapshot - self.snapshot
self.snapshot = new_snapshot
for _, path_tuple in events.change_list.items():
for ((src_path, dst_path), e_type) in path_tuple:
if e_type == 'CR':
self.queue_event(FileCreatedEvent(src_path))
elif e_type == 'CR_D':
self.queue_event(DirCreatedEvent(src_path))
elif e_type == 'MV':
self.queue_event(FileMovedEvent(src_path, dst_path))
elif e_type == 'MV_D':
self.queue_event(DirMovedEvent(src_path, dst_path))
elif e_type == 'DL':
self.queue_event(FileDeletedEvent(src_path))
elif e_type == 'DL_D':
self.queue_event(DirDeletedEvent(src_path))
elif e_type == 'MD':
self.queue_event(FileModifiedEvent(src_path))
elif e_type == 'MD_D':
self.queue_event(DirModifiedEvent(src_path))
self.event_queue.put(FileDeletedEvent(path))
for path in diff.files_modified:
self.event_queue.put(FileModifiedEvent(path))
for path in diff.files_created:
self.event_queue.put(FileCreatedEvent(path))
for path, dest_path in diff.files_moved.items():
self.event_queue.put(FileMovedEvent(path, dest_path))
for path in diff.dirs_modified:
self.event_queue.put(DirModifiedEvent(path))
for path in diff.dirs_deleted:
self.event_queue.put(DirDeletedEvent(path))
for path in diff.dirs_created:
self.event_queue.put(DirCreatedEvent(path))
for path, dest_path in diff.dirs_moved.items():
self.event_queue.put(DirMovedEvent(path, dest_path))
for src_path in events.files_modified:
self.queue_event(FileModifiedEvent(src_path))
for src_path in events.files_created:
self.queue_event(FileCreatedEvent(src_path))
for src_path, dest_path in events.files_moved:
self.queue_event(FileMovedEvent(src_path, dest_path))
# Directories.
for src_path in events.dirs_deleted:
self.queue_event(DirDeletedEvent(src_path))
for src_path in events.dirs_modified:
self.queue_event(DirModifiedEvent(src_path))
for src_path in events.dirs_created:
self.queue_event(DirCreatedEvent(src_path))
for src_path, dest_path in events.dirs_moved:
self.queue_event(DirMovedEvent(src_path, dest_path))
for src_path in events.files_modified:
self.queue_event(FileModifiedEvent(src_path))
for src_path in events.files_created:
self.queue_event(FileCreatedEvent(src_path))
for src_path, dest_path in events.files_moved:
self.queue_event(FileMovedEvent(src_path, dest_path))
# Directories.
for src_path in events.dirs_deleted:
self.queue_event(DirDeletedEvent(src_path))
for src_path in events.dirs_modified:
self.queue_event(DirModifiedEvent(src_path))
for src_path in events.dirs_created:
self.queue_event(DirCreatedEvent(src_path))
for src_path, dest_path in events.dirs_moved:
self.queue_event(DirMovedEvent(src_path, dest_path))
while i < len(events):
event = events[i]
# For some reason the create and remove flags are sometimes also
# set for rename and modify type events, so let those take
# precedence.
if event.is_renamed:
# Internal moves appears to always be consecutive in the same
# buffer and have IDs differ by exactly one (while others
# don't) making it possible to pair up the two events coming
# from a singe move operation. (None of this is documented!)
# Otherwise, guess whether file was moved in or out.
# TODO: handle id wrapping
if (i + 1 < len(events) and events[i + 1].is_renamed
and events[i + 1].event_id == event.event_id + 1):
cls = DirMovedEvent if event.is_directory else FileMovedEvent
self.queue_event(cls(event.path, events[i + 1].path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
self.queue_event(DirModifiedEvent(os.path.dirname(events[i + 1].path)))
i += 1
elif os.path.exists(event.path):
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
else:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
# TODO: generate events for tree
elif event.is_modified or event.is_inode_meta_mod or event.is_xattr_mod :
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent