Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _schedule_and_set_callback(self, stream):
if not stream.paths:
raise ValueError("No paths to observe.")
for path in stream.paths:
# Strip the path of the ending separator to ensure consistent keys
# in the self.snapshot_for_path dictionary.
path = absolute_path(path)
self.snapshot_for_path[path] = DirectorySnapshot(path, recursive=stream.is_recursive)
def callback(paths, masks):
for path in paths:
#logging.debug(path)
self._dispatch_events_for_path(stream.event_handler, stream.is_recursive, path)
_fsevents.schedule(self, stream, callback, stream.paths)
def process_IN_CLOSE_WRITE(self, event):
src_path = absolute_path(event.pathname)
if event.dir:
self._event_queue.put(DirModifiedEvent(src_path))
else:
self._event_queue.put(FileModifiedEvent(src_path))
try:
ref_stat_info = ref_dir_snapshot.stat_info(path_renamed)
except KeyError:
# Caught a temporary directory most probably.
# So fire a created+deleted event sequence.
self.event_queue.put(
DirCreatedEvent(path_renamed,
handlers=self.handlers))
self.event_queue.put(
DirDeletedEvent(path_renamed,
handlers=self.handlers))
continue
try:
path = new_dir_snapshot.path_for_inode(ref_stat_info.st_ino)
path = absolute_path(path)
# If we're in recursive mode, we fire move events for
# the entire contents of the moved directory.
if self._is_recursive:
dir_path_renamed = absolute_path(path_renamed)
for root, directories, filenames in os.walk(path):
for directory_path in directories:
full_path = os.path.join(root, directory_path)
renamed_path = full_path.replace(path, dir_path_renamed)
self.event_queue.put(
DirMovedEvent(src_path=renamed_path,
dest_path=full_path,
handlers=self.handlers))
self._unregister_path(renamed_path)
def _register_path(self, path, is_directory=False):
"""Bookkeeping method that registers watching a given path."""
#with self._lock:
path = absolute_path(path)
if not path in self._watch_table:
try:
# If we haven't registered a kevent for this path already,
# add a new kevent for the path.
kev, fd = create_kevent_for_path(path)
self._kevent_list.append(kev)
watch = _Watch(fd, kev, path, is_directory)
self._descriptors.add(fd)
self._watch_table[fd] = watch
self._watch_table[path] = watch
except OSError, e:
if e.errno == errno.ENOENT:
# No such file or directory.
# Possibly a temporary file we can ignore.
if is_directory:
event_created_class = DirCreatedEvent
def _unregister_path(self, path):
"""Bookkeeping method that unregisters watching a given path."""
#with self.lock:
path = absolute_path(path)
if path in self._watch_table:
watch = self._watch_table[path]
self._kevent_list.remove(watch.kev)
del self._watch_table[path]
del self._watch_table[watch.fd]
try:
os.close(watch.fd)
except OSError, e:
logging.warn(e)
self._descriptors.remove(watch.fd)
def process_IN_CREATE(self, event):
src_path = absolute_path(event.pathname)
if event.dir:
self._event_queue.put(DirCreatedEvent(src_path))
else:
self._event_queue.put(FileCreatedEvent(src_path))
def process_IN_MOVED_TO(self, event):
src_path = absolute_path(event.src_pathname)
dest_path = absolute_path(event.pathname)
if event.dir:
dir_moved_event = DirMovedEvent(src_path, dest_path)
if self._is_recursive:
for sub_moved_event in dir_moved_event.sub_moved_events():
self._event_queue.put(sub_moved_event)
self._event_queue.put(dir_moved_event)
else:
self._event_queue.put(FileMovedEvent(src_path, dest_path))