Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'DeliveryStreamARN': firehose_stream_arn(stream_name),
'DeliveryStreamStatus': 'ACTIVE',
'DeliveryStreamName': stream_name,
'Destinations': [],
'Tags': tags
}
DELIVERY_STREAMS[stream_name] = stream
if elasticsearch_destination:
update_destination(stream_name=stream_name,
destination_id=short_uid(),
elasticsearch_update=elasticsearch_destination)
if s3_destination:
update_destination(stream_name=stream_name, destination_id=short_uid(), s3_update=s3_destination)
# record event
event_publisher.fire_event(event_publisher.EVENT_FIREHOSE_CREATE_STREAM,
payload={'n': event_publisher.get_hash(stream_name)})
if delivery_stream_type == 'KinesisStreamAsSource':
kinesis_stream_name = delivery_stream_type_configuration.get('KinesisStreamARN').split('/')[1]
kinesis_connector.listen_to_kinesis(
stream_name=kinesis_stream_name, fh_d_stream=stream_name,
listener_func=process_records, wait_until_started=True,
ddb_lease_table_suffix='-firehose', region_name=region_name)
return stream
def return_response(self, method, path, data, headers, response):
data = json.loads(to_str(data or '{}'))
name = data.get('name') or (data.get('stateMachineArn') or '').split(':')[-1]
# publish event
if headers.get('X-Amz-Target') == 'AWSStepFunctions.CreateStateMachine':
event_publisher.fire_event(event_publisher.EVENT_STEPFUNCTIONS_CREATE_SM,
payload={'m': event_publisher.get_hash(name)})
elif headers.get('X-Amz-Target') == 'AWSStepFunctions.DeleteStateMachine':
event_publisher.fire_event(event_publisher.EVENT_STEPFUNCTIONS_DELETE_SM,
payload={'m': event_publisher.get_hash(name)})
def delete_stream(stream_name):
stream = DELIVERY_STREAMS.pop(stream_name, {})
if not stream:
return error_not_found(stream_name)
# record event
event_publisher.fire_event(event_publisher.EVENT_FIREHOSE_DELETE_STREAM,
payload={'n': event_publisher.get_hash(stream_name)})
return {}
elif action == '%s.DeleteItem' % ACTION_PREFIX:
if response.status_code == 200:
old_item = self._thread_local('existing_item')
record['eventName'] = 'REMOVE'
record['dynamodb']['Keys'] = data['Key']
record['dynamodb']['OldImage'] = old_item
elif action == '%s.CreateTable' % ACTION_PREFIX:
if 'StreamSpecification' in data:
if response.status_code == 200:
content = json.loads(to_str(response._content))
create_dynamodb_stream(data, content['TableDescription'].get('LatestStreamLabel'))
event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_CREATE_TABLE,
payload={'n': event_publisher.get_hash(data['TableName'])})
return
elif action == '%s.DeleteTable' % ACTION_PREFIX:
event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_DELETE_TABLE,
payload={'n': event_publisher.get_hash(data['TableName'])})
return
elif action == '%s.UpdateTable' % ACTION_PREFIX:
if 'StreamSpecification' in data:
if response.status_code == 200:
content = json.loads(to_str(response._content))
create_dynamodb_stream(data, content['TableDescription'].get('LatestStreamLabel'))
return
else:
# nothing to do
return
if len(records) > 0 and 'eventName' in records[0]:
if 'TableName' in data:
records[0]['eventSourceARN'] = aws_stack.dynamodb_table_arn(data['TableName'])
forward_to_lambda(records)
def create_domain():
data = json.loads(to_str(request.data))
domain_name = data['DomainName']
if domain_name in ES_DOMAINS:
return error_response(error_type='ResourceAlreadyExistsException')
ES_DOMAINS[domain_name] = data
# start actual Elasticsearch instance
start_elasticsearch_instance()
result = get_domain_status(domain_name)
# record event
event_publisher.fire_event(event_publisher.EVENT_ES_CREATE_DOMAIN,
payload={'n': event_publisher.get_hash(domain_name)})
return jsonify(result)
def return_response(self, method, path, data, headers, response):
# fix backend issue (missing support for API documentation)
if re.match(r'/restapis/[^/]+/documentation/versions', path):
if response.status_code == 404:
return requests_response({'position': '1', 'items': []})
# publish event
if method == 'POST' and path == '/restapis':
content = json.loads(to_str(response.content))
event_publisher.fire_event(event_publisher.EVENT_APIGW_CREATE_API,
payload={'a': event_publisher.get_hash(content['id'])})
api_regex = r'^/restapis/([a-zA-Z0-9\-]+)$'
if method == 'DELETE' and re.match(api_regex, path):
api_id = re.sub(api_regex, r'\1', path)
event_publisher.fire_event(event_publisher.EVENT_APIGW_DELETE_API,
payload={'a': event_publisher.get_hash(api_id)})
def return_response(self, method, path, data, headers, response):
action = headers.get('X-Amz-Target')
data = json.loads(to_str(data or '{}'))
records = []
if action in (ACTION_CREATE_STREAM, ACTION_DELETE_STREAM):
event_type = (event_publisher.EVENT_KINESIS_CREATE_STREAM if action == ACTION_CREATE_STREAM
else event_publisher.EVENT_KINESIS_DELETE_STREAM)
payload = {'n': event_publisher.get_hash(data.get('StreamName'))}
if action == ACTION_CREATE_STREAM:
payload['s'] = data.get('ShardCount')
event_publisher.fire_event(event_type, payload=payload)
elif action == ACTION_PUT_RECORD:
response_body = json.loads(to_str(response.content))
event_record = {
'data': data['Data'],
'partitionKey': data['PartitionKey'],
'sequenceNumber': response_body.get('SequenceNumber')
}
event_records = [event_record]
stream_name = data['StreamName']
lambda_api.process_kinesis_records(event_records, stream_name)
elif action == ACTION_PUT_RECORDS:
def create_function():
""" Create new function
---
operationId: 'createFunction'
parameters:
- name: 'request'
in: body
"""
arn = 'n/a'
try:
data = json.loads(to_str(request.data))
lambda_name = data['FunctionName']
event_publisher.fire_event(event_publisher.EVENT_LAMBDA_CREATE_FUNC,
payload={'n': event_publisher.get_hash(lambda_name)})
arn = func_arn(lambda_name)
if arn in arn_to_lambda:
return error_response('Function already exist: %s' %
lambda_name, 409, error_type='ResourceConflictException')
arn_to_lambda[arn] = func_details = LambdaFunction(arn)
func_details.versions = {'$LATEST': {'RevisionId': str(uuid.uuid4())}}
func_details.last_modified = isoformat_milliseconds(datetime.utcnow()) + '+0000'
func_details.description = data.get('Description', '')
func_details.handler = data['Handler']
func_details.runtime = data['Runtime']
func_details.envvars = data.get('Environment', {}).get('Variables', {})
func_details.tags = data.get('Tags', {})
func_details.timeout = data.get('Timeout', LAMBDA_DEFAULT_TIMEOUT)
func_details.role = data['Role']
func_details.memory_size = data.get('MemorySize')