Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from nesta.core.orms.orm_utils import insert_data
from nesta.core.orms.orm_utils import get_class_by_tablename
from nesta.core.orms.uk_geography_lookup_orm import Base
from nesta.core.luigihacks import misctools
from nesta.core.luigihacks.mysqldb import MySqlTarget
from collections import defaultdict
import datetime
import luigi
MYSQLDB_ENV = 'MYSQLDB'
class UkGeoLookupTask(luigi.Task):
production = luigi.BoolParameter(default=False)
date = luigi.DateParameter(default=datetime.date.today())
def output(self):
'''Points to the output database engine'''
db_config = misctools.get_config("mysqldb.config", "mysqldb")
db_config["database"] = "production" if self.production else "dev"
db_config["table"] = "UK Geography Lookup (dummy) "
update_id = db_config["table"]+str(self.date)
return MySqlTarget(update_id=update_id, **db_config)
def run(self):
# Get all UK geographies, and group by country and base
gss_codes = get_gss_codes()
country_codes = defaultdict(lambda: defaultdict(list))
for code in gss_codes:
country = code[0]
def kwarg_maker(dataset, routine_id):
env_files=[f3p('config/mysqldb.config'),
f3p('config/elasticsearch.config'),
f3p('schema_transformations/eurito/'),
f3p('nesta')]
batchable=f3p(f'batchables/eurito/{dataset}_eu')
return dict(dataset=f'{dataset}-eu',
routine_id=f'{dataset}-eu_{routine_id}',
env_files=env_files,
batchable=batchable)
class RootTask(luigi.WrapperTask):
process_batch_size = luigi.IntParameter(default=1000)
production = luigi.BoolParameter(default=False)
date = luigi.DateParameter(default=dt.now())
drop_and_recreate = luigi.BoolParameter(default=False)
def requires(self):
test = not self.production
set_log_level(True)
routine_id = f'EURITO-ElasticsearchTask-{self.date}-{test}'
default_kwargs = dict(date=self.date,
process_batch_size=self.process_batch_size,
drop_and_recreate=self.drop_and_recreate,
job_def='py36_amzn1_image',
job_name=routine_id,
job_queue='HighPriority',
region_name='eu-west-2',
poll_time=10,
max_live_jobs=300,
db_config_env='MYSQLDB',
return datetime.timedelta(999999999)
return abs(date - self.date)
closest = min(pathmap.keys(), key=closest_keyfun)
if closest > self.date:
raise RuntimeError('No shipment before: %s' % min(pathmap.keys()))
with self.output().open('w') as output:
output.write_tsv(closest, pathmap.get(closest))
def output(self):
return luigi.LocalTarget(path=self.path(), format=TSV)
class LFERLatestDate(LFERTask):
""" Only output the latest date. """
date = luigi.DateParameter(default=datetime.date.today())
indicator = luigi.Parameter(default=hourly(fmt='%s'))
def requires(self):
return LFERLatestDateAndPath(date=self.date, indicator=self.indicator)
@timed
def run(self):
output = shellout("awk '{{print $1}}' {input} > {output}",
input=self.input().path)
luigi.File(output).move(self.output().path)
def output(self):
return luigi.LocalTarget(path=self.path(), format=TSV)
class LFERMarc(LFERTask):
""" Import a single date or fail if there was no shipment on that date. """
class DateTask(luigi.WrapperTask):
'''Collect new data from the arXiv api and dump the
data in the MySQL server.
Args:
date (datetime): Datetime used to label the outputs
_routine_id (str): String used to label the AWS task
db_config_env (str): environmental variable pointing to the db config file
db_config_path (str): The output database configuration
insert_batch_size (int): number of records to insert into the database at once
articles_from_date (str): new and updated articles from this date will be
retrieved. Must be in YYYY-MM-DD format
'''
date = luigi.DateParameter()
_routine_id = luigi.Parameter()
test = luigi.BoolParameter(default=True)
db_config_path = luigi.Parameter(default="mysqldb.config")
db_config_env = luigi.Parameter()
insert_batch_size = luigi.IntParameter(default=500)
articles_from_date = luigi.Parameter(default=None)
def requires(self):
"""
Collects the last date of successful update from the database and launches the
iterative data collection task.
"""
# database setup
database = 'dev' if self.test else 'production'
logging.warning(f"Using {database} database")
self.engine = get_mysql_engine(self.db_config_env, 'mysqldb', database)
_BUCKET = S3.Bucket("nesta-production-intermediate")
DONE_KEYS = set(obj.key for obj in _BUCKET.objects.all())
class CrunchbaseSql2EsTask(autobatch.AutoBatchTask):
'''Download tar file of csvs and load them into the MySQL server.
Args:
date (datetime): Datetime used to label the outputs
_routine_id (str): String used to label the AWS task
db_config_env (str): The output database envariable
process_batch_size (int): Number of rows to process in a batch
insert_batch_size (int): Number of rows to insert into the db in a batch
intermediate_bucket (str): S3 bucket where the list of ids for each batch are
written
'''
date = luigi.DateParameter()
_routine_id = luigi.Parameter()
db_config_env = luigi.Parameter()
process_batch_size = luigi.IntParameter(default=10000)
insert_batch_size = luigi.IntParameter()
intermediate_bucket = luigi.Parameter()
drop_and_recreate = luigi.BoolParameter(default=False)
def requires(self):
yield DescriptionMeshTask(date=self.date,
_routine_id=self._routine_id,
test=self.test,
insert_batch_size=self.insert_batch_size,
db_config_path=self.db_config_path,
db_config_env=self.db_config_env)
def output(self):
from nesta.core.luigihacks.misctools import find_filepath_from_pathstub as f3p
import luigi
from datetime import datetime as dt
import logging
class CrunchbaseLolveltyRootTask(luigi.WrapperTask):
"""Apply Lolvelty score to crunchbase data.
Args:
production (bool): Running in full production mode?
index (str): Elasticsearch index to append Lolvelty score to.
date (datetime): Date for timestamping this routine.
"""
production = luigi.BoolParameter(default=False)
index = luigi.Parameter(default=None)
date = luigi.DateParameter(default=dt.now())
def requires(self):
logging.getLogger().setLevel(logging.INFO)
kwargs = {'score_field': 'rank_rhodonite_organisation',
'fields': ['name_of_organisation',
'textBody_descriptive_organisation',
'terms_category_organisation']}
test = not self.production
routine_id = f"CrunchbaseLolveltyTask-{self.date}-{test}"
index = self.index if self.production else 'companies_dev'
assert index is not None
return LazyElasticsearchTask(routine_id=routine_id,
test=test,
index=index,
dataset='companies',
endpoint='health-scanner',
entity_type='company',
class GridTask(luigi.Task):
"""Join arxiv articles with GRID data for institute addresses and geocoding.
Args:
date (datetime): Datetime used to label the outputs
_routine_id (str): String used to label the AWS task
db_config_env (str): environmental variable pointing to the db config file
db_config_path (str): The output database configuration
mag_config_path (str): Microsoft Academic Graph Api key configuration path
insert_batch_size (int): number of records to insert into the database at once
(not used in this task but passed down to others)
articles_from_date (str): new and updated articles from this date will be
retrieved. Must be in YYYY-MM-DD format
(not used in this task but passed down to others)
"""
date = luigi.DateParameter()
_routine_id = luigi.Parameter()
test = luigi.BoolParameter(default=True)
db_config_env = luigi.Parameter()
db_config_path = luigi.Parameter()
mag_config_path = luigi.Parameter()
insert_batch_size = luigi.IntParameter(default=500)
articles_from_date = luigi.Parameter()
def output(self):
'''Points to the output database engine'''
db_config = misctools.get_config(self.db_config_path, "mysqldb")
db_config["database"] = 'dev' if self.test else 'production'
db_config["table"] = "arXlive " # Note, not a real table
update_id = "ArxivGrid_{}".format(self.date)
return MySqlTarget(update_id=update_id, **db_config)
city : luigi.Parameter
City of interest, *e.g.* Bordeaux or Lyon
start : luigi.DateParameter
Training start date
stop : luigi.DataParameter
Training stop date upper bound (actually the end date is computed with
`validation`)
validation : luigi.DateMinuteParameter
Date that bounds the training set and the validation set during the
XGBoost model training
frequency : DateOffset, timedelta or str
Indicates the prediction frequency
"""
city = luigi.Parameter()
train_start = luigi.DateParameter()
train_stop = luigi.DateParameter()
train_cut = luigi.DateMinuteParameter()
start = luigi.DateMinuteParameter()
stop = luigi.DateMinuteParameter()
frequency = luigi.Parameter(default="30T")
def outputpath(self):
fname = ("{}-to-{}-at-{}-freq-{}.model.{}-to-{}.predictions.csv"
"").format(self.train_start, self.train_stop,
self.train_cut.isoformat(),
self.frequency, self.start, self.stop)
return os.path.join(DATADIR, self.city, 'xgboost-model', fname)
def output(self):
return luigi.LocalTarget(self.outputpath(), format=MixedUnicodeBytes)
def requires(self):
'''Create a composite key based on the abstract url'''
ids = [int(i) for i in RE_COMP.findall(url)[0]]
return pair(*ids)
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
class WorldReporter(autobatch.AutoBatchTask):
'''
'''
date = luigi.DateParameter()
chunksize = luigi.IntParameter()
_routine_id = luigi.Parameter()
index = luigi.Parameter()
doc_type = luigi.Parameter()
def output(self):
'''Points to the input database target'''
update_id = "worldreporter-%s" % self._routine_id
db_config = misctools.get_config("es.config", "es")
return ElasticsearchTarget(update_id=update_id,
index=self.index,
doc_type=self.doc_type,
extra_elasticsearch_args={"scheme":"https"},
**db_config)