Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def start_profiling(*args):
if not config.USE_PROFILER:
return
@profiled()
def profile_func():
# keep profiler active until tests have finished
mutex.acquire()
print('Start profiling...')
profile_func()
print('Done profiling...')
def test_create_bucket_via_host_name(self):
body = """
eu-central-1
"""
headers = aws_stack.mock_aws_request_headers('s3')
bucket_name = 'test-%s' % short_uid()
headers['Host'] = '%s.s3.amazonaws.com' % bucket_name
response = requests.put(config.TEST_S3_URL, data=body, headers=headers, verify=False)
self.assertEquals(response.status_code, 200)
response = self.s3_client.get_bucket_location(Bucket=bucket_name)
self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)
self.assertIn('LocationConstraint', response)
def _update_location(self, content, bucket_name):
bucket_name = normalize_bucket_name(bucket_name)
host = config.HOSTNAME_EXTERNAL
if ':' not in host:
host = '%s:%s' % (host, config.PORT_S3)
return re.sub(r'\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*',
r'%s://%s/%s/\2' % (get_service_protocol(), host, bucket_name),
content, flags=re.MULTILINE)
def start_stepfunctions(port=None, asynchronous=False, update_listener=None):
port = port or config.PORT_STEPFUNCTIONS
install.install_stepfunctions_local()
backend_port = DEFAULT_PORT_STEPFUNCTIONS_BACKEND
# TODO: local port is currently hard coded in Step Functions Local :/
backend_port = 8083
lambda_endpoint = aws_stack.get_local_service_url('lambda')
dynamodb_endpoint = aws_stack.get_local_service_url('dynamodb')
sns_endpoint = aws_stack.get_local_service_url('sns')
sqs_endpoint = aws_stack.get_local_service_url('sqs')
sfn_endpoint = aws_stack.get_local_service_url('stepfunctions')
cmd = ('cd %s; java -Dcom.amazonaws.sdk.disableCertChecking -Xmx%s -jar StepFunctionsLocal.jar '
'--lambda-endpoint %s --dynamodb-endpoint %s --sns-endpoint %s '
'--sqs-endpoint %s --aws-region %s --aws-account %s --step-functions-endpoint %s') % (
install.INSTALL_DIR_STEPFUNCTIONS, MAX_HEAP_SIZE, lambda_endpoint, dynamodb_endpoint,
sns_endpoint, sqs_endpoint, aws_stack.get_region(), TEST_AWS_ACCOUNT_ID, sfn_endpoint)
print('Starting mock StepFunctions (%s port %s)...' % (get_service_protocol(), port))
start_proxy_for_service('stepfunctions', port, backend_port, update_listener)
env_dict = os.environ.copy() if inherit_env else {}
if env_vars:
env_dict.update(env_vars)
if tty:
asynchronous = True
stdin = True
try:
cwd = os.getcwd() if inherit_cwd else None
if not asynchronous:
if stdin:
return subprocess.check_output(cmd, shell=True, stderr=stderr, env=env_dict,
stdin=subprocess.PIPE, cwd=cwd)
output = subprocess.check_output(cmd, shell=True, stderr=stderr, env=env_dict, cwd=cwd)
return output.decode(config.DEFAULT_ENCODING)
# subprocess.Popen is not thread-safe, hence use a mutex here.. (TODO: mutex still needed?)
with mutex_popen:
stdin_arg = subprocess.PIPE if stdin else None
stdout_arg = open(outfile, 'wb') if isinstance(outfile, six.string_types) else outfile
stderr_arg = stderr
if tty:
# Note: leave the "pty" import here (not supported in Windows)
import pty
master_fd, slave_fd = pty.openpty()
stdin_arg = slave_fd
stdout_arg = stderr_arg = None
# start the actual sub process
kwargs = {}
if is_linux() or is_mac_os():
def set_archive_code(code, lambda_name, zip_file_content=None):
# get metadata
lambda_arn = func_arn(lambda_name)
lambda_details = arn_to_lambda[lambda_arn]
is_local_mount = code.get('S3Bucket') == BUCKET_MARKER_LOCAL
if is_local_mount and config.LAMBDA_REMOTE_DOCKER:
msg = 'Please note that Lambda mounts (bucket name "%s") cannot be used with LAMBDA_REMOTE_DOCKER=1'
raise Exception(msg % BUCKET_MARKER_LOCAL)
# Stop/remove any containers that this arn uses.
LAMBDA_EXECUTOR.cleanup(lambda_arn)
if is_local_mount:
# Mount or use a local folder lambda executors can reference
# WARNING: this means we're pointing lambda_cwd to a local path in the user's
# file system! We must ensure that there is no data loss (i.e., we must *not* add
# this folder to TMP_FILES or similar).
return code['S3Key']
# get file content
zip_file_content = zip_file_content or get_zip_bytes(code)
def start_cloudformation(port=None, asynchronous=False, update_listener=None):
port = port or config.PORT_CLOUDFORMATION
backend_port = DEFAULT_PORT_CLOUDFORMATION_BACKEND
print('Starting mock CloudFormation (%s port %s)...' % (get_service_protocol(), port))
start_proxy_for_service('cloudformation', port, backend_port, update_listener)
if RUN_SERVER_IN_PROCESS:
cmd = 'python "%s" cloudformation -p %s -H 0.0.0.0' % (__file__, backend_port)
env_vars = {'PYTHONPATH': ':'.join(sys.path)}
return do_run(cmd, asynchronous, env_vars=env_vars)
else:
argv = ['cloudformation', '-p', str(backend_port), '-H', '0.0.0.0']
thread = FuncThread(start_up, argv)
thread.start()
return thread
def get_local_region():
global LOCAL_REGION
if LOCAL_REGION is None:
session = boto3.session.Session()
LOCAL_REGION = session.region_name or ''
return LOCAL_REGION or config.DEFAULT_REGION
def _queue_url(self, path, req_data, headers):
queue_url = req_data.get('QueueUrl')
if queue_url:
return queue_url[0]
url = config.TEST_SQS_URL
if headers.get('Host'):
url = 'http%s://%s' % ('s' if config.USE_SSL else '', headers['Host'])
queue_url = '%s%s' % (url, path.partition('?')[0])
return queue_url
def forward_to_fallback_url(func_arn, data):
""" If LAMBDA_FALLBACK_URL is configured, forward the invocation of this non-existing
Lambda to the configured URL. """
if not config.LAMBDA_FALLBACK_URL:
return None
if config.LAMBDA_FALLBACK_URL.startswith('dynamodb://'):
table_name = urlparse(config.LAMBDA_FALLBACK_URL.replace('dynamodb://', 'http://')).netloc
dynamodb = aws_stack.connect_to_service('dynamodb')
item = {
'id': {'S': short_uid()},
'timestamp': {'N': str(now_utc())},
'payload': {'S': str(data)}
}
aws_stack.create_dynamodb_table(table_name, partition_key='id')
dynamodb.put_item(TableName=table_name, Item=item)
return ''
if re.match(r'^https?://.+', config.LAMBDA_FALLBACK_URL):
response = safe_requests.post(config.LAMBDA_FALLBACK_URL, data)
return response.content
raise ClientError('Unexpected value for LAMBDA_FALLBACK_URL: %s' % config.LAMBDA_FALLBACK_URL)