Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return num
@lambda_solid(input_defs=[InputDefinition('num')], output_def=OutputDefinition(Int))
def load_num(num):
return num + 3
@composite_solid(output_defs=[OutputDefinition(Int)])
def test():
return load_num(
num=canonicalize_num(
num=subsample_num(num=ingest_num(num=unzip_num(num=download_num())))
)
)
result = execute_pipeline(
PipelineDefinition(solid_defs=[test]),
{'solids': {'test': {'solids': {'download_num': {'config': 123}}}}},
)
assert result.result_for_handle('test.canonicalize_num').output_value() == 123
assert result.result_for_handle('test.load_num').output_value() == 126
def define_part_eight_step_three_pipeline():
return PipelineDefinition(
name='part_eight_step_three_pipeline',
solids=[typed_double_word_mismatch, count_letters],
dependencies={
'count_letters': {
'word': DependencyDefinition('typed_double_word_mismatch')
}
def define_part_nine_step_one_pipeline():
return PipelineDefinition(
name='part_nine_step_one_pipeline',
solids=define_contextless_solids(),
dependencies={
'add_ints': {
'num_one': DependencyDefinition('ingest_a'),
'num_two': DependencyDefinition('ingest_b'),
},
'mult_ints': {
'num_one': DependencyDefinition('ingest_a'),
'num_two': DependencyDefinition('ingest_b'),
},
def create_hello_world_solid_composed_pipeline():
def transform_fn(_context, inputs):
num_df = inputs['num_df']
num_df['sum'] = num_df['num1'] + num_df['num2']
return num_df
hello_world = single_output_transform(
name='hello_world',
inputs=[InputDefinition('num_df')],
transform_fn=transform_fn,
output=OutputDefinition(),
)
return PipelineDefinition(
solids=[define_read_csv_solid('read_hello_world'), hello_world],
dependencies={'hello_world': {
'num_df': DependencyDefinition('read_hello_world')
}}
def test_files_default_config():
pipeline_def = PipelineDefinition(name='pipeline', solid_defs=[])
env_type = create_environment_type(pipeline_def)
assert 'storage' in env_type.fields
config_value = throwing_validate_config_value(env_type, {})
assert 'storage' not in config_value
def setup_json_file_logger(tf_name, name='foo', level=logging.DEBUG):
logger_def = define_json_file_logger(name, tf_name, level)
init_logger_context = InitLoggerContext({}, PipelineDefinition([]), logger_def, '')
return logger_def.logger_fn(init_logger_context)
def test_custom_contexts():
@solid(inputs=[], outputs=[OutputDefinition()])
def custom_context_transform(context):
assert context.resources == {'field_one': 'value_two'}
pipeline = PipelineDefinition(
solids=[custom_context_transform],
context_definitions={
'custom_one': PipelineContextDefinition(
config_field=Field(Dict({'field_one': Field(dagster_type=String)})),
context_fn=lambda init_context: ExecutionContext(
resources=init_context.context_config
),
),
'custom_two': PipelineContextDefinition(
config_field=Field(Dict({'field_one': Field(dagster_type=String)})),
context_fn=lambda init_context: ExecutionContext(
resources=init_context.context_config
),
),
},
)
def define_part_eight_step_one_pipeline():
return PipelineDefinition(
name='part_eight_step_one_pipeline',
solids=[double_the_word_with_typed_config, count_letters],
dependencies={
'count_letters': {
'word': DependencyDefinition(
'double_the_word_with_typed_config'
)
def define_hello_world_explicit_yield_pipeline():
return PipelineDefinition(
name='hello_world_explicit_yield_pipeline', solid_defs=[define_hello_world_explicit_yield()]
)
def execute_pipeline(self, _, pipeline, pipeline_run, raise_on_error):
check.inst_param(pipeline, 'pipeline', PipelineDefinition)
try:
return execute_pipeline(
pipeline,
pipeline_run.config,
run_config=RunConfig(
pipeline_run.run_id,
mode=pipeline_run.mode,
event_callback=pipeline_run.handle_new_event,
executor_config=InProcessExecutorConfig(raise_on_error=raise_on_error),
reexecution_config=pipeline_run.reexecution_config,
step_keys_to_execute=pipeline_run.step_keys_to_execute,
),
)
except Exception: # pylint: disable=broad-except
if raise_on_error:
six.reraise(*sys.exc_info())