Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, filename):
"""
Init object from filename. filenamebase *emonitor/modules/*
path: [module]/templates/[filename]
:param filename: print.[layout].html
"""
self.module = filename.split('.')[0]
self.filename = '.'.join(filename.split('.')[1:])
self.parameters = []
env = Environment(loader=PackageLoader('emonitor.modules.{}'.format(self.module), 'templates'))
if not current_app:
from emonitor import app
env.filters.update(app.jinja_env.filters)
else:
env.filters.update(current_app.jinja_env.filters)
parsed_content = env.parse(env.loader.get_source(env, self.filename)[0])
parameters = meta.find_undeclared_variables(parsed_content)
for p in filter(lambda x: x.startswith('param_'), parameters):
self.parameters.append(LayoutParameter(p))
def info_handler():
keys_str = ",".join(list(current_app.text_models.keys()))
return Response(
response=json.dumps({
'author': "astoliarov",
"info": "Бот, генератор предложений, на основе твиттер аккаунтов. Есть корпусы для генерации твитов от {}".
format(keys_str)
}),
status=200,
mimetype="application/json",
)
variation['link'] = generate_link(backend, variation['file_path'],
project_id)
# Construct the new expiry datetime.
validity_secs = current_app.config['FILE_LINK_VALIDITY'][backend]
response['link_expires'] = now + datetime.timedelta(seconds=validity_secs)
patch_info = remove_private_keys(response)
# The project could have been soft-deleted, in which case it's fine to
# update the links to the file. However, Eve/Cerberus doesn't allow this;
# removing the 'project' key from the PATCH works around this.
patch_info.pop('project', None)
file_id = ObjectId(response['_id'])
(patch_resp, _, _, _) = current_app.patch_internal('files', patch_info,
_id=file_id)
if patch_resp.get('_status') == 'ERR':
log.warning('Unable to save new links for file %s: %r',
response['_id'], patch_resp)
# TODO: raise a snag.
response['_updated'] = now
else:
response['_updated'] = patch_resp['_updated']
# Be silly and re-fetch the etag ourselves. TODO: handle this better.
etag_doc = current_app.data.driver.db['files'].find_one({'_id': file_id},
{'_etag': 1})
response['_etag'] = etag_doc['_etag']
def api_user(request):
api_key = request.headers.get('X-JETO-KEY')
if api_key:
key = APIKey.query.filter_by(name=api_key).first()
if key:
identity_changed.send(
current_app._get_current_object(),
identity=Identity(key.user.id)
)
return key.user
# finally, return None if both methods did not login the user
return None
def tc_purgecache(body):
"""Purge cache on taskcluster
"""
credentials = []
client_id = current_app.config.get('TASKCLUSTER_CLIENT_ID')
access_token = current_app.config.get('TASKCLUSTER_ACCESS_TOKEN')
if client_id and access_token:
credentials = [dict(
credentials=dict(
clientId=client_id,
accessToken=access_token,
))]
purge_cache = taskcluster.PurgeCache(*credentials)
for item in body:
purge_cache.purgeCache(item.provisionerId,
item.workerType,
dict(cacheName=item.cacheName))
return None
from alembic import context
from flask import current_app
from sqlalchemy import engine_from_config, pool
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
logger = logging.getLogger("alembic.env")
config.set_main_option(
"sqlalchemy.url", current_app.config.get("SQLALCHEMY_DATABASE_URI")
)
target_metadata = current_app.extensions["migrate"].db.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
def export_event_json(event_id, settings):
"""
Exports the event as a zip on the server and return its path
"""
# make directory
exports_dir = app.config['BASE_DIR'] + '/static/uploads/exports/'
if not os.path.isdir(exports_dir):
os.mkdir(exports_dir)
dir_path = exports_dir + 'event%d' % event_id
if os.path.isdir(dir_path):
shutil.rmtree(dir_path, ignore_errors=True)
os.mkdir(dir_path)
# save to directory
for e in EXPORTS:
if e[0] == 'event':
data = _order_json(marshal(e[1].get(event_id), e[2]), e)
_download_media(data, 'event', dir_path, settings)
else:
data = marshal(e[1].list(event_id), e[2])
for count in range(len(data)):
data[count] = _order_json(data[count], e)
_download_media(data[count], e[0], dir_path, settings)
def _update_name(file_id, file_props):
files_collection = current_app.data.driver.db['files']
file_doc = files_collection.find_one({'_id': ObjectId(file_id)})
if file_doc is None or file_doc['backend'] != 'gcs':
return
# For textures -- the map type should be part of the name.
map_type = file_props.get('map_type', u'')
storage = GoogleCloudStorageBucket(str(node['project']))
blob = storage.Get(file_doc['file_path'], to_dict=False)
# Pick file extension from original filename
_, ext = os.path.splitext(file_doc['filename'])
name = _format_name(node['name'], ext, map_type=map_type)
storage.update_name(blob, name)
# Assign the same name to variations
def get_methods():
if methods is not None:
return methods
options_resp = current_app.make_default_options_response()
return options_resp.headers['allow']
def load_user(user_login_id):
try:
user = db_user.get_by_login_id(user_login_id)
except Exception as e:
current_app.logger.error("Error while getting user by login ID: %s", str(e), exc_info=True)
return None
if user:
return User.from_dbrow(user)
else:
return None