Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_delete_stack(self):
cloudformation = aws_stack.connect_to_resource('cloudformation')
cf_client = aws_stack.connect_to_service('cloudformation')
s3 = aws_stack.connect_to_service('s3')
sns = aws_stack.connect_to_service('sns')
apigateway = aws_stack.connect_to_service('apigateway')
template = template_deployer.template_to_json(load_file(TEST_TEMPLATE_1))
# deploy template
stack_name = 'stack-%s' % short_uid()
cloudformation.create_stack(StackName=stack_name, TemplateBody=template)
# wait for deployment to finish
def check_stack():
stack = get_stack_details(stack_name)
self.assertEqual(stack['StackStatus'], 'CREATE_COMPLETE')
retry(check_stack, retries=3, sleep=2)
# assert that resources have been created
assert bucket_exists('cf-test-bucket-1')
assert queue_exists('cf-test-queue-1')
topic_arn = topic_exists('%s-test-topic-1-1' % stack_name)
def test_java_runtime_with_lib(self):
java_jar_with_lib = load_file(TEST_LAMBDA_JAVA_WITH_LIB, mode='rb')
# create ZIP file from JAR file
jar_dir = new_tmp_dir()
zip_dir = new_tmp_dir()
unzip(TEST_LAMBDA_JAVA_WITH_LIB, jar_dir)
shutil.move(os.path.join(jar_dir, 'lib'), os.path.join(zip_dir, 'lib'))
jar_without_libs_file = testutil.create_zip_file(jar_dir)
shutil.copy(jar_without_libs_file, os.path.join(zip_dir, 'lib', 'lambda.jar'))
java_zip_with_lib = testutil.create_zip_file(zip_dir, get_content=True)
for archive in [java_jar_with_lib, java_zip_with_lib]:
lambda_name = 'test-%s' % short_uid()
testutil.create_lambda_function(func_name=lambda_name,
zip_file=archive, runtime=LAMBDA_RUNTIME_JAVA8,
handler='cloud.localstack.sample.LambdaHandlerWithLib')
def test_dead_letter_queue(self):
sqs_client = aws_stack.connect_to_service('sqs')
lambda_client = aws_stack.connect_to_service('lambda')
# create DLQ and Lambda function
queue_name = 'test-%s' % short_uid()
lambda_name = 'test-%s' % short_uid()
queue_url = sqs_client.create_queue(QueueName=queue_name)['QueueUrl']
queue_arn = aws_stack.sqs_queue_arn(queue_name)
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON),
get_content=True, libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON36)
testutil.create_lambda_function(func_name=lambda_name, zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON36, DeadLetterConfig={'TargetArn': queue_arn})
# invoke Lambda, triggering an error
payload = {
lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1
}
lambda_client.invoke(FunctionName=lambda_name,
Payload=json.dumps(payload), InvocationType='Event')
# assert that message has been received on the DLQ
def receive_dlq():
result = sqs_client.receive_message(QueueUrl=queue_url, MessageAttributeNames=['All'])
self.assertGreater(len(result['Messages']), 0)
msg_attrs = result['Messages'][0]['MessageAttributes']
def create_lambda_function(self, fn_name):
zip_file = testutil.create_lambda_archive(
load_file(TEST_LAMBDA_PYTHON),
get_content=True,
libs=TEST_LAMBDA_LIBS,
runtime=LAMBDA_RUNTIME_PYTHON27
)
testutil.create_lambda_function(
func_name=fn_name,
zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON27
)
def test_handler_in_submodule(self):
func_name = 'lambda-%s' % short_uid()
zip_file = testutil.create_lambda_archive(
load_file(TEST_LAMBDA_PYTHON), get_content=True,
libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON36,
file_name='abc/def/main.py')
testutil.create_lambda_function(func_name=func_name, zip_file=zip_file,
handler='abc.def.main.handler', runtime=LAMBDA_RUNTIME_PYTHON36)
# invoke function and assert result
result = self.lambda_client.invoke(FunctionName=func_name, Payload=b'{}')
result_data = json.loads(result['Payload'].read())
self.assertEqual(result['StatusCode'], 200)
self.assertEqual(result_data['event'], json.loads('{}'))
# run these tests only for the "reuse containers" Lambda executor
if not isinstance(lambda_api.LAMBDA_EXECUTOR,
lambda_executors.LambdaExecutorReuseContainers):
return
executor = lambda_api.LAMBDA_EXECUTOR
func_name = 'test_prime_and_destroy_containers'
func_arn = lambda_api.func_arn(func_name)
# make sure existing containers are gone
executor.cleanup()
self.assertEqual(len(executor.get_all_container_names()), 0)
# deploy and invoke lambda without Docker
zip_file = testutil.create_lambda_archive(
load_file(TEST_LAMBDA_ENV),
get_content=True,
libs=TEST_LAMBDA_LIBS,
runtime=LAMBDA_RUNTIME_PYTHON27
)
testutil.create_lambda_function(
func_name=func_name,
zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON27,
envvars={'Hello': 'World'}
)
self.assertEqual(len(executor.get_all_container_names()), 0)
self.assertDictEqual(executor.function_invoke_times, {})
# invoke a few times.
durations = []
print('WARN: %s' % e)
rm_rf(archive)
if retries > 0:
return get_lambda_code(func_name, retries=retries - 1, cache_time=1, env=env)
else:
print('WARNING: Unable to retrieve lambda code: %s' % e)
# traverse subdirectories and get script sources
result = {}
for root, subdirs, files in os.walk(folder):
for file in files:
prefix = root.split(folder)[-1]
key = '%s/%s' % (prefix, file)
if re.match(r'.+\.py$', key) or re.match(r'.+\.js$', key):
codefile = '%s/%s' % (root, file)
result[key] = load_file(codefile)
# cleanup cache
clean_cache(file_pattern=TMP_DOWNLOAD_FILE_PATTERN,
last_clean_time=last_cache_cleanup_time,
max_age=TMP_DOWNLOAD_CACHE_MAX_AGE)
# TODO: delete only if cache_time is over
rm_rf(folder)
return result
# running Lambdas locally (not in Docker), or (3) we're using remote Docker.
# -> We do *not* want to raise an error if we're using local mount in non-remote Docker
if not is_local_mount or not use_docker() or config.LAMBDA_REMOTE_DOCKER:
file_list = run('cd "%s"; du -d 3 .' % lambda_cwd)
config_debug = ('Config for local mount, docker, remote: "%s", "%s", "%s"' %
(is_local_mount, use_docker(), config.LAMBDA_REMOTE_DOCKER))
LOG.debug('Lambda archive content:\n%s' % file_list)
raise ClientError(error_response(
'Unable to find handler script (%s) in Lambda archive. %s' % (main_file, config_debug),
400, error_type='ValidationError'))
if runtime.startswith('python') and not use_docker():
try:
# make sure the file is actually readable, then read contents
ensure_readable(main_file)
zip_file_content = load_file(main_file, mode='rb')
# extract handler
lambda_handler = exec_lambda_code(
zip_file_content,
handler_function=handler_function,
lambda_cwd=lambda_cwd,
lambda_env=lambda_environment)
except Exception as e:
raise ClientError('Unable to get handler function from lambda code.', e)
add_function_mapping(lambda_name, lambda_handler, lambda_cwd)
return {'FunctionName': lambda_name}