Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(
self,
key,
name,
kind,
type_attributes=DEFAULT_TYPE_ATTRIBUTES,
description=None,
type_params=None,
):
self.key = check.str_param(key, 'key')
self.name = check.opt_str_param(name, 'name')
self.kind = check.inst_param(kind, 'kind', ConfigTypeKind)
self._description = check.opt_str_param(description, 'description')
self.type_attributes = check.inst_param(
type_attributes, 'type_attributes', ConfigTypeAttributes
)
self.type_params = (
check.list_param(type_params, 'type_params', of_type=ConfigType)
if type_params
else None
)
def __new__(
cls, name, type_ref, is_optional, default_provided, default_value_as_str, description
):
return super(ConfigFieldMeta, cls).__new__(
cls,
name=check.opt_str_param(name, 'name'),
type_ref=check.inst_param(type_ref, 'type_ref', TypeRef),
is_optional=check.bool_param(is_optional, 'is_optional'),
default_provided=check.bool_param(default_provided, 'default_provided'),
default_value_as_str=check.opt_str_param(default_value_as_str, 'default_value_as_str'),
description=check.opt_str_param(description, 'description'),
)
def create_execution_plan(pipeline, environment_dict=None, mode=None):
check.inst_param(pipeline, 'pipeline', PipelineDefinition)
environment_dict = check.opt_dict_param(environment_dict, 'environment_dict', key_type=str)
check.opt_str_param(mode, 'mode')
environment_config = create_environment_config(pipeline, environment_dict)
return ExecutionPlan.build(pipeline, environment_config)
def __new__(
cls,
event_type_value,
pipeline_name,
step_key=None,
solid_handle=None,
step_kind_value=None,
logging_tags=None,
event_specific_data=None,
message=None,
):
return super(DagsterEvent, cls).__new__(
cls,
check.str_param(event_type_value, 'event_type_value'),
check.str_param(pipeline_name, 'pipeline_name'),
check.opt_str_param(step_key, 'step_key'),
check.opt_inst_param(solid_handle, 'solid_handle', SolidHandle),
check.opt_str_param(step_kind_value, 'step_kind_value'),
check.opt_dict_param(logging_tags, 'logging_tags'),
_validate_event_specific_data(DagsterEventType(event_type_value), event_specific_data),
check.opt_str_param(message, 'message'),
)
def _make_airflow_dag(
handle,
pipeline_name,
environment_dict=None,
mode=None,
instance=None,
dag_id=None,
dag_description=None,
dag_kwargs=None,
op_kwargs=None,
operator=DagsterPythonOperator,
):
check.inst_param(handle, 'handle', ExecutionTargetHandle)
check.str_param(pipeline_name, 'pipeline_name')
environment_dict = check.opt_dict_param(environment_dict, 'environment_dict', key_type=str)
mode = check.opt_str_param(mode, 'mode')
# Default to use the (persistent) system temp directory rather than a seven.TemporaryDirectory,
# which would not be consistent between Airflow task invocations.
instance = (
check.inst_param(instance, 'instance', DagsterInstance)
if instance
else DagsterInstance.get(fallback_storage=seven.get_system_temp_directory())
)
# Only used for Airflow; internally we continue to use pipeline.name
dag_id = check.opt_str_param(dag_id, 'dag_id', _rename_for_airflow(pipeline_name))
dag_description = check.opt_str_param(
dag_description, 'dag_description', _make_dag_description(pipeline_name)
)
check.subclass_param(operator, 'operator', BaseOperator)
def uri_for_key(self, key, protocol=None):
check.str_param(key, 'key')
protocol = check.opt_str_param(protocol, 'protocol', default='s3://')
return protocol + self.bucket + '/' + '{key}'.format(key=key)
def __new__(cls, success, label=None, description=None, metadata_entries=None):
return super(ExpectationResult, cls).__new__(
cls,
success=check.bool_param(success, 'success'),
label=check.opt_str_param(label, 'label', 'result'),
description=check.opt_str_param(description, 'description'),
metadata_entries=check.opt_list_param(
metadata_entries, metadata_entries, of_type=EventMetadataEntry
),
def __new__(cls, run_id=None, pipeline=None, status=None, tag_key=None, tag_value=None):
return super(PipelineRunsFilter, cls).__new__(
cls,
run_id=check.opt_str_param(run_id, 'run_id'),
tag_key=check.opt_str_param(tag_key, 'tag_key'),
tag_value=check.opt_str_param(tag_value, 'tag_value'),
pipeline=check.opt_str_param(pipeline, 'pipeline'),
status=status,
)
def __new__(cls, path, description=None):
return super(Materialization, cls).__new__(
cls,
path=check.opt_str_param(path, 'path'),
description=check.opt_str_param(description, 'description'),
)
name='PandasDataFrame',
description=\'\'\'Two-dimensional size-mutable, potentially heterogeneous
tabular data structure with labeled axes (rows and columns).
See http://pandas.pydata.org/\'\'\',
input_hydration_config=dataframe_input_schema,
output_materialization_config=dataframe_output_schema,
type_check=df_type_check
)
See, e.g., ``dagster_pandas.DataFrame`` and ``dagster_pyspark.SparkRDD`` for fuller worked
examples.
'''
return _decorate_as_dagster_type(
bare_cls=check.type_param(existing_type, 'existing_type'),
name=check.opt_str_param(name, 'name', existing_type.__name__),
description=description,
input_hydration_config=input_hydration_config,
output_materialization_config=output_materialization_config,
serialization_strategy=serialization_strategy,
auto_plugins=auto_plugins,
type_check=type_check,
)