Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
aws_stack.create_kinesis_stream(TEST_CHAIN_STREAM2_NAME, delete=True)
s3.create_bucket(Bucket=TEST_BUCKET_NAME)
# deploy test lambdas connected to Kinesis streams
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON), get_content=True,
libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(func_name=TEST_CHAIN_LAMBDA1_NAME, zip_file=zip_file,
event_source_arn=get_event_source_arn(TEST_CHAIN_STREAM1_NAME), runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(func_name=TEST_CHAIN_LAMBDA2_NAME, zip_file=zip_file,
event_source_arn=get_event_source_arn(TEST_CHAIN_STREAM2_NAME), runtime=LAMBDA_RUNTIME_PYTHON27)
# publish test record
test_data = {'test_data': 'forward_chain_data_%s with \'quotes\\"' % short_uid()}
data = clone(test_data)
data[lambda_integration.MSG_BODY_MESSAGE_TARGET] = 'kinesis:%s' % TEST_CHAIN_STREAM2_NAME
kinesis.put_record(Data=to_bytes(json.dumps(data)), PartitionKey='testId', StreamName=TEST_CHAIN_STREAM1_NAME)
# check results
time.sleep(5)
all_objects = testutil.list_all_s3_objects()
testutil.assert_objects(test_data, all_objects)
# clean up
kinesis.delete_stream(StreamName=TEST_CHAIN_STREAM1_NAME)
kinesis.delete_stream(StreamName=TEST_CHAIN_STREAM2_NAME)
def _perform_multipart_upload(self, bucket, key, data=None, zip=False, acl=None):
kwargs = {'ACL': acl} if acl else {}
multipart_upload_dict = self.s3_client.create_multipart_upload(Bucket=bucket, Key=key, **kwargs)
uploadId = multipart_upload_dict['UploadId']
# Write contents to memory rather than a file.
data = data or (5 * short_uid())
data = to_bytes(data)
upload_file_object = BytesIO(data)
if zip:
upload_file_object = BytesIO()
with gzip.GzipFile(fileobj=upload_file_object, mode='w') as filestream:
filestream.write(data)
response = self.s3_client.upload_part(Bucket=bucket, Key=key,
Body=upload_file_object, PartNumber=1, UploadId=uploadId)
multipart_upload_parts = [{'ETag': response['ETag'], 'PartNumber': 1}]
return self.s3_client.complete_multipart_upload(Bucket=bucket,
Key=key, MultipartUpload={'Parts': multipart_upload_parts}, UploadId=uploadId)
def random_id(stream_arn, kinesis_shard_id):
namespace = uuid.UUID(bytes=hashlib.sha1(to_bytes(stream_arn)).digest()[:16])
if six.PY2:
kinesis_shard_id = to_bytes(kinesis_shard_id, 'utf-8')
return uuid.uuid5(namespace, kinesis_shard_id).hex
if isinstance(result, FlaskResponse):
return flask_to_requests_response(result)
if isinstance(result, Response):
return result
response = Response()
parsed_result = result if isinstance(result, dict) else json.loads(str(result))
parsed_result = common.json_safe(parsed_result)
parsed_result = {} if parsed_result is None else parsed_result
response.status_code = int(parsed_result.get('statusCode', 200))
response.headers.update(parsed_result.get('headers', {}))
try:
if isinstance(parsed_result['body'], dict):
response._content = json.dumps(parsed_result['body'])
else:
response._content = to_bytes(parsed_result['body'])
except Exception:
response._content = '{}'
response.headers['Content-Length'] = len(response._content)
return response
else:
msg = 'API Gateway action uri "%s" not yet implemented' % uri
LOGGER.warning(msg)
return make_error_response(msg, 404)
elif integration['type'] == 'HTTP':
function = getattr(requests, method.lower())
if isinstance(data, dict):
data = json.dumps(data)
result = function(integration['uri'], data=data, headers=headers)
return result
modified_data = None
# check bucket name
bucket_name = get_bucket_name(path, headers)
if method == 'PUT' and not re.match(BUCKET_NAME_REGEX, bucket_name):
if len(parsed_path.path) <= 1:
return error_response('Unable to extract valid bucket name. Please ensure that your AWS SDK is ' +
'configured to use path style addressing, or send a valid .s3.amazonaws.com "Host" header',
'InvalidBucketName', status_code=400)
return error_response('The specified bucket is not valid.', 'InvalidBucketName', status_code=400)
# TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-1
to_find = to_bytes('us-east-1')
if data and data.startswith(to_bytes('<')) and to_find in data:
modified_data = data.replace(to_find, to_bytes(''))
# If this request contains streaming v4 authentication signatures, strip them from the message
# Related isse: https://github.com/localstack/localstack/issues/98
# TODO we should evaluate whether to replace moto s3 with scality/S3:
# https://github.com/scality/S3/issues/237
if headers.get('x-amz-content-sha256') == 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD':
modified_data = strip_chunk_signatures(modified_data or data)
headers['content-length'] = headers.get('x-amz-decoded-content-length')
# POST requests to S3 may include a "${filename}" placeholder in the
# key, which should be replaced with an actual file name before storing.
if method == 'POST':
original_data = modified_data or data
expanded_data = multipart_content.expand_multipart_filename(original_data, headers)
if expanded_data is not original_data:
modified_data = expanded_data
def fix_headers_for_updated_response(response):
response.headers['content-length'] = len(to_bytes(response.content))
response.headers['x-amz-crc32'] = calculate_crc32(response)
content_length_sent = False
for header_key, header_value in iteritems(response.headers):
# filter out certain headers that we don't want to transmit
if header_key.lower() not in ('transfer-encoding', 'date', 'server'):
self.send_header(header_key, header_value)
content_length_sent = content_length_sent or header_key.lower() == 'content-length'
if not content_length_sent:
self.send_header('Content-Length', '%s' % len(response.content) if response.content else 0)
# allow pre-flight CORS headers by default
self._send_cors_headers(response)
self.end_headers()
if response.content and len(response.content):
self.wfile.write(to_bytes(response.content))
except Exception as e:
trace = str(traceback.format_exc())
conn_errors = ('ConnectionRefusedError', 'NewConnectionError',
'Connection aborted', 'Unexpected EOF', 'Connection reset by peer')
conn_error = any(e in trace for e in conn_errors)
error_msg = 'Error forwarding request: %s %s' % (e, trace)
if 'Broken pipe' in trace:
LOG.warn('Connection prematurely closed by client (broken pipe).')
elif not self.proxy.quiet or not conn_error:
LOG.error(error_msg)
if os.environ.get(ENV_INTERNAL_TEST_RUN):
# During a test run, we also want to print error messages, because
# log messages are delayed until the entire test run is over, and
# hence we are missing messages if the test hangs for some reason.
print('ERROR: %s' % error_msg)
self.send_response(502) # bad gateway
if response is not None:
return response
modified_data = None
# check bucket name
bucket_name = get_bucket_name(path, headers)
if method == 'PUT' and not re.match(BUCKET_NAME_REGEX, bucket_name):
if len(parsed_path.path) <= 1:
return error_response('Unable to extract valid bucket name. Please ensure that your AWS SDK is ' +
'configured to use path style addressing, or send a valid .s3.amazonaws.com "Host" header',
'InvalidBucketName', status_code=400)
return error_response('The specified bucket is not valid.', 'InvalidBucketName', status_code=400)
# TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-1
to_find = to_bytes('us-east-1')
if data and data.startswith(to_bytes('<')) and to_find in data:
modified_data = data.replace(to_find, to_bytes(''))
# If this request contains streaming v4 authentication signatures, strip them from the message
# Related isse: https://github.com/localstack/localstack/issues/98
# TODO we should evaluate whether to replace moto s3 with scality/S3:
# https://github.com/scality/S3/issues/237
if headers.get('x-amz-content-sha256') == 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD':
modified_data = strip_chunk_signatures(modified_data or data)
headers['content-length'] = headers.get('x-amz-decoded-content-length')
# POST requests to S3 may include a "${filename}" placeholder in the
# key, which should be replaced with an actual file name before storing.
if method == 'POST':
original_data = modified_data or data
expanded_data = multipart_content.expand_multipart_filename(original_data, headers)