Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_time_to_live(self):
dynamodb = aws_stack.connect_to_resource('dynamodb')
dynamodb_client = aws_stack.connect_to_service('dynamodb')
aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME_3, partition_key=PARTITION_KEY)
table = dynamodb.Table(TEST_DDB_TABLE_NAME_3)
# Insert some items to the table
items = {
'id1': {PARTITION_KEY: 'id1', 'data': 'IT IS'},
'id2': {PARTITION_KEY: 'id2', 'data': 'TIME'},
'id3': {PARTITION_KEY: 'id3', 'data': 'TO LIVE!'}
}
for k, item in items.items():
table.put_item(Item=item)
# Describe TTL when still unset.
response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)
assert response.status_code == 200
def _receive_assert_delete(self, queue_url, assertions, sqs_client=None, required_subject=None):
if not sqs_client:
sqs_client = aws_stack.connect_to_service('sqs')
response = sqs_client.receive_message(QueueUrl=queue_url)
messages = [json.loads(to_str(m['Body'])) for m in response['Messages']]
testutil.assert_objects(assertions, messages)
for message in response['Messages']:
sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'])
def test_stream_consumers(self):
client = aws_stack.connect_to_service('kinesis')
stream_name = 'test-%s' % short_uid()
stream_arn = aws_stack.kinesis_stream_arn(stream_name)
def assert_consumers(count):
consumers = client.list_stream_consumers(StreamARN=stream_arn).get('Consumers')
self.assertEqual(len(consumers), count)
return consumers
# create stream and assert 0 consumers
client.create_stream(StreamName=stream_name, ShardCount=1)
assert_consumers(0)
# create consumer and assert 1 consumer
consumer_name = 'cons1'
client.register_stream_consumer(StreamARN=stream_arn, ConsumerName=consumer_name)
consumers = assert_consumers(1)
self.assertEqual(consumers[0]['ConsumerName'], consumer_name)
self.assertIn('/%s' % consumer_name, consumers[0]['ConsumerARN'])
def setUp(self):
self.sqs_client = aws_stack.connect_to_service('sqs')
self.sns_client = aws_stack.connect_to_service('sns')
self.topic_arn = self.sns_client.create_topic(Name=TEST_TOPIC_NAME)['TopicArn']
self.queue_url = self.sqs_client.create_queue(QueueName=TEST_QUEUE_NAME)['QueueUrl']
self.queue_url_2 = self.sqs_client.create_queue(QueueName=TEST_QUEUE_NAME_2)['QueueUrl']
def update_physical_resource_id(resource):
phys_res_id = getattr(resource, 'physical_resource_id', None)
if not phys_res_id:
if isinstance(resource, lambda_models.LambdaFunction):
func_arn = aws_stack.lambda_function_arn(resource.function_name)
resource.function_arn = resource.physical_resource_id = func_arn
elif isinstance(resource, sfn_models.StateMachine):
sm_arn = aws_stack.state_machine_arn(resource.name)
resource.physical_resource_id = sm_arn
elif isinstance(resource, service_models.StepFunctionsActivity):
act_arn = aws_stack.stepfunctions_activity_arn(resource.params.get('Name'))
resource.physical_resource_id = act_arn
elif isinstance(resource, kinesis_models.Stream):
resource.physical_resource_id = resource.stream_name
else:
LOG.warning('Unable to determine physical_resource_id for resource %s' % type(resource))
def check_infra_dynamodb(expect_shutdown=False, print_error=False):
out = None
try:
# check DynamoDB
out = aws_stack.connect_to_service(service_name='dynamodb', client=True, env=ENV_DEV).list_tables()
except Exception as e:
if print_error:
LOGGER.error('DynamoDB health check failed: %s %s' % (e, traceback.format_exc()))
if expect_shutdown:
assert out is None
else:
assert isinstance(out['TableNames'], list)
try:
aws_stack.check_valid_region(headers)
aws_stack.set_default_region_in_headers(headers)
except Exception as e:
return make_error(message=str(e), code=400)
if method == 'POST' and path == '/':
# parse payload and extract fields
req_data = urlparse.parse_qs(to_str(data))
req_action = req_data['Action'][0]
topic_arn = req_data.get('TargetArn') or req_data.get('TopicArn') or req_data.get('ResourceArn')
if topic_arn:
topic_arn = topic_arn[0]
topic_arn = aws_stack.fix_account_id_in_arns(topic_arn)
if req_action == 'SetSubscriptionAttributes':
sub = get_subscription_by_arn(req_data['SubscriptionArn'][0])
if not sub:
return make_error(message='Unable to find subscription for given ARN', code=400)
attr_name = req_data['AttributeName'][0]
attr_value = req_data['AttributeValue'][0]
sub[attr_name] = attr_value
return make_response(req_action)
elif req_action == 'GetSubscriptionAttributes':
sub = get_subscription_by_arn(req_data['SubscriptionArn'][0])
if not sub:
return make_error(message='Unable to find subscription for given ARN', code=400)
content = ''
for key, value in sub.items():
content += '%s%s\n' % (key, value)
def start_stepfunctions(port=None, asynchronous=False, update_listener=None):
port = port or config.PORT_STEPFUNCTIONS
install.install_stepfunctions_local()
backend_port = DEFAULT_PORT_STEPFUNCTIONS_BACKEND
# TODO: local port is currently hard coded in Step Functions Local :/
backend_port = 8083
lambda_endpoint = aws_stack.get_local_service_url('lambda')
dynamodb_endpoint = aws_stack.get_local_service_url('dynamodb')
sns_endpoint = aws_stack.get_local_service_url('sns')
sqs_endpoint = aws_stack.get_local_service_url('sqs')
sfn_endpoint = aws_stack.get_local_service_url('stepfunctions')
cmd = ('cd %s; java -Dcom.amazonaws.sdk.disableCertChecking -Xmx%s -jar StepFunctionsLocal.jar '
'--lambda-endpoint %s --dynamodb-endpoint %s --sns-endpoint %s '
'--sqs-endpoint %s --aws-region %s --aws-account %s --step-functions-endpoint %s') % (
install.INSTALL_DIR_STEPFUNCTIONS, MAX_HEAP_SIZE, lambda_endpoint, dynamodb_endpoint,
sns_endpoint, sqs_endpoint, aws_stack.get_region(), TEST_AWS_ACCOUNT_ID, sfn_endpoint)
print('Starting mock StepFunctions (%s port %s)...' % (get_service_protocol(), port))
start_proxy_for_service('stepfunctions', port, backend_port, update_listener)
return do_run(cmd, asynchronous)
def get_stream_info(stream_name, log_file=None, shards=None, env=None, endpoint_url=None,
ddb_lease_table_suffix=None, env_vars={}):
if not ddb_lease_table_suffix:
ddb_lease_table_suffix = DEFAULT_DDB_LEASE_TABLE_SUFFIX
# construct stream info
env = aws_stack.get_environment(env)
props_file = os.path.join(tempfile.gettempdir(), 'kclipy.%s.properties' % short_uid())
app_name = '%s%s' % (stream_name, ddb_lease_table_suffix)
stream_info = {
'name': stream_name,
'region': aws_stack.get_region(),
'shards': shards,
'properties_file': props_file,
'log_file': log_file,
'app_name': app_name,
'env_vars': env_vars
}
# set local connection
if aws_stack.is_local_env(env):
stream_info['conn_kwargs'] = {
'host': HOSTNAME,
'port': config.PORT_KINESIS,
def get_client(resource, func_config):
resource_type = get_resource_type(resource)
service = get_service_name(resource)
resource_config = RESOURCE_TO_FUNCTION.get(resource_type)
if resource_config is None:
raise Exception('CloudFormation deployment for resource type %s not yet implemented' % resource_type)
try:
if func_config.get('boto_client') == 'resource':
return aws_stack.connect_to_resource(service)
return aws_stack.connect_to_service(service)
except Exception as e:
LOG.warning('Unable to get client for "%s" API, skipping deployment: %s' % (service, e))
return None