Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_cognito_trigger_existing(self, session):
z = Zappa(session)
z.update_cognito('Zappa-Trigger-Test', 'us-east-1_9jUv74DH8', {'PreSignUp': 'test.tasks.pre_signup'}, 'arn:aws:lambda:us-east-1:12345:function:Zappa-Trigger-Test')
def test_rollback_lambda_function_version(self, session):
z = Zappa(session)
z.credentials_arn = 'arn:aws:iam::724336686645:role/ZappaLambdaExecution'
function_name = 'django-helloworld-unicode'
too_many_versions = z.rollback_lambda_function_version(function_name, 99999)
self.assertFalse(too_many_versions)
function_arn = z.rollback_lambda_function_version(function_name, 1)
def test_get_api_url(self, session):
z = Zappa(session)
z.credentials_arn = 'arn:aws:iam::724336686645:role/ZappaLambdaExecution'
url = z.get_api_url('Spheres-demonstration', 'demonstration')
def test_create_lambda_function_local(self, session):
bucket_name = 'lmbda'
local_file = 'Spheres-dev-1454694878.zip'
z = Zappa(session)
z.aws_region = 'us-east-1'
z.load_credentials(session)
z.credentials_arn = 'arn:aws:iam::12345:role/ZappaLambdaExecution'
arn = z.create_lambda_function(
bucket=bucket_name,
local_zip=local_file,
function_name='test_lmbda_function55',
handler='runme.lambda_handler'
)
arn = z.update_lambda_function(
bucket=bucket_name,
local_zip=local_file,
function_name='test_lmbda_function55',
)
def test_create_lambda_function_s3(self, session):
bucket_name = 'lmbda'
zip_path = 'Spheres-dev-1454694878.zip'
z = Zappa(session)
z.aws_region = 'us-east-1'
z.load_credentials(session)
z.credentials_arn = 'arn:aws:iam::12345:role/ZappaLambdaExecution'
arn = z.create_lambda_function(
bucket=bucket_name,
s3_key=zip_path,
function_name='test_lmbda_function55',
handler='runme.lambda_handler'
)
arn = z.update_lambda_function(
bucket=bucket_name,
s3_key=zip_path,
function_name='test_lmbda_function55',
)
def test_upload_remove_s3(self, session):
bucket_name = 'test_zappa_upload_s3'
z = Zappa(session)
zip_path = z.create_lambda_zip(minify=False)
res = z.upload_to_s3(zip_path, bucket_name)
self.assertTrue(res)
s3 = session.resource('s3')
# will throw ClientError with 404 if bucket doesn't exist
s3.meta.client.head_bucket(Bucket=bucket_name)
# will throw ClientError with 404 if object doesn't exist
s3.meta.client.head_object(
Bucket=bucket_name,
Key=zip_path,
)
res = z.remove_from_s3(zip_path, bucket_name)
self.assertTrue(res)
def test_fetch_logs(self, session):
z = Zappa(session)
z.credentials_arn = 'arn:aws:iam::12345:role/ZappaLambdaExecution'
events = z.fetch_logs('Spheres-demonstration')
self.assertTrue(events is not None)
def test_copy_on_s3(self, session):
bucket_name = 'test_zappa_upload_s3'
z = Zappa(session)
zip_path = z.create_lambda_zip(minify=False)
res = z.upload_to_s3(zip_path, bucket_name)
self.assertTrue(res)
s3 = session.resource('s3')
# will throw ClientError with 404 if bucket doesn't exist
s3.meta.client.head_bucket(Bucket=bucket_name)
# will throw ClientError with 404 if object doesn't exist
s3.meta.client.head_object(
Bucket=bucket_name,
Key=zip_path,
)
zp = 'copy_' + zip_path
res = z.copy_on_s3(zip_path, zp, bucket_name)
os.remove(zip_path)
def test_cognito_trigger(self, session):
z = Zappa(session)
z.update_cognito('Zappa-Trigger-Test', 'us-east-1_9jUv74DH8', {'PreSignUp': 'test.tasks.pre_signup'}, 'arn:aws:lambda:us-east-1:12345:function:Zappa-Trigger-Test')
u"apiId": u"1234567890",
u"resourcePath": u"/{proxy+}",
u"httpMethod": u"POST",
u"requestId": u"c6af9ac6-7b61-11e6-9a41-93e8deadbeef",
u"accountId": u"123456789012",
u"identity": {
u"userAgent": u"Custom User Agent String",
u"cognitoIdentityPoolId": u"userpoolID",
u"cognitoIdentityId": u"myCognitoID",
u"sourceIp": u"127.0.0.1",
},
"stage": "prod"
},
u'query': {}
}
environ = create_wsgi_request(event, script_name='http://zappa.com/',
trailing_slash=False)
self.assertEqual(environ['QUERY_STRING'], u'')