Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import luigi
import gokart
from gokart import TaskOnKart
class _DummySubTask(TaskOnKart):
task_namespace = __name__
pass
class _DummyTask(TaskOnKart):
task_namespace = __name__
param = luigi.IntParameter()
task = gokart.TaskInstanceParameter(default=_DummySubTask())
class ListTaskInstanceParameterTest(unittest.TestCase):
def setUp(self):
_DummyTask.clear_instance_cache()
def test_serialize_and_parse(self):
original = [_DummyTask(param=3), _DummyTask(param=3)]
s = gokart.ListTaskInstanceParameter().serialize(original)
parsed = gokart.ListTaskInstanceParameter().parse(s)
self.assertEqual(parsed[0].task_id, original[0].task_id)
self.assertEqual(parsed[1].task_id, original[1].task_id)
if __name__ == '__main__':
unittest.main()
def requires(self):
return self.item2embedding_task
def output(self):
return self.make_target(self.output_file_path)
def run(self):
item2embedding = self.load() # type: Dict[Any, np.ndarray]
model = DimensionReductionModel(dimension_size=self.dimension_size)
model.fit(np.array(list(item2embedding.values())))
self.dump(model)
class ApplyDimensionReductionModel(gokart.TaskOnKart):
task_namespace = 'redshells.word_item_similarity'
item2embedding_task = gokart.TaskInstanceParameter(
description='A task outputs item2embedding data with type = Dict[Any, np.ndarray].')
dimension_reduction_model_task = gokart.TaskInstanceParameter(
default='A task outputs a model instance of `DimensionReductionModel`.')
l2_normalize = luigi.BoolParameter() # type: bool
output_file_path = luigi.Parameter(default='app/word_item_similarity/dimension_reduction_model.pkl') # type: str
def requires(self):
return dict(item2embedding=self.item2embedding_task, model=self.dimension_reduction_model_task)
def output(self):
return self.make_target(self.output_file_path)
def run(self):
item2embedding = self.load('item2embedding') # type: Dict[Any, np.ndarray]
model = self.load('model')
items = list(item2embedding.keys())
def output(self):
return self.make_target(self.output_file_path)
def run(self):
item2embedding = self.load() # type: Dict[Any, np.ndarray]
model = DimensionReductionModel(dimension_size=self.dimension_size)
model.fit(np.array(list(item2embedding.values())))
self.dump(model)
class ApplyDimensionReductionModel(gokart.TaskOnKart):
task_namespace = 'redshells.word_item_similarity'
item2embedding_task = gokart.TaskInstanceParameter(
description='A task outputs item2embedding data with type = Dict[Any, np.ndarray].')
dimension_reduction_model_task = gokart.TaskInstanceParameter(
default='A task outputs a model instance of `DimensionReductionModel`.')
l2_normalize = luigi.BoolParameter() # type: bool
output_file_path = luigi.Parameter(default='app/word_item_similarity/dimension_reduction_model.pkl') # type: str
def requires(self):
return dict(item2embedding=self.item2embedding_task, model=self.dimension_reduction_model_task)
def output(self):
return self.make_target(self.output_file_path)
def run(self):
item2embedding = self.load('item2embedding') # type: Dict[Any, np.ndarray]
model = self.load('model')
items = list(item2embedding.keys())
embeddings = model.apply(np.array(list(item2embedding.values())))
if self.l2_normalize:
from typing import Any
from typing import Dict
import luigi
import sklearn
import tensorflow as tf
import gokart
from redshells.model import MatrixFactorization
class TrainMatrixFactorization(gokart.TaskOnKart):
task_namespace = 'redshells'
train_data_task = gokart.TaskInstanceParameter(
description=
'A task outputs a pd.DataFrame with columns={`user_column_name`, `item_column_name`, `service_column_name`, `target_column_name`}.'
)
user_column_name = luigi.Parameter(default='user', description='The column name of user id.') # type: str
item_column_name = luigi.Parameter(default='item', description='The column name of item id') # type: str
service_column_name = luigi.Parameter(default='service', description='The column name of service id.') # type: str
rating_column_name = luigi.Parameter(
default='rating', description='The target column name to predict.') # type: str
model_kwargs = luigi.DictParameter(default=dict(), description='Arguments of the model.') # type: Dict[str, Any]
max_data_size = luigi.IntParameter(default=50000000)
output_file_path = luigi.Parameter(default='model/matrix_factorization.zip') # type: str
def requires(self):
return self.train_data_task
def output(self):
from typing import Any, Dict
import gokart
import luigi
import sklearn
import redshells
import redshells.train.utils
class _BinaryClassificationModelTask(gokart.TaskOnKart):
train_data_task = gokart.TaskInstanceParameter(
description='A task outputs a pd.DataFrame with columns={`target_column_name`}.')
target_column_name = luigi.Parameter(default='category', description='Category column names.') # type: str
model_name = luigi.Parameter(
description='A model name which has "fit" interface, and must be registered by "register_prediction_model".'
) # type: str
model_kwargs = luigi.DictParameter(
default=dict(), description='Arguments of the model which are created with model_name.') # type: Dict[str, Any]
def requires(self):
return self.train_data_task
def output(self):
return self.make_target(self.output_file_path)
def create_model(self):
return redshells.factory.create_prediction_model(self.model_name, **self.model_kwargs)
def output(self):
return self.make_target('none.pkl')
def run(self):
self.dump(None)
class TrainGraphConvolutionalMatrixCompletion(gokart.TaskOnKart):
task_namespace = 'redshells'
train_data_task = gokart.TaskInstanceParameter(
description='A task outputs a pd.DataFrame with columns={`user_column_name`, `item_column_name`, `target_column_name`}.')
user_column_name = luigi.Parameter(default='user', description='The column name of user id.') # type: str
item_column_name = luigi.Parameter(default='item', description='The column name of item id') # type: str
rating_column_name = luigi.Parameter(default='rating', description='The target column name to predict.') # type: str
user_feature_task = gokart.TaskInstanceParameter(default=NoneTask())
item_feature_task = gokart.TaskInstanceParameter(default=NoneTask())
model_kwargs = luigi.DictParameter(default=dict(), description='Arguments of the model.') # type: Dict[str, Any]
max_data_size = luigi.IntParameter(default=50000000) # type: int
output_file_path = luigi.Parameter(default='model/graph_convolutional_matrix_completion.zip') # type: str
try_count = luigi.IntParameter(default=10) # type: int
decay_speed = luigi.FloatParameter(default=2.0) # type: float
test_size = luigi.FloatParameter(default=0.2) # type: float
# data parameters
min_user_click_count = luigi.IntParameter(default=5) # type: int
max_user_click_count = luigi.IntParameter(default=200) # type: int
def requires(self):
return dict(train_data=self.train_data_task, user_features=self.user_feature_task, item_features=self.item_feature_task)
def output(self):
return dict(
from random import shuffle
from typing import Any
from typing import Dict
from typing import List
import gensim
import gokart
import luigi
class TrainWord2Vec(gokart.TaskOnKart):
task_namespace = 'redshells'
tokenized_text_data_task = gokart.TaskInstanceParameter(
description='The task outputs tokenized texts with type "List[List[str]]".')
output_file_path = luigi.Parameter(default='model/word2vec.zip') # type: str
word2vec_kwargs = luigi.DictParameter(
default=dict(),
description='Arguments for Word2Vec except "sentences". Please see gensim.models.Word2Vec for more details.'
) # type: Dict[str, Any]
def requires(self):
return self.tokenized_text_data_task
def output(self):
return self.make_model_target(
self.output_file_path, save_function=gensim.models.Word2Vec.save, load_function=gensim.models.Word2Vec.load)
def run(self):
texts = self.load() # type: List[List[str]]
* Calculate similarities between items using a matrix factorization method.
* Calculate similarities between items using keyword matching.
* Calculate document embeddings using the SCDV.
* Train XGBoost to predict similarities using elementwise product of document embeddings as input features.
*
"""
task_namespace = 'redshells.word_item_similarity'
word_data_task = gokart.TaskInstanceParameter(description='A task which outputs `List[str]`.')
item_train_data_task = gokart.TaskInstanceParameter(
description='A task which outputs `pd.DataFrame` with columns=["item_id", "token", "title_token"].')
click_data_task = gokart.TaskInstanceParameter(
description='A task which outputs `pd.DataFrame` with columns=["user_id", "item_id", "service_id"].')
item_predict_data_task = gokart.TaskInstanceParameter(
description='A task which outputs `pd.DataFrame` with columns=["item_id", "token", "title_token"].')
text_data_task = gokart.TaskInstanceParameter(
description='A task which outputs `List[List[str]]` for FastText training.')
use_only_title = luigi.BoolParameter(default=False) # type: bool
word_embedding_type = luigi.Parameter(
default='average',
description='A type of word embedding in prediction. This must be "average" or "word"') # type: str
def __init__(self, *args, **kwargs) -> None:
super(BuildWordItemSimilarity, self).__init__(*args, **kwargs)
self.scdv = None
self.word2items = None
self.word2embedding = None
self.item2embedding = None
self.similarity_train_data = None
self.similarity_model = None
self.word2average_embedding = None
self.predict_item2embedding = None
from logging import getLogger
import luigi
import numpy as np
import gokart
from redshells.model import FeatureAggregationSimilarityModel
from redshells.model.feature_aggregation_similarity_model import FeatureAggregationSimilarityDataset
logger = getLogger(__name__)
class TrainFeatureAggregationSimilarityModel(gokart.TaskOnKart):
dataset_task = gokart.TaskInstanceParameter(description='An instance of task which outputs `FeatureAggregationSimilarityDataset`.')
embedding_size = luigi.IntParameter() # type: int
learning_rate = luigi.FloatParameter() # type: float
batch_size = luigi.IntParameter() # type: int
epoch_size = luigi.IntParameter() # type: int
test_size_rate = luigi.FloatParameter() # type: float
early_stopping_patience = luigi.IntParameter() # type: int
max_data_size = luigi.IntParameter() # type: int
output_file_path = luigi.Parameter(default='model/feature_aggregation)similarity_model.pkl') # type: str
def requires(self):
return self.dataset_task
def output(self):
return self.make_target(self.output_file_path)
def run(self):