Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_fit_for_each_should_fit_all_steps_for_each_data_inputs_expected_outputs():
tape = TapeCallbackFunction()
p = Pipeline([
ForEachDataInput(Pipeline([
FitCallbackStep(tape.callback, ["1"]),
FitCallbackStep(tape.callback, ["2"]),
]))
])
data_inputs = [[0, 1], [1, 2]]
expected_outputs = [[2, 3], [4, 5]]
p = p.fit(data_inputs, expected_outputs)
assert isinstance(p, Pipeline)
assert tape.get_name_tape() == ["1", "2", "1", "2"]
assert tape.data == [([0, 1], [2, 3]), ([0, 1], [2, 3]), ([1, 2], [4, 5]), ([1, 2], [4, 5])]
def choose_one_step_single_step_chosen_fit():
a_callback = TapeCallbackFunction()
b_callback = TapeCallbackFunction()
c_callback = TapeCallbackFunction()
d_callback = TapeCallbackFunction()
return NeuraxleTestCase(
pipeline=Pipeline([
ChooseOneStepOf([
('a', FitTransformCallbackStep(a_callback, c_callback, transform_function=lambda di: di * 2)),
('b', FitTransformCallbackStep(b_callback, d_callback, transform_function=lambda di: di * 2))
]),
]),
callbacks=[a_callback, c_callback, b_callback, d_callback],
expected_callbacks_data=[
[],
(DATA_INPUTS, EXPECTED_OUTPUTS),
[],
[]
],
hyperparams={
'ChooseOneOrManyStepsOf__choice': 'a'
},
expected_processed_outputs=np.array([0, 2, 4, 6, 8, 10, 12, 14, 16, 18])
)
def choose_one_step_single_step_chosen_fit():
a_callback = TapeCallbackFunction()
b_callback = TapeCallbackFunction()
c_callback = TapeCallbackFunction()
d_callback = TapeCallbackFunction()
return NeuraxleTestCase(
pipeline=Pipeline([
ChooseOneStepOf([
('a', FitTransformCallbackStep(a_callback, c_callback, transform_function=lambda di: di * 2)),
('b', FitTransformCallbackStep(b_callback, d_callback, transform_function=lambda di: di * 2))
]),
]),
callbacks=[a_callback, c_callback, b_callback, d_callback],
expected_callbacks_data=[
[],
(DATA_INPUTS, EXPECTED_OUTPUTS),
[],
[]
],
hyperparams={
'ChooseOneOrManyStepsOf__choice': 'a'
},
expected_processed_outputs=np.array([0, 2, 4, 6, 8, 10, 12, 14, 16, 18])
)
class SomeTruncableStep(TruncableSteps):
def __init__(self):
TruncableSteps.__init__(self,
hyperparams=HYPERPARAMETERS,
hyperparams_space=HYPERPARAMETERS_SPACE,
steps_as_tuple=(SomeStepWithHyperparams(), SomeStepWithHyperparams())
)
def transform(self, data_inputs):
pass
def fit(self, data_inputs, expected_outputs=None):
pass
class SomeSplitStep(NonFittableMixin, BaseStep):
def fit(self, data_inputs, expected_outputs=None) -> 'NonFittableMixin':
pass
def fit_transform(self, data_inputs, expected_outputs=None):
pass
def transform(self, data_inputs):
pass
def test_resumable_pipeline_fit_should_save_all_fitted_pipeline_steps(tmpdir: LocalPath):
p = ResumablePipeline([
(SOME_STEP_1, MultiplyByN(multiply_by=2)),
(PIPELINE_2, ResumablePipeline([
(SOME_STEP_2, MultiplyByN(multiply_by=4)),
(CHECKPOINT, DefaultCheckpoint()),
(SOME_STEP_3, MultiplyByN(multiply_by=6))
]))
], cache_folder=tmpdir)
p.name = ROOT
p = p.fit(
np.array(range(10)),
np.array(range(10))
)
not_saved_paths = [create_some_step3_path(tmpdir)]
saved_paths = [create_root_path(tmpdir), create_pipeline2_path(tmpdir), create_some_step1_path(tmpdir),
create_some_step2_path(tmpdir), create_some_checkpoint_path(tmpdir)]
def test_automl_sequential_wrapper(tmpdir):
# Given
data_inputs = np.array(range(100))
expected_outputs = np.array(range(100, 200))
hyperparameter_space = HyperparameterSpace({
'multiplication_1__multiply_by': RandInt(1, 3),
'multiplication_2__multiply_by': RandInt(1, 3),
'multiplication_3__multiply_by': RandInt(1, 3),
})
pipeline = Pipeline([
('multiplication_1', MultiplyByN()),
('multiplication_2', MultiplyByN()),
('multiplication_3', MultiplyByN())
], cache_folder=tmpdir).set_hyperparams_space(hyperparameter_space)
auto_ml = RandomSearch(pipeline, hyperparams_repository=HyperparamsJSONRepository(tmpdir), n_iter=100)
# When
auto_ml: AutoMLSequentialWrapper = auto_ml.fit(data_inputs, expected_outputs)
best_model: Pipeline = auto_ml.get_best_model()
predicted_outputs = best_model.transform(data_inputs)
def test_resumable_pipeline_fit_should_save_all_fitted_pipeline_steps(tmpdir: LocalPath):
p = ResumablePipeline([
(SOME_STEP_1, MultiplyByN(multiply_by=2)),
(PIPELINE_2, ResumablePipeline([
(SOME_STEP_2, MultiplyByN(multiply_by=4)),
(CHECKPOINT, DefaultCheckpoint()),
(SOME_STEP_3, MultiplyByN(multiply_by=6))
]))
], cache_folder=tmpdir)
p.name = ROOT
p = p.fit(
np.array(range(10)),
np.array(range(10))
)
not_saved_paths = [create_some_step3_path(tmpdir)]
saved_paths = [create_root_path(tmpdir), create_pipeline2_path(tmpdir), create_some_step1_path(tmpdir),
create_some_step2_path(tmpdir), create_some_checkpoint_path(tmpdir)]
for p in saved_paths:
assert os.path.exists(p)
for p in not_saved_paths:
def test_when_hyperparams_and_saved_no_pipeline_should_not_load_checkpoint_pickle(tmpdir: LocalPath):
# Given
tape = TapeCallbackFunction()
pickle_checkpoint_step = DefaultCheckpoint()
# When
pipeline_save = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=Identity(),
tape=TapeCallbackFunction(),
hyperparameters=HyperparameterSamples({"a__learning_rate": 1}),
different=True,
save_pipeline=False
)
pipeline_save.fit_transform(data_inputs, expected_outputs)
pipeline_load = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=pickle_checkpoint_step,
tape=tape,
hyperparameters=HyperparameterSamples({"a__learning_rate": 1})
)
pipeline_load, actual_data_inputs = pipeline_load.fit_transform(data_inputs, expected_outputs)
# Then
actual_tape = tape.get_name_tape()
def test_when_no_hyperparams_and_saved_same_pipeline_should_load_checkpoint_pickle(tmpdir: LocalPath):
# Given
tape = TapeCallbackFunction()
# When
pipeline_save = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=DefaultCheckpoint(),
tape=TapeCallbackFunction()
)
pipeline_save.fit_transform(data_inputs, expected_outputs)
pipeline_load = create_pipeline(
tmpdir=tmpdir,
pickle_checkpoint_step=DefaultCheckpoint(),
tape=tape
)
pipeline_load, actual_data_inputs = pipeline_load.fit_transform(data_inputs, expected_outputs)
# Then
actual_tape = tape.get_name_tape()
assert np.array_equal(actual_data_inputs, data_inputs)
assert actual_tape == EXPECTED_TAPE_AFTER_CHECKPOINT
def create_test_case_single_step_choosen():
a_callback = TapeCallbackFunction()
b_callback = TapeCallbackFunction()
return NeuraxleTestCase(
pipeline=Pipeline([
ChooseOneOrManyStepsOf([
('a', TransformCallbackStep(a_callback, transform_function=lambda di: di * 2)),
('b', TransformCallbackStep(b_callback, transform_function=lambda di: di * 2))
]),
]),
callbacks=[a_callback, b_callback],
expected_callbacks_data=[
DATA_INPUTS,
[]
],
hyperparams={
'ChooseOneOrManyStepsOf__a__enabled': True,
'ChooseOneOrManyStepsOf__b__enabled': False