Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
file_cre_event_match,
file_cre_event_not_match,
file_cre_event_ignored,
file_mov_event_match,
file_mov_event_not_match,
file_mov_event_ignored,
]
all_events = all_file_events + all_dir_events
def assert_check_directory(handler, event):
self.assertFalse(handler.ignore_directories and event.is_directory)
def assert_equal(a, b):
self.assertEqual(a, b)
class TestableEventHandler(RegexMatchingEventHandler):
def on_any_event(self, event):
assert_check_directory(self, event)
def on_modified(self, event):
assert_check_directory(self, event)
assert_equal(event.event_type, EVENT_TYPE_MODIFIED)
assert_regexes(self, event)
def on_deleted(self, event):
assert_check_directory(self, event)
assert_equal(event.event_type, EVENT_TYPE_DELETED)
assert_regexes(self, event)
def on_moved(self, event):
assert_check_directory(self, event)
def test_ignore_directories(self):
handler1 = RegexMatchingEventHandler(g_allowed_regexes,
g_ignore_regexes, True)
handler2 = RegexMatchingEventHandler(g_allowed_regexes,
g_ignore_regexes, False)
self.assertTrue(handler1.ignore_directories)
self.assertFalse(handler2.ignore_directories)
def make_handler(ctx, task_, regexes, ignore_regexes, *args, **kwargs):
args = [ctx] + list(args)
try:
from watchdog.events import RegexMatchingEventHandler
except ImportError:
sys.exit("If you want to use this, 'pip install watchdog' first.")
class Handler(RegexMatchingEventHandler):
def on_any_event(self, event):
try:
task_(*args, **kwargs)
except BaseException:
pass
return Handler(regexes=regexes, ignore_regexes=ignore_regexes)
info += '------------------------------------------------------\n'
return info
# 执行同步
def qiniu_syn(path):
if PY2:
filename = str(hashlib.md5(path).hexdigest()) + '.json'
else:
filename = str(hashlib.md5(path.encode("utf8")).hexdigest()) + '.json'
cmd = current_dir + '\qrsync.exe ' + current_dir + '\\' + filename
os.system(cmd)
# 监听文件变动,正则过滤文件
class QiniuSyncEventHandler(RegexMatchingEventHandler):
def __init__(self, path, reg_ignore = []):
super(QiniuSyncEventHandler, self).__init__(ignore_regexes=reg_ignore,ignore_directories=True)
self.path = path
def on_any_event(self, event):
qiniu_syn(self.path)
# 根据路径获取文件夹名称
dir_str = get_foldername(self.path)
# 生成链接
src_path = event.src_path
if PY2:
url = config.QINIU_DOMIAN + (dir_str + src_path.replace(self.path, '')).replace('\\', urllib2.quote('/'))
else:
url = config.QINIU_DOMIAN + (dir_str + src_path.replace(self.path, '')).replace('\\', urllib.request.quote('/'))
copy_to_clipboard(url)
"""Block until a filesize remains constant."""
# watchdog cannot tell us when a newly created file has finished being
# written. Let's just wait until the filesize stops changing
size = None
while True:
try:
newsize = os.path.getsize(fname)
except:
newsize = None
else:
if newsize is not None and size == newsize:
break
print(size, newsize, fname)
size = newsize
class AIORegexMatchingEventHandler(RegexMatchingEventHandler):
def __init__(self, callback, loop=None, **kwargs):
"""asyncio compatible minimal regex matching event
handler for watchdog.
:param callback: function to apply to filenames.
:param loop: ayncio-like event loop.
"""
RegexMatchingEventHandler.__init__(self, **kwargs)
self._loop = loop or asyncio.get_event_loop()
self.callback = callback
@asyncio.coroutine
def _process_file(self, event):
"""Process an event when a file is created (or moved).
elif self.endtime is not None and time > self.endtime:
return
# the event matched, including time if applicable, dispatch
self.on_any_event(event)
_method_map = {
"modified": self.on_modified,
"moved": self.on_moved,
"created": self.on_created,
"deleted": self.on_deleted,
}
event_type = event.event_type
_method_map[event_type](event)
class DirWatcher(BaseObserver, RegexMatchingEventHandler):
"""Watchdog observer for monitoring a particular directory.
This observer has a sub-observer and handler for noticing when the
specified directory is created or deleted so that observations of that
directory can be preserved. In this way observations can be scheduled
for a directory that doesn't yet exist, and they will be enacted once it is
created.
"""
def __init__(self, path, force_polling=False, **kwargs):
"""Create observer for the directory at `path`."""
if force_polling:
observer_class = PollingObserver
else:
observer_class = Observer
self.current_config = copy.deepcopy(new_config)
def _call_spec_handlers(self, new_config):
flattened_config = yapconf.flatten(new_config, self.spec._separator)
flattened_current = yapconf.flatten(
self.current_config, self.spec._separator
)
for key, value in flattened_config.items():
if value != flattened_current.get(key):
item = self.spec.find_item(key)
if item and item.watch_target:
item.watch_target(flattened_current[key], value)
class FileHandler(RegexMatchingEventHandler):
"""Watchdog handler that only watches a specific file."""
def __init__(self, filename, handler, file_type='json'):
super(FileHandler, self).__init__(regexes=[r'^%s$' % filename])
self._handler = handler
self._filename = filename
self._file_type = file_type
def on_deleted(self, event):
raise YapconfSourceError(
'While watching file %s the file was deleted. Aborting watch.'
% self._filename
)
def on_modified(self, event):
new_config = self._load_config(self._filename)
import os
import sys
from copy import copy
from watchdog.events import RegexMatchingEventHandler
if sys.platform == "darwin":
from watchdog.observers.polling import PollingObserver as Observer
else:
from watchdog.observers import Observer
from microproxy.log import ProxyLogger
logger = ProxyLogger.get_logger(__name__)
class PluginEventHandler(RegexMatchingEventHandler):
def __init__(self, filename, callback):
super(PluginEventHandler, self).__init__(ignore_directories=True,
regexes=['.*' + filename])
self.callback = callback
def on_modified(self, event):
self.callback()
class Plugin(object):
PLUGIN_METHODS = ["on_request", "on_response"]
def __init__(self, plugin_path):
self.plugin_path = os.path.abspath(plugin_path)
self.plugin_name = os.path.basename(self.plugin_path)
self.plugin_dir = os.path.dirname(self.plugin_path)
raise sched_err
else:
root = newroot
else:
# watch was set, break out of while loop
break
# add regex for root and next subdirectory
regexes = []
regexes.append("^" + re.escape(root) + "$")
nextdir = self._get_next_dir_in_path(root)
if nextdir != root:
regexes.append("^" + re.escape(nextdir) + "$")
# update handler (self) with regexes
RegexMatchingEventHandler.__init__(
self, regexes=regexes, ignore_directories=False
)
# unschedule old root watch
if self.root_watch is not None:
try:
self.root_observer.unschedule(self.root_watch)
except KeyError:
# emitter already stopped
pass
self.root_watch = watch
def __init__(self, regexes=[r".*"], ignore_regexes=None,
ignore_directories=False, case_sensitive=False):
super(RegexMatchingEventHandler, self).__init__()
if case_sensitive:
self._regexes = [re.compile(r) for r in regexes]
self._ignore_regexes = [re.compile(r) for r in ignore_regexes]
else:
self._regexes = [re.compile(r, re.I) for r in regexes]
self._ignore_regexes = [re.compile(r, re.I) for r in ignore_regexes]
self._ignore_directories = ignore_directories
self._case_sensitive = case_sensitive