Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUpClass(cls):
cls.log_file = conf.log.handlers.requests.filename
cls.columns = conf.log.handlers.requests['keys']
cls.dirpath = os.path.dirname(cls.log_file)
cls.dbpath = os.path.join(cls.dirpath, 'logviewer.db')
cls.queryconf = cls.get_keywith(conf.url, 'apps/logviewer/query-')
schd = cls.get_keywith(info.schedule, 'apps/logviewer-')
# check schedule is present in config
ok_(schd is not None)
# Remove logviewer.db before running scheduler
os.remove(cls.dbpath)
# run logviewer scheduler once
schd.run()
# df with raw logs
df = pd.concat([
gramex.cache.open(f, 'csv', names=cls.columns).fillna('-')
for f in glob(cls.log_file + '*')
], ignore_index=True)
cls.df = logviewer.prepare_logs(df)
# test patch
temp = {key: val}
self.json('put', '/json/path/', temp, data=temp)
self.json('patch', '/json/path/', temp, data=temp)
self.match_jsonfile(temp)
self.json('patch', '/json/path/', {key: val2}, data={key: val2})
temp.update({key: val2})
self.match_jsonfile(temp)
self.json('patch', '/json/path/', {key2: val}, data={key2: val})
temp.update({key2: val})
self.match_jsonfile(temp)
# Test store contents
self.assertEqual(store['json/get'],
conf.url['json/get'].kwargs.data)
# Ensure that the JSON file in the path is stored in jsonhander.store
path = conf.url['json/path'].kwargs.path
with io.open(path, 'r') as handle: # noqa
data = json.load(handle)
self.assertEqual(store[path], data)
# cleanup
self.json('delete', '/json/path/', None)
def setUpClass(cls):
super(DBAuthBase, cls).setUpClass()
config = gramex.conf.url['auth/dbexcel'].kwargs
cls.create_database(config.url)
cls.url = server.base_url + '/auth/dbexcel'
def test_schedule(self):
# Run this test as soon as Gramex starts to check if schedule has run.
self.assertIn('schedule-key', info)
self.check('/', code=OK)
# This tests that long running threads run in parallel. We run
# utils.slow_count every 10ms for kwargs.count seconds. If this test
# fails, increase schedule.schedule-startup-slow.kwargs.count. It may be
# due to slow startup.
max_count = conf.schedule['schedule-startup-slow'].kwargs.count - 1
ok_(0 < info['schedule-count'] < max_count, 'Schedule still running')
def setUpClass(cls):
cls.log_file = conf.log.handlers.requests.filename
cls.columns = conf.log.handlers.requests['keys']
cls.dirpath = os.path.dirname(cls.log_file)
cls.dbpath = os.path.join(cls.dirpath, 'logviewer.db')
cls.queryconf = cls.get_keywith(conf.url, 'apps/logviewer/query-')
schd = cls.get_keywith(info.schedule, 'apps/logviewer-')
# check schedule is present in config
ok_(schd is not None)
# Remove logviewer.db before running scheduler
os.remove(cls.dbpath)
# run logviewer scheduler once
schd.run()
# df with raw logs
df = pd.concat([
gramex.cache.open(f, 'csv', names=cls.columns).fillna('-')
for f in glob(cls.log_file + '*')
], ignore_index=True)
def get_schedule(service_type):
import json
import pandas as pd
from cron_descriptor import get_description
data = []
for key, info in gramex.conf.get(service_type, {}).items():
entry = dict(info)
entry['name'] = key
entry['args'] = json.dumps(entry.get('args', []))
entry['kwargs'] = json.dumps(entry.get('kwargs', {}))
entry['schedule'] = ''
if key not in gramex.service[service_type]:
entry['schedule'] = 'NA'
data.append(entry)
continue
schedule = gramex.service[service_type][key]
entry['next'] = schedule.next * 1000 if schedule.next else None
if hasattr(schedule, 'cron_str'):
cron = schedule.cron_str
# cron_descriptor requires year to start with a number
if cron.endswith(' *'):
cron = cron[:-2]
def config(handler):
'''Dump the final resolved config'''
return yaml.dump(gramex.conf, default_flow_style=False)
def setup(cls, **kwargs):
super(FormHandler, cls).setup(**kwargs)
conf_kwargs = merge(AttrDict(cls.conf.kwargs),
objectpath(gramex_conf, 'handlers.FormHandler', {}),
'setdefault')
cls.headers = conf_kwargs.pop('headers', {})
# Top level formats: key is special. Don't treat it as data
cls.formats = conf_kwargs.pop('formats', {})
default_config = conf_kwargs.pop('default', None)
# Remove other known special keys from dataset configuration
cls.clear_special_keys(conf_kwargs)
# If top level has url: then data spec is at top level. Else it's a set of sub-keys
if 'url' in conf_kwargs:
cls.datasets = {'data': conf_kwargs}
cls.single = True
else:
if 'modify' in conf_kwargs:
cls.modify_all = staticmethod(build_transform(
conf={'function': conf_kwargs.pop('modify', None)},
vars=cls.function_vars['modify'],
# get this month log files if db is already created
if table_exists(table(levels[-1]), conn):
max_date = pd.read_sql(
'SELECT MAX(time) FROM {}'.format(
table(levels[-1])), conn).iloc[0, 0]
app_log.info('logviewer: last processed till %s', max_date)
this_month = max_date[:8] + '01'
log_files = [f for f in log_files if filesince(f, this_month)]
max_date = pd.to_datetime(max_date)
if not log_files:
app_log.info('logviewer: no log files to process')
return
# Create dataframe from log files
columns = conf.log.handlers.requests['keys']
# TODO: avoid concat?
app_log.info('logviewer: files to process %s', log_files)
data = pd.concat([
pd.read_csv(f, names=columns, encoding='utf-8').fillna('-')
for f in log_files
], ignore_index=True)
app_log.info(
'logviewer: prepare_logs {} rows with {} mint session_threshold'.format(
len(data.index), session_threshold))
data = prepare_logs(df=data,
session_threshold=session_threshold,
cutoff_buffer=cutoff_buffer)
app_log.info('logviewer: processed and returned {} rows'.format(len(data.index)))
# apply transforms on raw data
app_log.info('logviewer: applying transforms')
for spec in transforms: