Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_bucket_via_host_name(self):
body = """
eu-central-1
"""
headers = aws_stack.mock_aws_request_headers('s3')
bucket_name = 'test-%s' % short_uid()
headers['Host'] = '%s.s3.amazonaws.com' % bucket_name
response = requests.put(config.TEST_S3_URL, data=body, headers=headers, verify=False)
self.assertEquals(response.status_code, 200)
response = self.s3_client.get_bucket_location(Bucket=bucket_name)
self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)
self.assertIn('LocationConstraint', response)
def test_dead_letter_queue(self):
sqs_client = aws_stack.connect_to_service('sqs')
lambda_client = aws_stack.connect_to_service('lambda')
# create DLQ and Lambda function
queue_name = 'test-%s' % short_uid()
lambda_name = 'test-%s' % short_uid()
queue_url = sqs_client.create_queue(QueueName=queue_name)['QueueUrl']
queue_arn = aws_stack.sqs_queue_arn(queue_name)
zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON),
get_content=True, libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON36)
testutil.create_lambda_function(func_name=lambda_name, zip_file=zip_file,
runtime=LAMBDA_RUNTIME_PYTHON36, DeadLetterConfig={'TargetArn': queue_arn})
# invoke Lambda, triggering an error
payload = {
lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1
}
lambda_client.invoke(FunctionName=lambda_name,
Payload=json.dumps(payload), InvocationType='Event')
# assert that message has been received on the DLQ
def test_s3_get_response_default_content_type(self):
# When no content type is provided by a PUT request
# 'binary/octet-stream' should be used
# src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
bucket_name = 'test-bucket-%s' % short_uid()
self.s3_client.create_bucket(Bucket=bucket_name)
# put object
object_key = 'key-by-hostname'
self.s3_client.put_object(Bucket=bucket_name, Key=object_key, Body='something')
url = self.s3_client.generate_presigned_url(
'get_object', Params={'Bucket': bucket_name, 'Key': object_key})
# get object and assert headers
response = requests.get(url, verify=False)
self.assertEqual(response.headers['content-type'], 'binary/octet-stream')
# clean up
self._delete_bucket(bucket_name, [object_key])
def test_sam_template(self):
cloudformation = aws_stack.connect_to_service('cloudformation')
awslambda = aws_stack.connect_to_service('lambda')
# deploy template
stack_name = 'stack-%s' % short_uid()
func_name = 'test-%s' % short_uid()
template = TEST_TEMPLATE_4 % func_name
cloudformation.create_stack(StackName=stack_name, TemplateBody=template)
# run Lambda test invocation
result = awslambda.invoke(FunctionName=func_name)
result = json.loads(to_str(result['Payload'].read()))
self.assertEqual(result, {'hello': 'world'})
def test_delete_non_existing_keys(self):
bucket_name = 'test-%s' % short_uid()
self.s3_client.create_bucket(Bucket=bucket_name)
object_key = 'test-key'
self.s3_client.put_object(Bucket=bucket_name, Key=object_key, Body='something')
response = self.s3_client.delete_objects(Bucket=bucket_name,
Delete={'Objects': [{'Key': object_key}, {'Key': 'dummy1'}, {'Key': 'dummy2'}]})
self.assertEqual(len(response['Deleted']), 3)
self.assertNotIn('Errors', response)
# clean up
self._delete_bucket(bucket_name)
stream = {
'DeliveryStreamType': delivery_stream_type,
'KinesisStreamSourceConfiguration': delivery_stream_type_configuration,
'HasMoreDestinations': False,
'VersionId': '1',
'CreateTimestamp': time.time(),
'DeliveryStreamARN': firehose_stream_arn(stream_name),
'DeliveryStreamStatus': 'ACTIVE',
'DeliveryStreamName': stream_name,
'Destinations': [],
'Tags': tags
}
DELIVERY_STREAMS[stream_name] = stream
if elasticsearch_destination:
update_destination(stream_name=stream_name,
destination_id=short_uid(),
elasticsearch_update=elasticsearch_destination)
if s3_destination:
update_destination(stream_name=stream_name, destination_id=short_uid(), s3_update=s3_destination)
# record event
event_publisher.fire_event(event_publisher.EVENT_FIREHOSE_CREATE_STREAM,
payload={'n': event_publisher.get_hash(stream_name)})
if delivery_stream_type == 'KinesisStreamAsSource':
kinesis_stream_name = delivery_stream_type_configuration.get('KinesisStreamARN').split('/')[1]
kinesis_connector.listen_to_kinesis(
stream_name=kinesis_stream_name, fh_d_stream=stream_name,
listener_func=process_records, wait_until_started=True,
ddb_lease_table_suffix='-firehose', region_name=region_name)
return stream
def make_error(message, code=400, code_string='InvalidParameter'):
response = Response()
response._content = """
Sender
<code>{code_string}</code>
{message}
{req_id}
""".format(message=message, code_string=code_string, req_id=short_uid())
response.status_code = code
return response
for config_file in (config_file_home, config_file_tmp):
if config_file:
local_configs = load_file(config_file)
local_configs = json.loads(to_str(local_configs))
configs_map[config_file] = local_configs
if 'machine_id' in local_configs:
MACHINE_ID = local_configs['machine_id']
break
# if we can neither find NOR create the config files, fall back to process id
if not configs_map:
return PROCESS_ID
# assign default id if empty
if not MACHINE_ID:
MACHINE_ID = short_uid()
# update MACHINE_ID in all config files
for config_file, configs in configs_map.items():
configs['machine_id'] = MACHINE_ID
save_file(config_file, json.dumps(configs))
return MACHINE_ID
def Deployment_create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
props = cloudformation_json['Properties']
name = props['StageName']
deployment_id = props.get('Id') or short_uid()
description = props.get('Description') or ''
return apigw_models.Deployment(deployment_id, name, description)
if (('AWS_ASSUME_ROLE_ARN' in os.environ or 'AWS_ASSUME_ROLE_ARN' in env_vars) and
('AWS_ASSUME_ROLE_SESSION_NAME' in os.environ or 'AWS_ASSUME_ROLE_SESSION_NAME' in env_vars)):
# use special credentials provider that can assume IAM roles and handle temporary STS auth tokens
credentialsProvider = 'cloud.localstack.DefaultSTSAssumeRoleSessionCredentialsProvider'
# pass through env variables to child process
for var_name in ['AWS_ASSUME_ROLE_ARN', 'AWS_ASSUME_ROLE_SESSION_NAME',
'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_SESSION_TOKEN']:
if var_name in os.environ and var_name not in env_vars:
env_vars[var_name] = os.environ[var_name]
if aws_stack.is_local_env(env):
# need to disable CBOR protocol, enforce use of plain JSON,
# see https://github.com/mhart/kinesalite/issues/31
env_vars['AWS_CBOR_DISABLE'] = 'true'
if kcl_log_level or (len(log_subscribers) > 0):
if not log_file:
log_file = LOG_FILE_PATTERN.replace('*', short_uid())
TMP_FILES.append(log_file)
run('touch %s' % log_file)
# start log output reader thread which will read the KCL log
# file and print each line to stdout of this process...
reader_thread = OutputReaderThread({'file': log_file, 'level': kcl_log_level,
'log_prefix': 'KCL', 'log_subscribers': log_subscribers})
reader_thread.start()
# construct stream info
stream_info = get_stream_info(stream_name, log_file, env=env, endpoint_url=endpoint_url,
ddb_lease_table_suffix=ddb_lease_table_suffix, env_vars=env_vars)
props_file = stream_info['properties_file']
# set kcl config options
kwargs = {
'metricsLevel': 'NONE',
'initialPositionInStream': 'LATEST'