Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_run_state_machine(self):
state_machines_before = self.sfn_client.list_state_machines()['stateMachines']
# create state machine
role_arn = aws_stack.role_arn('sfn_role')
definition = clone(STATE_MACHINE_DEF)
lambda_arn_1 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_1)
lambda_arn_2 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_2)
definition['States']['step1']['Resource'] = lambda_arn_1
definition['States']['step2']['Resource'] = lambda_arn_2
definition = json.dumps(definition)
result = self.sfn_client.create_state_machine(
name=STATE_MACHINE_NAME, definition=definition, roleArn=role_arn)
# assert that the SM has been created
state_machines_after = self.sfn_client.list_state_machines()['stateMachines']
self.assertEqual(len(state_machines_after), len(state_machines_before) + 1)
# run state machine
state_machines = self.sfn_client.list_state_machines()['stateMachines']
sm_arn = [m['stateMachineArn'] for m in state_machines if m['name'] == STATE_MACHINE_NAME][0]
result = self.sfn_client.start_execution(stateMachineArn=sm_arn)
aws_stack.create_kinesis_stream(TEST_CHAIN_STREAM1_NAME, delete=True)
aws_stack.create_kinesis_stream(TEST_CHAIN_STREAM2_NAME, delete=True)
s3.create_bucket(Bucket=TEST_BUCKET_NAME)
# deploy test lambdas connected to Kinesis streams
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON), get_content=True,
libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(func_name=TEST_CHAIN_LAMBDA1_NAME, zip_file=zip_file,
event_source_arn=get_event_source_arn(TEST_CHAIN_STREAM1_NAME), runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(func_name=TEST_CHAIN_LAMBDA2_NAME, zip_file=zip_file,
event_source_arn=get_event_source_arn(TEST_CHAIN_STREAM2_NAME), runtime=LAMBDA_RUNTIME_PYTHON27)
# publish test record
test_data = {'test_data': 'forward_chain_data_%s with \'quotes\\"' % short_uid()}
data = clone(test_data)
data[lambda_integration.MSG_BODY_MESSAGE_TARGET] = 'kinesis:%s' % TEST_CHAIN_STREAM2_NAME
kinesis.put_record(Data=to_bytes(json.dumps(data)), PartitionKey='testId', StreamName=TEST_CHAIN_STREAM1_NAME)
# check results
time.sleep(5)
all_objects = testutil.list_all_s3_objects()
testutil.assert_objects(test_data, all_objects)
# clean up
kinesis.delete_stream(StreamName=TEST_CHAIN_STREAM1_NAME)
kinesis.delete_stream(StreamName=TEST_CHAIN_STREAM2_NAME)
def get_message_attributes_md5(self, req_data):
req_data = clone(req_data)
orig_types = {}
for key, entry in dict(req_data).items():
# Fix an issue in moto where data types like 'Number.java.lang.Integer' are
# not supported: Keep track of the original data type, and temporarily change
# it to the short form (e.g., 'Number'), before changing it back again.
if key.endswith('DataType'):
parts = entry[0].split('.')
if len(parts) > 2:
short_type_name = parts[0]
full_type_name = req_data[key][0]
attr_num = key.split('.')[1]
attr_name = req_data['MessageAttribute.%s.Name' % attr_num][0]
orig_types[attr_name] = full_type_name
req_data[key] = [short_type_name]
if full_type_name not in TRANSPORT_TYPE_ENCODINGS:
TRANSPORT_TYPE_ENCODINGS[full_type_name] = TRANSPORT_TYPE_ENCODINGS[short_type_name]
# add some missing default props which otherwise cause deployments to fail
props = resource_json['Properties'] = resource_json.get('Properties') or {}
if resource_json['Type'] == 'AWS::Lambda::EventSourceMapping' and not props.get('StartingPosition'):
props['StartingPosition'] = 'LATEST'
# check if this resource already exists in the resource map
resource = resources_map._parsed_resources.get(logical_id)
if resource and not update:
return resource
# check whether this resource needs to be deployed
resource_wrapped = {logical_id: resource_json}
should_be_created = template_deployer.should_be_deployed(logical_id, resource_wrapped, stack_name)
# fix resource ARNs, make sure to convert account IDs 000000000000 to 123456789012
resource_json_arns_fixed = clone(json_safe(convert_objs_to_ids(resource_json)))
set_moto_account_ids(resource_json_arns_fixed)
# create resource definition and store CloudFormation metadata in moto
if resource or update:
parse_and_update_resource_orig(logical_id,
resource_json_arns_fixed, resources_map, region_name)
elif not resource:
try:
resource = parse_and_create_resource_orig(logical_id,
resource_json_arns_fixed, resources_map, region_name)
except Exception as e:
if should_be_created:
raise
else:
LOG.info('Error on moto CF resource creation. Ignoring, as should_be_created=%s: %s' %
(should_be_created, e))
events = config.get('Event')
if isinstance(events, six.string_types):
events = [events]
event_filter = config.get('Filter', {})
# make sure FilterRule is an array
s3_filter = _get_s3_filter(event_filter)
if s3_filter and not isinstance(s3_filter.get('FilterRule', []), list):
s3_filter['FilterRule'] = [s3_filter['FilterRule']]
# create final details dict
notification_details = {
'Id': config.get('Id'),
'Event': events,
dest: config.get(dest),
'Filter': event_filter
}
S3_NOTIFICATIONS[bucket].append(clone(notification_details))
return response
def to_authorizer_response_json(api_id, data):
result = common.clone(data)
self_link = '/restapis/%s/authorizers/%s' % (api_id, data['id'])
if '_links' not in result:
result['_links'] = {}
result['_links']['self'] = {
'href': self_link
}
result['_links']['curies'] = {
'href': 'https://docs.aws.amazon.com/apigateway/latest/developerguide/restapi-authorizer-latest.html',
'name': 'authorizer',
'templated': True
}
def normalize_authorizer(data):
result = common.clone(data)
# terraform sends this as a string in patch, so convert to int
result['authorizerResultTtlInSeconds'] = int(result.get('authorizerResultTtlInSeconds') or 300)
return result
def forward_request(self, method, path, data, headers):
global STREAM_CONSUMERS
data = json.loads(to_str(data or '{}'))
action = headers.get('X-Amz-Target')
if action == '%s.RegisterStreamConsumer' % ACTION_PREFIX:
consumer = clone(data)
consumer['ConsumerStatus'] = 'ACTIVE'
consumer['ConsumerARN'] = '%s/consumer/%s' % (data['StreamARN'], data['ConsumerName'])
consumer['ConsumerCreationTimestamp'] = datetime.now()
consumer = json_safe(consumer)
STREAM_CONSUMERS.append(consumer)
return {'Consumer': consumer}
elif action == '%s.DeregisterStreamConsumer' % ACTION_PREFIX:
def consumer_matches(c):
stream_arn = data.get('StreamARN')
cons_name = data.get('ConsumerName')
cons_arn = data.get('ConsumerARN')
return (c.get('ConsumerARN') == cons_arn or
(c.get('StreamARN') == stream_arn and c.get('ConsumerName') == cons_name))
STREAM_CONSUMERS = [c for c in STREAM_CONSUMERS if not consumer_matches(c)]
return {}
elif action == '%s.ListStreamConsumers' % ACTION_PREFIX: