Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def dispatch(self, event):
if isinstance(event, DirDeletedEvent):
if event.src_path == self.path:
logging.info("Watch directory deleted.")
self.notifier.stop()
else:
# only process events we care about
if event.src_path.endswith(self.log_suffix):
super(LogRotationEventHandler, self).dispatch(event)
else:
logging.log(1, "Event path doesn't end with log_suffix: {}. "
"Discarding event: {}".format(self.log_suffix, event))
# Utilities.
patterns = ['*.py', '*.txt']
ignore_patterns = ["*.pyc"]
def assert_patterns(event):
if has_attribute(event, 'dest_path'):
paths = [event.src_path, event.dest_path]
else:
paths = [event.src_path]
filtered_paths = filter_paths(paths,
included_patterns=patterns,
excluded_patterns=ignore_patterns,
case_sensitive=False)
self.assertTrue(filtered_paths)
dir_del_event_match = DirDeletedEvent('/path/blah.py')
dir_del_event_not_match = DirDeletedEvent('/path/foobar')
dir_del_event_ignored = DirDeletedEvent('/path/foobar.pyc')
file_del_event_match = FileDeletedEvent('/path/blah.txt')
file_del_event_not_match = FileDeletedEvent('/path/foobar')
file_del_event_ignored = FileDeletedEvent('/path/blah.pyc')
dir_cre_event_match = DirCreatedEvent('/path/blah.py')
dir_cre_event_not_match = DirCreatedEvent('/path/foobar')
dir_cre_event_ignored = DirCreatedEvent('/path/foobar.pyc')
file_cre_event_match = FileCreatedEvent('/path/blah.txt')
file_cre_event_not_match = FileCreatedEvent('/path/foobar')
file_cre_event_ignored = FileCreatedEvent('/path/blah.pyc')
dir_mod_event_match = DirModifiedEvent('/path/blah.py')
dir_mod_event_not_match = DirModifiedEvent('/path/foobar')
dir_mod_event_ignored = DirModifiedEvent('/path/foobar.pyc')
self.stopped_event.wait(self.interval)
diff = self._get_directory_snapshot_diff()
for path in diff.files_deleted:
self.event_queue.put(FileDeletedEvent(path))
for path in diff.files_modified:
self.event_queue.put(FileModifiedEvent(path))
for path in diff.files_created:
self.event_queue.put(FileCreatedEvent(path))
for path, dest_path in diff.files_moved.items():
self.event_queue.put(FileMovedEvent(path, dest_path))
for path in diff.dirs_modified:
self.event_queue.put(DirModifiedEvent(path))
for path in diff.dirs_deleted:
self.event_queue.put(DirDeletedEvent(path))
for path in diff.dirs_created:
self.event_queue.put(DirCreatedEvent(path))
for path, dest_path in diff.dirs_moved.items():
self.event_queue.put(DirMovedEvent(path, dest_path))
# inside the directory tree? Does kqueue does this
# all by itself? Check this and then enable this code
# only if it doesn't already.
# A: It doesn't. So I've enabled this block.
if self.watch.is_recursive:
for sub_event in event.sub_moved_events():
self.queue_event(sub_event)
self.queue_event(event)
else:
self.queue_event(FileMovedEvent(src_path, dest_path))
except KeyError:
# If the new snapshot does not have an inode for the
# old path, we haven't found the new name. Therefore,
# we mark it as deleted and remove unregister the path.
if is_directory:
self.queue_event(DirDeletedEvent(src_path))
else:
self.queue_event(FileDeletedEvent(src_path))
def on_moved(self, sync_event):
if sync_event.is_src_path_syncable:
if sync_event.is_dest_path_syncable:
self._events_buffer.set_moved_event(sync_event)
else:
self.sync_logger.debug(
'Moved outside of the syncable area, removing: %r' % sync_event)
if sync_event.is_directory:
raw_delete_event = DirDeletedEvent(sync_event.src_path)
else:
raw_delete_event = FileDeletedEvent(sync_event.src_path)
delete_event = SyncEvent(
raw_delete_event, sync_event.timestamp, sync_event.relpath)
self.on_deleted(delete_event)
elif sync_event.is_dest_path_syncable:
self.sync_logger.debug(
'Moved into the syncable area, creating: %r' % sync_event)
if sync_event.is_directory:
raw_create_event = DirCreatedEvent(sync_event.dest_path)
else:
raw_create_event = FileCreatedEvent(sync_event.dest_path)
create_event = SyncEvent(
raw_create_event, sync_event.timestamp, sync_event.relpath)
self.on_created(create_event)
new_snapshot):
"""
Compares information from two directory snapshots (one taken before
the rename operation and another taken right after) to determine the
destination path of the file system object renamed, and yields
the appropriate events to be queued.
"""
try:
f_inode = ref_snapshot.inode(src_path)
except KeyError:
# Probably caught a temporary file/directory that was renamed
# and deleted. Fires a sequence of created and deleted events
# for the path.
if is_directory:
yield DirCreatedEvent(src_path)
yield DirDeletedEvent(src_path)
else:
yield FileCreatedEvent(src_path)
yield FileDeletedEvent(src_path)
# We don't process any further and bail out assuming
# the event represents deletion/creation instead of movement.
return
dest_path = new_snapshot.path(f_inode)
if dest_path is not None:
dest_path = absolute_path(dest_path)
if is_directory:
event = DirMovedEvent(src_path, dest_path)
yield event
else:
yield FileMovedEvent(src_path, dest_path)
yield self._parent_dir_modified(src_path)
else:
event = FileCreatedEvent(path)
changes.append(event)
elif is_modified:
if snapshot.isdir(path):
event = DirModifiedEvent(path)
else:
event = FileModifiedEvent(path)
changes.append(event)
# get deleted items
rev_dict_copy = self.get_rev_dict()
for path in rev_dict_copy:
if self.to_local_path(path).lower() not in lowercase_snapshot_paths:
if rev_dict_copy[path] == "folder":
event = DirDeletedEvent(self.to_local_path(path))
else:
event = FileDeletedEvent(self.to_local_path(path))
changes.append(event)
del snapshot
del lowercase_snapshot_paths
return changes
attribute modifications. The other events, namely,
file creation, directory modification, file rename,
directory rename, directory creation, etc. are
determined by comparing directory snapshots.
"""
files_renamed = set()
dirs_renamed = set()
dirs_modified = set()
for kev in event_list:
descriptor = self._descriptors.get_for_fd(kev.ident)
src_path = descriptor.path
if is_deleted(kev):
if descriptor.is_directory:
self.queue_event(DirDeletedEvent(src_path))
else:
self.queue_event(FileDeletedEvent(src_path))
elif is_attrib_modified(kev):
if descriptor.is_directory:
self.queue_event(DirModifiedEvent(src_path))
else:
self.queue_event(FileModifiedEvent(src_path))
elif is_modified(kev):
if descriptor.is_directory:
# When a directory is modified, it may be due to
# sub-file/directory renames or new file/directory
# creation. We determine all this by comparing
# snapshots later.
dirs_modified.add(src_path)
else:
self.queue_event(FileModifiedEvent(src_path))
events = SimplerDiff(self._snapshot, new_snapshot)
self._snapshot = new_snapshot
# Files.
for src_path in events.files_deleted:
self.queue_event(FileDeletedEvent(src_path))
for src_path in events.files_modified:
self.queue_event(FileModifiedEvent(src_path))
for src_path in events.files_created:
self.queue_event(FileCreatedEvent(src_path))
for src_path, dest_path in events.files_moved:
self.queue_event(FileMovedEvent(src_path, dest_path))
# Directories.
for src_path in events.dirs_deleted:
self.queue_event(DirDeletedEvent(src_path))
for src_path in events.dirs_modified:
self.queue_event(DirModifiedEvent(src_path))
for src_path in events.dirs_created:
self.queue_event(DirCreatedEvent(src_path))
for src_path, dest_path in events.dirs_moved:
self.queue_event(DirMovedEvent(src_path, dest_path))