Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class RegenerateHandler(watchdog.events.FileSystemEventHandler):
"""A handler for recompiling files which triggered watchdog events"""
def on_modified(self, event):
"""Regenerate documentation for a file which triggered an event"""
# Re-generate documentation from a source file if it was listed on
# the command line. Watchdog monitors whole directories, so other
# files may cause notifications as well.
if event.src_path in absolute_sources:
process([absolute_sources[event.src_path]],
outdir=opts.outdir,
preserve_paths=opts.paths)
# Set up an observer which monitors all directories for files given on
# the command line and notifies the handler defined above.
event_handler = RegenerateHandler()
observer = watchdog.observers.Observer()
directories = set(os.path.split(source)[0] for source in sources)
for directory in directories:
observer.schedule(event_handler, path=directory)
# Run the file change monitoring loop until the user hits Ctrl-C.
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
self.queue.write_one({
'filename': event.src_path,
'rules': rule_content,
})
def on_modified(self, event):
self.process(event)
def on_created(self, event):
self.process(event)
if __name__ == '__main__':
observer = watchdog.observers.Observer()
worker = FSWatcher()
worker.read_config(sys.argv[1])
observer.schedule(worker, worker.config['watch_path'])
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
self.entityTypes[i[0]] = entity.EntityType(i[0],i[1]) # create a new entity type, with the member fields (located in i[1])
def delete(self):
"""Stop using this module"""
global handler
realfile = os.path.realpath(self.filename) # Chase thru symbolic links
del handler.files[realfile] # Clean up this reference
self.entityTypes = None
def onModuleChanged(self):
"""If the module file changes, we need to reload it."""
print "module changed"
handler = ModuleChangedHandler()
observer = watchdog.observers.Observer()
observer.start()
def Test():
import time
m = Module("SAFplusAmf.yang")
time.sleep(1000);
import watchdog.events
import watchdog.observers
class RegenerateHandler(watchdog.events.FileSystemEventHandler):
"""A handler for recompiling files which triggered watchdog events"""
def on_any_event(self, event):
"""Regenerate documentation for a file which triggered an event"""
# Re-generate documentation from a source file if it was listed on\
# the command line. Watchdog monitors whole directories, so other\
# files may cause notifications as well.
func()
# Set up an observer which monitors all directories for files given on\
# the command line and notifies the handler defined above.
event_handler = RegenerateHandler()
observer = watchdog.observers.Observer()
observer.schedule(event_handler, path=path, recursive=True)
# Run the file change monitoring loop until the user hits Ctrl-C.
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
@task()
def watch(paths, action):
handler = _Wrapper(action)
obs = watchdog.observers.Observer()
for path in paths:
obs.schedule(handler, path, recursive=True)
obs.start()
continue
if (
not self.target_class.config_subdirectory and
not (
file_name.endswith(".yaml") or file_name.endswith(".yml")
)
):
continue
handler.on_created(
events.FileCreatedEvent(
os.path.join(self.file_path, file_name)
)
)
observer = observers.Observer()
observer.schedule(handler, self.file_path)
observer.start()
return observer
def monitor(root):
class Handler(watchdog.events.FileSystemEventHandler):
def on_any_event(*args):
build(root)
observer = watchdog.observers.Observer()
observer.schedule(Handler(), os.path.join(root, 'sources/'))
observer.start()
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def watchDirectory(self, start):
# Start directory watchdog.
if start:
self.watcher = watchdog.observers.Observer()
self.watcher.schedule(FileSystemWatcher(self),
self.directory,
recursive = True)
self.watcher.start()
# Stop directory watchdog.
else:
self.watcher.stop()
def run(src_folder, dest_folder):
event_handler = EventHandler(dest_folder)
observer = watchdog.observers.Observer()
observer.schedule(event_handler, src_folder, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
help='do not ignore any files')
parser.add_argument('-p', '--port', type=int, default=22, help='the SSH port to use')
parser.add_argument('-k', '--keep-extra', action='store_true',
help='keep files on the remote that do not exist locally')
parser.add_argument('-o', '--ssh-options', default=[], action='append',
help='options to pass on to SSH with the -o flag. This argument may be specified multiple times.')
parser.add_argument('--version', action='version', version='%(prog)s 1.3')
parser.add_argument('user', metavar='user@hostname', help='the remote machine (and optional user name) to login to')
# The user argument is passed on to rsync and ssh, so actually the 'user@'
# part is optional, but using metavar='[user@]hostname' causes an error
# because of a bug in argparse - see http://bugs.python.org/issue11874
parser.add_argument('dest', help='the path to the remote directory to push changes to')
args = parser.parse_args()
observer = watchdog.observers.Observer()
observer.schedule(PypushHandler(args), path='.', recursive=True)
observer.start()
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
observer.stop()
observer.join()