Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def admin_tenant(environ, start_response):
from gengine.app.admin import adminapp
def admin_app(environ, start_response):
#return HTTPSProxied(DebuggedApplication(adminapp.wsgi_app, True))(environ, start_response)
return HTTPSProxied(adminapp.wsgi_app)(environ, start_response)
def request_auth(environ, start_response):
resp = Response()
resp.status_code = 401
resp.www_authenticate = 'Basic realm="%s"' % ("Gamification Engine Admin",)
return resp(environ, start_response)
if not asbool(get_settings().get("enable_user_authentication", False)):
return admin_app(environ, start_response)
req = Request(environ)
def _get_basicauth_credentials(request):
authorization = request.headers.get("authorization","")
try:
authmeth, auth = authorization.split(' ', 1)
except ValueError: # not enough values to unpack
return None
if authmeth.lower() == 'basic':
try:
auth = base64.b64decode(auth.strip()).decode("UTF-8")
except binascii.Error: # can't decode
return None
try:
if asbool(settings.get('testing', False)):
config.include('.tests.testing_views')
config.include(tests_js)
# Load upgrades last so that all views (including testing views) are
# registered.
config.include('.upgrade')
app = config.make_wsgi_app()
if asbool(settings.get('load_sample_data', False)):
load_sample_data(app)
workbook_filename = settings.get('load_workbook', '')
load_test_only = asbool(settings.get('load_test_only', False))
docsdir = settings.get('load_docsdir', None)
if docsdir is not None:
docsdir = [path.strip() for path in docsdir.strip().split('\n')]
if workbook_filename:
load_workbook(app, workbook_filename, docsdir, test=load_test_only)
return app
def _parse_tool_definition(value_by_key, configuration_folder, tool_name):
try:
command_template = value_by_key['command_template'].strip()
except KeyError as e:
raise ToolConfigurationNotValid('command_template required')
if not command_template:
raise ToolConfigurationNotValid('command_template expected')
d = _parse_tool_arguments(value_by_key)
d['configuration_folder'] = configuration_folder
d['tool_name'] = tool_name
d['show_raw_output'] = asbool(value_by_key.get('show_raw_output'))
d['ignored_outputs'] = aslist(value_by_key.get('ignored_outputs', []))
for k, v in make_absolute_paths(d, configuration_folder).items():
if k in ('argument_names', 'show_raw_output', 'ignored_outputs'):
continue
v = v.strip()
if k.endswith('_path') and not v:
raise ToolConfigurationNotValid('file not found (%s=%s)' % (k, v))
d[unicode_safely(k)] = unicode_safely(v)
return d
def access_key_view_raw(context, request):
if asbool(request.params.get('upgrade', True)):
properties = context.upgrade_properties()
else:
properties = context.properties.copy()
del properties['secret_access_key_hash']
return properties
def setup_app(app, root, request, registry, closer, ini_location):
loader = INILoader(celery_app, ini_file=ini_location)
celery_config = loader.read_configuration()
#: TODO: There might be other variables requiring special handling
boolify(
celery_config, 'CELERY_ALWAYS_EAGER', 'CELERY_ENABLE_UTC',
'CELERY_RESULT_PERSISTENT'
)
if asbool(celery_config.get('USE_CELERYCONFIG', False)) is True:
config_path = 'celeryconfig'
celery_app.config_from_object(config_path)
else:
# TODO: Couldn't find a way with celery to do this
hijack_logger = asbool(
celery_config.get('CELERYD_HIJACK_ROOT_LOGGER', False)
)
celery_config['CELERYD_HIJACK_ROOT_LOGGER'] = hijack_logger
if hijack_logger is False:
global ini_file
ini_file = ini_location
signals.setup_logging.connect(configure_logging)
celery_app.config_from_object(celery_config)
celery_app.conf.update({'PYRAMID_APP': app})
celery_app.conf.update({'PYRAMID_ROOT': root})
celery_app.conf.update({'PYRAMID_REQUEST': request})
celery_app.conf.update({'PYRAMID_REGISTRY': registry})
"""Return a verified certificate serial from `request`, if any, else None.
Raises CertVerificationError if a certificate has been sent but was invalid
according to the front-end server that handled the SSL connection.
Currently assumes the use of nginx as the front-end server and SSL
endpoint.
"""
# test amenity
env = request.environ
settings = request.registry.settings
if 'paste.testing' in env and 'tests.auth.cert_serial' in env:
return env['tests.auth.cert_serial']
if not asbool(settings.get('auth.certs.enabled')):
return
# ATM, the cert serial is passed by the front-end server in an HTTP header.
transport = settings.get('auth.certs.transport')
if transport == 'http_headers':
verify = request.headers.get('X-Floof-SSL-Client-Verify')
serial = request.headers.get('X-Floof-SSL-Client-Serial', '')
elif transport == 'wsgi_environ':
verify = request.environ.get('floof.ssl_client_verify')
serial = request.environ.get('floof.ssl_client_serial', '')
else:
log.warning('Certificate auth enabled, but no valid transport set.')
return
serial = serial.lower()
def includeme(config): # pragma: no cover
settings = config.registry.settings
statsd_enabled = asbool(settings.get('substanced.statsd.enabled', False))
statsd_enabled = False
if statsd_enabled:
host = settings.get('substanced.statsd.host', 'localhost')
port = int(settings.get('substanced.statsd.port', 8125))
prefix = settings.get('substanced.statsd.prefix', 'substanced')
client = statsd.StatsClient(host=host, port=port, prefix=prefix)
# use a dict lookup rather than a full-on utility lookup for speed
config.registry['statsd_client'] = client
def indexing_active():
return _asbool(_get('use_elasticsearch'))
def configure_dbsession(config):
from snovault import DBSESSION
settings = config.registry.settings
DBSession = settings.pop(DBSESSION, None)
if DBSession is None:
engine = configure_engine(settings)
if asbool(settings.get('create_tables', False)):
from snovault.storage import Base
Base.metadata.create_all(engine)
import snovault.storage
import zope.sqlalchemy
from sqlalchemy import orm
DBSession = orm.scoped_session(orm.sessionmaker(bind=engine))
zope.sqlalchemy.register(DBSession)
snovault.storage.register(DBSession)
config.registry[DBSESSION] = DBSession
* base_dir
* base_url
which are passed in positionally.
Read the ``WebAssets`` docs for ``Environment`` for more details.
"""
# Make a dictionary of the webassets.* elements...
kwargs = {} # assets settings
cut_prefix = len(prefix) + 1
for k in settings:
if k.startswith(prefix):
val = settings[k]
if isinstance(val, six.string_types):
if val.lower() in auto_booly:
val = asbool(val)
elif val.lower().startswith('json:') and k[cut_prefix:] != 'manifest':
val = json.loads(val[5:])
kwargs[k[cut_prefix:]] = val
if 'base_dir' not in kwargs:
raise Exception("You need to provide webassets.base_dir in your configuration")
if 'base_url' not in kwargs:
raise Exception("You need to provide webassets.base_url in your configuration")
asset_dir = kwargs.pop('base_dir')
asset_url = kwargs.pop('base_url')
if ':' in asset_dir:
try:
resolved_dir = AssetResolver(None).resolve(asset_dir).abspath()
except ImportError: