Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def define_part_nine_final_pipeline():
return PipelineDefinition(
name='part_nine_final_pipeline',
solids=define_contextful_solids(),
dependencies={
'add_ints': {
'num_one': DependencyDefinition('ingest_a'),
'num_two': DependencyDefinition('ingest_b'),
},
'mult_ints': {
'num_one': DependencyDefinition('ingest_a'),
'num_two': DependencyDefinition('ingest_b'),
},
},
context_definitions={
'local': PipelineContextDefinition(
context_fn=lambda *_args: ExecutionContext.console_logging(
log_level=DEBUG,
resources=PartNineResources(InMemoryStore()),
)
),
'cloud': PipelineContextDefinition(
context_fn=lambda info: ExecutionContext.console_logging(
def diamond_deps():
return {
'A': {'A_input': DependencyDefinition('A_source')},
'B': {'A': DependencyDefinition('A')},
'C': {'A': DependencyDefinition('A')},
'D': {'B': DependencyDefinition('B'), 'C': DependencyDefinition('C')},
}
@lambda_solid
def return_one():
return 1
@lambda_solid(input_defs=[InputDefinition('num', Int)], output_def=OutputDefinition(Int))
def add_one(num):
return num + 1
@lambda_solid
def user_throw_exception():
raise Exception('whoops')
pipeline = PipelineDefinition(
name='basic_external_plan_execution',
solid_defs=[return_one, add_one, user_throw_exception],
dependencies={'add_one': {'num': DependencyDefinition('return_one')}},
)
return pipeline
)
def adder(_context, one, two, three):
assert one == 1
assert two == 2
assert three == 3
return one + two + three
pipeline = PipelineDefinition(
name='input_test',
solid_defs=[emit_one, emit_two, emit_three, emit_nothing, adder],
dependencies={
SolidInvocation('emit_nothing', '_one'): {},
SolidInvocation('emit_nothing', '_two'): {},
SolidInvocation('emit_nothing', '_three'): {},
'adder': {
'_one': DependencyDefinition('_one'),
'_two': DependencyDefinition('_two'),
'_three': DependencyDefinition('_three'),
'one': DependencyDefinition('emit_one'),
'two': DependencyDefinition('emit_two'),
'three': DependencyDefinition('emit_three'),
},
},
)
result = execute_pipeline(pipeline)
assert result.success
def diamond_deps():
return {
'A': {'A_input': DependencyDefinition('A_source')},
'B': {'A': DependencyDefinition('A')},
'C': {'A': DependencyDefinition('A')},
'D': {'B': DependencyDefinition('B'), 'C': DependencyDefinition('C')},
}
def define_part_thirteen_step_one_pipeline():
return PipelineDefinition(
name='part_thirteen_step_one_pipeline',
solids=[load_a, load_b, a_plus_b],
dependencies={
'a_plus_b': {
'a': DependencyDefinition('load_a'),
'b': DependencyDefinition('load_b'),
}
return PipelineDefinition(
name='test_events',
solid_defs=[
materialization_and_expectation,
optional_only_one,
should_fail,
should_be_skipped,
],
dependencies={
'optional_only_one': {},
'should_fail': {
'some_input': DependencyDefinition(optional_only_one.name, 'output_one')
},
'should_be_skipped': {
'some_input': DependencyDefinition(optional_only_one.name, 'output_two')
},
return PipelineDefinition(
name='test_events',
solid_defs=[
materialization_and_expectation,
optional_only_one,
should_fail,
should_be_skipped,
],
dependencies={
'optional_only_one': {},
'should_fail': {
'some_input': DependencyDefinition(optional_only_one.name, 'output_one')
},
'should_be_skipped': {
'some_input': DependencyDefinition(optional_only_one.name, 'output_two')
},
def test_pandas_multiple_inputs():
def compute_fn(_context, inputs):
return inputs['num_csv1'] + inputs['num_csv2']
double_sum = _dataframe_solid(
name='double_sum',
input_defs=[InputDefinition('num_csv1', DataFrame), InputDefinition('num_csv2', DataFrame)],
compute_fn=compute_fn,
)
pipe = PipelineDefinition(
solid_defs=[load_num_csv_solid('load_one'), load_num_csv_solid('load_two'), double_sum],
dependencies={
'double_sum': {
'num_csv1': DependencyDefinition('load_one'),
'num_csv2': DependencyDefinition('load_two'),
}
},
)
output_df = execute_pipeline(pipe).result_for_solid('double_sum').output_value()
assert not output_df.empty
assert output_df.to_dict('list') == {'num1': [2, 6], 'num2': [4, 8]}
def define_diamond_dag_pipeline():
return PipelineDefinition(
name='actual_dag_pipeline',
context_definitions=context_definitions,
solids=[solid_a, solid_b, solid_c, solid_d],
dependencies={
'solid_b': {'arg_a': DependencyDefinition('solid_a')},
'solid_c': {'arg_a': DependencyDefinition('solid_a')},
'solid_d': {
'arg_b': DependencyDefinition('solid_b'),
'arg_c': DependencyDefinition('solid_c'),
},