How to use the localstack.config function in localstack

To help you get started, we’ve selected a few localstack examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github localstack / localstack / tests / integration / __init__.py View on Github external
def start_profiling(*args):
    if not config.USE_PROFILER:
        return

    @profiled()
    def profile_func():
        # keep profiler active until tests have finished
        mutex.acquire()

    print('Start profiling...')
    profile_func()
    print('Done profiling...')
github localstack / localstack / tests / integration / test_s3.py View on Github external
def test_create_bucket_via_host_name(self):
        body = """
            
                eu-central-1
            """
        headers = aws_stack.mock_aws_request_headers('s3')
        bucket_name = 'test-%s' % short_uid()
        headers['Host'] = '%s.s3.amazonaws.com' % bucket_name
        response = requests.put(config.TEST_S3_URL, data=body, headers=headers, verify=False)
        self.assertEquals(response.status_code, 200)
        response = self.s3_client.get_bucket_location(Bucket=bucket_name)
        self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)
        self.assertIn('LocationConstraint', response)
github localstack / localstack / localstack / services / s3 / s3_listener.py View on Github external
def _update_location(self, content, bucket_name):
        bucket_name = normalize_bucket_name(bucket_name)

        host = config.HOSTNAME_EXTERNAL
        if ':' not in host:
            host = '%s:%s' % (host, config.PORT_S3)
        return re.sub(r'\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*',
            r'%s://%s/%s/\2' % (get_service_protocol(), host, bucket_name),
            content, flags=re.MULTILINE)
github localstack / localstack / localstack / services / stepfunctions / stepfunctions_starter.py View on Github external
def start_stepfunctions(port=None, asynchronous=False, update_listener=None):
    port = port or config.PORT_STEPFUNCTIONS
    install.install_stepfunctions_local()
    backend_port = DEFAULT_PORT_STEPFUNCTIONS_BACKEND
    # TODO: local port is currently hard coded in Step Functions Local :/
    backend_port = 8083
    lambda_endpoint = aws_stack.get_local_service_url('lambda')
    dynamodb_endpoint = aws_stack.get_local_service_url('dynamodb')
    sns_endpoint = aws_stack.get_local_service_url('sns')
    sqs_endpoint = aws_stack.get_local_service_url('sqs')
    sfn_endpoint = aws_stack.get_local_service_url('stepfunctions')
    cmd = ('cd %s; java -Dcom.amazonaws.sdk.disableCertChecking -Xmx%s -jar StepFunctionsLocal.jar '
           '--lambda-endpoint %s --dynamodb-endpoint %s --sns-endpoint %s '
           '--sqs-endpoint %s --aws-region %s --aws-account %s --step-functions-endpoint %s') % (
        install.INSTALL_DIR_STEPFUNCTIONS, MAX_HEAP_SIZE, lambda_endpoint, dynamodb_endpoint,
        sns_endpoint, sqs_endpoint, aws_stack.get_region(), TEST_AWS_ACCOUNT_ID, sfn_endpoint)
    print('Starting mock StepFunctions (%s port %s)...' % (get_service_protocol(), port))
    start_proxy_for_service('stepfunctions', port, backend_port, update_listener)
github localstack / localstack / localstack / utils / bootstrap.py View on Github external
env_dict = os.environ.copy() if inherit_env else {}
    if env_vars:
        env_dict.update(env_vars)

    if tty:
        asynchronous = True
        stdin = True

    try:
        cwd = os.getcwd() if inherit_cwd else None
        if not asynchronous:
            if stdin:
                return subprocess.check_output(cmd, shell=True, stderr=stderr, env=env_dict,
                    stdin=subprocess.PIPE, cwd=cwd)
            output = subprocess.check_output(cmd, shell=True, stderr=stderr, env=env_dict, cwd=cwd)
            return output.decode(config.DEFAULT_ENCODING)

        # subprocess.Popen is not thread-safe, hence use a mutex here.. (TODO: mutex still needed?)
        with mutex_popen:
            stdin_arg = subprocess.PIPE if stdin else None
            stdout_arg = open(outfile, 'wb') if isinstance(outfile, six.string_types) else outfile
            stderr_arg = stderr
            if tty:
                # Note: leave the "pty" import here (not supported in Windows)
                import pty
                master_fd, slave_fd = pty.openpty()
                stdin_arg = slave_fd
                stdout_arg = stderr_arg = None

            # start the actual sub process
            kwargs = {}
            if is_linux() or is_mac_os():
github localstack / localstack / localstack / services / awslambda / lambda_api.py View on Github external
def set_archive_code(code, lambda_name, zip_file_content=None):

    # get metadata
    lambda_arn = func_arn(lambda_name)
    lambda_details = arn_to_lambda[lambda_arn]
    is_local_mount = code.get('S3Bucket') == BUCKET_MARKER_LOCAL

    if is_local_mount and config.LAMBDA_REMOTE_DOCKER:
        msg = 'Please note that Lambda mounts (bucket name "%s") cannot be used with LAMBDA_REMOTE_DOCKER=1'
        raise Exception(msg % BUCKET_MARKER_LOCAL)

    # Stop/remove any containers that this arn uses.
    LAMBDA_EXECUTOR.cleanup(lambda_arn)

    if is_local_mount:
        # Mount or use a local folder lambda executors can reference
        # WARNING: this means we're pointing lambda_cwd to a local path in the user's
        # file system! We must ensure that there is no data loss (i.e., we must *not* add
        # this folder to TMP_FILES or similar).
        return code['S3Key']

    # get file content
    zip_file_content = zip_file_content or get_zip_bytes(code)
github localstack / localstack / localstack / services / cloudformation / cloudformation_starter.py View on Github external
def start_cloudformation(port=None, asynchronous=False, update_listener=None):
    port = port or config.PORT_CLOUDFORMATION
    backend_port = DEFAULT_PORT_CLOUDFORMATION_BACKEND
    print('Starting mock CloudFormation (%s port %s)...' % (get_service_protocol(), port))
    start_proxy_for_service('cloudformation', port, backend_port, update_listener)
    if RUN_SERVER_IN_PROCESS:
        cmd = 'python "%s" cloudformation -p %s -H 0.0.0.0' % (__file__, backend_port)
        env_vars = {'PYTHONPATH': ':'.join(sys.path)}
        return do_run(cmd, asynchronous, env_vars=env_vars)
    else:
        argv = ['cloudformation', '-p', str(backend_port), '-H', '0.0.0.0']
        thread = FuncThread(start_up, argv)
        thread.start()
        return thread
github localstack / localstack / localstack / utils / aws / aws_stack.py View on Github external
def get_local_region():
    global LOCAL_REGION
    if LOCAL_REGION is None:
        session = boto3.session.Session()
        LOCAL_REGION = session.region_name or ''
    return LOCAL_REGION or config.DEFAULT_REGION
github localstack / localstack / localstack / services / sqs / sqs_listener.py View on Github external
def _queue_url(self, path, req_data, headers):
        queue_url = req_data.get('QueueUrl')
        if queue_url:
            return queue_url[0]
        url = config.TEST_SQS_URL
        if headers.get('Host'):
            url = 'http%s://%s' % ('s' if config.USE_SSL else '', headers['Host'])
        queue_url = '%s%s' % (url, path.partition('?')[0])
        return queue_url
github localstack / localstack / localstack / services / awslambda / lambda_api.py View on Github external
def forward_to_fallback_url(func_arn, data):
    """ If LAMBDA_FALLBACK_URL is configured, forward the invocation of this non-existing
        Lambda to the configured URL. """
    if not config.LAMBDA_FALLBACK_URL:
        return None
    if config.LAMBDA_FALLBACK_URL.startswith('dynamodb://'):
        table_name = urlparse(config.LAMBDA_FALLBACK_URL.replace('dynamodb://', 'http://')).netloc
        dynamodb = aws_stack.connect_to_service('dynamodb')
        item = {
            'id': {'S': short_uid()},
            'timestamp': {'N': str(now_utc())},
            'payload': {'S': str(data)}
        }
        aws_stack.create_dynamodb_table(table_name, partition_key='id')
        dynamodb.put_item(TableName=table_name, Item=item)
        return ''
    if re.match(r'^https?://.+', config.LAMBDA_FALLBACK_URL):
        response = safe_requests.post(config.LAMBDA_FALLBACK_URL, data)
        return response.content
    raise ClientError('Unexpected value for LAMBDA_FALLBACK_URL: %s' % config.LAMBDA_FALLBACK_URL)