Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def aws_write(bucket, key, data, creds=(None, None), endpoint=None):
client = StorageEngines.boto3.client('s3',
aws_access_key_id=creds[0],
aws_secret_access_key=creds[1],
endpoint_url=endpoint
)
with open(data, 'rb') as fp:
client.upload_fileobj(fp, bucket, key)
def aws_list(bucket, key, creds=(None, None), limit=None, endpoint=None):
client = StorageEngines.boto3.client('s3',
aws_access_key_id=creds[0],
aws_secret_access_key=creds[1],
endpoint_url=endpoint
)
objs = client.list_objects_v2(Bucket=bucket, Prefix=key)
if objs and 'Contents' in objs:
ret = []
if limit:
for obj in objs['Contents']:
ret.append(obj['Key'])
limit -= 1
if not limit:
break
return ret
else: