Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_release_not_implemented(self):
"""Check that `release` does nothing by default"""
data_set = LambdaDataSet(None, None)
data_set.release()
def test_save_not_callable(self):
pattern = (
r"`save` function for LambdaDataSet must be a Callable\. "
r"Object of type `str` provided instead\."
)
with pytest.raises(DataSetError, match=pattern):
LambdaDataSet(None, "save")
first_task: first_op,
middle_task: middle_op,
last_task: last_op,
}[python_callable]
def operator_arguments(task_id):
args = {"lambda-none-a": {"retries": 1}, "lambda-b-none": {"retries": 2}}
return args.get(task_id, {})
# actually call the runner to do the conversion
dag = Mock()
pipeline = Pipeline([first_node, last_node, middle_node])
catalog = DataCatalog(
{
"a": LambdaDataSet(load=None, save=None),
"b": LambdaDataSet(load=None, save=None),
}
)
AirflowRunner(dag, None, operator_arguments).run(pipeline, catalog)
# check the create task calls
create_task.assert_has_calls(
[
call(first_node, catalog),
call(middle_node, catalog),
call(last_node, catalog),
],
any_order=True,
)
# check the operator constructor calls
operator.assert_has_calls(
def mocked_dataset(mocker):
load = mocker.Mock(return_value=42)
save = mocker.Mock()
return LambdaDataSet(load, save)
def test_release_invocation(self, mocker):
"""Test the basic `release` method invocation"""
mocked_release = mocker.Mock()
data_set = LambdaDataSet(None, None, None, mocked_release)
data_set.release()
mocked_release.assert_called_once_with()
def test_exists_not_implemented(self, caplog):
"""Test calling `exists` on the data set, which didn't implement it"""
catalog = DataCatalog(data_sets={"test": LambdaDataSet(None, None)})
result = catalog.exists("test")
log_record = caplog.records[0]
assert log_record.levelname == "WARNING"
assert (
"`exists()` not implemented for `LambdaDataSet`. "
"Assuming output does not exist." in log_record.message
)
assert result is False
pass # pragma: no cover
def _dummy_save():
pass # pragma: no cover
def _dummy_exists():
return False # pragma: no cover
def _dummy_release():
pass # pragma: no cover
assert "LambdaDataSet(load=)" in str(
LambdaDataSet(_dummy_load, None)
)
assert "LambdaDataSet(save=)" in str(
LambdaDataSet(None, _dummy_save)
)
assert "LambdaDataSet(exists=)" in str(
LambdaDataSet(None, None, _dummy_exists)
)
assert (
"LambdaDataSet(release=)"
in str(LambdaDataSet(None, None, None, _dummy_release))
)
# __init__ keys alphabetically sorted, None values not shown
expected = (
"LambdaDataSet(exists=, "
"load=, "
"save=)"
)
actual = str(LambdaDataSet(_dummy_load, _dummy_save, _dummy_exists, None))
def test_exists_invocation(self, mocker):
"""Test the basic `exists` method invocation"""
mocked_exists = mocker.Mock(return_value=True)
data_set = LambdaDataSet(None, None, mocked_exists)
result = data_set.exists()
mocked_exists.assert_called_once_with()
assert result is True
def prepare_missing_csv(context):
sample_csv = "/var/missing_csv_file.csv"
context.csv_data_set = LambdaDataSet(
load=lambda: pd.read_csv(sample_csv), save=None
)
def data_set_with_no_save(context):
context.csv_data_set = LambdaDataSet(load=None, save=None)