Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def call_later(self):
'''Schedule next run automatically. Clears any previous scheduled runs'''
delay = self.cron.next(default_utc=self.utc) if hasattr(self, 'cron') else None
self._call_later(delay)
if delay is not None:
app_log.debug('Scheduling %s after %.0fs', self.name, delay)
else:
app_log.debug('No further schedule for %s', self.name)
def load_datasets(data, each):
'''
Modify data by load datasets and filter by condition.
Modify each to apply the each: argument, else return (None, None)
'''
for key, val in datasets.items():
# Allow raw data in lists as-is. Treat dicts as {url: ...}
data[key] = val if isinstance(val, list) else gramex.data.filter(**val)
result = condition(**data)
# Avoiding isinstance(result, pd.DataFrame) to avoid importing pandas
if type(result).__name__ == 'DataFrame':
data['data'] = result
elif isinstance(result, dict):
data.update(result)
elif not result:
app_log.debug('alert: %s stopped. condition = %s', name, result)
return
if 'each' in alert:
each_data = data[alert['each']]
if isinstance(each_data, dict):
each += list(each_data.items())
elif isinstance(each_data, list):
each += list(enumerate(each_data))
elif hasattr(each_data, 'iterrows'):
each += list(each_data.iterrows())
else:
raise ValueError('alert: %s: each: data.%s must be dict/list/DF, not %s' % (
name, alert['each'], type(each_data)))
else:
each.append((0, None))
def allowed(self, path):
'''
A path is allowed if it matches any allow:, or matches no ignore:.
Override this method for a custom implementation.
'''
for ignore in self.ignore:
if _match(path, ignore):
# Check allows only if an ignore: is matched.
# If any allow: is matched, allow it
for allow in self.allow:
if _match(path, allow):
return True
app_log.debug('%s: Disallow "%s". It matches "%s"', self.name, path, ignore)
return False
return True
def close(self):
try:
self.store.close()
# h5py.h5f.get_obj_ids often raises a ValueError: Not a file id.
# This is presumably if the file handle has been closed. Log & ignore.
except ValueError:
app_log.debug('HDF5Store("%s").close() error ignored', self.path)
self.include_body = kwargs.pop('include_body', True)
path = urljoin('/', args[0] if len(args) else '').lstrip('/')
if isinstance(self.root, list):
# Concatenate multiple files and serve them one after another
for path_item in self.root:
yield self._get_path(path_item, multipart=True)
elif isinstance(self.root, dict):
# Render path for the the first matching regex
for pattern, filestr in self.root.items():
match = pattern.match(path)
if match:
q = defaultdict(text_type, **self.default)
q.update({k: v[0] for k, v in self.args.items() if len(v) > 0})
q.update(match.groupdict())
p = Path(filestr.format(*match.groups(), **q)).absolute()
app_log.debug('%s: %s renders %s', self.name, self.request.path, p)
yield self._get_path(p)
break
else:
raise HTTPError(NOT_FOUND, '%s matches no path key', self.request.path)
elif not args:
# No group has been specified in the pattern. So just serve root
yield self._get_path(self.root)
else:
# Eliminate parent directory references like `../` in the URL
path = urljoin('/', path)[1:]
if self.pattern:
yield self._get_path(Path(self.pattern.replace('*', path)).absolute())
else:
yield self._get_path(self.root / path if self.root.is_dir() else self.root)
for key in handler._session_store.keys():
# Ignore current session or OTP sessions
if key == handler.session.get('id'):
continue
if isinstance(key, six.text_type) and key.startswith('otp:'):
continue
if isinstance(key, six.binary_type) and key.startswith(b'otp:'):
continue
# Remove user from all other sessions
other_session = handler._session_store.load(key)
if other_session is not None:
other_user = other_session.get('user')
if other_user is not None and other_user.get('id'):
other_session.pop('user')
handler._session_store.dump(key, other_session)
app_log.debug('dropped user %s from session %s', user_id, other_session['id'])
def gramex_update(url):
'''If a newer version of gramex is available, logs a warning'''
import time
import requests
import platform
from . import services
if not services.info.eventlog:
return app_log.error('eventlog: service is not running. So Gramex update is disabled')
query = services.info.eventlog.query
update = query('SELECT * FROM events WHERE event="update" ORDER BY time DESC LIMIT 1')
delay = 24 * 60 * 60 # Wait for one day before updates
if update and time.time() < update[0]['time'] + delay:
return app_log.debug('Gramex update ran recently. Deferring check.')
meta = {
'dir': variables.get('GRAMEXDATA'),
'uname': platform.uname(),
}
if update:
events = query('SELECT * FROM events WHERE time > ? ORDER BY time',
(update[0]['time'], ))
else:
events = query('SELECT * FROM events')
logs = [dict(log, **meta) for log in events]
r = requests.post(url, data=json.dumps(logs))
r.raise_for_status()
update = r.json()
version = update['version']
# Services may return callbacks to be run at the end
for key, val in final_config.items():
if key not in conf or conf[key] != val or force_reload:
if hasattr(services, key):
app_log.debug('Loading service: %s', key)
conf[key] = deepcopy(val)
callback = getattr(services, key)(conf[key])
if callable(callback):
callbacks[key] = callback
else:
app_log.error('No service named %s', key)
# Run the callbacks. Specifically, the app service starts the Tornado ioloop
for key in (+config_layers).keys():
if key in callbacks:
app_log.debug('Running callback: %s', key)
callbacks[key]()