Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _test_run_with_empty_data_frame(cmdline_args: List[str], test_run_params: test_run):
from unittest.mock import patch
try:
gokart.run(cmdline_args=cmdline_args)
except SystemExit as e:
assert e.code == 0, f'original workflow does not run properly. It exited with error code {e}.'
with CmdlineParser.global_instance(cmdline_args) as cp:
all_tasks = _get_all_tasks(cp.get_task_obj())
if test_run_params.namespace is not None:
all_tasks = [t for t in all_tasks if t.task_namespace == test_run_params.namespace]
with patch('gokart.TaskOnKart.load_data_frame', new=lambda *args, required_columns=None, **kwargs: pd.DataFrame(columns=required_columns)):
with patch('gokart.TaskOnKart.dump', new=lambda *args, **kwargs: None):
test_status_list = [_run_with_test_status(t) for t in all_tasks]
test_logger.info('gokart test results:\n' + '\n'.join(s.format() for s in test_status_list))
if any(s.fail() for s in test_status_list):
exit(1)
def apply(self, x):
return x + 1
def get(self):
return 2
class DummyModelTask(gokart.TaskOnKart):
task_namespace = f'{__name__}.dummy'
rerun = True
def run(self):
self.dump(DummyModel())
class DummyPandasDataFrameTask(gokart.TaskOnKart):
task_namespace = __name__
param = luigi.Parameter()
rerun = True
def run(self):
df = pd.DataFrame(dict(x=[1, 3, 4]))
self.dump(df)
class DummyWorkFlowWithError(gokart.TaskOnKart):
task_namespace = __name__
rerun = True
def requires(self):
return dict(model=DummyModelTask(), data_a=DummyPandasDataFrameTask(param='a'))
import unittest
import luigi
import luigi.mock
from luigi.cmdline_parser import CmdlineParser
import gokart
def in_parse(cmds, deferred_computation):
with CmdlineParser.global_instance(cmds) as cp:
deferred_computation(cp.get_task_obj())
class WithDefaultTrue(gokart.TaskOnKart):
param = gokart.ExplicitBoolParameter(default=True)
class WithDefaultFalse(gokart.TaskOnKart):
param = gokart.ExplicitBoolParameter(default=False)
class ExplicitParsing(gokart.TaskOnKart):
param = gokart.ExplicitBoolParameter()
def run(self):
ExplicitParsing._param = self.param
class TestExplicitBoolParameter(unittest.TestCase):
def test_bool_default(self):
def test_repr(self):
class _SubTask(gokart.TaskOnKart):
task_namespace = __name__
class _Task(gokart.TaskOnKart):
task_namespace = __name__
int_param = luigi.IntParameter()
task_param = TaskInstanceParameter()
list_task_param = ListTaskInstanceParameter()
task = _Task(int_param=1, task_param=_SubTask(), list_task_param=[_SubTask(), _SubTask()])
sub_task_id = _SubTask().make_unique_id()
expected = f'{__name__}._Task(int_param=1, task_param={__name__}._SubTask({sub_task_id}), ' \
f'list_task_param=[{__name__}._SubTask({sub_task_id}), {__name__}._SubTask({sub_task_id})])'
self.assertEqual(expected, str(task))
import luigi
import gokart
from gokart import TaskOnKart
class _DummySubTask(TaskOnKart):
task_namespace = __name__
pass
class _DummyTask(TaskOnKart):
task_namespace = __name__
param = luigi.IntParameter()
task = gokart.TaskInstanceParameter(default=_DummySubTask())
class ListTaskInstanceParameterTest(unittest.TestCase):
def setUp(self):
_DummyTask.clear_instance_cache()
def test_serialize_and_parse(self):
original = [_DummyTask(param=3), _DummyTask(param=3)]
s = gokart.ListTaskInstanceParameter().serialize(original)
parsed = gokart.ListTaskInstanceParameter().parse(s)
self.assertEqual(parsed[0].task_id, original[0].task_id)
self.assertEqual(parsed[1].task_id, original[1].task_id)
if __name__ == '__main__':
unittest.main()
def test_save_pandas_series(self):
obj = pd.Series(data=[1, 2], name='column_name')
file_path = os.path.join(_get_temporary_directory(), 'test.csv')
target = make_target(file_path=file_path, unique_id=None)
target.dump(obj)
loaded = target.load()
pd.testing.assert_series_equal(loaded['column_name'], obj)
def test_last_modified_time(self):
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='test')
obj = 1
file_path = os.path.join('s3://test/', 'test.pkl')
target = make_target(file_path=file_path, unique_id=None)
target.dump(obj)
t = target.last_modification_time()
self.assertIsInstance(t, datetime)
def test_save_and_load_gzip(self):
obj = 1
file_path = os.path.join(_get_temporary_directory(), 'test.gz')
target = make_target(file_path=file_path, unique_id=None)
target.dump(obj)
loaded = target.load()
self.assertEqual(loaded, [str(obj)], msg='should save an object as List[str].')
def test_save_and_load_csv(self):
obj = pd.DataFrame(dict(a=[1, 2], b=[3, 4]))
file_path = os.path.join(_get_temporary_directory(), 'test.csv')
target = make_target(file_path=file_path, unique_id=None)
target.dump(obj)
loaded = target.load()
pd.testing.assert_frame_equal(loaded, obj)
def _load_function(path):
return make_target(file_path=path).load()