Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, dir_object):
watchdog.events.FileSystemEventHandler.__init__(self)
self.dir_object = dir_object
def watchdir():
try:
logger.debug('Initializing watchdog')
observer = Observer()
watchdog_handler = MyHandler()
observer.schedule(watchdog_handler, path=cfg.watchdir_location, recursive=True)
observer.start()
logger.info('Initializing watchdog complete')
for dirpath, dirs, files in os.walk(cfg.watchdir_location):
for file in files:
filepath = os.path.join(dirpath, file)
watchdog_handler.on_created(events.FileCreatedEvent(filepath))
except:
raise
def __init__(self, paths=None):
if not paths:
paths = ["src", "plugins", config.data_dir + "/__plugins__"]
self.log = logging.getLogger("DebugReloader")
self.last_chaged = 0
self.callbacks = []
if enabled:
self.observer = watchdog.observers.Observer()
event_handler = watchdog.events.FileSystemEventHandler()
event_handler.on_modified = event_handler.on_deleted = self.onChanged
event_handler.on_created = event_handler.on_moved = self.onChanged
for path in paths:
if not os.path.isdir(path):
continue
self.log.debug("Adding autoreload: %s" % path)
self.observer.schedule(event_handler, path, recursive=True)
self.observer.start()
def on_modified(self,event):
pdb.set_trace()
if type(event) == watchdog.events.FileModifiedEvent:
print "file modified";
tmp = self.files.get(event.src_path, None)
if tmp: tmp.onModuleChanged()
def monitor(path, func):
"""Monitor each source file and re-generate documentation on change."""
# The watchdog modules are imported in `main()` but we need to re-import\
# here to bring them into the local namespace.
import watchdog.events
import watchdog.observers
class RegenerateHandler(watchdog.events.FileSystemEventHandler):
"""A handler for recompiling files which triggered watchdog events"""
def on_any_event(self, event):
"""Regenerate documentation for a file which triggered an event"""
# Re-generate documentation from a source file if it was listed on\
# the command line. Watchdog monitors whole directories, so other\
# files may cause notifications as well.
func()
# Set up an observer which monitors all directories for files given on\
# the command line and notifies the handler defined above.
event_handler = RegenerateHandler()
observer = watchdog.observers.Observer()
observer.schedule(event_handler, path=path, recursive=True)
# Run the file change monitoring loop until the user hits Ctrl-C.
observer.start()
raise
return patched
@task("GET")
def get(url, **kwargs):
task.info("GET %s" % url)
try:
response = requests.get(str(url), **kwargs)
response.json = json_patch(response, response.json)
return response
except requests.RequestException, e:
raise TaskError(e)
import watchdog, watchdog.events
class _Wrapper(watchdog.events.FileSystemEventHandler):
def __init__(self, action):
for attr in "on_any_event", "on_created", "on_deleted", "on_modified", "on_moved":
meth = getattr(action, attr, None)
if meth:
setattr(self, attr, meth)
@task()
def dispatch(self, event):
watchdog.events.FileSystemEventHandler.dispatch(self, event)
@task()
def watch(paths, action):
handler = _Wrapper(action)
obs = watchdog.observers.Observer()
for path in paths:
return value
def push(self, config):
new_config = self.new_child()
new_config.update(config)
return new_config
def replace(self, config):
self.maps[0] = config
def __getattr__(self, attr):
return self.__getitem__(attr)
class ProjectWatcher(events.FileSystemEventHandler):
# TODO(dejw): should expose some stats (like how many times it was
# notified) or how many times it succeeed in testing etc.
def __init__(self, config, working_dir, scheduler, builder, observer):
super(ProjectWatcher, self).__init__()
self._event = None
self._build = 0
self.name = get_project_name(working_dir)
self.working_dir = path.path(working_dir)
self.set_config(config)
self._last_status = (None, None)
self._notification = None
from datetime import datetime
import json
import string
import watchdog
class MessangerEventHandler(watchdog.events.FileSystemEventHandler):
def __init__(self, wsMessanger, reactor, base_dir):
self.wsMessanger = wsMessanger
self.reactor = reactor
self.base_dir = base_dir
super(watchdog.events.FileSystemEventHandler, self).__init__()
def on_any_event(self, event):
print event.src_path, event.event_type
self.reactor.callFromThread(self.wsMessanger.notify_clients,
(self._jsonize_event(event)))
def _jsonize_event(self, event):
rel_path = string.replace(event.src_path, self.base_dir, '')
return json.dumps(
{
def watch(self, handler):
path = self.file_path
class Watcher(watchdog.events.FileSystemEventHandler):
def on_modified(self, event):
if not event.is_directory and os.path.realpath(event.src_path) == os.path.realpath(
path
):
handler()
self.observer = watchdog.observers.Observer()
self.observer.schedule(Watcher(), os.path.dirname(path))
self.observer.start()
import yang
import os
import pdb
from types import *
import common
import entity
def DataTypeSortOrder(a,b):
if not type(a[1]) is DictType:
return 1
if not type(b[1]) is DictType:
return -1
return cmp(a[1].get("order",10000),b[1].get("order",10001))
class ModuleChangedHandler(watchdog.events.FileSystemEventHandler):
def __init__(self):
self.files = {}
def on_modified(self,event):
pdb.set_trace()
if type(event) == watchdog.events.FileModifiedEvent:
print "file modified";
tmp = self.files.get(event.src_path, None)
if tmp: tmp.onModuleChanged()
class Module:
def __init__(self, filename):
global observer
global handler
self.filename = common.fileResolver(filename)
realfile = os.path.realpath(self.filename) # Chase thru symbolic links
handler.files[realfile] = self