Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@solid
def cache_file_from_s3(context, s3_coord: S3Coordinate) -> str:
# we default the target_key to the last component of the s3 key.
target_key = s3_coord['key'].split('/')[-1]
with get_temp_file_name() as tmp_file:
boto3.client('s3').download_file(
Bucket=s3_coord['bucket'], Key=s3_coord['key'], Filename=tmp_file
)
file_cache = context.resources.file_cache
with open(tmp_file, 'rb') as tmp_file_object:
# returns a handle rather than a path
file_handle = file_cache.write_file_object(target_key, tmp_file_object)
return file_handle.path
@solid(config_field=Field(Int))
def add_n(context, x: Int) -> int:
return x + context.solid_config
@solid(config=Field(String))
def configured(context):
called['configured'] = True
assert context.solid_config == 'yes'
@solid(outputs=[OutputDefinition(types.String)])
def str_as_output(_info):
return 'bar'
@solid(config={'inner': Field(String)})
def inner_solid(context):
return context.solid_config['inner']
@solid(output_defs=[])
def success_expectation_solid(_context):
yield ExpectationResult(success=True, description='This is always true.')
@solid
def check_context_stack(info):
assert info.context.get_context_value('foo') == 'bar'
assert info.context.get_context_value('quux') == 'baaz'
called['yup'] = True
@solid(
input_defs=[
InputDefinition('in_1', Int),
InputDefinition('in_2', Int),
InputDefinition('in_3', Int),
InputDefinition('in_4', Int),
],
output_defs=[OutputDefinition(Int)],
)
def total(_, in_1, in_2, in_3, in_4):
return in_1 + in_2 + in_3 + in_4
@solid(required_resource_keys={'volume'})
def unzip_files(context, file_names: List[str], source_dir: str, target_dir: str) -> List[str]:
# mount dirs onto volume
zipfile_location = os.path.join(context.resources.volume, source_dir)
csv_location = os.path.join(context.resources.volume, target_dir)
if not os.path.exists(csv_location):
os.mkdir(csv_location)
return [
_unzip_file(os.path.join(zipfile_location, file_name), csv_location)
for file_name in file_names
]
@solid(
name='download_from_s3_to_file',
config={
'bucket': Field(String, description='S3 bucket name'),
'key': Field(String, description='S3 key name'),
'target_folder': Field(
Path, description=('Specifies the path at which to download the object.')
),
'skip_if_present': Field(Bool, is_optional=True, default_value=False),
},
description='Downloads an object from S3 to a file.',
output_defs=[
OutputDefinition(FileExistsAtPath, description='The path to the downloaded object.')
],
required_resource_keys={'s3'},
)
def download_from_s3_to_file(context):