Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_checkpoint_test_case(tmpdir):
tape_transform_1 = TapeCallbackFunction()
tape_fit_1 = TapeCallbackFunction()
tape_transform_2 = TapeCallbackFunction()
tape_fit_2 = TapeCallbackFunction()
pipeline = ResumablePipeline([
('step1', FitTransformCallbackStep(tape_transform_1, tape_fit_1)),
('checkpoint', DefaultCheckpoint()),
('step2', FitTransformCallbackStep(tape_transform_2, tape_fit_2))
], cache_folder=tmpdir)
return CheckpointTest(
tape_transform_1, tape_fit_1, tape_transform_2, tape_fit_2, pipeline
)
def test_resumable_pipeline_transform_should_not_save_steps(tmpdir: LocalPath):
p = ResumablePipeline([
(SOME_STEP_1, MultiplyByN(multiply_by=2)),
(PIPELINE_2, ResumablePipeline([
(SOME_STEP_2, MultiplyByN(multiply_by=4)),
(CHECKPOINT, DefaultCheckpoint()),
(SOME_STEP_3, MultiplyByN(multiply_by=6)),
]))
], cache_folder=tmpdir)
p.name = ROOT
outputs = p.transform(
np.array(range(10))
)
saved_paths = [create_root_path(tmpdir), create_pipeline2_path(tmpdir), create_some_step1_path(tmpdir),
create_some_step2_path(tmpdir), create_some_step3_path(tmpdir), create_some_checkpoint_path(tmpdir)]
assert np.array_equal(outputs, EXPECTED_OUTPUTS)
for p in saved_paths:
assert not os.path.exists(p)
def given_saved_pipeline(tmpdir):
step_savers = [(SOME_STEP_1, []), (PIPELINE_2, [TruncableJoblibStepSaver()])]
path = create_root_path(tmpdir, True)
root = ResumablePipeline([], cache_folder=tmpdir)
root.sub_steps_savers = step_savers
root.name = ROOT
dump(root, path)
pipeline_2 = ResumablePipeline([], cache_folder=tmpdir)
pipeline_2.name = 'pipeline2'
pipeline_2.sub_steps_savers = [
(SOME_STEP_2, []),
(CHECKPOINT, []),
(SOME_STEP_3, []),
]
dump(pipeline_2, create_pipeline2_path(tmpdir, True))
given_saved_some_step(multiply_by=2, name=SOME_STEP_1, path=create_some_step1_path(tmpdir, True))
given_saved_some_step(multiply_by=4, name=SOME_STEP_2, path=create_some_step2_path(tmpdir, True))
given_saved_some_step(multiply_by=6, name=SOME_STEP_3, path=create_some_step3_path(tmpdir, True))
checkpoint = DefaultCheckpoint()
checkpoint.name = CHECKPOINT
dump(checkpoint, create_some_checkpoint_path(tmpdir, True))
(CHECKPOINT, []),
(SOME_STEP_3, []),
]
dump(pipeline_2, create_pipeline2_path(tmpdir, True))
given_saved_some_step(multiply_by=2, name=SOME_STEP_1, path=create_some_step1_path(tmpdir, True))
given_saved_some_step(multiply_by=4, name=SOME_STEP_2, path=create_some_step2_path(tmpdir, True))
given_saved_some_step(multiply_by=6, name=SOME_STEP_3, path=create_some_step3_path(tmpdir, True))
checkpoint = DefaultCheckpoint()
checkpoint.name = CHECKPOINT
dump(checkpoint, create_some_checkpoint_path(tmpdir, True))
p = ResumablePipeline([
(SOME_STEP_1, MultiplyByN(multiply_by=1)),
(PIPELINE_2, ResumablePipeline([
(SOME_STEP_2, MultiplyByN(multiply_by=1)),
(CHECKPOINT, DefaultCheckpoint()),
(SOME_STEP_3, MultiplyByN(multiply_by=1))
]))
], cache_folder=tmpdir)
p.name = ROOT
return p
def create_pipeline(tmpdir, pickle_checkpoint_step, tape, hyperparameters=None, different=False, save_pipeline=True):
if different:
pipeline = ResumablePipeline(
steps=[
('a',
DifferentCallbackStep(tape.callback, ["1"], hyperparams=hyperparameters)),
('pickle_checkpoint', pickle_checkpoint_step),
('c', TransformCallbackStep(tape.callback, ["2"])),
('d', TransformCallbackStep(tape.callback, ["3"]))
],
cache_folder=tmpdir
)
else:
pipeline = ResumablePipeline(
steps=[
('a',
TransformCallbackStep(tape.callback, ["1"], hyperparams=hyperparameters)),
('pickle_checkpoint', pickle_checkpoint_step),
('c', TransformCallbackStep(tape.callback, ["2"])),
('d', TransformCallbackStep(tape.callback, ["3"]))
], cache_folder=tmpdir
)
return pipeline
def given_saved_pipeline(tmpdir):
step_savers = [(SOME_STEP_1, []), (PIPELINE_2, [TruncableJoblibStepSaver()])]
path = create_root_path(tmpdir, True)
root = ResumablePipeline([], cache_folder=tmpdir)
root.sub_steps_savers = step_savers
root.name = ROOT
dump(root, path)
pipeline_2 = ResumablePipeline([], cache_folder=tmpdir)
pipeline_2.name = 'pipeline2'
pipeline_2.sub_steps_savers = [
(SOME_STEP_2, []),
(CHECKPOINT, []),
(SOME_STEP_3, []),
]
dump(pipeline_2, create_pipeline2_path(tmpdir, True))
given_saved_some_step(multiply_by=2, name=SOME_STEP_1, path=create_some_step1_path(tmpdir, True))
given_saved_some_step(multiply_by=4, name=SOME_STEP_2, path=create_some_step2_path(tmpdir, True))
given_saved_some_step(multiply_by=6, name=SOME_STEP_3, path=create_some_step3_path(tmpdir, True))
def test_resumable_pipeline_fit_transform_should_save_all_fitted_pipeline_steps(tmpdir: LocalPath):
p = ResumablePipeline([
(SOME_STEP_1, MultiplyByN(multiply_by=2)),
(PIPELINE_2, ResumablePipeline([
(SOME_STEP_2, MultiplyByN(multiply_by=4)),
(CHECKPOINT, DefaultCheckpoint()),
(SOME_STEP_3, MultiplyByN(multiply_by=6))
]))
], cache_folder=tmpdir)
p.name = ROOT
p, outputs = p.fit_transform(
np.array(range(10)),
np.array(range(10))
)
not_saved_paths = [create_some_step3_path(tmpdir)]
saved_paths = [create_root_path(tmpdir), create_pipeline2_path(tmpdir), create_some_step1_path(tmpdir),
),
]),
("e", FitTransformCallbackStep(tape6.callback, tape6_fit.callback, ["3"])),
("f", FitTransformCallbackStep(tape6.callback, tape6_fit.callback, ["4"])),
],
["3", "4"])
tape7 = TapeCallbackFunction()
tape7_fit = TapeCallbackFunction()
tape_checkpoint_saved_inside_subpipeline_first_step = ResumablePipelineTestCase(
tape7,
data_inputs,
expected_outputs,
[
("a", FitTransformCallbackStep(tape7.callback, tape7_fit.callback, ["1"])),
ResumablePipeline([
("d", SomeCheckpointStep(data_container=dc)
),
("b", FitTransformCallbackStep(tape7.callback, tape7_fit.callback, ["2"])),
]),
("e", FitTransformCallbackStep(tape7.callback, tape7_fit.callback, ["3"])),
("f", FitTransformCallbackStep(tape7.callback, tape7_fit.callback, ["4"])),
],
["2", "3", "4"])
tape8 = TapeCallbackFunction()
tape8_fit = TapeCallbackFunction()
tape_checkpoint_saved_inside_subpipeline_step_in_the_middle = ResumablePipelineTestCase(
tape8,
data_inputs,
expected_outputs,
[
("b", FitTransformCallbackStep(tape7.callback, tape7_fit.callback, ["2"])),
]),
("e", FitTransformCallbackStep(tape7.callback, tape7_fit.callback, ["3"])),
("f", FitTransformCallbackStep(tape7.callback, tape7_fit.callback, ["4"])),
],
["2", "3", "4"])
tape8 = TapeCallbackFunction()
tape8_fit = TapeCallbackFunction()
tape_checkpoint_saved_inside_subpipeline_step_in_the_middle = ResumablePipelineTestCase(
tape8,
data_inputs,
expected_outputs,
[
("a", FitTransformCallbackStep(tape8.callback, tape8_fit.callback, ["1"])),
ResumablePipeline([
("b", FitTransformCallbackStep(tape8.callback, tape8_fit.callback, ["2"])),
("d", SomeCheckpointStep(data_container=dc)
),
("e", FitTransformCallbackStep(tape8.callback, tape8_fit.callback, ["3"])),
]),
("f", FitTransformCallbackStep(tape8.callback, tape8_fit.callback, ["4"])),
],
["3", "4"])
tape9 = TapeCallbackFunction()
tape9_fit = TapeCallbackFunction()
tape_checkpoint_saved_inside_subpipeline_of_subpipeline = ResumablePipelineTestCase(
tape9,
data_inputs,
expected_outputs,
[