How to use the localstack.utils.common.short_uid function in localstack

To help you get started, we’ve selected a few localstack examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github localstack / localstack / tests / integration / test_s3.py View on Github external
def test_create_bucket_via_host_name(self):
        body = """
            
                eu-central-1
            """
        headers = aws_stack.mock_aws_request_headers('s3')
        bucket_name = 'test-%s' % short_uid()
        headers['Host'] = '%s.s3.amazonaws.com' % bucket_name
        response = requests.put(config.TEST_S3_URL, data=body, headers=headers, verify=False)
        self.assertEquals(response.status_code, 200)
        response = self.s3_client.get_bucket_location(Bucket=bucket_name)
        self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)
        self.assertIn('LocationConstraint', response)
github localstack / localstack / tests / integration / test_lambda.py View on Github external
def test_dead_letter_queue(self):
        sqs_client = aws_stack.connect_to_service('sqs')
        lambda_client = aws_stack.connect_to_service('lambda')

        # create DLQ and Lambda function
        queue_name = 'test-%s' % short_uid()
        lambda_name = 'test-%s' % short_uid()
        queue_url = sqs_client.create_queue(QueueName=queue_name)['QueueUrl']
        queue_arn = aws_stack.sqs_queue_arn(queue_name)
        zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON),
            get_content=True, libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON36)
        testutil.create_lambda_function(func_name=lambda_name, zip_file=zip_file,
            runtime=LAMBDA_RUNTIME_PYTHON36, DeadLetterConfig={'TargetArn': queue_arn})

        # invoke Lambda, triggering an error
        payload = {
            lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1
        }
        lambda_client.invoke(FunctionName=lambda_name,
            Payload=json.dumps(payload), InvocationType='Event')

        # assert that message has been received on the DLQ
github localstack / localstack / tests / integration / test_s3.py View on Github external
def test_s3_get_response_default_content_type(self):
        # When no content type is provided by a PUT request
        # 'binary/octet-stream' should be used
        # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html

        bucket_name = 'test-bucket-%s' % short_uid()
        self.s3_client.create_bucket(Bucket=bucket_name)

        # put object
        object_key = 'key-by-hostname'
        self.s3_client.put_object(Bucket=bucket_name, Key=object_key, Body='something')
        url = self.s3_client.generate_presigned_url(
            'get_object', Params={'Bucket': bucket_name, 'Key': object_key})

        # get object and assert headers
        response = requests.get(url, verify=False)
        self.assertEqual(response.headers['content-type'], 'binary/octet-stream')
        # clean up
        self._delete_bucket(bucket_name, [object_key])
github localstack / localstack / tests / integration / test_cloudformation.py View on Github external
def test_sam_template(self):
        cloudformation = aws_stack.connect_to_service('cloudformation')
        awslambda = aws_stack.connect_to_service('lambda')

        # deploy template
        stack_name = 'stack-%s' % short_uid()
        func_name = 'test-%s' % short_uid()
        template = TEST_TEMPLATE_4 % func_name
        cloudformation.create_stack(StackName=stack_name, TemplateBody=template)

        # run Lambda test invocation
        result = awslambda.invoke(FunctionName=func_name)
        result = json.loads(to_str(result['Payload'].read()))
        self.assertEqual(result, {'hello': 'world'})
github localstack / localstack / tests / integration / test_s3.py View on Github external
def test_delete_non_existing_keys(self):
        bucket_name = 'test-%s' % short_uid()
        self.s3_client.create_bucket(Bucket=bucket_name)
        object_key = 'test-key'
        self.s3_client.put_object(Bucket=bucket_name, Key=object_key, Body='something')
        response = self.s3_client.delete_objects(Bucket=bucket_name,
            Delete={'Objects': [{'Key': object_key}, {'Key': 'dummy1'}, {'Key': 'dummy2'}]})
        self.assertEqual(len(response['Deleted']), 3)
        self.assertNotIn('Errors', response)
        # clean up
        self._delete_bucket(bucket_name)
github localstack / localstack / localstack / services / firehose / firehose_api.py View on Github external
stream = {
        'DeliveryStreamType': delivery_stream_type,
        'KinesisStreamSourceConfiguration': delivery_stream_type_configuration,
        'HasMoreDestinations': False,
        'VersionId': '1',
        'CreateTimestamp': time.time(),
        'DeliveryStreamARN': firehose_stream_arn(stream_name),
        'DeliveryStreamStatus': 'ACTIVE',
        'DeliveryStreamName': stream_name,
        'Destinations': [],
        'Tags': tags
    }
    DELIVERY_STREAMS[stream_name] = stream
    if elasticsearch_destination:
        update_destination(stream_name=stream_name,
                           destination_id=short_uid(),
                           elasticsearch_update=elasticsearch_destination)
    if s3_destination:
        update_destination(stream_name=stream_name, destination_id=short_uid(), s3_update=s3_destination)

    # record event
    event_publisher.fire_event(event_publisher.EVENT_FIREHOSE_CREATE_STREAM,
        payload={'n': event_publisher.get_hash(stream_name)})

    if delivery_stream_type == 'KinesisStreamAsSource':
        kinesis_stream_name = delivery_stream_type_configuration.get('KinesisStreamARN').split('/')[1]
        kinesis_connector.listen_to_kinesis(
            stream_name=kinesis_stream_name, fh_d_stream=stream_name,
            listener_func=process_records, wait_until_started=True,
            ddb_lease_table_suffix='-firehose', region_name=region_name)
    return stream
github localstack / localstack / localstack / services / sns / sns_listener.py View on Github external
def make_error(message, code=400, code_string='InvalidParameter'):
    response = Response()
    response._content = """
        Sender
        <code>{code_string}</code>
        {message}
        {req_id}
        """.format(message=message, code_string=code_string, req_id=short_uid())
    response.status_code = code
    return response
github localstack / localstack / localstack / utils / analytics / event_publisher.py View on Github external
for config_file in (config_file_home, config_file_tmp):
        if config_file:
            local_configs = load_file(config_file)
            local_configs = json.loads(to_str(local_configs))
            configs_map[config_file] = local_configs
            if 'machine_id' in local_configs:
                MACHINE_ID = local_configs['machine_id']
                break

    # if we can neither find NOR create the config files, fall back to process id
    if not configs_map:
        return PROCESS_ID

    # assign default id if empty
    if not MACHINE_ID:
        MACHINE_ID = short_uid()

    # update MACHINE_ID in all config files
    for config_file, configs in configs_map.items():
        configs['machine_id'] = MACHINE_ID
        save_file(config_file, json.dumps(configs))

    return MACHINE_ID
github localstack / localstack / localstack / services / cloudformation / cloudformation_starter.py View on Github external
def Deployment_create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
        props = cloudformation_json['Properties']
        name = props['StageName']
        deployment_id = props.get('Id') or short_uid()
        description = props.get('Description') or ''
        return apigw_models.Deployment(deployment_id, name, description)
github localstack / localstack / localstack / utils / kinesis / kinesis_connector.py View on Github external
if (('AWS_ASSUME_ROLE_ARN' in os.environ or 'AWS_ASSUME_ROLE_ARN' in env_vars) and
            ('AWS_ASSUME_ROLE_SESSION_NAME' in os.environ or 'AWS_ASSUME_ROLE_SESSION_NAME' in env_vars)):
        # use special credentials provider that can assume IAM roles and handle temporary STS auth tokens
        credentialsProvider = 'cloud.localstack.DefaultSTSAssumeRoleSessionCredentialsProvider'
        # pass through env variables to child process
        for var_name in ['AWS_ASSUME_ROLE_ARN', 'AWS_ASSUME_ROLE_SESSION_NAME',
                'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_SESSION_TOKEN']:
            if var_name in os.environ and var_name not in env_vars:
                env_vars[var_name] = os.environ[var_name]
    if aws_stack.is_local_env(env):
        # need to disable CBOR protocol, enforce use of plain JSON,
        # see https://github.com/mhart/kinesalite/issues/31
        env_vars['AWS_CBOR_DISABLE'] = 'true'
    if kcl_log_level or (len(log_subscribers) > 0):
        if not log_file:
            log_file = LOG_FILE_PATTERN.replace('*', short_uid())
            TMP_FILES.append(log_file)
        run('touch %s' % log_file)
        # start log output reader thread which will read the KCL log
        # file and print each line to stdout of this process...
        reader_thread = OutputReaderThread({'file': log_file, 'level': kcl_log_level,
            'log_prefix': 'KCL', 'log_subscribers': log_subscribers})
        reader_thread.start()

    # construct stream info
    stream_info = get_stream_info(stream_name, log_file, env=env, endpoint_url=endpoint_url,
        ddb_lease_table_suffix=ddb_lease_table_suffix, env_vars=env_vars)
    props_file = stream_info['properties_file']
    # set kcl config options
    kwargs = {
        'metricsLevel': 'NONE',
        'initialPositionInStream': 'LATEST'