Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def apply(self, x):
return x + 1
def get(self):
return 2
class DummyModelTask(gokart.TaskOnKart):
task_namespace = f'{__name__}.dummy'
rerun = True
def run(self):
self.dump(DummyModel())
class DummyPandasDataFrameTask(gokart.TaskOnKart):
task_namespace = __name__
param = luigi.Parameter()
rerun = True
def run(self):
df = pd.DataFrame(dict(x=[1, 3, 4]))
self.dump(df)
class DummyWorkFlowWithError(gokart.TaskOnKart):
task_namespace = __name__
rerun = True
def requires(self):
return dict(model=DummyModelTask(), data_a=DummyPandasDataFrameTask(param='a'))
import unittest
import luigi
import luigi.mock
from luigi.cmdline_parser import CmdlineParser
import gokart
def in_parse(cmds, deferred_computation):
with CmdlineParser.global_instance(cmds) as cp:
deferred_computation(cp.get_task_obj())
class WithDefaultTrue(gokart.TaskOnKart):
param = gokart.ExplicitBoolParameter(default=True)
class WithDefaultFalse(gokart.TaskOnKart):
param = gokart.ExplicitBoolParameter(default=False)
class ExplicitParsing(gokart.TaskOnKart):
param = gokart.ExplicitBoolParameter()
def run(self):
ExplicitParsing._param = self.param
class TestExplicitBoolParameter(unittest.TestCase):
def test_bool_default(self):
def test_repr(self):
class _SubTask(gokart.TaskOnKart):
task_namespace = __name__
class _Task(gokart.TaskOnKart):
task_namespace = __name__
int_param = luigi.IntParameter()
task_param = TaskInstanceParameter()
list_task_param = ListTaskInstanceParameter()
task = _Task(int_param=1, task_param=_SubTask(), list_task_param=[_SubTask(), _SubTask()])
sub_task_id = _SubTask().make_unique_id()
expected = f'{__name__}._Task(int_param=1, task_param={__name__}._SubTask({sub_task_id}), ' \
f'list_task_param=[{__name__}._SubTask({sub_task_id}), {__name__}._SubTask({sub_task_id})])'
self.assertEqual(expected, str(task))
from typing import Any
from typing import Dict
import luigi
import sklearn
import gokart
import redshells
import redshells.train.utils
class _ClassificationModelTask(gokart.TaskOnKart):
train_data_task = gokart.TaskInstanceParameter(
description='A task outputs a pd.DataFrame with columns={`target_column_name`}.')
target_column_name = luigi.Parameter(default='category', description='Category column names.') # type: str
model_name = luigi.Parameter(
default='XGBClassifier',
description='A model name which has "fit" interface, and must be registered by "register_prediction_model".'
) # type: str
model_kwargs = luigi.DictParameter(
default=dict(), description='Arguments of the model which are created with model_name.') # type: Dict[str, Any]
def requires(self):
return self.train_data_task
def output(self):
return self.make_target(self.output_file_path)
import gokart
from gokart.info import tree_info
class SampleTaskLog(gokart.TaskOnKart):
def run(self):
self.task_log['sample key'] = 'sample value'
if __name__ == '__main__':
SampleTaskLog().run()
tree_info()
gokart.run(
['--tree-info-mode=all', '--tree-info-output-path=sample_task_log.txt', 'SampleTaskLog', '--local-scheduler'])
import itertools
import luigi
import pandas as pd
import gokart
import redshells
class FindItemKeywordByMatching(gokart.TaskOnKart):
"""
Find items which include keywords in its value of 'item_keyword_column_name'.
Output pd.DataFrame with columns [item_id, keyword].
"""
task_namespace = 'redshells.word_item_similarity'
target_keyword_task = gokart.TaskInstanceParameter(
description='A task outputs keywords as type `List[Any]` or `Set[Any]`.')
item_task = gokart.TaskInstanceParameter(
description='A task outputs item data as type `pd.DataFrame` which has `item_id_column_name`.')
tfidf_task = gokart.TaskInstanceParameter(description='A task instance of TrainTfidf.')
keep_top_rate = luigi.FloatParameter(description='A rate to filter words in texts.') # type: float
item_id_column_name = luigi.Parameter() # type: str
item_keyword_column_name = luigi.Parameter() # type: str
output_file_path = luigi.Parameter(
default='app/word_item_similarity/find_item_by_keyword_matching.pkl') # type: str
from typing import Any, Dict, List
import gensim
import luigi
import numpy as np
import gokart
import redshells.model
class TrainSCDV(gokart.TaskOnKart):
task_namespace = 'redshells'
tokenized_text_data_task = gokart.TaskInstanceParameter(
description='A task outputs tokenized texts with type "List[List[str]]".')
dictionary_task = gokart.TaskInstanceParameter(description='A task outputs gensim.corpora.Dictionary.')
word2vec_task = gokart.TaskInstanceParameter(
description='A task outputs gensim.models.Word2Vec, gensim.models.FastText or models with the same interface.')
cluster_size = luigi.IntParameter(
default=60, description='A cluster size of Gaussian mixture model in SCDV.') # type: int
sparsity_percentage = luigi.FloatParameter(
default=0.04, description='A percentage of sparsity in SCDV') # type: float
gaussian_mixture_kwargs = luigi.DictParameter(
default=dict(),
description='Arguments for Gaussian mixture model except for cluster size.') # type: Dict[str, Any]
output_file_path = luigi.Parameter(default='model/scdv.pkl') # type: str
text_sample_size = luigi.IntParameter(
default=10000,
def requires(self):
return self.data_task
def output(self):
return self.make_target(self.output_file_path)
def run(self):
categorical_column_names = list(self.categorical_column_names)
data = self.load_data_frame(required_columns=set(categorical_column_names))
for c in self.categorical_column_names:
data[c] = data[c].astype('category')
self.dump(data)
class SplitTrainTestData(gokart.TaskOnKart):
task_namespace = 'redshells.data_frame_utils'
data_task = gokart.TaskInstanceParameter()
test_size_rate = luigi.FloatParameter()
train_output_file_path = luigi.Parameter(default='data/train_data.pkl') # type: str
test_output_file_path = luigi.Parameter(default='data/test_data.pkl') # type: str
def requires(self):
return self.data_task
def output(self):
return dict(
train=self.make_target(self.train_output_file_path), test=self.make_target(self.test_output_file_path))
def run(self):
data = self.load_data_frame()
data = sklearn.utils.shuffle(data)
logger = getLogger(__name__)
def _get_target_column() -> str:
return 'label'
def _get_integer_columns() -> List[str]:
return [f'int_feat_{i}' for i in range(13)]
def _get_categorical_columns() -> List[str]:
return [f'cat_feat_{i}' for i in range(26)]
class SampleCriteo(gokart.TaskOnKart):
task_namespace = 'examples'
text_data_file_path = luigi.Parameter() # type: str
data_size_rate = luigi.FloatParameter() # type: float
def requires(self):
return redshells.data.LoadExistingFile(file_path=self.text_data_file_path)
def output(self):
return self.make_target('criteo/data_samples.tsv')
def run(self):
logger.info('loading...')
data = self.load()
logger.info('sampling...')
data = [data[i] for i in np.where(np.random.uniform(size=len(data)) < self.data_size_rate)[0]]
columns = [_get_target_column()] + _get_integer_columns() + _get_categorical_columns()
name = task.__class__.__name__
result += f'({is_complete}) {name}[{task.make_unique_id()}]'
if details:
params = task.get_info(only_significant=True)
output_paths = [t.path() for t in luigi.task.flatten(task.output())]
processing_time = task.get_processing_time()
if type(processing_time) == float:
processing_time = str(processing_time) + 's'
result += f'(parameter={params}, output={output_paths}, time={processing_time}, task_log={dict(task.get_task_log())})'
children = luigi.task.flatten(task.requires())
for index, child in enumerate(children):
result += make_tree_info(child, indent, (index + 1) == len(children), details=details)
return result
class tree_info(gokart.TaskOnKart):
mode = luigi.Parameter(default='', description='This must be in ["simple", "all"].') # type: str
output_path = luigi.Parameter(default='tree.txt', description='Output file path.') # type: str
def output(self):
return self.make_target(self.output_path, use_unique_id=False)