Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from exaslct_src.lib.base.dependency_logger_base_task import DependencyLoggerBaseTask
from exaslct_src.lib.base.docker_base_task import DockerBaseTask
from exaslct_src.lib.base.json_pickle_parameter import JsonPickleParameter
from exaslct_src.lib.data.container_info import ContainerInfo
from exaslct_src.lib.data.database_info import DatabaseInfo
from exaslct_src.lib.test_runner.container_log_thread import ContainerLogThread
from exaslct_src.lib.test_runner.database_credentials import DatabaseCredentialsParameter
from exaslct_src.lib.test_runner.is_database_ready_thread import IsDatabaseReadyThread
class WaitForTestDockerDatabase(DockerBaseTask, DatabaseCredentialsParameter):
environment_name = luigi.Parameter()
test_container_info = JsonPickleParameter(ContainerInfo, significant=False) # type: ContainerInfo
database_info = JsonPickleParameter(DatabaseInfo, significant=False) # type: DatabaseInfo
db_startup_timeout_in_seconds = luigi.IntParameter(10 * 60, significant=False)
attempt = luigi.IntParameter(1)
def run_task(self):
test_container = self._client.containers.get(self.test_container_info.container_name)
db_container_name = self.database_info.container_info.container_name
db_container = self._client.containers.get(db_container_name)
is_database_ready = \
self.wait_for_database_startup(test_container, db_container)
after_startup_db_log_file = self.get_log_path().joinpath("after_startup_db_log.tar.gz")
self.save_db_log_files_as_gzip_tar(after_startup_db_log_file, db_container)
self.return_object(is_database_ready)
def wait_for_database_startup(self,
test_container: Container,
db_container: Container):
container_log_thread, is_database_ready_thread = \
self.start_wait_threads(db_container, test_container)
#
class SimpleStitchAssignmentsBase(luigi.Task):
""" SimpleStitchAssignments base class
"""
task_name = 'simple_stitch_assignments'
src_file = os.path.abspath(__file__)
allow_retry = False
problem_path = luigi.Parameter()
features_key = luigi.Parameter()
graph_key = luigi.Parameter()
assignments_path = luigi.Parameter()
assignments_key = luigi.Parameter()
edge_size_threshold = luigi.IntParameter(default=0)
serialize_edges = luigi.BoolParameter(default=False)
# task that is required before running this task
dependency = luigi.TaskParameter()
def requires(self):
return self.dependency
def run_impl(self):
shebang, block_shape, roi_begin, roi_end = self.global_config_values()
self.init(shebang)
with vu.file_reader(self.problem_path, 'r') as f:
shape = f[self.graph_key].attrs['shape']
block_list = vu.blocks_in_volume(shape, block_shape,
roi_begin, roi_end)
n_jobs = min(len(block_list), self.max_jobs)
# # Copy input files if requested
# if self.save_input_files:
# input_file_save_dir = self.output_directory + "_input_files"
# for dataset in tile.datasets:
# os.link(dataset.path, input_file_save_dir)
yield tile
class CsvTileListTask(luigi.Task):
x = luigi.IntParameter()
y = luigi.IntParameter()
year_min = luigi.IntParameter()
year_max = luigi.IntParameter()
satellites = luigi.Parameter(is_list=True)
output_directory = luigi.Parameter()
def output(self):
return luigi.LocalTarget(os.path.join(self.output_directory,
"TILES_{x:03d}_{y:04d}_{year_min:04d}_{year_max:04d}.csv"
.format(x=self.x,
y=self.y,
year_min=self.year_min,
year_max=self.year_max)))
def run(self):
config = Config(os.path.expanduser("~/.datacube/config"))
_log.debug(config.to_str())
import nifty
# TODO more clean up (job config files)
# TODO computation with rois
class WriteAssignmentTask(luigi.Task):
"""
Write node assignments for all blocks
"""
path = luigi.Parameter()
in_key = luigi.Parameter()
out_key = luigi.Parameter()
config_path = luigi.Parameter()
# maximal number of jobs that will be run in parallel
max_jobs = luigi.IntParameter()
tmp_folder = luigi.Parameter()
# identifier to seperate the output files of different write
# assignment tasks
identifier = luigi.Parameter()
# TODO would be more elegant to express both as tasks,
# but for this we would need an empty default task
# for the offsets
dependency = luigi.TaskParameter()
offset_path = luigi.Parameter(default='')
# FIXME default does not work; this still needs to be specified
time_estimate = luigi.IntParameter(default=10)
run_local = luigi.BoolParameter(default=False)
def requires(self):
return self.dependency
the file system operations, please do and contribute back.
"""
from luigi.contrib.hdfs import config as hdfs_config
from luigi.contrib.hdfs import abstract_client as hdfs_abstract_client
import luigi.contrib.target
import logging
import os
import warnings
logger = logging.getLogger('luigi-interface')
class webhdfs(luigi.Config):
port = luigi.IntParameter(default=50070,
description='Port for webhdfs')
user = luigi.Parameter(default='', description='Defaults to $USER envvar',
config_path=dict(section='hdfs', name='user'))
client_type = luigi.ChoiceParameter(var_type=str, choices=['insecure', 'kerberos'],
default='insecure', description='Type of hdfs client to use.')
class WebHdfsClient(hdfs_abstract_client.HdfsFileSystem):
"""
A webhdfs that tries to confirm to luigis interface for file existence.
The library is using `this api
`__.
"""
def __init__(self, host=None, port=None, user=None, client_type=None):
import sys
import logging
from luigi import IntParameter
from luigi.setup_logging import InterfaceLogging
class retcode(luigi.Config):
"""
See the :ref:`return codes configuration section `.
"""
# default value inconsistent with doc/configuration.rst for backwards compatibility reasons
unhandled_exception = IntParameter(default=4,
description='For internal luigi errors.',
)
# default value inconsistent with doc/configuration.rst for backwards compatibility reasons
missing_data = IntParameter(default=0,
description="For when there are incomplete ExternalTask dependencies.",
)
# default value inconsistent with doc/configuration.rst for backwards compatibility reasons
task_failed = IntParameter(default=0,
description='''For when a task's run() method fails.''',
)
# default value inconsistent with doc/configuration.rst for backwards compatibility reasons
already_running = IntParameter(default=0,
description='For both local --lock and luigid "lock"',
)
# default value inconsistent with doc/configuration.rst for backwards compatibility reasons
scheduling_error = IntParameter(default=0,
description='''For when a task's complete() or requires() fails,
or task-limit reached'''
)
# default value inconsistent with doc/configuration.rst for backwards compatibility reasons
"""
# path to the n5 file and keys
path = luigi.Parameter()
aff_key = luigi.Parameter()
ws_key = luigi.Parameter()
out_key = luigi.Parameter()
# maximal number of jobs that will be run in parallel
max_jobs = luigi.IntParameter()
# path to the configuration
# TODO allow individual paths for individual blocks
config_path = luigi.Parameter()
tmp_folder = luigi.Parameter()
dependency = luigi.TaskParameter()
# FIXME default does not work; this still needs to be specified
time_estimate = luigi.IntParameter(default=10)
run_local = luigi.BoolParameter(default=False)
# TODO optional parameter to just run a subset of blocks
def requires(self):
return self.dependency
def _prepare_jobs(self, n_jobs, block_list, config, prefix):
for job_id in range(n_jobs):
block_jobs = block_list[job_id::n_jobs]
job_config = {'config': config,
'block_list': block_jobs}
config_path = os.path.join(self.tmp_folder, 'merge_votes_config_%s_job%i.json' % (prefix, job_id))
with open(config_path, 'w') as f:
json.dump(job_config, f)
def _submit_job(self, job_id, prefix):
#
# Graph Tasks
#
class MergeSubGraphsBase(luigi.Task):
""" MergeSubGraph base class
"""
task_name = 'merge_sub_graphs'
src_file = os.path.abspath(__file__)
# input volumes and graph
graph_path = luigi.Parameter()
scale = luigi.IntParameter()
output_key = luigi.Parameter(default='')
merge_complete_graph = luigi.BoolParameter(default=False)
# dependency
dependency = luigi.TaskParameter()
def requires(self):
return self.dependency
def clean_up_for_retry(self, block_list):
super().clean_up_for_retry(block_list)
# TODO remove any output of failed blocks because it might be corrupted
def _run_scale(self, config, block_shape, roi_begin, roi_end):
# make graph file and write shape as attribute
with vu.file_reader(self.graph_path) as f:
shape = f.attrs['shape']
launch_name = luigi.Parameter()
portfolio = luigi.Parameter()
product = luigi.Parameter()
version = luigi.Parameter()
account_id = luigi.Parameter()
region = luigi.Parameter()
puppet_account_id = luigi.Parameter()
parameters = luigi.ListParameter(default=[])
ssm_param_inputs = luigi.ListParameter(default=[])
dependencies = luigi.ListParameter(default=[])
retry_count = luigi.IntParameter(default=1)
worker_timeout = luigi.IntParameter(default=0, significant=False)
ssm_param_outputs = luigi.ListParameter(default=[])
try_count = 1
def requires(self):
version_id = GetVersionIdByVersionName(
self.portfolio,
self.product,
self.version,
self.account_id,
self.region,
)
product_id = GetProductIdByProductName(
self.portfolio,
self.product,
"""
An task for killing any open Redshift sessions
in a given database. This is necessary to prevent open user sessions
with transactions against the table from blocking drop or truncate
table commands.
Usage:
Subclass and override the required `host`, `database`,
`user`, and `password` attributes.
"""
# time in seconds to wait before
# reconnecting to Redshift if our session is killed too.
# 30 seconds is usually fine; 60 is conservative
connection_reset_wait_seconds = luigi.IntParameter(default=60)
@abc.abstractproperty
def host(self):
return None
@abc.abstractproperty
def database(self):
return None
@abc.abstractproperty
def user(self):
return None
@abc.abstractproperty
def password(self):
return None