Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if key:
object_path = '/' + key
elif bucket_name_in_host:
object_path = parsed.path
else:
parts = parsed.path[1:].split('/', 1)
object_path = parts[1] if parts[1][0] == '/' else '/%s' % parts[1]
version_id = response.headers.get('x-amz-version-id', None)
send_notifications(method, bucket_name, object_path, version_id)
# publish event for creation/deletion of buckets:
if method in ('PUT', 'DELETE') and ('/' not in path[1:] or len(path[1:].split('/')[1]) <= 0):
event_type = (event_publisher.EVENT_S3_CREATE_BUCKET if method == 'PUT'
else event_publisher.EVENT_S3_DELETE_BUCKET)
event_publisher.fire_event(event_type, payload={'n': event_publisher.get_hash(bucket_name)})
# fix an upstream issue in moto S3 (see https://github.com/localstack/localstack/issues/382)
if method == 'PUT' and parsed.query == 'policy':
response._content = ''
response.status_code = 204
return response
# emulate ErrorDocument functionality if a website is configured
if method == 'GET' and response.status_code == 404 and parsed.query != 'website':
s3_client = aws_stack.connect_to_service('s3')
try:
# Verify the bucket exists in the first place--if not, we want normal processing of the 404
s3_client.head_bucket(Bucket=bucket_name)
website_config = s3_client.get_bucket_website(Bucket=bucket_name)
def _fire_event(self, req_data, response):
action = req_data.get('Action', [None])[0]
event_type = None
queue_url = None
if action == 'CreateQueue':
event_type = event_publisher.EVENT_SQS_CREATE_QUEUE
response_data = xmltodict.parse(response.content)
if 'CreateQueueResponse' in response_data:
queue_url = response_data['CreateQueueResponse']['CreateQueueResult']['QueueUrl']
elif action == 'DeleteQueue':
event_type = event_publisher.EVENT_SQS_DELETE_QUEUE
queue_url = req_data.get('QueueUrl', [None])[0]
if event_type and queue_url:
event_publisher.fire_event(event_type, payload={'u': event_publisher.get_hash(queue_url)})
def delete_domain(domain_name):
if domain_name not in ES_DOMAINS:
return error_response(error_type='ResourceNotFoundException')
result = get_domain_status(domain_name, deleted=True)
ES_DOMAINS.pop(domain_name)
if not ES_DOMAINS:
cleanup_elasticsearch_instance()
# record event
event_publisher.fire_event(event_publisher.EVENT_ES_DELETE_DOMAIN,
payload={'n': event_publisher.get_hash(domain_name)})
return jsonify(result)
def create_function():
""" Create new function
---
operationId: 'createFunction'
parameters:
- name: 'request'
in: body
"""
arn = 'n/a'
try:
data = json.loads(to_str(request.data))
lambda_name = data['FunctionName']
event_publisher.fire_event(event_publisher.EVENT_LAMBDA_CREATE_FUNC,
payload={'n': event_publisher.get_hash(lambda_name)})
arn = func_arn(lambda_name)
if arn in arn_to_lambda:
return error_response('Function already exist: %s' %
lambda_name, 409, error_type='ResourceConflictException')
arn_to_lambda[arn] = func_details = LambdaFunction(arn)
func_details.versions = {'$LATEST': {'RevisionId': str(uuid.uuid4())}}
func_details.last_modified = isoformat_milliseconds(datetime.utcnow()) + '+0000'
func_details.description = data.get('Description', '')
func_details.handler = data['Handler']
func_details.runtime = data['Runtime']
func_details.envvars = data.get('Environment', {}).get('Variables', {})
func_details.tags = data.get('Tags', {})
func_details.timeout = data.get('Timeout', LAMBDA_DEFAULT_TIMEOUT)
func_details.role = data['Role']
func_details.memory_size = data.get('MemorySize')
func_details.code = data['Code']
def return_response(self, method, path, data, headers, response):
# fix backend issue (missing support for API documentation)
if re.match(r'/restapis/[^/]+/documentation/versions', path):
if response.status_code == 404:
return requests_response({'position': '1', 'items': []})
# publish event
if method == 'POST' and path == '/restapis':
content = json.loads(to_str(response.content))
event_publisher.fire_event(event_publisher.EVENT_APIGW_CREATE_API,
payload={'a': event_publisher.get_hash(content['id'])})
api_regex = r'^/restapis/([a-zA-Z0-9\-]+)$'
if method == 'DELETE' and re.match(api_regex, path):
api_id = re.sub(api_regex, r'\1', path)
event_publisher.fire_event(event_publisher.EVENT_APIGW_DELETE_API,
payload={'a': event_publisher.get_hash(api_id)})
def forward_request(self, method, path, data, headers):
if method == 'OPTIONS':
return 200
req_data = None
if method == 'POST' and path == '/':
req_data = urlparse.parse_qs(to_str(data))
req_data = dict([(k, v[0]) for k, v in req_data.items()])
action = req_data.get('Action')
stack_name = req_data.get('StackName')
if action == 'CreateStack':
event_publisher.fire_event(event_publisher.EVENT_CLOUDFORMATION_CREATE_STACK,
payload={'n': event_publisher.get_hash(stack_name)})
if action == 'DeleteStack':
client = aws_stack.connect_to_service('cloudformation')
stack_resources = client.list_stack_resources(StackName=stack_name)['StackResourceSummaries']
template_deployer.delete_stack(stack_name, stack_resources)
if action == 'DescribeStackEvents':
# fix an issue where moto cannot handle ARNs as stack names (or missing names)
run_fix = not stack_name
if stack_name:
if stack_name.startswith('arn:aws:cloudformation'):
run_fix = True
stack_name = re.sub(r'arn:aws:cloudformation:[^:]+:[^:]+:stack/([^/]+)(/.+)?',
r'\1', stack_name)
if run_fix:
stack_names = [stack_name] if stack_name else self._list_stack_names()
}
response._content = json.dumps(content)
fix_headers_for_updated_response(response)
elif action == '%s.DeleteItem' % ACTION_PREFIX:
if response.status_code == 200:
old_item = self._thread_local('existing_item')
record['eventName'] = 'REMOVE'
record['dynamodb']['Keys'] = data['Key']
record['dynamodb']['OldImage'] = old_item
elif action == '%s.CreateTable' % ACTION_PREFIX:
if 'StreamSpecification' in data:
if response.status_code == 200:
content = json.loads(to_str(response._content))
create_dynamodb_stream(data, content['TableDescription'].get('LatestStreamLabel'))
event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_CREATE_TABLE,
payload={'n': event_publisher.get_hash(data['TableName'])})
return
elif action == '%s.DeleteTable' % ACTION_PREFIX:
event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_DELETE_TABLE,
payload={'n': event_publisher.get_hash(data['TableName'])})
return
elif action == '%s.UpdateTable' % ACTION_PREFIX:
if 'StreamSpecification' in data:
if response.status_code == 200:
content = json.loads(to_str(response._content))
create_dynamodb_stream(data, content['TableDescription'].get('LatestStreamLabel'))
return
else:
# nothing to do
return
if len(records) > 0 and 'eventName' in records[0]:
def return_response(self, method, path, data, headers, response):
action = headers.get('X-Amz-Target')
data = json.loads(to_str(data or '{}'))
records = []
if action in (ACTION_CREATE_STREAM, ACTION_DELETE_STREAM):
event_type = (event_publisher.EVENT_KINESIS_CREATE_STREAM if action == ACTION_CREATE_STREAM
else event_publisher.EVENT_KINESIS_DELETE_STREAM)
payload = {'n': event_publisher.get_hash(data.get('StreamName'))}
if action == ACTION_CREATE_STREAM:
payload['s'] = data.get('ShardCount')
event_publisher.fire_event(event_type, payload=payload)
elif action == ACTION_PUT_RECORD:
response_body = json.loads(to_str(response.content))
event_record = {
'data': data['Data'],
'partitionKey': data['PartitionKey'],
'sequenceNumber': response_body.get('SequenceNumber')
}
event_records = [event_record]
stream_name = data['StreamName']
lambda_api.process_kinesis_records(event_records, stream_name)
elif action == ACTION_PUT_RECORDS:
event_records = []
response_body = json.loads(to_str(response.content))
def delete_stream(stream_name):
stream = DELIVERY_STREAMS.pop(stream_name, {})
if not stream:
return error_not_found(stream_name)
# record event
event_publisher.fire_event(event_publisher.EVENT_FIREHOSE_DELETE_STREAM,
payload={'n': event_publisher.get_hash(stream_name)})
return {}