Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_detect_modify_for_moved_files(p):
touch(p('a'))
ref = DirectorySnapshot(p(''))
wait()
touch(p('a'))
mv(p('a'), p('b'))
diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('')))
assert diff.files_moved == [(p('a'), p('b'))]
assert diff.files_modified == [p('a')]
def test_dir_modify_on_create(p):
ref = DirectorySnapshot(p(''))
wait()
touch(p('a'))
diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('')))
assert diff.dirs_modified == [p('')]
def test_replace_same_folder(self):
mkdir(p('dir1'))
touch(p('dir1', 'a'))
touch(p('dir1', 'b'))
snapBefore = DirectorySnapshot(temp_dir)
sleep(1)
mv(p('dir1', 'a'), p('dir1', 'b'))
snapAfter = DirectorySnapshot(temp_dir)
changes = snapAfter - snapBefore
expected = {
'files_moved': [(p('dir1', 'a'), p('dir1', 'b')), ],
'dirs_modified': [p('dir1'), ]
}
self.verify(expected, changes)
def test_move_replace(p):
mkdir(p('dir1'))
mkdir(p('dir2'))
touch(p('dir1', 'a'))
touch(p('dir2', 'b'))
ref = DirectorySnapshot(p(''))
mv(p('dir1', 'a'), p('dir2', 'b'))
diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p('')))
assert diff.files_moved == [(p('dir1', 'a'), p('dir2', 'b'))]
assert diff.files_deleted == [p('dir2', 'b')]
assert diff.files_created == []
def queue_events(self, timeout):
with self._lock:
if (not self.watch.is_recursive
and self.watch.path not in self.pathnames):
return
new_snapshot = DirectorySnapshot(self.watch.path,
self.watch.is_recursive)
events = new_snapshot - self.snapshot
self.snapshot = new_snapshot
# Files.
for src_path in events.files_deleted:
self.queue_event(FileDeletedEvent(src_path))
for src_path in events.files_modified:
self.queue_event(FileModifiedEvent(src_path))
for src_path in events.files_created:
self.queue_event(FileCreatedEvent(src_path))
for src_path, dest_path in events.files_moved:
self.queue_event(FileMovedEvent(src_path, dest_path))
# Directories.
for src_path in events.dirs_deleted:
def _get_directory_snapshot_diff(self, path, recursive):
"""Obtains a diff of two directory snapshots."""
# The path will be reset to the watched directory path
# and a snapshot will be stored for the correct key.
with self._lock:
(path, snapshot) = self._get_snapshot_for_path(path)
new_snapshot = DirectorySnapshot(path, recursive=recursive)
self.snapshot_for_path[path] = new_snapshot
return new_snapshot - snapshot
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
EventEmitter.__init__(self, event_queue, watch, timeout)
self._kq = select.kqueue()
self._lock = threading.RLock()
# A collection of KeventDescriptor.
self._descriptors = KeventDescriptorSet()
def custom_stat(path, self=self):
stat_info = stat(path)
self._register_kevent(path, stat.S_ISDIR(stat_info.st_mode))
return stat_info
self._snapshot = DirectorySnapshot(watch.path,
recursive=watch.is_recursive,
stat=custom_stat)
def _queue_events(self):
"""Blocking call to kqueue.control that enlists events and then
processes them classifying them into various events defined in watchdog.events."""
with self._lock:
event_list = self._kq.control(list(self._kevent_list), MAX_EVENTS)
files_renamed, dirs_renamed, dirs_modified = \
self._kevents_except_movement(event_list)
# Take a fresh snapshot of the directory and update saved snapshot.
new_dir_snapshot = DirectorySnapshot(self._path, self._is_recursive)
ref_dir_snapshot = self._dir_snapshot
self._dir_snapshot = new_dir_snapshot
# Process events for renames and directories modified.
if files_renamed or dirs_renamed or dirs_modified:
self._kevent_file_renames(ref_dir_snapshot,
new_dir_snapshot,
files_renamed)
self._kevent_dir_renames(ref_dir_snapshot,
new_dir_snapshot,
dirs_renamed)
self._kevent_dir_modifications(ref_dir_snapshot,
new_dir_snapshot,
dirs_modified)
:meth:`select.kqueue.control()` method.
:param timeout:
Blocking timeout for reading events.
:type timeout:
``float`` (seconds)
"""
with self._lock:
try:
event_list = self._read_events(timeout)
# TODO: investigate why order appears to be reversed
event_list.reverse()
# Take a fresh snapshot of the directory and update the
# saved snapshot.
new_snapshot = DirectorySnapshot(self.watch.path,
self.watch.is_recursive)
ref_snapshot = self._snapshot
self._snapshot = new_snapshot
diff_events = new_snapshot - ref_snapshot
# Process events
for directory_created in diff_events.dirs_created:
self.queue_event(DirCreatedEvent(directory_created))
for file_created in diff_events.files_created:
self.queue_event(FileCreatedEvent(file_created))
for file_modified in diff_events.files_modified:
self.queue_event(FileModifiedEvent(file_modified))
for kev in event_list:
for event in self._gen_kqueue_events(kev,
ref_snapshot,