Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def cp_object(self, src, dst):
check.str_param(src, 'src')
check.str_param(dst, 'dst')
source_blob = self.bucket_obj.blob(src)
self.bucket_obj.copy_blob(source_blob, self.bucket_obj, dst)
return ObjectStoreOperation(
op=ObjectStoreOperationType.CP_OBJECT,
key=self.uri_for_key(src),
dest_key=self.uri_for_key(dst),
object_store_name=self.name,
)
def __new__(cls, label, description, entry_data):
return super(EventMetadataEntry, cls).__new__(
cls,
check.str_param(label, 'label'),
check.opt_str_param(description, 'description'),
check.inst_param(entry_data, 'entry_data', EntryDataUnion),
)
def _create_input_thunk_execution_step(pipeline_name, solid, input_def, input_spec, handle):
check.str_param(pipeline_name, 'pipeline_name')
check.inst_param(solid, 'solid', Solid)
check.inst_param(input_def, 'input_def', InputDefinition)
check.opt_inst_param(handle, 'handle', SolidHandle)
check.invariant(input_def.runtime_type.input_schema)
def _fn(step_context, _inputs):
value = input_def.runtime_type.input_schema.construct_from_config_value(
step_context, input_spec
)
yield Result(output_name=INPUT_THUNK_OUTPUT, value=value)
return ExecutionStep(
pipeline_name=pipeline_name,
key_suffix='inputs.' + input_def.name + '.read',
step_inputs=[],
def format_description(desc, indent):
check.str_param(desc, 'desc')
check.str_param(indent, 'indent')
desc = re.sub(r'\s+', ' ', desc)
dedented = textwrap.dedent(desc)
wrapper = textwrap.TextWrapper(initial_indent='', subsequent_indent=indent)
filled = wrapper.fill(dedented)
return filled
def define_specific_context_config_cls(name, config_field, resources):
check.str_param(name, 'name')
check_opt_field_param(config_field, 'config_field')
check.dict_param(resources, 'resources', key_type=str, value_type=ResourceDefinition)
return SystemNamedDict(
name,
fields=remove_none_entries(
{
'config': config_field,
'resources': Field(
define_resource_dictionary_cls('{name}.Resources'.format(name=name), resources)
),
'persistence': Field(
SystemNamedSelector(
'{name}.Persistence'.format(name=name), {'file': Field(Dict({}))}
)
def config_type_keyed(self, key):
check.str_param(key, 'key')
return self.config_type_dict_by_key[key]
def __init__(self, name, fields, description=None, type_attributes=DEFAULT_TYPE_ATTRIBUTES):
check.str_param(name, 'name')
fields = process_user_facing_fields_dict(fields, 'NamedSelector named "{}"'.format(name))
super(NamedSelector, self).__init__(
key=name,
name=name,
kind=ConfigTypeKind.SELECTOR,
fields=fields,
description=description,
type_attributes=type_attributes,
)
def get_validated_config(graphene_info, dauphin_pipeline, environment_dict, mode):
check.str_param(mode, 'mode')
pipeline = dauphin_pipeline.get_dagster_pipeline()
environment_schema = create_environment_schema(pipeline, mode)
validated_config = evaluate_config(
environment_schema.environment_type, environment_dict, pipeline
)
if not validated_config.success:
raise UserFacingGraphQLError(
graphene_info.schema.type_named('PipelineConfigValidationInvalid')(
pipeline=dauphin_pipeline,
errors=[
graphene_info.schema.type_named(
'PipelineConfigValidationError'
def output_def_named(self, name):
check.str_param(name, 'name')
return self._output_dict[name]
def format_description(desc, indent):
check.str_param(desc, 'desc')
check.str_param(indent, 'indent')
desc = re.sub(r'\s+', ' ', desc)
dedented = textwrap.dedent(desc)
wrapper = textwrap.TextWrapper(initial_indent='', subsequent_indent=indent)
filled = wrapper.fill(dedented)
return filled