Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.s3_client = aws_stack.connect_to_service('s3')
self.bucket = "test_log_bucket"
self.s3_client.create_bucket(Bucket=self.bucket)
try:
b_objects = [{'Key': o['Key']} for o in self.s3_client.list_objects(Bucket=self.bucket).get('Contents')]
self.s3_client.delete_objects(Bucket=self.bucket, Delete={
'Objects': b_objects
})
except:
pass
def _run_forward_to_fallback_url(self, url, num_requests=3):
lambda_client = aws_stack.connect_to_service('lambda')
config.LAMBDA_FALLBACK_URL = url
try:
for i in range(num_requests):
lambda_client.invoke(FunctionName='non-existing-lambda-%s' % i,
Payload=b'{}', InvocationType='RequestResponse')
finally:
config.LAMBDA_FALLBACK_URL = ''
def _delete_notification_config(self):
s3_client = aws_stack.connect_to_service('s3')
s3_client.put_bucket_notification_configuration(
Bucket=TEST_BUCKET_NAME_WITH_NOTIFICATIONS, NotificationConfiguration={})
config = s3_client.get_bucket_notification_configuration(Bucket=TEST_BUCKET_NAME_WITH_NOTIFICATIONS)
self.assertFalse(config.get('QueueConfigurations'))
self.assertFalse(config.get('TopicConfiguration'))
def test_large_data_download(self):
dynamodb = aws_stack.connect_to_resource('dynamodb')
dynamodb_client = aws_stack.connect_to_service('dynamodb')
aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME_2, partition_key=PARTITION_KEY)
table = dynamodb.Table(TEST_DDB_TABLE_NAME_2)
# Create a large amount of items
num_items = 20
for i in range(0, num_items):
item = {PARTITION_KEY: 'id%s' % i, 'data1': 'foobar123 ' * 1000}
table.put_item(Item=item)
# Retrieve the items. The data will be transmitted to the client with chunked transfer encoding
result = table.scan(TableName=TEST_DDB_TABLE_NAME_2)
assert len(result['Items']) == num_items
# Clean up
dynamodb_client.delete_table(TableName=TEST_DDB_TABLE_NAME_2)
def setUpClass(cls):
cls.lambda_client = aws_stack.connect_to_service('lambda')
cls.s3_client = aws_stack.connect_to_service('s3')
cls.sfn_client = aws_stack.connect_to_service('stepfunctions')
zip_file = testutil.create_lambda_archive(
load_file(TEST_LAMBDA_PYTHON),
get_content=True,
runtime=LAMBDA_RUNTIME_PYTHON36
)
testutil.create_lambda_function(
func_name=TEST_LAMBDA_NAME_1,
zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON36,
envvars={'Hello': TEST_RESULT_VALUE}
)
testutil.create_lambda_function(
func_name=TEST_LAMBDA_NAME_2,
zip_file=zip_file,
def stream_exists(name):
kinesis_client = aws_stack.connect_to_service('kinesis')
streams = kinesis_client.list_streams()
return name in streams['StreamNames']
def test_time_to_live(self):
dynamodb = aws_stack.connect_to_resource('dynamodb')
dynamodb_client = aws_stack.connect_to_service('dynamodb')
aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME_3, partition_key=PARTITION_KEY)
table = dynamodb.Table(TEST_DDB_TABLE_NAME_3)
# Insert some items to the table
items = {
'id1': {PARTITION_KEY: 'id1', 'data': 'IT IS'},
'id2': {PARTITION_KEY: 'id2', 'data': 'TIME'},
'id3': {PARTITION_KEY: 'id3', 'data': 'TO LIVE!'}
}
for k, item in items.items():
table.put_item(Item=item)
# Describe TTL when still unset.
response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)
assert response.status_code == 200
def _receive_assert_delete(self, queue_url, assertions, sqs_client=None, required_subject=None):
if not sqs_client:
sqs_client = aws_stack.connect_to_service('sqs')
response = sqs_client.receive_message(QueueUrl=queue_url)
messages = [json.loads(to_str(m['Body'])) for m in response['Messages']]
testutil.assert_objects(assertions, messages)
for message in response['Messages']:
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'])
def test_stream_consumers(self):
client = aws_stack.connect_to_service('kinesis')
stream_name = 'test-%s' % short_uid()
stream_arn = aws_stack.kinesis_stream_arn(stream_name)
def assert_consumers(count):
consumers = client.list_stream_consumers(StreamARN=stream_arn).get('Consumers')
self.assertEqual(len(consumers), count)
return consumers
# create stream and assert 0 consumers
client.create_stream(StreamName=stream_name, ShardCount=1)
assert_consumers(0)
# create consumer and assert 1 consumer
consumer_name = 'cons1'
client.register_stream_consumer(StreamARN=stream_arn, ConsumerName=consumer_name)
consumers = assert_consumers(1)
self.assertEqual(consumers[0]['ConsumerName'], consumer_name)
self.assertIn('/%s' % consumer_name, consumers[0]['ConsumerARN'])
def setUp(self):
self.sqs_client = aws_stack.connect_to_service('sqs')
self.sns_client = aws_stack.connect_to_service('sns')
self.topic_arn = self.sns_client.create_topic(Name=TEST_TOPIC_NAME)['TopicArn']
self.queue_url = self.sqs_client.create_queue(QueueName=TEST_QUEUE_NAME)['QueueUrl']
self.queue_url_2 = self.sqs_client.create_queue(QueueName=TEST_QUEUE_NAME_2)['QueueUrl']