Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_resumable_pipeline_fit_should_save_all_fitted_pipeline_steps(tmpdir: LocalPath):
p = ResumablePipeline([
(SOME_STEP_1, MultiplyByN(multiply_by=2)),
(PIPELINE_2, ResumablePipeline([
(SOME_STEP_2, MultiplyByN(multiply_by=4)),
(CHECKPOINT, DefaultCheckpoint()),
(SOME_STEP_3, MultiplyByN(multiply_by=6))
]))
], cache_folder=tmpdir)
p.name = ROOT
p = p.fit(
np.array(range(10)),
np.array(range(10))
)
not_saved_paths = [create_some_step3_path(tmpdir)]
saved_paths = [create_root_path(tmpdir), create_pipeline2_path(tmpdir), create_some_step1_path(tmpdir),
create_some_step2_path(tmpdir), create_some_checkpoint_path(tmpdir)]
for p in saved_paths:
assert os.path.exists(p)
for p in not_saved_paths:
def create_checkpoint_test_case(tmpdir):
tape_transform_1 = TapeCallbackFunction()
tape_fit_1 = TapeCallbackFunction()
tape_transform_2 = TapeCallbackFunction()
tape_fit_2 = TapeCallbackFunction()
pipeline = ResumablePipeline([
('step1', FitTransformCallbackStep(tape_transform_1, tape_fit_1)),
('checkpoint', DefaultCheckpoint()),
('step2', FitTransformCallbackStep(tape_transform_2, tape_fit_2))
], cache_folder=tmpdir)
return CheckpointTest(
tape_transform_1, tape_fit_1, tape_transform_2, tape_fit_2, pipeline
)
dump(root, path)
pipeline_2 = ResumablePipeline([], cache_folder=tmpdir)
pipeline_2.name = 'pipeline2'
pipeline_2.sub_steps_savers = [
(SOME_STEP_2, []),
(CHECKPOINT, []),
(SOME_STEP_3, []),
]
dump(pipeline_2, create_pipeline2_path(tmpdir, True))
given_saved_some_step(multiply_by=2, name=SOME_STEP_1, path=create_some_step1_path(tmpdir, True))
given_saved_some_step(multiply_by=4, name=SOME_STEP_2, path=create_some_step2_path(tmpdir, True))
given_saved_some_step(multiply_by=6, name=SOME_STEP_3, path=create_some_step3_path(tmpdir, True))
checkpoint = DefaultCheckpoint()
checkpoint.name = CHECKPOINT
dump(checkpoint, create_some_checkpoint_path(tmpdir, True))
p = ResumablePipeline([
(SOME_STEP_1, MultiplyByN(multiply_by=1)),
(PIPELINE_2, ResumablePipeline([
(SOME_STEP_2, MultiplyByN(multiply_by=1)),
(CHECKPOINT, DefaultCheckpoint()),
(SOME_STEP_3, MultiplyByN(multiply_by=1))
]))
], cache_folder=tmpdir)
p.name = ROOT
return p
def test_when_hyperparams_should_save_checkpoint_pickle(tmpdir: LocalPath):
tape = TapeCallbackFunction()
pickle_checkpoint_step = DefaultCheckpoint()
pipeline = create_pipeline(tmpdir, pickle_checkpoint_step, tape, HyperparameterSamples({"a__learning_rate": 1}))
pipeline, actual_data_inputs = pipeline.fit_transform(data_inputs, expected_outputs)
actual_tape = tape.get_name_tape()
assert np.array_equal(actual_data_inputs, data_inputs)
assert actual_tape == ["1", "2", "3"]
assert os.path.exists(
os.path.join(tmpdir, 'ResumablePipeline', 'pickle_checkpoint', 'di', '44f9d6dd8b6ccae571ca04525c3eaffa.pickle'))
assert os.path.exists(
os.path.join(tmpdir, 'ResumablePipeline', 'pickle_checkpoint', 'di', '898a67b2f5eeae6393ca4b3162ba8e3d.pickle'))
assert os.path.exists(
os.path.join(tmpdir, 'ResumablePipeline', 'pickle_checkpoint', 'eo', '44f9d6dd8b6ccae571ca04525c3eaffa.pickle'))
assert os.path.exists(
os.path.join(tmpdir, 'ResumablePipeline', 'pickle_checkpoint', 'eo', '898a67b2f5eeae6393ca4b3162ba8e3d.pickle'))
def test_when_no_hyperparams_and_saved_same_pipeline_should_load_checkpoint_pickle(tmpdir: LocalPath):
# Given
tape = TapeCallbackFunction()
# When
pipeline_save = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=DefaultCheckpoint(),
tape=TapeCallbackFunction()
)
pipeline_save.fit_transform(data_inputs, expected_outputs)
pipeline_load = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=DefaultCheckpoint(),
tape=tape
)
pipeline_load, actual_data_inputs = pipeline_load.fit_transform(data_inputs, expected_outputs)
# Then
actual_tape = tape.get_name_tape()
assert np.array_equal(actual_data_inputs, data_inputs)
assert actual_tape == EXPECTED_TAPE_AFTER_CHECKPOINT
def test_when_hyperparams_and_saved_same_pipeline_should_load_checkpoint_pickle(tmpdir: LocalPath):
# Given
tape = TapeCallbackFunction()
# When
pipeline_save = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=DefaultCheckpoint(),
tape=TapeCallbackFunction(),
hyperparameters=HyperparameterSamples({"a__learning_rate": 1})
)
pipeline_save.fit_transform(data_inputs, expected_outputs)
pipeline_load = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=DefaultCheckpoint(),
tape=tape,
hyperparameters=HyperparameterSamples({"a__learning_rate": 1})
)
pipeline_load, actual_data_inputs = pipeline_load.fit_transform(data_inputs, expected_outputs)
# Then
actual_tape = tape.get_name_tape()
assert np.array_equal(actual_data_inputs, data_inputs)
create_pipeline_output_transformer = lambda: ResumablePipeline(
[
('output_transformer_1', MultiplyBy2OutputTransformer()),
('pickle_checkpoint', DefaultCheckpoint()),
('output_transformer_2', MultiplyBy2OutputTransformer()),
], cache_folder=tmpdir)
]
dump(pipeline_2, create_pipeline2_path(tmpdir, True))
given_saved_some_step(multiply_by=2, name=SOME_STEP_1, path=create_some_step1_path(tmpdir, True))
given_saved_some_step(multiply_by=4, name=SOME_STEP_2, path=create_some_step2_path(tmpdir, True))
given_saved_some_step(multiply_by=6, name=SOME_STEP_3, path=create_some_step3_path(tmpdir, True))
checkpoint = DefaultCheckpoint()
checkpoint.name = CHECKPOINT
dump(checkpoint, create_some_checkpoint_path(tmpdir, True))
p = ResumablePipeline([
(SOME_STEP_1, MultiplyByN(multiply_by=1)),
(PIPELINE_2, ResumablePipeline([
(SOME_STEP_2, MultiplyByN(multiply_by=1)),
(CHECKPOINT, DefaultCheckpoint()),
(SOME_STEP_3, MultiplyByN(multiply_by=1))
]))
], cache_folder=tmpdir)
p.name = ROOT
return p
def test_when_no_hyperparams_should_save_checkpoint_pickle(tmpdir: LocalPath):
tape = TapeCallbackFunction()
pickle_checkpoint_step = DefaultCheckpoint()
pipeline = create_pipeline(tmpdir, pickle_checkpoint_step, tape)
pipeline, actual_data_inputs = pipeline.fit_transform(data_inputs, expected_outputs)
actual_tape = tape.get_name_tape()
assert np.array_equal(actual_data_inputs, data_inputs)
assert actual_tape == ["1", "2", "3"]
assert os.path.exists(os.path.join(tmpdir, 'ResumablePipeline', 'pickle_checkpoint', 'di', '0.pickle'))
assert os.path.exists(os.path.join(tmpdir, 'ResumablePipeline', 'pickle_checkpoint', 'di', '1.pickle'))
assert os.path.exists(os.path.join(tmpdir, 'ResumablePipeline', 'pickle_checkpoint', 'eo', '0.pickle'))
assert os.path.exists(os.path.join(tmpdir, 'ResumablePipeline', 'pickle_checkpoint', 'eo', '1.pickle'))