Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@task
def parse_to_string(p_date=None):
# type: (datetime ) -> str
split = p_date.isoformat().split("T")
return "SEPARATOR".join(split)
def test_log_metric(self):
@task
def t_f_metric(a=5):
log_metric("t_f", a)
t = assert_run_task(t_f_metric.t())
assert (
t.ctrl.last_task_run.meta_files.get_metric_target("t_f").read().split()[1]
== "5"
)
@task()
def t_f_2nd(a):
# type: (DataList[str])-> List[str]
return t_f_b(a)
@task
def task_from_config(parameter_from_config, expected):
assert parameter_from_config == expected
@task
def t_f_df(a):
# type: (DataFrame) -> int
assert_frame_equal(pandas_data_frame, a)
return 1
@task
def t_A(p_str="check", p_int=2):
# type: (str,int) -> str
logging.info("I am running")
return p_str * p_int
@task
def t_f_a(t_input, t_param, t_default="d1"):
# type: (DataList[str], str, str) -> DataList[str]
# adds dressing
assert t_default == "d1"
assert t_param == "d2"
log_metric("t_input", len(t_input))
logger.info("Got string: %s", t_input)
return t_input[:2]
@task
def inline_dict_of_dict(dfs):
logging.info("Shape: %s", dfs["df_a"]["df_b"].shape)
return dfs["df_a"]
@task
def t_f(t_input, t_int, t_time):
# type: (DataList[str], int, datetime.datetime) -> List[str]
return ["1"]
from dbnd import auto_namespace, band, parameter
from dbnd._core.constants import CloudType
from dbnd_test_scenarios.test_common.task.factories import FooConfig, TTask
auto_namespace(scope=__name__)
class FirstTask(TTask):
# exists only for local
foo = parameter(default="FooConfig")[FooConfig]
param = parameter(default="FirstTask.inline.param")[str]
class SecondTask(FirstTask):
defaults = {
FooConfig.bar: "SecondTask.defaults.bar",
FooConfig.quz: "SecondTask.defaults.quz",
}
@band(defaults={FooConfig.bar: "first_pipeline.defaults.bar"})
def first_pipeline():
return SecondTask(param="first_pipeline.band.param").t_output
@band(defaults={FooConfig.quz: "second_pipeline.defaults.quz"})