Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, fs_environ, fs_environ_info, test=False, use_es=True):
# FOURSIGHT information
self.fs_env = fs_environ
es = ESConnection(index=fs_environ_info.get('bucket')) if use_es else None
self.connections = {
's3': S3Connection(fs_environ_info.get('bucket')),
'es': es
}
# FOURFRONT information
self.ff_server = fs_environ_info['fourfront']
self.ff_env = fs_environ_info['ff_env']
self.ff_es = fs_environ_info['es']
if not test:
self.ff_s3 = s3Utils(env=self.ff_env)
try:
self.ff_keys = self.ff_s3.get_access_keys('access_key_foursight')
except Exception as e:
raise Exception('Could not initiate connection to Fourfront; it is probably a bad ff_env. '
'You gave: %s. Error message: %s' % (self.ff_env, str(e)))
# ensure ff_keys has server, and make sure it does not end with '/'
if 'server' not in self.ff_keys:
server = self.ff_server[:-1] if self.ff_server.endswith('/') else self.ff_server
self.ff_keys['server'] = server
else:
self.ff_s3 = None
self.ff_keys = None
def __init__(
self,
ff_access_keys = None,
google_api_key = None,
s3UtilsInstance = None,
extra_config = DEFAULT_GOOGLE_API_CONFIG
):
'''Authenticate with Google APIs and initialize sub-class instances.'''
if s3UtilsInstance is None:
self._s3Utils = s3_utils.s3Utils(env='data') # Google API Keys are stored on production bucket only ATM.
else:
self._s3Utils = s3UtilsInstance
if google_api_key is None:
self._api_key = self._s3Utils.get_google_key()
if not self._api_key:
raise Exception("Failed to get Google API key from S3.")
else:
self._api_key = google_api_key
if not GoogleAPISyncer.validate_api_key_format(self._api_key):
raise Exception("Google API Key is in invalid format.")
self.extra_config = extra_config
self.credentials = Credentials.from_service_account_info(
self._api_key,
def get_s3_utils_obj(connection):
"""
Returns a dcicutils.s3_utils.s3Utils object
"""
# custom handling of data and staging
if connection.fs_environment in ['data', 'staging']:
use_env = 'fourfront-webprod'
else:
use_env = connection.ff_env
return s3_utils.s3Utils(env=use_env)