Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# make test request to gateway and check response
path = path.replace('{test_param1}', 'foo1')
path = path + '?foo=foo&bar=bar&bar=baz'
url = self.gateway_request_url(
api_id=api_id, stage_name=self.TEST_STAGE_NAME, path=path)
data = {'return_status_code': 203, 'return_headers': {'foo': 'bar123'}}
result = requests.post(url, data=json.dumps(data),
headers={'User-Agent': 'python-requests/testing'})
self.assertEqual(result.status_code, 203)
self.assertEqual(result.headers.get('foo'), 'bar123')
parsed_body = json.loads(to_str(result.content))
self.assertEqual(parsed_body.get('return_status_code'), 203)
self.assertDictEqual(parsed_body.get('return_headers'), {'foo': 'bar123'})
self.assertDictEqual(parsed_body.get('queryStringParameters'), {'foo': 'foo', 'bar': ['bar', 'baz']})
request_context = parsed_body.get('requestContext')
source_ip = request_context['identity'].pop('sourceIp')
self.assertTrue(re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', source_ip))
self.assertEqual(request_context['path'], '/lambda/foo1')
self.assertEqual(request_context['accountId'], TEST_AWS_ACCOUNT_ID)
self.assertEqual(request_context['resourceId'], resource.get('id'))
self.assertEqual(request_context['stage'], self.TEST_STAGE_NAME)
self.assertEqual(request_context['identity']['userAgent'], 'python-requests/testing')
result = requests.delete(url, data=json.dumps(data))
def test_random_error_on_put_record(self):
put_record_header = {'X-Amz-Target': 'Kinesis_20131202.PutRecord'}
config.KINESIS_ERROR_PROBABILITY = 1.0
response = UPDATE_KINESIS.forward_request('POST', '/', TEST_DATA, put_record_header)
self.assertEqual(response.status_code, 400)
resp_json = json.loads(to_str(response.content))
self.assertEqual(resp_json['ErrorCode'], 'ProvisionedThroughputExceededException')
self.assertEqual(resp_json['ErrorMessage'], 'Rate exceeded for shard X in stream Y under account Z.')
def _receive_assert_delete(self, queue_url, assertions, sqs_client=None, required_subject=None):
if not sqs_client:
sqs_client = aws_stack.connect_to_service('sqs')
response = sqs_client.receive_message(QueueUrl=queue_url)
messages = [json.loads(to_str(m['Body'])) for m in response['Messages']]
testutil.assert_objects(assertions, messages)
for message in response['Messages']:
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'])
if action == ACTION_CREATE_STREAM:
payload['s'] = data.get('ShardCount')
event_publisher.fire_event(event_type, payload=payload)
elif action == ACTION_PUT_RECORD:
response_body = json.loads(to_str(response.content))
event_record = {
'data': data['Data'],
'partitionKey': data['PartitionKey'],
'sequenceNumber': response_body.get('SequenceNumber')
}
event_records = [event_record]
stream_name = data['StreamName']
lambda_api.process_kinesis_records(event_records, stream_name)
elif action == ACTION_PUT_RECORDS:
event_records = []
response_body = json.loads(to_str(response.content))
if 'Records' in response_body:
response_records = response_body['Records']
records = data['Records']
for index in range(0, len(records)):
record = records[index]
event_record = {
'data': record['Data'],
'partitionKey': record['PartitionKey'],
'sequenceNumber': response_records[index].get('SequenceNumber')
}
event_records.append(event_record)
stream_name = data['StreamName']
lambda_api.process_kinesis_records(event_records, stream_name)
elif action == ACTION_UPDATE_SHARD_COUNT:
# Currently kinesalite, which backs the Kinesis implementation for localstack, does
# not support UpdateShardCount:
def forward_request(self, method, path, data, **kwargs):
response = Response()
response.status_code = 200
response._content = '{}'
try:
if path == API_PATH_SERVERS:
if method == 'POST':
start_api_server_locally(json.loads(to_str(data)))
elif method == 'GET':
response._content = json.dumps(json_safe(API_SERVERS))
except Exception as e:
LOG.error('Unable to process request: %s' % e)
response.status_code = 500
response._content = str(e)
return response
append_last_modified_headers(response=response)
append_list_objects_marker(method, path, data, response)
fix_location_constraint(response)
fix_range_content_type(bucket_name, path, headers, response)
fix_delete_objects_response(bucket_name, method, parsed, data, headers, response)
# Remove body from PUT response on presigned URL
# https://github.com/localstack/localstack/issues/1317
if method == 'PUT' and ('X-Amz-Security-Token=' in path or
'X-Amz-Credential=' in path or 'AWSAccessKeyId=' in path):
response._content = ''
reset_content_length = True
response_content_str = None
try:
response_content_str = to_str(response._content)
except Exception:
pass
# Honor response header overrides
# https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
if method == 'GET':
query_map = urlparse.parse_qs(parsed.query, keep_blank_values=True)
for param_name, header_name in ALLOWED_HEADER_OVERRIDES.items():
if param_name in query_map:
response.headers[header_name] = query_map[param_name][0]
if response_content_str and response_content_str.startswith('<'):
is_bytes = isinstance(response._content, six.binary_type)
response._content = response_content_str
append_last_modified_headers(response=response, content=response_content_str)
def fix_delete_objects_response(bucket_name, method, parsed_path, data, headers, response):
# Deleting non-existing keys should not result in errors.
# Fixes https://github.com/localstack/localstack/issues/1893
if not (method == 'POST' and parsed_path.query == 'delete' and '' not in content:
return
result = xmltodict.parse(content).get('DeleteResult')
errors = result.get('Error')
errors = errors if isinstance(errors, list) else [errors]
deleted = result.get('Deleted')
if not isinstance(result.get('Deleted'), list):
deleted = result['Deleted'] = [deleted] if deleted else []
for entry in list(errors):
if set(entry.keys()) == set(['Key']):
errors.remove(entry)
deleted.append(entry)
if not errors:
result.pop('Error')
response._content = xmltodict.unparse({'DeleteResult': result})
def run_lambda_executor(self, cmd, event=None, env_vars={}):
process = run(cmd, asynchronous=True, stderr=subprocess.PIPE, outfile=subprocess.PIPE, env_vars=env_vars,
stdin=True)
result, log_output = process.communicate(input=event)
try:
result = to_str(result).strip()
except Exception:
pass
log_output = to_str(log_output).strip()
return_code = process.returncode
# Note: The user's code may have been logging to stderr, in which case the logs
# will be part of the "result" variable here. Hence, make sure that we extract
# only the *last* line of "result" and consider anything above that as log output.
if isinstance(result, six.string_types) and '\n' in result:
additional_logs, _, result = result.rpartition('\n')
log_output += '\n%s' % additional_logs
if return_code != 0:
raise Exception('Lambda process returned error status code: %s. Result: %s. Output:\n%s' %
(return_code, result, log_output))
return result, log_output
def return_response(self, method, path, data, headers, response):
if path.startswith('/shell') or method == 'GET':
return
data = json.loads(to_str(data))
# update table definitions
if data and 'TableName' in data and 'KeySchema' in data:
TABLE_DEFINITIONS[data['TableName']] = data
if response._content:
# fix the table and latest stream ARNs (DynamoDBLocal hardcodes "ddblocal" as the region)
content_replaced = re.sub(r'("TableArn"|"LatestStreamArn"|"StreamArn")\s*:\s*"arn:aws:dynamodb:' +
'ddblocal:([^"]+)"', r'\1: "arn:aws:dynamodb:%s:\2"' % aws_stack.get_region(),
to_str(response._content))
if content_replaced != response._content:
response._content = content_replaced
fix_headers_for_updated_response(response)
action = headers.get('X-Amz-Target')
if not action: