Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import datetime
import luigi
from luigi_swf import util
class MyDependency(luigi.Task):
dt = luigi.DateParameter()
class MyTask(luigi.Task):
dt = luigi.DateParameter()
def requires(self):
return MyDependency(dt=self.dt)
def test_fullname():
# Setup
t = MyTask(dt=datetime.date(2050, 1, 1))
# Execute
modname, clsname = util.fullname(t)
# Test
assert modname == 'test_util'
for source in ToastConfig().config['sources']:
dest_name = build_dest_filename(source['url'],
decompress=source['compression'])
yield DownloadFileToDFSTask(
source=source['url'],
target=os.path.join(self.destination, dest_name),
compression=source['compression'])
def run(self):
create_SUCCESS_file(self.destination)
def output(self):
return flag_target(self.destination)
class PrepareHadoopDownloadTask(Task):
hdfs_path = Parameter()
def run(self):
tmp_dir = mkdtemp(
prefix='tmp_eggo_',
dir=eggo_config.get('worker_env', 'work_path'))
try:
# build the remote command for each source
tmp_command_file = '{0}/command_file'.format(tmp_dir)
with open(tmp_command_file, 'w') as command_file:
for source in ToastConfig().config['sources']:
command_file.write('{0}\n'.format(json.dumps(source)))
# 3. Copy command file to Hadoop filesystem
hdfs_client = HdfsClient()
hdfs_client.mkdir(os.path.dirname(self.hdfs_path), True)
----
To run:
(vm) $ python scaffold0_helloworld.py
Run with less noise:
(vm) $ python scaffold0_helloworld.py 2> /dev/null
"""
import luigi
class HelloWorldTask(luigi.Task):
def run(self):
"""
TODO:
* Print 'HelloWorldTask says hello world' to stdout.
"""
if __name__ == '__main__':
luigi.run(['HelloWorldTask', '--workers', '1', '--local-scheduler'])
labels = []
with self.input().open() as fobj:
report = json.load(fobj)
for k in range(self.nbmin_clusters, self.nbmax_clusters + 1):
fpath = report['filelist'][str(k)]
df = pd.read_hdf(fpath, "/individuals")
center = pd.read_hdf(fpath, "/centroids")
centers.append(center.drop("n_individuals", axis=1).values)
features.append(df.drop("Xclust", axis=1).values)
labels.append(df['Xclust'].copy().values)
fig = ul.kmeans_elbow_silhouette(features, centers, labels,
self.nbmin_clusters, self.nbmax_clusters)
fig.savefig(self.output().path)
class AutoKMeans(luigi.Task):
"""Carry out an automatic KMeans which depends the AutoPCA task and the
KMeansReport. The 'report' can give an optimal number of components based on
the Elbow computation.
Inertia, elbow and silhouette computations in order to choose the number of clusters.
"""
datarep = luigi.Parameter("data")
dsname = luigi.Parameter("bordeaux-metropole")
metadata_type = luigi.Parameter("user")
nbmin_clusters = luigi.parameter.IntParameter(3)
nbmax_clusters = luigi.parameter.IntParameter(8)
def outputpath(self):
# Note: the content of the data of this file will be the same as the
# KMeans file generated by the automatic clusters number selection.
out_path = container.get_root(self.nbar_root, granule=self.granule)
modtran_root = pjoin(out_path, CONFIG.get('work', 'modtran_root'))
out_fname = CONFIG.get('extract_flux', 'out_fname')
out_fname = pjoin(modtran_root, out_fname)
return luigi.LocalTarget(out_fname)
def run(self):
compression = CONFIG.get('work', 'compression')
with self.output().temporary_path() as out_fname:
for target in self.input():
gaip.create_solar_irradiance_tables(target.path, out_fname,
compression=compression)
target.remove()
class CalculateCoefficients(luigi.Task):
"""
Calculate the atmospheric parameters needed by BRDF and atmospheric
correction model.
"""
level1 = luigi.Parameter()
nbar_root = luigi.Parameter()
granule = luigi.Parameter()
def requires(self):
args = [self.level1, self.nbar_root, self.granule]
return AccumulateSolarIrradiance(*args)
def output(self):
container = gaip.acquisitions(self.level1)
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import random as rnd
import time
import luigi
class Configuration(luigi.Task):
seed = luigi.IntParameter()
def output(self):
"""
Returns the target output for this task.
In this case, a successful execution of this task will create a file on the local filesystem.
:return: the target output for this task.
:rtype: object (:py:class:`luigi.target.Target`)
"""
return luigi.LocalTarget('/tmp/Config_%d.txt' % self.seed)
def run(self):
time.sleep(5)
rnd.seed(self.seed)
db_user = luigi.Parameter(default="root")
db_pass = luigi.Parameter(default="mypass")
def requires(self):
return ItemSimilaritySparkJob(client=self.client,startDay=self.startDay)
def run(self):
u = ItemSimilarityUploadMysql(self.client,self.db_host,self.db_user,self.db_pass)
u.stream_and_upload(self.input().path)
#
# MF
#
class SeldonMatrixFactorization(luigi.Task):
"""
Matrix factorization using Spark
"""
inputPath = luigi.Parameter(default="/seldon-data/seldon-models/")
outputPath = luigi.Parameter(default="/seldon-data/seldon-models/")
client = luigi.Parameter(default="test")
sparkDriverMemory = luigi.Parameter(default="1g")
sparkExecutorMemory = luigi.Parameter(default="1g")
startDay = luigi.IntParameter(default=1)
days = luigi.IntParameter(default=1)
rank = luigi.IntParameter(default=30)
mf_lambda = luigi.FloatParameter(default=0.01)
alpha = luigi.FloatParameter(default=1)
iterations = luigi.IntParameter(default=5)
"outinfo": f"{S3_PREFIX}_{self.date}_{key}",
"test": self.test,
"done": key in DONE_KEYS,
"config": "mysqldb.config",
}
logging.info(params)
job_params.append(params)
return job_params
def combine(self, job_params):
""" Touch the checkpoint """
self.output().touch()
class RootTask(luigi.Task):
"""
Args:
date (`datetime`): Date used to label the outputs
"""
date = luigi.DateParameter(default=datetime.datetime.today())
def requires(self):
"""Get the output from the batchtask"""
return CHBatchQuery(
date=self.date,
batchable=(find_filepath_from_pathstub("batchables/companies_house/")),
job_def="py36_amzn1_image",
job_name="ch-batch-api-%s" % self.date,
env_files=(
find_filepath_from_pathstub("nesta/nesta"),
#! /usr/bin/python
import os
import time
import argparse
import subprocess
import json
from concurrent import futures
import z5py
import nifty
import nifty.distributed as ndist
import luigi
class MergeSubgraphScalesTask(luigi.Task):
"""
Merge subgraphs on scale level
"""
path = luigi.Parameter()
ws_key = luigi.Parameter()
out_path = luigi.Parameter()
scale = luigi.IntParameter()
max_jobs = luigi.IntParameter()
config_path = luigi.Parameter()
tmp_folder = luigi.Parameter()
dependency = luigi.TaskParameter()
# FIXME default does not work; this still needs to be specified
time_estimate = luigi.IntParameter(default=10)
run_local = luigi.BoolParameter(default=False)
def output(self):
return luigi_bigquery.ResultTarget('MyQueryStep1.job')
class MyQueryStep2(luigi.Task):
def requires(self):
return MyQueryStep1()
def output(self):
return luigi.LocalTarget('MyQueryStep2.csv')
def run(self):
# retrieve the result and save it as a CSV file
with self.output().open('w') as f:
self.input().result.to_csv(f)
class MyQueryStep3(luigi.Task):
def requires(self):
return MyQueryStep2()
def output(self):
return luigi.LocalTarget('MyQueryStep3.txt')
def run(self):
with self.input().open() as f:
# process the result here
print f.read()
with self.output().open('w') as f:
# crate the final output
f.write('done')
## Templating Queries