Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def define_read_csv_solid(name):
def _t_fn(info, _inputs):
yield Result(pd.read_csv(info.config['path']))
return SolidDefinition(
name=name,
inputs=[],
outputs=[OutputDefinition()],
config_field=types.Field(types.Dict({
'path': Field(types.Path)
})),
transform_fn=_t_fn
)
assert _hash({'some_int': Field(int)}) != _hash({'another_int': Field(int)})
assert _hash({'same_name': Field(int)}) != _hash({'same_name': Field(str)})
assert _hash({'same_name': Field(int)}) != _hash({'same_name': Field(int, is_optional=True)})
assert _hash({'same_name': Field(int)}) != _hash(
{'same_name': Field(int, is_optional=True, default_value=2)}
)
assert _hash({'same_name': Field(int, is_optional=True)}) != _hash(
{'same_name': Field(int, is_optional=True, default_value=2)}
)
assert _hash({'same_name': Field(int)}) != _hash({'same_name': Field(int, description='desc')})
config={'some_config': Field(String)},
)
def solid_with_context(context):
did_get['yep'] = context.solid_config
@solid(config={'foo': Field(String)})
def node_a(context):
return context.solid_config['foo']
'delimiter': Field(
String,
default_value=',',
is_optional=True,
description=('A one-character string used to separate fields.'),
)
},
required_resource_keys={'volume'},
)
def consolidate_csv_files(context, input_file_names: List[str], source_dir: str) -> DataFrame:
# mount dirs onto volume
csv_file_location = os.path.join(context.resources.volume, source_dir)
if not os.path.exists(csv_file_location):
os.mkdir(csv_file_location)
# There must be a header in all of these dataframes or pandas won't know how to concatinate dataframes.
dataset = concat(
[
)
dimensions = Field(
List[
Dict(
fields={
'Key': Field(String, description='The dimension name.', is_optional=False),
'Value': Field(String, description='The dimension value.', is_optional=True),
}
)
],
description='''A CloudWatch metric dimension.''',
is_optional=True,
)
trigger = Field(
Dict(
fields={
'CloudWatchAlarmDefinition': Field(
Dict(
fields={
'ComparisonOperator': Field(
EmrComparisonOperator,
description='''Determines how the metric specified by MetricName is
compared to the value specified by Threshold.''',
is_optional=False,
),
'EvaluationPeriods': Field(
Int,
description='''The number of periods, expressed in seconds using
Period, during which the alarm condition must exist before the alarm
triggers automatic scaling activity. The default value is 1.''',
)
}
),
description='''The runtime logging config of the job.''',
is_optional=True,
),
'properties': Field(
PermissiveDict(),
description='''Optional. A mapping of property names to values, used
to configure Pig. Properties that conflict with values set by the
Cloud Dataproc API may be overwritten. Can include properties set in
/etc/hadoop/conf/*-site.xml, /etc/pig/conf/pig.properties, and
classes in user code.''',
is_optional=True,
),
'continueOnFailure': Field(
Bool,
description='''Optional. Whether to continue executing queries if a
query fails. The default value is false. Setting to true can be
useful when executing independent parallel queries.''',
is_optional=True,
),
}
),
description='''A Cloud Dataproc job for running Apache Pig
(https://pig.apache.org/) queries on YARN.''',
is_optional=True,
),
'hiveJob': Field(
Dict(
fields={
'continueOnFailure': Field(
@solid(config_field=Field(types.Any))
def double_the_word(info):
return info.config['word'] * 2
def _define_shared_fields():
'''The following fields are shared between both QueryJobConfig and LoadJobConfig.
'''
clustering_fields = Field(
List[String],
description='''Fields defining clustering for the table
(Defaults to None).
Clustering fields are immutable after table creation.
''',
is_optional=True,
)
create_disposition = Field(
BQCreateDisposition,
description='''Specifies behavior for creating tables.
See https://g.co/cloud/bigquery/docs/reference/rest/v2/jobs#configuration.query.createDisposition
''',
is_optional=True,
'Key': Field(String, description='The dimension name.', is_optional=False),
'Value': Field(String, description='The dimension value.', is_optional=True),
}
)
],
description='''A CloudWatch metric dimension.''',
is_optional=True,
)
trigger = Field(
Dict(
fields={
'CloudWatchAlarmDefinition': Field(
Dict(
fields={
'ComparisonOperator': Field(
EmrComparisonOperator,
description='''Determines how the metric specified by MetricName is
compared to the value specified by Threshold.''',
is_optional=False,
),
'EvaluationPeriods': Field(
Int,
description='''The number of periods, expressed in seconds using
Period, during which the alarm condition must exist before the alarm
triggers automatic scaling activity. The default value is 1.''',
is_optional=True,
),
'MetricName': Field(
String,
description='''The name of the CloudWatch metric that is watched to
determine an alarm condition.''',