Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_on_delete_ignore(self, _os):
_os.path.abspath.return_value = "/a/test.py"
_event = FileDeletedEvent(src_path="/a/test.py")
self.handler.on_deleted(_event)
_os.path.abspath.assert_called_once_with(
"/a/test.py"
)
self.assertEqual(_os.path.join.call_count, 0)
dest_path="/foo/bar/newservice.yaml"
)
target_class = Mock()
on_add = Mock()
on_update = Mock()
on_delete = Mock()
handler = ConfigFileChangeHandler(
target_class, on_add, on_update, on_delete
)
handler.on_moved(moved_event)
on_deleted.assert_called_once_with(
events.FileDeletedEvent("/foo/bar/service.yaml")
)
on_created.assert_called_once_with(
events.FileCreatedEvent("/foo/bar/newservice.yaml")
)
def test_on_deleted(self, _remote_rm, _os):
with mock.patch.object(
self.handler,
"del_gitignore"
) as _del_gitignore:
_os.path.abspath.return_value = "/a/.gitignore"
_os.path.join.return_value = "/b/a/.gitignore"
_remote_rm.return_value = True
_del_gitignore.return_value = True
_event = FileDeletedEvent(src_path="/a/.gitignore")
self.handler.on_deleted(_event)
_os.path.abspath.assert_called_once_with("/a/.gitignore")
_os.path.join.assert_called_once_with(
"/b/a/",
".gitignore"
)
_remote_rm.assert_called_once_with(
dst_ssh=self.handler.dst_ssh,
dst_path="/b/a/.gitignore"
)
_del_gitignore.assert_called_once_with("/a/.gitignore")
if local:
# TODO: find better way of ignoring files we want to ignore
if local.name.startswith('.') or local.name.endswith('~'):
return event
if local and db:
if self._get_proper_path(local) != self._get_proper_path(db):
raise IncorrectLocalDBMatch
if isinstance(db, File) and db.is_file and self._make_hash(local) != db.hash:
event = FileModifiedEvent(self._get_proper_path(local).full_path) # create changed event
# folder modified event cannot happen. It will be a create and delete event.
elif local is None:
db_path = self._get_proper_path(db).full_path
if isinstance(db, File) and db.is_file:
event = FileDeletedEvent(db_path) # delete event for file
else:
event = DirDeletedEvent(db_path) # delete event for folder
elif db is None:
local_path = self._get_proper_path(local)
if local_path.is_dir:
event = DirCreatedEvent(local_path.full_path)
else:
event = FileCreatedEvent(local_path.full_path)
return event
unique_paths = set(e.src_path for e in events)
histories = [[e for e in events if e.src_path == path] for path in unique_paths]
new_events.clear()
for h in histories:
if len(h) == 1:
new_events.append(h[0])
else:
path = h[0].src_path
if isinstance(h[-1], FILE_EVENTS): # final item is a file
CreatedEvent = FileCreatedEvent
ModifiedEvent = FileModifiedEvent
DeletedEvent = FileDeletedEvent
else: # final item is a directory
CreatedEvent = DirCreatedEvent
ModifiedEvent = DirModifiedEvent
DeletedEvent = DirDeletedEvent
n_created = len([e for e in h if e.event_type == EVENT_TYPE_CREATED])
n_deleted = len([e for e in h if e.event_type == EVENT_TYPE_DELETED])
if n_created > n_deleted: # file created
new_events.append(CreatedEvent(path))
if n_created < n_deleted: # file was only temporary
new_events.append(DeletedEvent(path))
elif n_deleted == n_created:
if n_created == 0: # file was modified
new_events.append(ModifiedEvent(path))
else:
directory rename, directory creation, etc. are
determined by comparing directory snapshots.
"""
files_renamed = set()
dirs_renamed = set()
dirs_modified = set()
for kev in event_list:
descriptor = self._descriptors.get_for_fd(kev.ident)
src_path = descriptor.path
if is_deleted(kev):
if descriptor.is_directory:
self.queue_event(DirDeletedEvent(src_path))
else:
self.queue_event(FileDeletedEvent(src_path))
elif is_attrib_modified(kev):
if descriptor.is_directory:
self.queue_event(DirModifiedEvent(src_path))
else:
self.queue_event(FileModifiedEvent(src_path))
elif is_modified(kev):
if descriptor.is_directory:
# When a directory is modified, it may be due to
# sub-file/directory renames or new file/directory
# creation. We determine all this by comparing
# snapshots later.
dirs_modified.add(src_path)
else:
self.queue_event(FileModifiedEvent(src_path))
elif is_renamed(kev):
# Kqueue does not specify the destination names for renames
directory rename, directory creation, etc. are
determined by comparing directory snapshots.
"""
files_renamed = set()
dirs_renamed = set()
dirs_modified = set()
for kev in event_list:
descriptor = self._descriptors.get_for_fd(kev.ident)
src_path = descriptor.path
if is_deleted(kev):
if descriptor.is_directory:
self.queue_event(DirDeletedEvent(src_path))
else:
self.queue_event(FileDeletedEvent(src_path))
elif is_attrib_modified(kev):
if descriptor.is_directory:
self.queue_event(DirModifiedEvent(src_path))
else:
self.queue_event(FileModifiedEvent(src_path))
elif is_modified(kev):
if descriptor.is_directory:
# When a directory is modified, it may be due to
# sub-file/directory renames or new file/directory
# creation. We determine all this by comparing
# snapshots later.
dirs_modified.add(src_path)
else:
self.queue_event(FileModifiedEvent(src_path))
elif is_renamed(kev):
# Kqueue does not specify the destination names for renames
with self._lock:
if not self._snapshot:
return
# Get event diff between fresh snapshot and previous snapshot.
# Update snapshot.
new_snapshot = dirsnapshot.DirectorySnapshot(
self.watch.path, self.watch.is_recursive)
events = SimplerDiff(self._snapshot, new_snapshot)
self._snapshot = new_snapshot
# Files.
for src_path in events.files_deleted:
self.queue_event(FileDeletedEvent(src_path))
for src_path in events.files_modified:
self.queue_event(FileModifiedEvent(src_path))
for src_path in events.files_created:
self.queue_event(FileCreatedEvent(src_path))
for src_path, dest_path in events.files_moved:
self.queue_event(FileMovedEvent(src_path, dest_path))
# Directories.
for src_path in events.dirs_deleted:
self.queue_event(DirDeletedEvent(src_path))
for src_path in events.dirs_modified:
self.queue_event(DirModifiedEvent(src_path))
for src_path in events.dirs_created:
self.queue_event(DirCreatedEvent(src_path))
for src_path, dest_path in events.dirs_moved:
self.queue_event(DirMovedEvent(src_path, dest_path))
def on_deleted(self, event):
if not isinstance(event, FileDeletedEvent):
return None
if not event.src_path.endswith(self.__file_ending):
return None
self._get_func(self.target.remove_file)(Path(event.src_path))