Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Load data from self.path JSON file if it's specified, exists, and newer than last load.
# Otherwise, load the default data provided.
if self.path:
path = self.path
_jsonstores.setdefault(path, None)
self.changed = False
if os.path.exists(path):
if _loaded.get(path, 0) <= os.stat(path).st_mtime:
# Don't use encoding when reading JSON. We're using ensure_ascii=True
# Besides, when handling Py2 & Py3, just ignoring encoding works best
with open(path, mode='r') as handle: # noqa
try:
_jsonstores[path] = json.load(handle)
_loaded[path] = time.time()
except ValueError:
app_log.warning('Invalid JSON in %s', path)
self.changed = True
else:
self.changed = True
else:
path = self.name
_jsonstores.setdefault(path, self.default_data)
# Walk down the path and find the parent, key and data represented by jsonpath
parent, key, data = _jsonstores, path, _jsonstores[path]
if not jsonpath:
return parent, key, data
keys = [path] + jsonpath.split('/')
for index, key in enumerate(keys[1:]):
if hasattr(data, '__contains__') and key in data:
parent, data = data, data[key]
continue
def _copy(source, target, template_data=None):
'''
Copy single directory or file (as binary) from source to target.
Warn if target exists, or source is not file/directory, and exit.
If template_data is specified, treat source as a Tornado template.
'''
if os.path.exists(target):
app_log.warning('Skip existing %s', target)
elif os.path.isdir(source):
_mkdir(target)
elif os.path.isfile(source):
with io.open(source, 'rb') as handle:
result = handle.read()
from mimetypes import guess_type
filetype = guess_type(source)[0]
basetype = 'text' if filetype is None else filetype.split('/')[0]
if template_data is not None and basetype in {'text', 'application'}:
result = Template(result).generate(**template_data)
with io.open(target, 'wb') as handle:
handle.write(result)
else:
app_log.warning('Skip unknown file %s', source)
if not os.path.exists(target):
return True
# TODO: check case insensitive in Windows, but case sensitive on other OS
elif target.lower().startswith(variables['GRAMEXDATA'].lower()):
# Try multiple times to recover from errors, since we have no way of
# auto-resuming rmtree: https://bugs.python.org/issue8523
for count in range(100):
try:
shutil.rmtree(target, onerror=_ensure_remove)
except TryAgainError:
pass
else:
break
return True
else:
app_log.warning('Not removing directory %s (outside $GRAMEXDATA)', target)
return False
contentfields = ['body', 'html', 'bodyfile', 'htmlfile', 'markdown', 'markdownfile']
addr_fields = ['to', 'cc', 'bcc', 'reply_to', 'on_behalf_of', 'from']
for key in ['subject'] + addr_fields + contentfields:
if not isinstance(alert.get(key, ''), string_types + (list, )):
app_log.error('alert: %s.%s: %r must be a list or str', name, key, alert[key])
return
if not isinstance(alert.get('images', {}), dict):
app_log.error('alert: %s.images: %r is not a dict', name, alert['images'])
return
if not isinstance(alert.get('attachments', []), list):
app_log.error('alert: %s.attachments: %r is not a list', name, alert['attachments'])
return
# Warn if subject is missing
if 'subject' not in alert:
app_log.warning('alert: %s: missing subject', name)
# Warn if body, html, bodyfile, htmlfile keys are missing
if not any(key in alert for key in contentfields):
app_log.warning('alert: %s: missing body/html/bodyfile/htmlfile/...', name)
# Pre-compile data.
# - `data: {key: [...]}` -- loads data in-place
# - `data: {key: {url: file}}` -- loads from a file
# - `data: {key: {url: sqlalchemy-url, table: table}}` -- loads from a database
# - `data: file` -- same as `data: {data: {url: file}}`
# - `data: {key: file}` -- same as `data: {key: {url: file}}`
# - `data: [...]` -- same as `data: {data: [...]}`
datasets = {}
if 'data' in alert:
if isinstance(alert['data'], string_types):
datasets = {'data': {'url': alert['data']}}
items = []
for key, val in items:
# Purge already cleared / removed sessions
if val is None:
keys.append(key)
elif isinstance(val, dict):
# If the session has expired, remove it
if val.get('_t', 0) < now:
keys.append(key)
# If the session is inactive, remove it after a week.
# If we remove immediately, then we may lose WIP sessions.
# For example, people who opened a login page where _next_url was set
elif '_i' in val and '_l' in val and val['_i'] + val['_l'] < now - week:
keys.append(key)
else:
app_log.warning('Store key: %s has value type %s (not dict)', key, type(val))
return keys
# pipe write to the RequestHandler
if target == 'pipe':
callbacks.append(self._write)
# false-y values are ignored. (False, 0, etc)
elif not target:
pass
# strings are treated as files
elif isinstance(target, six.string_types):
# cache file handles for re-use between stdout, stderr
if target not in self.handles:
self.handles[target] = io.open(target, mode='wb')
handle = self.handles[target]
callbacks.append(handle.write)
# warn on unknown parameters (e.g. numbers, True, etc)
else:
app_log.warning('ProcessHandler: %s: %s is not implemented' % (name, target))
return callbacks
'''
for module in modules:
name = getattr(module, '__name__', None)
path = getattr(module, '__file__', None)
# sys.__file__ does not exist, but don't raise a warning. You can't reload it
if name in {'sys'}:
continue
if name is None or path is None or not os.path.exists(path):
app_log.warning('Path for module %s is %s: not found', name, path)
continue
# On Python 3, __file__ points to the .py file. In Python 2, it's the .pyc file
# https://www.python.org/dev/peps/pep-3147/#file
if path.lower().endswith('.pyc'):
path = path[:-1]
if not os.path.exists(path):
app_log.warning('Path for module %s is %s: not found', name, path)
continue
# The first time, don't reload it. Thereafter, if it's older or resized, reload it
fstat = stat(path)
if fstat != _MODULE_CACHE.get(name, fstat):
app_log.info('Reloading module %s', name)
six.moves.reload_module(module)
_MODULE_CACHE[name] = fstat