Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_lambda_function(self, fn_name):
zip_file = testutil.create_lambda_archive(
load_file(TEST_LAMBDA_PYTHON),
get_content=True,
libs=TEST_LAMBDA_LIBS,
runtime=LAMBDA_RUNTIME_PYTHON27
)
testutil.create_lambda_function(
func_name=fn_name,
zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON27
)
def test_invocation_with_qualifier(self):
lambda_name = 'test_lambda_%s' % short_uid()
bucket_name = 'test-bucket-lambda2'
bucket_key = 'test_lambda.zip'
# upload zip file to S3
zip_file = testutil.create_lambda_archive(
load_file(TEST_LAMBDA_PYTHON),
get_content=True,
libs=TEST_LAMBDA_LIBS,
runtime=LAMBDA_RUNTIME_PYTHON27
)
self.s3_client.create_bucket(Bucket=bucket_name)
self.s3_client.upload_fileobj(
BytesIO(zip_file), bucket_name, bucket_key)
# create lambda function
response = self.lambda_client.create_function(
FunctionName=lambda_name, Handler='handler.handler',
Runtime=lambda_api.LAMBDA_RUNTIME_PYTHON27, Role='r1',
Code={
'S3Bucket': bucket_name,
'S3Key': bucket_key
sqs_client = aws_stack.connect_to_service('sqs')
lambda_client = aws_stack.connect_to_service('lambda')
# create SQS queue with DLQ redrive policy
queue_name1 = 'test-%s' % short_uid()
queue_name2 = 'test-%s' % short_uid()
queue_url1 = sqs_client.create_queue(QueueName=queue_name1)['QueueUrl']
queue_arn1 = aws_stack.sqs_queue_arn(queue_name1)
policy = {'deadLetterTargetArn': queue_arn1, 'maxReceiveCount': 1}
queue_url2 = sqs_client.create_queue(QueueName=queue_name2,
Attributes={'RedrivePolicy': json.dumps(policy)})['QueueUrl']
queue_arn2 = aws_stack.sqs_queue_arn(queue_name2)
# create Lambda and add source mapping
lambda_name = 'test-%s' % short_uid()
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON),
get_content=True, libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON36)
testutil.create_lambda_function(func_name=lambda_name, zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON36)
lambda_client.create_event_source_mapping(EventSourceArn=queue_arn2, FunctionName=lambda_name)
# add message to SQS, which will trigger the Lambda, resulting in an error
payload = {
lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1
}
sqs_client.send_message(QueueUrl=queue_url2, MessageBody=json.dumps(payload))
# assert that message has been received on the DLQ
def receive_dlq():
result = sqs_client.receive_message(QueueUrl=queue_url1, MessageAttributeNames=['All'])
self.assertGreater(len(result['Messages']), 0)
msg_attrs = result['Messages'][0]['MessageAttributes']
def test_kinesis_lambda_forward_chain(self):
kinesis = aws_stack.connect_to_service('kinesis')
s3 = aws_stack.connect_to_service('s3')
aws_stack.create_kinesis_stream(TEST_CHAIN_STREAM1_NAME, delete=True)
aws_stack.create_kinesis_stream(TEST_CHAIN_STREAM2_NAME, delete=True)
s3.create_bucket(Bucket=TEST_BUCKET_NAME)
# deploy test lambdas connected to Kinesis streams
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON), get_content=True,
libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(func_name=TEST_CHAIN_LAMBDA1_NAME, zip_file=zip_file,
event_source_arn=get_event_source_arn(TEST_CHAIN_STREAM1_NAME), runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(func_name=TEST_CHAIN_LAMBDA2_NAME, zip_file=zip_file,
event_source_arn=get_event_source_arn(TEST_CHAIN_STREAM2_NAME), runtime=LAMBDA_RUNTIME_PYTHON27)
# publish test record
test_data = {'test_data': 'forward_chain_data_%s with \'quotes\\"' % short_uid()}
data = clone(test_data)
data[lambda_integration.MSG_BODY_MESSAGE_TARGET] = 'kinesis:%s' % TEST_CHAIN_STREAM2_NAME
kinesis.put_record(Data=to_bytes(json.dumps(data)), PartitionKey='testId', StreamName=TEST_CHAIN_STREAM1_NAME)
# check results
time.sleep(5)
all_objects = testutil.list_all_s3_objects()
testutil.assert_objects(test_data, all_objects)
def create_function(cls, file, name, runtime=None, libs=None):
runtime = runtime or LAMBDA_RUNTIME_PYTHON27
zip_file = testutil.create_lambda_archive(
load_file(file), get_content=True, libs=libs, runtime=runtime)
testutil.create_lambda_function(
func_name=name, zip_file=zip_file, runtime=runtime)
LOGGER.info('Kinesis consumer initialized.')
# create table with stream forwarding config
aws_stack.create_dynamodb_table(table_name, partition_key=PARTITION_KEY,
stream_view_type='NEW_AND_OLD_IMAGES')
# list DDB streams and make sure the table stream is there
streams = dynamodbstreams.list_streams()
ddb_event_source_arn = None
for stream in streams['Streams']:
if stream['TableName'] == table_name:
ddb_event_source_arn = stream['StreamArn']
self.assertTrue(ddb_event_source_arn)
# deploy test lambda connected to DynamoDB Stream
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON), get_content=True,
libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_DDB,
zip_file=zip_file, event_source_arn=ddb_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27, delete=True)
# make sure we cannot create Lambda with same name twice
assert_raises(Exception, testutil.create_lambda_function, func_name=TEST_LAMBDA_NAME_DDB,
zip_file=zip_file, event_source_arn=ddb_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27)
# deploy test lambda connected to Kinesis Stream
kinesis_event_source_arn = kinesis.describe_stream(
StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME)['StreamDescription']['StreamARN']
testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_STREAM,
zip_file=zip_file, event_source_arn=kinesis_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27)
# deploy test lambda connected to SQS queue
sqs_queue_info = testutil.create_sqs_queue(TEST_LAMBDA_NAME_QUEUE)
testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_QUEUE,
def test_upload_lambda_from_s3(self):
lambda_name = 'test_lambda_%s' % short_uid()
bucket_name = 'test-bucket-lambda'
bucket_key = 'test_lambda.zip'
# upload zip file to S3
zip_file = testutil.create_lambda_archive(
load_file(TEST_LAMBDA_PYTHON),
get_content=True,
libs=TEST_LAMBDA_LIBS,
runtime=LAMBDA_RUNTIME_PYTHON27
)
self.s3_client.create_bucket(Bucket=bucket_name)
self.s3_client.upload_fileobj(
BytesIO(zip_file), bucket_name, bucket_key)
# create lambda function
self.lambda_client.create_function(
FunctionName=lambda_name, Handler='handler.handler',
Runtime=lambda_api.LAMBDA_RUNTIME_PYTHON27, Role='r1',
Code={
'S3Bucket': bucket_name,
'S3Key': bucket_key
def test_python_lambda_running_in_docker(self):
if not use_docker():
return
zip_file = testutil.create_lambda_archive(
load_file(TEST_LAMBDA_PYTHON3),
get_content=True,
libs=TEST_LAMBDA_LIBS,
runtime=LAMBDA_RUNTIME_PYTHON36
)
testutil.create_lambda_function(
func_name=TEST_LAMBDA_NAME_PY3,
zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON36
)
result = self.lambda_client.invoke(
FunctionName=TEST_LAMBDA_NAME_PY3, Payload=b'{}')
result_data = result['Payload'].read()
self.assertEqual(result['StatusCode'], 200)
def test_lambda_environment(self):
vars = {'Hello': 'World'}
zip_file = testutil.create_lambda_archive(
load_file(TEST_LAMBDA_ENV), get_content=True,
libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON27)
testutil.create_lambda_function(
func_name=TEST_LAMBDA_NAME_ENV, zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON27, envvars=vars)
# invoke function and assert result contains env vars
result = self.lambda_client.invoke(
FunctionName=TEST_LAMBDA_NAME_ENV, Payload=b'{}')
result_data = result['Payload']
self.assertEqual(result['StatusCode'], 200)
self.assertDictEqual(json.load(result_data), vars)
# get function config and assert result contains env vars
result = self.lambda_client.get_function_configuration(
FunctionName=TEST_LAMBDA_NAME_ENV)