Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
def query(self):
sql = ''
for view in self.materialised_views:
sql += 'REFRESH MATERIALIZED VIEW {view};\n'.format(view=view)
return sql
class ListParameter(luigi.Parameter):
def parse(self, s):
return s.split(' ')
def serialize(self, e):
return ' '.join(e)
class ListReportsAndRun(luigi.WrapperTask):
date_interval = luigi.DateIntervalParameter()
task = luigi.Parameter(default="InsertMeasurementsIntoPostgres")
test_names = ListParameter(default=[])
update_views = luigi.BoolParameter(default=False)
@staticmethod
def _list_reports_in_bucket(date):
ooni_private_dir = config.get("ooni", "raw-reports-dir")
bucket_path = os.path.join(ooni_private_dir, date.strftime("%Y-%m-%d"))
return get_luigi_target(bucket_path).fs.listdir(bucket_path)
def requires(self):
try:
task_factory = globals()[self.task]
except AttributeError:
def run(self):
infiles = shell('ls {input}/*.csv'.format(
input=self.input().path))
fhandle = self.output().open('w')
for infile in infiles.strip().split('\n'):
topic = os.path.split(infile)[-1].split('.csv')[0]
data = yield CopyDataToTable(resolution=self.resolution, survey=self.survey, topic=topic)
fhandle.write('{table}\n'.format(table=data.table))
fhandle.close()
def output(self):
return LocalTarget(os.path.join('tmp', classpath(self), self.task_id))
class ImportAllResolutions(WrapperTask):
survey = Parameter(default=SURVEY_CEN)
def requires(self):
for resolution in GEOGRAPHIES:
yield ImportData(resolution=resolution, survey=self.survey)
class ImportAllSurveys(WrapperTask):
resolution = Parameter(default=GEO_PR)
def requires(self):
for survey in SURVEYS:
yield ImportData(resolution=self.resolution, survey=survey)
class ImportAll(WrapperTask):
# pick up any left over in the batches
if article_institute_batcher:
article_institute_batcher.write()
if match_attempted_batcher:
match_attempted_batcher.write()
logging.info("All articles processed")
logging.info(f"Total successful fuzzy matches for institute names: {len(fuzzer.successful_fuzzy_matches)}")
logging.info(f"Total failed fuzzy matches for institute names{len(fuzzer.failed_fuzzy_matches): }")
# mark as done
logging.info("Task complete")
self.output().touch()
class GridRootTask(luigi.WrapperTask):
date = luigi.DateParameter(default=datetime.today())
db_config_path = luigi.Parameter(default="mysqldb.config")
production = luigi.BoolParameter(default=False)
drop_and_recreate = luigi.BoolParameter(default=False)
articles_from_date = luigi.Parameter(default=None)
insert_batch_size = luigi.IntParameter(default=500)
debug = luigi.BoolParameter(default=False)
def requires(self):
'''Collects the database configurations
and executes the central task.'''
logging.getLogger().setLevel(logging.INFO)
_routine_id = "{}-{}".format(self.date, self.production)
grid_task_kwargs = {
'_routine_id':_routine_id,
'db_config_path':self.db_config_path,
tasks needed to run the pipe. The WrapperTask is complete when all requirements are met.
Depending on the kind of pipe given, change our behavior between 1:1,
1:n, and n:1.
author: Kenneth Yocum
"""
from collections import defaultdict, deque
import luigi
from disdat.fs import PipeCacheEntry, DisdatFS
from disdat.pipe_base import PipeBase
class DriverTask(luigi.WrapperTask, PipeBase):
"""
Properties:
output_bundle: The name of the collection of resulting data items
param_bundles: A dictionary of arguments to the first underlying Luigi Task
pipe_cls: The pipe's class, type[disdat.Pipe.PipeTask]
input_tags:
output_tags:
force: Force recompute of dependencies (requires)
"""
output_bundle = luigi.Parameter(default=None)
pipe_params = luigi.Parameter(default=None)
pipe_cls = luigi.Parameter(default=None)
input_tags = luigi.DictParameter()
output_tags = luigi.DictParameter()
force = luigi.BoolParameter(default=False)
data_context = luigi.Parameter(significant=False)
else:
return []
def complete(self):
if self.output():
return super(luigi.WrapperTask, self).complete()
else:
# then this didn't do anything, so check that
# the requres are complete
# ... which is exactly what luigi.WrapperTask does.
# TODO: This requires knowledge of prepare_ifg,
# that is opaque. Address with refactoring.
return super(PrepareInterferogram, self).complete()
class PrepareInterferograms(IfgListMixin, luigi.WrapperTask):
"""
Wrapper function to prepare a sequence of interferogram files for
PyRate analysis using Luigi tasks. See pyrate.prepifg.prepare_ifgs() and
pyrate.tasks.prepifg.PrepareInterferogram() for further documentation.
"""
def __init__(self, *args, **kwargs):
super(PrepareInterferograms, self).__init__(*args, **kwargs)
self.extents_removed = False
def requires(self):
return [PrepareInterferogram(ifg=Ifg(path))
for path in self.ifg_tiff_list()]
def run(self):
try:
reqs.append(RunTCBand(self.level1, self.work_root, self.granule,
self.group, band_num=band))
return reqs
def output(self):
out_path = acquisitions(self.level1).get_root(self.work_root,
self.group, self.granule)
return luigi.LocalTarget(pjoin(out_path, 'reflectance.h5'))
def run(self):
with self.output().temporary_path() as out_fname:
fnames = [target.path for target in self.input()]
link_reflectance_data(fnames, out_fname)
class NBAR(luigi.WrapperTask):
"""Kicks off NBAR tasks for each level1 entry."""
level1_csv = luigi.Parameter()
output_directory = luigi.Parameter()
work_extension = luigi.Parameter(default='.gaip-work', significant=False)
def requires(self):
with open(self.level1_csv) as src:
level1_scenes = [scene.strip() for scene in src.readlines()]
for scene in level1_scenes:
work_name = basename(scene) + self.work_extension
work_root = pjoin(self.output_directory, work_name)
container = acquisitions(scene)
for granule in container.granules:
'''
Root Task (generic)
===================
Luigi routine to collect all GtR data, geocode and load to MYSQL.
'''
import datetime
import logging
import luigi
from nesta.core.routines.gtr.gtr_geocode import GtrGeocode
class RootTask(luigi.WrapperTask):
'''A dummy root task, which collects the database configurations
and executes the central task.
Args:
date (datetime): Date used to label the outputs
'''
date = luigi.DateParameter(default=datetime.date.today())
page_size = luigi.IntParameter(default=10)
production = luigi.BoolParameter(default=False)
def requires(self):
'''Collects the database configurations and executes the central task.'''
_routine_id = "{}-{}".format(self.date, self.production)
log_stream_handler = logging.StreamHandler()
log_file_handler = logging.FileHandler('logs.log')
logging.basicConfig(handlers=(log_stream_handler, log_file_handler),
fold = luigi.Parameter()
weight = luigi.IntParameter()
def requires(self):
return SparkMergeFeatures()
def output(self):
return (LocalTarget(
'output/vw_input/{0}.sentence.{1}.vw_input.gz'.format(self.fold, self.weight)),
LocalTarget('output/vw_input/{0}.sentence.{1}.meta'.format(self.fold, self.weight)))
def run(self):
call(['bash', 'bin/vw_merge.sh', self.fold, str(self.weight)])
class VWMergeAllFeatures(luigi.WrapperTask):
def requires(self):
for fold, weight in product(C.FOLDS, C.NEGATIVE_WEIGHTS):
yield VWMergeFeature(fold=fold, weight=weight)
class VWModel(luigi.Task):
weight = luigi.IntParameter()
def requires(self):
return VWMergeFeature(fold='dev', weight=self.weight)
def output(self):
return LocalTarget('output/models/sentence.{0}.vw'.format(self.weight))
def run(self):
call([
'''
arXiv data collection and processing
==================================
Luigi routine to collect all data from the arXiv api and load it to MySQL.
'''
import datetime
import logging
import luigi
from nesta.production.routines.arxiv.arxiv_grid_task import GridTask
class RootTask(luigi.WrapperTask):
'''A dummy root task, which collects the database configurations
and executes the central task.
Args:
date (datetime): Date used to label the outputs
db_config_path (str): Path to the MySQL database configuration
production (bool): Flag indicating whether running in testing
mode (False, default), or production mode (True).
'''
date = luigi.DateParameter(default=datetime.date.today())
db_config_path = luigi.Parameter(default="mysqldb.config")
production = luigi.BoolParameter(default=False)
articles_from_date = luigi.Parameter(default=None)
insert_batch_size = luigi.IntParameter(default=500)
debug = luigi.BoolParameter(default=False)
while True:
logger.info('times: %s', times)
self.heartbeat()
if self.cancel_requested:
logger.info('cancel requested')
self.ack_cancel()
return
times -= 1
if times == 0:
break
sleep(15 * seconds)
logger.info('done')
call(['touch', self.output().path])
class DemoCancelWorkflow(luigi.WrapperTask):
swf_wf_start_to_close_timeout = 1 * hours
def requires(self):
return DemoCancelTask()
if __name__ == '__main__':
task = DemoCancelWorkflow()
domain = 'development'
version = 'unspecified'
ex = LuigiSwfExecutor(domain, version, task)
ex.register()
ex.execute()