Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
filtered_paths = filter_paths(paths,
included_patterns=patterns,
excluded_patterns=ignore_patterns,
case_sensitive=False)
self.assertTrue(filtered_paths)
dir_del_event_match = DirDeletedEvent('/path/blah.py')
dir_del_event_not_match = DirDeletedEvent('/path/foobar')
dir_del_event_ignored = DirDeletedEvent('/path/foobar.pyc')
file_del_event_match = FileDeletedEvent('/path/blah.txt')
file_del_event_not_match = FileDeletedEvent('/path/foobar')
file_del_event_ignored = FileDeletedEvent('/path/blah.pyc')
dir_cre_event_match = DirCreatedEvent('/path/blah.py')
dir_cre_event_not_match = DirCreatedEvent('/path/foobar')
dir_cre_event_ignored = DirCreatedEvent('/path/foobar.pyc')
file_cre_event_match = FileCreatedEvent('/path/blah.txt')
file_cre_event_not_match = FileCreatedEvent('/path/foobar')
file_cre_event_ignored = FileCreatedEvent('/path/blah.pyc')
dir_mod_event_match = DirModifiedEvent('/path/blah.py')
dir_mod_event_not_match = DirModifiedEvent('/path/foobar')
dir_mod_event_ignored = DirModifiedEvent('/path/foobar.pyc')
file_mod_event_match = FileModifiedEvent('/path/blah.txt')
file_mod_event_not_match = FileModifiedEvent('/path/foobar')
file_mod_event_ignored = FileModifiedEvent('/path/blah.pyc')
dir_mov_event_match = DirMovedEvent('/path/blah.py', '/path/blah')
dir_mov_event_not_match = DirMovedEvent('/path/foobar', '/path/blah')
dir_mov_event_ignored = DirMovedEvent('/path/foobar.pyc', '/path/blah')
file_mov_event_match = FileMovedEvent('/path/blah.txt', '/path/blah')
file_mov_event_not_match = FileMovedEvent('/path/foobar', '/path/blah')
def test___init__(self):
event = DirCreatedEvent(path_1)
self.assertEqual(path_1, event.src_path)
self.assertEqual(EVENT_TYPE_CREATED, event.event_type)
self.assertTrue(event.is_directory)
def test_fast_subdirectory_creation_deletion():
root_dir = p('dir1')
sub_dir = p('dir1', 'subdir1')
times = 30
mkdir(root_dir)
start_watching(root_dir)
for _ in range(times):
mkdir(sub_dir)
rm(sub_dir, True)
count = {DirCreatedEvent: 0,
DirModifiedEvent: 0,
DirDeletedEvent: 0}
etype_for_dir = {DirCreatedEvent: sub_dir,
DirModifiedEvent: root_dir,
DirDeletedEvent: sub_dir}
for _ in range(times * 4):
event = event_queue.get(timeout=5)[0]
logger.debug(event)
etype = type(event)
count[etype] += 1
assert event.src_path == etype_for_dir[etype]
assert count[DirCreatedEvent] >= count[DirDeletedEvent]
assert count[DirCreatedEvent] + count[DirDeletedEvent] >= count[DirModifiedEvent]
assert count == {DirCreatedEvent: times,
DirModifiedEvent: times * 2,
DirDeletedEvent: times}
def on_created(self, event):
abs_src_path = os.path.abspath(event.src_path)
isdir = isinstance(event, DirCreatedEvent)
if self.is_ignore(abs_src_path, isdir):
return
relative_path = self.get_relative_src_path(event.src_path)
dst_path = os.path.join(self.dst_path, relative_path)
if isinstance(event, DirCreatedEvent):
logger.info("Create {} remotely".format(dst_path))
remote_create_folder(dst_ssh=self.dst_ssh, dst_path=dst_path)
else:
dst_path = os.path.join(self.dst_path, relative_path)
# Create folder of file
dst_folder_path = dst_path[:-len(dst_path.split("/")[-1])]
remote_create_folder(dst_ssh=self.dst_ssh,
dst_path=dst_folder_path)
logger.info("Rsync {} remotely".format(dst_path))
rsync(dst_ssh=self.dst_ssh, src_path=abs_src_path,
dst_path=dst_path)
if diff:
for path in diff.files_deleted:
handler.dispatch(FileDeletedEvent(path, handler))
for path in diff.files_modified:
handler.dispatch(FileModifiedEvent(path, handler))
for path in diff.files_created:
handler.dispatch(FileCreatedEvent(path, handler))
for path, dest_path in diff.files_moved.items():
handler.dispatch(FileMovedEvent(path, dest_path, handler))
for path in diff.dirs_modified:
handler.dispatch(DirModifiedEvent(path, handler))
for path in diff.dirs_deleted:
handler.dispatch(DirDeletedEvent(path, handler))
for path in diff.dirs_created:
handler.dispatch(DirCreatedEvent(path, handler))
for path, dest_path in diff.dirs_moved.items():
handler.dispatch(DirMovedEvent(path, dest_path, handler))
DirModifiedEvent(dir_modified,
handlers=self.handlers))
# Don't need to register here. It's already registered.
# self._register_path(dir_modified, is_directory=True)
diff = new_dir_snapshot - ref_dir_snapshot
for file_created in diff.files_created:
self.event_queue.put(
FileCreatedEvent(file_created,
handlers=self.handlers))
self._register_path(file_created, is_directory=False)
for dir_created in diff.dirs_created:
self.event_queue.put(
DirCreatedEvent(dir_created,
handlers=self.handlers))
self._register_path(dir_created, is_directory=True)
ref_snapshot,
new_snapshot):
"""
Compares information from two directory snapshots (one taken before
the rename operation and another taken right after) to determine the
destination path of the file system object renamed, and adds
appropriate events to the event queue.
"""
try:
ref_stat_info = ref_snapshot.stat_info(src_path)
except KeyError:
# Probably caught a temporary file/directory that was renamed
# and deleted. Fires a sequence of created and deleted events
# for the path.
if is_directory:
self.queue_event(DirCreatedEvent(src_path))
self.queue_event(DirDeletedEvent(src_path))
else:
self.queue_event(FileCreatedEvent(src_path))
self.queue_event(FileDeletedEvent(src_path))
# We don't process any further and bail out assuming
# the event represents deletion/creation instead of movement.
return
try:
dest_path = absolute_path(
new_snapshot.path(ref_stat_info.st_ino))
if is_directory:
event = DirMovedEvent(src_path, dest_path)
# TODO: Do we need to fire moved events for the items
# inside the directory tree? Does kqueue does this
# all by itself? Check this and then enable this code
)
# generate any events for files/dirs in self.path that were
# created before the watch started (since dispatching is stopped,
# duplicate events will be caught and ignored)
with self._lock:
for emitter in self.emitters:
dirs = [emitter.watch.path]
for root in dirs:
names = os.listdir(root)
paths = [os.path.join(root, name) for name in names]
paths.sort(key=os.path.getmtime)
for path in paths:
if os.path.isdir(path):
if emitter.watch.is_recursive:
dirs.append(path)
event = DirCreatedEvent(path)
else:
event = FileCreatedEvent(path)
emitter.queue_event(event)
# start dispatching events
self._start_dispatching()
def _kevent_dir_renames(self, ref_dir_snapshot, new_dir_snapshot, dirs_renamed):
"""Process kevent-hinted directory renames. These may be deletes
too relative to the watched directory."""
for path_renamed in dirs_renamed:
# These are kqueue-hinted renames. We classify them into
# either moved if the new path is found or deleted.
try:
ref_stat_info = ref_dir_snapshot.stat_info(path_renamed)
except KeyError:
# Caught a temporary directory most probably.
# So fire a created+deleted event sequence.
self.event_queue.put(
DirCreatedEvent(path_renamed,
handlers=self.handlers))
self.event_queue.put(
DirDeletedEvent(path_renamed,
handlers=self.handlers))
continue
try:
path = new_dir_snapshot.path_for_inode(ref_stat_info.st_ino)
path = absolute_path(path)
# If we're in recursive mode, we fire move events for
# the entire contents of the moved directory.
if self._is_recursive:
dir_path_renamed = absolute_path(path_renamed)
for root, directories, filenames in os.walk(path):
for directory_path in directories:
# get modified or added items
for path in snapshot.paths:
stats = snapshot.stat_info(path)
last_sync = CONF.get("internal", "lastsync")
# check if item was created or modified since last sync
dbx_path = self.to_dbx_path(path).lower()
is_new = (self.get_local_rev(dbx_path) is None and
not self.is_excluded(dbx_path))
is_modified = (self.get_local_rev(dbx_path) and
now > max(stats.st_ctime, stats.st_mtime) > last_sync)
if is_new:
if snapshot.isdir(path):
event = DirCreatedEvent(path)
else:
event = FileCreatedEvent(path)
changes.append(event)
elif is_modified:
if snapshot.isdir(path):
event = DirModifiedEvent(path)
else:
event = FileModifiedEvent(path)
changes.append(event)
# get deleted items
rev_dict_copy = self.get_rev_dict()
for path in rev_dict_copy:
if self.to_local_path(path).lower() not in lowercase_snapshot_paths:
if rev_dict_copy[path] == "folder":
event = DirDeletedEvent(self.to_local_path(path))