Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_when_hyperparams_and_saved_no_pipeline_should_not_load_checkpoint_pickle(tmpdir: LocalPath):
# Given
tape = TapeCallbackFunction()
pickle_checkpoint_step = DefaultCheckpoint()
# When
pipeline_save = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=Identity(),
tape=TapeCallbackFunction(),
hyperparameters=HyperparameterSamples({"a__learning_rate": 1}),
different=True,
save_pipeline=False
)
pipeline_save.fit_transform(data_inputs, expected_outputs)
pipeline_load = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=pickle_checkpoint_step,
tape=tape,
hyperparameters=HyperparameterSamples({"a__learning_rate": 1})
)
pipeline_load, actual_data_inputs = pipeline_load.fit_transform(data_inputs, expected_outputs)
# Then
actual_tape = tape.get_name_tape()
def test_when_no_hyperparams_and_saved_same_pipeline_should_load_checkpoint_pickle(tmpdir: LocalPath):
# Given
tape = TapeCallbackFunction()
# When
pipeline_save = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=DefaultCheckpoint(),
tape=TapeCallbackFunction()
)
pipeline_save.fit_transform(data_inputs, expected_outputs)
pipeline_load = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=DefaultCheckpoint(),
tape=tape
)
pipeline_load, actual_data_inputs = pipeline_load.fit_transform(data_inputs, expected_outputs)
# Then
actual_tape = tape.get_name_tape()
assert np.array_equal(actual_data_inputs, data_inputs)
assert actual_tape == EXPECTED_TAPE_AFTER_CHECKPOINT
def create_test_case_single_step_choosen():
a_callback = TapeCallbackFunction()
b_callback = TapeCallbackFunction()
return NeuraxleTestCase(
pipeline=Pipeline([
ChooseOneOrManyStepsOf([
('a', TransformCallbackStep(a_callback, transform_function=lambda di: di * 2)),
('b', TransformCallbackStep(b_callback, transform_function=lambda di: di * 2))
]),
]),
callbacks=[a_callback, b_callback],
expected_callbacks_data=[
DATA_INPUTS,
[]
],
hyperparams={
'ChooseOneOrManyStepsOf__a__enabled': True,
'ChooseOneOrManyStepsOf__b__enabled': False
def test_mini_batch_sequential_pipeline_should_fit_transform_steps_sequentially_for_each_barrier_for_each_batch():
# Given
tape1 = TapeCallbackFunction()
tape1_fit = TapeCallbackFunction()
tape2 = TapeCallbackFunction()
tape2_fit = TapeCallbackFunction()
tape3 = TapeCallbackFunction()
tape3_fit = TapeCallbackFunction()
tape4 = TapeCallbackFunction()
tape4_fit = TapeCallbackFunction()
p = MiniBatchSequentialPipeline([
MultiplyBy2FitTransformCallbackStep(tape1, tape1_fit, ["1"]),
MultiplyBy2FitTransformCallbackStep(tape2, tape2_fit, ["2"]),
Joiner(batch_size=10),
MultiplyBy2FitTransformCallbackStep(tape3, tape3_fit, ["3"]),
MultiplyBy2FitTransformCallbackStep(tape4, tape4_fit, ["4"]),
Joiner(batch_size=10)
])
tape2 = TapeCallbackFunction()
tape2_fit = TapeCallbackFunction()
tape_checkpoint_not_saved_test_arguments = ResumablePipelineTestCase(
tape2,
data_inputs,
expected_outputs,
[
("a", FitTransformCallbackStep(tape2.callback, tape2_fit.callback, ["1"])),
("b", SomeCheckpointStep(data_container=None)
),
("c", FitTransformCallbackStep(tape2.callback, tape2_fit.callback, ["2"])),
("d", FitTransformCallbackStep(tape2.callback, tape2_fit.callback, ["3"]))
],
["1", "2", "3"])
tape3 = TapeCallbackFunction()
tape3_fit = TapeCallbackFunction()
tape_checkpoint_saved_after_first_step_test_arguments = ResumablePipelineTestCase(
tape3,
data_inputs,
expected_outputs,
[
("a", FitTransformCallbackStep(tape3.callback, tape3_fit.callback, ["1"])),
("b", SomeCheckpointStep(data_container=dc)
),
("c", FitTransformCallbackStep(tape3.callback, tape3_fit.callback, ["2"])),
("d", FitTransformCallbackStep(tape3.callback, tape3_fit.callback, ["3"]))
],
["2", "3"])
tape4 = TapeCallbackFunction()
tape4_fit = TapeCallbackFunction()
dc = DataContainer(current_ids=range(len(data_inputs)), data_inputs=data_inputs, expected_outputs=expected_outputs)
tape = TapeCallbackFunction()
tape_fit = TapeCallbackFunction()
tape_without_checkpoint_test_arguments = ResumablePipelineTestCase(
tape,
data_inputs,
expected_outputs,
[
("a", FitTransformCallbackStep(tape.callback, tape_fit.callback, ["1"])),
("b", FitTransformCallbackStep(tape.callback, tape_fit.callback, ["2"])),
("c", FitTransformCallbackStep(tape.callback, tape_fit.callback, ["3"]))
],
["1", "2", "3"])
tape2 = TapeCallbackFunction()
tape2_fit = TapeCallbackFunction()
tape_checkpoint_not_saved_test_arguments = ResumablePipelineTestCase(
tape2,
data_inputs,
expected_outputs,
[
("a", FitTransformCallbackStep(tape2.callback, tape2_fit.callback, ["1"])),
("b", SomeCheckpointStep(data_container=None)
),
("c", FitTransformCallbackStep(tape2.callback, tape2_fit.callback, ["2"])),
("d", FitTransformCallbackStep(tape2.callback, tape2_fit.callback, ["3"]))
],
["1", "2", "3"])
tape3 = TapeCallbackFunction()
tape3_fit = TapeCallbackFunction()
def test_pipeline_nested_mutate_inverse_transform_without_identities():
"""
This test was required for a strange bug at the border of the pipelines
that happened when the identities were not used.
"""
expected_tape = ["1", "2", "3", "4", "5", "6", "7", "7", "6", "5", "4", "3", "2", "1"]
tape = TapeCallbackFunction()
p = Pipeline([
TransformCallbackStep(tape.callback, ["1"]),
TransformCallbackStep(tape.callback, ["2"]),
Pipeline([
TransformCallbackStep(tape.callback, ["3"]),
TransformCallbackStep(tape.callback, ["4"]),
TransformCallbackStep(tape.callback, ["5"]),
]),
TransformCallbackStep(tape.callback, ["6"]),
TransformCallbackStep(tape.callback, ["7"]),
])
p, _ = p.fit_transform(np.ones((1, 1))) # will add range(1, 8) to tape.
print("[mutating, inversing, and calling each inverse_transform]")
tape3 = TapeCallbackFunction()
tape3_fit = TapeCallbackFunction()
tape_checkpoint_saved_after_first_step_test_arguments = ResumablePipelineTestCase(
tape3,
data_inputs,
expected_outputs,
[
("a", FitTransformCallbackStep(tape3.callback, tape3_fit.callback, ["1"])),
("b", SomeCheckpointStep(data_container=dc)
),
("c", FitTransformCallbackStep(tape3.callback, tape3_fit.callback, ["2"])),
("d", FitTransformCallbackStep(tape3.callback, tape3_fit.callback, ["3"]))
],
["2", "3"])
tape4 = TapeCallbackFunction()
tape4_fit = TapeCallbackFunction()
tape_checkpoint_saved_after_second_step_test_arguments = ResumablePipelineTestCase(
tape4,
data_inputs,
expected_outputs,
[
("a", FitTransformCallbackStep(tape4.callback, tape4_fit.callback, ["1"])),
("b", FitTransformCallbackStep(tape4.callback, tape4_fit.callback, ["2"])),
("c", SomeCheckpointStep(data_container=dc)
),
("d", FitTransformCallbackStep(tape4.callback, tape4_fit.callback, ["3"]))
],
["3"])
tape5 = TapeCallbackFunction()
tape5_fit = TapeCallbackFunction()
tape8,
data_inputs,
expected_outputs,
[
("a", FitTransformCallbackStep(tape8.callback, tape8_fit.callback, ["1"])),
ResumablePipeline([
("b", FitTransformCallbackStep(tape8.callback, tape8_fit.callback, ["2"])),
("d", SomeCheckpointStep(data_container=dc)
),
("e", FitTransformCallbackStep(tape8.callback, tape8_fit.callback, ["3"])),
]),
("f", FitTransformCallbackStep(tape8.callback, tape8_fit.callback, ["4"])),
],
["3", "4"])
tape9 = TapeCallbackFunction()
tape9_fit = TapeCallbackFunction()
tape_checkpoint_saved_inside_subpipeline_of_subpipeline = ResumablePipelineTestCase(
tape9,
data_inputs,
expected_outputs,
[
("a", FitTransformCallbackStep(tape9.callback, tape9_fit.callback, ["1"])),
ResumablePipeline([
("b", FitTransformCallbackStep(tape9.callback, tape9_fit.callback, ["2"])),
ResumablePipeline([
("e", FitTransformCallbackStep(tape9.callback, tape9_fit.callback, ["3"])),
("d", SomeCheckpointStep(data_container=dc)
),
("f", FitTransformCallbackStep(tape9.callback, tape9_fit.callback, ["4"])),
]),
("g", FitTransformCallbackStep(tape9.callback, tape9_fit.callback, ["5"])),
def test_when_no_hyperparams_and_saved_same_pipeline_should_load_checkpoint_pickle(tmpdir: LocalPath):
# Given
tape = TapeCallbackFunction()
# When
pipeline_save = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=DefaultCheckpoint(),
tape=TapeCallbackFunction()
)
pipeline_save.fit_transform(data_inputs, expected_outputs)
pipeline_load = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=DefaultCheckpoint(),
tape=tape
)
pipeline_load, actual_data_inputs = pipeline_load.fit_transform(data_inputs, expected_outputs)