Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_find_config():
files = write_files({
'base.conf': dedent("""\
[datacube]
db_hostname: fakehost.test.lan
"""),
'override.conf': dedent("""\
[datacube]
db_hostname: overridden.test.lan
db_database: overridden_db
""")
})
# One config file
config = LocalConfig.find(paths=[str(files.joinpath('base.conf'))])
assert config['db_hostname'] == 'fakehost.test.lan'
# Not set: uses default
assert config['db_database'] == 'datacube'
# Now two config files, with the latter overriding earlier options.
config = LocalConfig.find(paths=[str(files.joinpath('base.conf')),
str(files.joinpath('override.conf'))])
assert config['db_hostname'] == 'overridden.test.lan'
assert config['db_database'] == 'overridden_db'
def load_config(config_file, env=None):
""" Load configuration from file. """
paths = DEFAULT_CONF_PATHS + (config_file,)
return LocalConfig.find(paths=paths, env=env)
set_env(DATACUBE_DB_URL='postgresql://uu:%20pass%40@host.tld:3344/db')
cfg = LocalConfig.find()
assert '3344' in str(cfg)
assert '3344' in repr(cfg)
assert cfg['db_username'] == 'uu'
assert cfg['db_password'] == ' pass@'
assert cfg['db_hostname'] == 'host.tld'
assert cfg['db_database'] == 'db'
assert cfg['db_port'] == '3344'
set_env(DB_DATABASE='dc2',
DB_HOSTNAME='remote.db',
DB_PORT='4433',
DB_USERNAME='dcu',
DB_PASSWORD='gg')
cfg = LocalConfig.find()
assert cfg['db_username'] == 'dcu'
assert cfg['db_password'] == 'gg'
assert cfg['db_hostname'] == 'remote.db'
assert cfg['db_database'] == 'dc2'
assert cfg['db_port'] == '4433'
# workers need everything, so we can save some time
data = self._file_transfer.fetch_payload(False)
# Sub-job data?
if 'job' in data:
self._job_params = data['job']
# Now fetch function params from S3
params_url = data['params_url'].lstrip('URL:')
file_transfer2 = FileTransfer(url=params_url)
self._input_params = file_transfer2.fetch_payload(False)
else:
self._input_params = data
# Initialise datacube
if 'paths' in self._input_params and 'env' in self._input_params and \
self._input_params['paths'] and self._input_params['env']:
config = LocalConfig.find(self._input_params['paths'], self._input_params['env'])
else:
config = LocalConfig.find()
self._datacube = Datacube(config=config)
self._store = StoreWorkers(**config.redis_config)
self._ee_config = config.execution_engine_config
self._result_bucket = self._ee_config['result_bucket']
# Worker should only produce output in the result_bucket
self._file_transfer.bucket = self._ee_config['result_bucket']
self._id = self._store.add_worker(WorkerMetadata(name, worker_type, time()),
WorkerStatuses.ALIVE)
message = '{}: Initialised'.format(self)
self._store.add_worker_logs(self._id, message)
self.logger.debug(message)
def new_func(*args, **kwargs):
obj = click.get_current_context().obj
paths = obj.get('config_files') or config.DEFAULT_CONF_PATHS
# If the user is overriding the defaults
specific_environment = obj.get('config_environment')
try:
parsed_config = config.LocalConfig.find(paths=paths, env=specific_environment)
except ValueError:
if specific_environment:
raise click.ClickException("No datacube config found for '{}'".format(specific_environment))
else:
raise click.ClickException("No datacube config found")
_LOG.debug("Loaded datacube config: %r", parsed_config)
return f(parsed_config, *args, **kwargs)
def ingestion_work(output_type, source_type, ingestion_definition):
"""Run the ingestion process for a user defined configuration
Args:
output_type, source_type: types produced by ingest.make_output_type
ingestion_definition: dict representing a Data Cube ingestion def produced using the utils func.
"""
conf_path = '/home/' + settings.LOCAL_USER + '/Datacube/data_cube_ui/config/.datacube.conf'
index = index_connect(local_config=LocalConfig.find([conf_path]))
tasks = ingest.create_task_list(index, output_type, None, source_type, ingestion_definition)
# this is a dry run
# paths = [ingest.get_filename(ingestion_definition, task['tile_index'], task['tile'].sources) for task in tasks]
# ingest.check_existing_files(paths)
# this actually ingests stuff
successful, failed = ingest.process_tasks(index, ingestion_definition, source_type, output_type, tasks, 3200,
get_executor(None, None))
index.close()
return 0
index on initialisation. Caution: In the current
implementation all parameters get passed to all potential
indexes.
:param kargs: Optional keyword arguments to be passed to the
index on initialisation. Caution: In the current
implementation all parameters get passed to all potential
indexes.
"""
self.logger = logging.getLogger(self.__class__.__name__)
if index is None:
local_config = kargs['local_config'] if 'local_config' in kargs else None
application_name = kargs['application_name'] if 'application_name' in kargs else None
validate_connection = kargs['validate_connection'] if 'validate_connection' in kargs else True
if local_config is None:
local_config = LocalConfig.find()
db = PostgresDb.from_config(local_config,
application_name=application_name,
validate_connection=validate_connection)
else:
db = index._db # pylint: disable=protected-access
super(Index, self).__init__(db)
def normalise_config(config):
if config is None:
return LocalConfig.find(env=env)
if isinstance(config, str):
return LocalConfig.find([config], env=env)
return config
def celery_app(store_config=None):
try:
if store_config is None:
local_config = LocalConfig.find()
store_config = local_config.redis_celery_config
_app = Celery('ee_task', broker=store_config['url'], backend=store_config['url'])
except ValueError:
_app = Celery('ee_task')
_app.conf.update(
task_serializer='pickle',
result_serializer='pickle',
accept_content=['pickle'],
worker_prefetch_multiplier=1,
broker_pool_limit=100,
broker_connection_retry=True,
broker_connection_timeout=4,
broker_transport_options={'socket_keepalive': True, 'retry_on_timeout': True,
'socket_connect_timeout': 10, 'socket_timeout': 10},
redis_socket_connect_timeout=10,
def db_connect(cfg=None):
""" Create database connection from datacube config.
cfg:
None -- use default datacube config
str -- use config with a given name
LocalConfig -- use loaded config object
"""
from datacube.config import LocalConfig
import psycopg2
if isinstance(cfg, str) or cfg is None:
cfg = LocalConfig.find(env=cfg)
cfg_remap = dict(dbname='db_database',
user='db_username',
password='db_password',
host='db_hostname',
port='db_port')
pg_cfg = {k: cfg.get(cfg_name, None)
for k, cfg_name in cfg_remap.items()}
return psycopg2.connect(**pg_cfg)