Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_nodejs_lambda_running_in_docker(self):
if not use_docker():
return
zip_file = testutil.create_zip_file(
TEST_LAMBDA_NODEJS, get_content=True)
testutil.create_lambda_function(
func_name=TEST_LAMBDA_NAME_JS,
zip_file=zip_file,
handler='lambda_integration.handler',
runtime=LAMBDA_RUNTIME_NODEJS810
)
result = self.lambda_client.invoke(
FunctionName=TEST_LAMBDA_NAME_JS, Payload=b'{}')
result_data = result['Payload'].read()
self.assertEqual(result['StatusCode'], 200)
self.assertEqual(to_str(result_data).strip(), '{}')
# assert that logs are present
expected = ['.*Node.js Lambda handler executing.']
self.check_lambda_logs(TEST_LAMBDA_NAME_JS, expected_lines=expected)
# create table with stream forwarding config
aws_stack.create_dynamodb_table(table_name, partition_key=PARTITION_KEY,
stream_view_type='NEW_AND_OLD_IMAGES')
# list DDB streams and make sure the table stream is there
streams = dynamodbstreams.list_streams()
ddb_event_source_arn = None
for stream in streams['Streams']:
if stream['TableName'] == table_name:
ddb_event_source_arn = stream['StreamArn']
self.assertTrue(ddb_event_source_arn)
# deploy test lambda connected to DynamoDB Stream
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON), get_content=True,
libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_DDB,
zip_file=zip_file, event_source_arn=ddb_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27, delete=True)
# submit a batch with writes
dynamodb.batch_write_item(RequestItems={table_name: [
{'PutRequest': {'Item': {PARTITION_KEY: {'S': 'testId0'}, 'data': {'S': 'foobar123'}}}},
{'PutRequest': {'Item': {PARTITION_KEY: {'S': 'testId1'}, 'data': {'S': 'foobar123'}}}},
{'PutRequest': {'Item': {PARTITION_KEY: {'S': 'testId2'}, 'data': {'S': 'foobar123'}}}}
]})
# submit a batch with writes and deletes
dynamodb.batch_write_item(RequestItems={table_name: [
{'PutRequest': {'Item': {PARTITION_KEY: {'S': 'testId3'}, 'data': {'S': 'foobar123'}}}},
{'PutRequest': {'Item': {PARTITION_KEY: {'S': 'testId4'}, 'data': {'S': 'foobar123'}}}},
{'PutRequest': {'Item': {PARTITION_KEY: {'S': 'testId5'}, 'data': {'S': 'foobar123'}}}},
{'DeleteRequest': {'Key': {PARTITION_KEY: {'S': 'testId0'}}}},
{'DeleteRequest': {'Key': {PARTITION_KEY: {'S': 'testId1'}}}},
def test_handler_in_submodule(self):
func_name = 'lambda-%s' % short_uid()
zip_file = testutil.create_lambda_archive(
load_file(TEST_LAMBDA_PYTHON), get_content=True,
libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON36,
file_name='abc/def/main.py')
testutil.create_lambda_function(func_name=func_name, zip_file=zip_file,
handler='abc.def.main.handler', runtime=LAMBDA_RUNTIME_PYTHON36)
# invoke function and assert result
result = self.lambda_client.invoke(FunctionName=func_name, Payload=b'{}')
result_data = json.loads(result['Payload'].read())
self.assertEqual(result['StatusCode'], 200)
self.assertEqual(result_data['event'], json.loads('{}'))
def test_java_runtime_with_lib(self):
java_jar_with_lib = load_file(TEST_LAMBDA_JAVA_WITH_LIB, mode='rb')
# create ZIP file from JAR file
jar_dir = new_tmp_dir()
zip_dir = new_tmp_dir()
unzip(TEST_LAMBDA_JAVA_WITH_LIB, jar_dir)
shutil.move(os.path.join(jar_dir, 'lib'), os.path.join(zip_dir, 'lib'))
jar_without_libs_file = testutil.create_zip_file(jar_dir)
shutil.copy(jar_without_libs_file, os.path.join(zip_dir, 'lib', 'lambda.jar'))
java_zip_with_lib = testutil.create_zip_file(zip_dir, get_content=True)
for archive in [java_jar_with_lib, java_zip_with_lib]:
lambda_name = 'test-%s' % short_uid()
testutil.create_lambda_function(func_name=lambda_name,
zip_file=archive, runtime=LAMBDA_RUNTIME_JAVA8,
handler='cloud.localstack.sample.LambdaHandlerWithLib')
result = self.lambda_client.invoke(FunctionName=lambda_name, Payload=b'{"echo":"echo"}')
result_data = result['Payload'].read()
self.assertEqual(result['StatusCode'], 200)
self.assertIn('echo', to_str(result_data))
# clean up
testutil.delete_lambda_function(lambda_name)
# create SQS queue with DLQ redrive policy
queue_name1 = 'test-%s' % short_uid()
queue_name2 = 'test-%s' % short_uid()
queue_url1 = sqs_client.create_queue(QueueName=queue_name1)['QueueUrl']
queue_arn1 = aws_stack.sqs_queue_arn(queue_name1)
policy = {'deadLetterTargetArn': queue_arn1, 'maxReceiveCount': 1}
queue_url2 = sqs_client.create_queue(QueueName=queue_name2,
Attributes={'RedrivePolicy': json.dumps(policy)})['QueueUrl']
queue_arn2 = aws_stack.sqs_queue_arn(queue_name2)
# create Lambda and add source mapping
lambda_name = 'test-%s' % short_uid()
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON),
get_content=True, libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON36)
testutil.create_lambda_function(func_name=lambda_name, zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON36)
lambda_client.create_event_source_mapping(EventSourceArn=queue_arn2, FunctionName=lambda_name)
# add message to SQS, which will trigger the Lambda, resulting in an error
payload = {
lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1
}
sqs_client.send_message(QueueUrl=queue_url2, MessageBody=json.dumps(payload))
# assert that message has been received on the DLQ
def receive_dlq():
result = sqs_client.receive_message(QueueUrl=queue_url1, MessageAttributeNames=['All'])
self.assertGreater(len(result['Messages']), 0)
msg_attrs = result['Messages'][0]['MessageAttributes']
self.assertIn('RequestID', msg_attrs)
self.assertIn('ErrorCode', msg_attrs)
def create_function(cls, file, name, runtime=None, libs=None):
runtime = runtime or LAMBDA_RUNTIME_PYTHON27
zip_file = testutil.create_lambda_archive(
load_file(file), get_content=True, libs=libs, runtime=runtime)
testutil.create_lambda_function(
func_name=name, zip_file=zip_file, runtime=runtime)
def test_dead_letter_queue(self):
sqs_client = aws_stack.connect_to_service('sqs')
lambda_client = aws_stack.connect_to_service('lambda')
# create DLQ and Lambda function
queue_name = 'test-%s' % short_uid()
lambda_name = 'test-%s' % short_uid()
queue_url = sqs_client.create_queue(QueueName=queue_name)['QueueUrl']
queue_arn = aws_stack.sqs_queue_arn(queue_name)
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON),
get_content=True, libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON36)
testutil.create_lambda_function(func_name=lambda_name, zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON36, DeadLetterConfig={'TargetArn': queue_arn})
# invoke Lambda, triggering an error
payload = {
lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1
}
lambda_client.invoke(FunctionName=lambda_name,
Payload=json.dumps(payload), InvocationType='Event')
# assert that message has been received on the DLQ
def receive_dlq():
result = sqs_client.receive_message(QueueUrl=queue_url, MessageAttributeNames=['All'])
self.assertGreater(len(result['Messages']), 0)
msg_attrs = result['Messages'][0]['MessageAttributes']
self.assertIn('RequestID', msg_attrs)
self.assertIn('ErrorCode', msg_attrs)
mkdir(os.path.dirname(TEST_LAMBDA_JAVA))
download(TEST_LAMBDA_JAR_URL, TEST_LAMBDA_JAVA)
# Lambda supports single JAR deployments without the zip,
# so we upload the JAR directly.
cls.test_java_jar = load_file(TEST_LAMBDA_JAVA, mode='rb')
cls.test_java_zip = testutil.create_zip_file(TEST_LAMBDA_JAVA, get_content=True)
testutil.create_lambda_function(
func_name=TEST_LAMBDA_NAME_JAVA,
zip_file=cls.test_java_jar,
runtime=LAMBDA_RUNTIME_JAVA8,
handler='cloud.localstack.sample.LambdaHandler'
)
# deploy lambda - Java with stream handler
testutil.create_lambda_function(
func_name=TEST_LAMBDA_NAME_JAVA_STREAM,
zip_file=cls.test_java_jar,
runtime=LAMBDA_RUNTIME_JAVA8,
handler='cloud.localstack.sample.LambdaStreamHandler'
)
# deploy lambda - Java with serializable input object
testutil.create_lambda_function(
func_name=TEST_LAMBDA_NAME_JAVA_SERIALIZABLE,
zip_file=cls.test_java_zip,
runtime=LAMBDA_RUNTIME_JAVA8,
handler='cloud.localstack.sample.SerializedInputLambdaHandler'
)
def test_lambda_environment(self):
vars = {'Hello': 'World'}
zip_file = testutil.create_lambda_archive(
load_file(TEST_LAMBDA_ENV), get_content=True,
libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(
func_name=TEST_LAMBDA_NAME_ENV, zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON27, envvars=vars)
# invoke function and assert result contains env vars
result = self.lambda_client.invoke(
FunctionName=TEST_LAMBDA_NAME_ENV, Payload=b'{}')
result_data = result['Payload']
self.assertEqual(result['StatusCode'], 200)
self.assertDictEqual(json.load(result_data), vars)
# get function config and assert result contains env vars
result = self.lambda_client.get_function_configuration(
FunctionName=TEST_LAMBDA_NAME_ENV)
self.assertEqual(result['Environment'], {'Variables': vars})
# clean up
def test_kinesis_lambda_forward_chain(self):
kinesis = aws_stack.connect_to_service('kinesis')
s3 = aws_stack.connect_to_service('s3')
aws_stack.create_kinesis_stream(TEST_CHAIN_STREAM1_NAME, delete=True)
aws_stack.create_kinesis_stream(TEST_CHAIN_STREAM2_NAME, delete=True)
s3.create_bucket(Bucket=TEST_BUCKET_NAME)
# deploy test lambdas connected to Kinesis streams
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON), get_content=True,
libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(func_name=TEST_CHAIN_LAMBDA1_NAME, zip_file=zip_file,
event_source_arn=get_event_source_arn(TEST_CHAIN_STREAM1_NAME), runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(func_name=TEST_CHAIN_LAMBDA2_NAME, zip_file=zip_file,
event_source_arn=get_event_source_arn(TEST_CHAIN_STREAM2_NAME), runtime=LAMBDA_RUNTIME_PYTHON27)
# publish test record
test_data = {'test_data': 'forward_chain_data_%s with \'quotes\\"' % short_uid()}
data = clone(test_data)
data[lambda_integration.MSG_BODY_MESSAGE_TARGET] = 'kinesis:%s' % TEST_CHAIN_STREAM2_NAME
kinesis.put_record(Data=to_bytes(json.dumps(data)), PartitionKey='testId', StreamName=TEST_CHAIN_STREAM1_NAME)
# check results
time.sleep(5)
all_objects = testutil.list_all_s3_objects()
testutil.assert_objects(test_data, all_objects)
# clean up