Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@pipeline
def t_pipeline(p_str="from_pipeline_ctor"):
# type: (str) -> str
a = t_A(p_str)
return t_B(a)
@pipeline
def my_xcom_pipeline(p_date=None):
# type: ( datetime ) -> Tuple[str, str]
corrected_date = date_correction(p_date=p_date)
string_date = convert_to_string(p_date=corrected_date)
parsed_string = my_second_pipeline(p_date=corrected_date)
return parsed_string, string_date
@pipeline
def deep_wide_pipe_int(width=25, depth=25, pause=0):
# type: (int, int, int) -> int
return large_pipe_int(width=width, depth=depth, pause=pause)
@pipeline
def my_pipeline_search(x_range=3):
results = {}
for x in range(x_range):
t2, t3 = my_pipeline(p_str=str(x))
results[x] = t3
return results
@pipeline
def experiment_concat():
# if type is specified, we can mix between strings and task outputs
l = ["aa", gen_token("bbb")]
return concat_tokens(l)
@pipeline(result=("model", "validation"))
def train_model_for_customer(
task_target_date,
data: pd.DataFrame = None,
alpha: float = 1.0,
l1_ratio: float = 0.5,
period=datetime.timedelta(days=7),
selected_features: List[str] = None,
):
if data is None:
partners = fetch_partner_data(task_target_date=task_target_date, period=period)
data = calculate_features(
selected_features=selected_features, raw_data=partners
)
training_set, test_set, validation_set = split_data(raw_data=data)
model = train_model(
@pipeline
def ingest_partner_b(task_target_date):
raw_data = data_source(name="b", task_target_date=task_target_date)
return clean_data_spark(raw_data=raw_data)
@pipeline
def fetch_data(task_target_date, period=datetime.timedelta(days=7)):
all_data = []
for d in period_dates(task_target_date, period):
data = fetch_wine_quality(task_target_date=d)
all_data.append(data)
return data_combine(all_data, sort=True)
@pipeline(result=("model", "validation", "serving"))
def tran_model_and_package():
model, validation = train_model_for_customer()
serving = package_as_docker(model=model)
return model, validation, serving
@pipeline(result=("model", "validation", "serving"))
def predict_wine_quality_package():
model, validation = predict_wine_quality()
serving = package_as_docker(model=model)
return model, validation, serving