Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_event_message(event_name, bucket_name, file_name='testfile.txt', version_id=None, file_size=1024):
# Based on: http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
bucket_name = normalize_bucket_name(bucket_name)
return {
'Records': [{
'eventVersion': '2.0',
'eventSource': 'aws:s3',
'awsRegion': aws_stack.get_region(),
'eventTime': timestamp(format=TIMESTAMP_FORMAT_MILLIS),
'eventName': event_name,
'userIdentity': {
'principalId': 'AIDAJDPLRKLG7UEXAMPLE'
},
'requestParameters': {
'sourceIPAddress': '127.0.0.1' # TODO determine real source IP
},
'responseElements': {
'x-amz-request-id': short_uid(),
'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2' # Amazon S3 host that processed the request
},
's3': {
's3SchemaVersion': '1.0',
'configurationId': 'testConfigRule',
'bucket': {
def get_domain_status(domain_name, deleted=False):
return {
'DomainStatus': {
'ARN': 'arn:aws:es:%s:%s:domain/%s' % (aws_stack.get_region(), TEST_AWS_ACCOUNT_ID, domain_name),
'Created': True,
'Deleted': deleted,
'DomainId': '%s/%s' % (TEST_AWS_ACCOUNT_ID, domain_name),
'DomainName': domain_name,
'ElasticsearchClusterConfig': {
'DedicatedMasterCount': 1,
'DedicatedMasterEnabled': True,
'DedicatedMasterType': 'm3.medium.elasticsearch',
'InstanceCount': 1,
'InstanceType': 'm3.medium.elasticsearch',
'ZoneAwarenessEnabled': False
},
'ElasticsearchVersion': '6.7',
'Endpoint': aws_stack.get_elasticsearch_endpoint(domain_name),
'Processing': False,
'EBSOptions': {
def describe_stack_resource(self):
stack_name = self._get_param('StackName')
stack = self.cloudformation_backend.get_stack(stack_name)
logical_resource_id = self._get_param('LogicalResourceId')
if not stack:
msg = ('Unable to find CloudFormation stack "%s" in region %s' %
(stack_name, aws_stack.get_region()))
if aws_stack.get_region() != self.region:
msg = '%s/%s' % (msg, self.region)
LOG.warning(msg)
response = aws_responses.flask_error_response(msg, code=404, error_type='ResourceNotFoundException')
return 404, response.headers, response.data
for stack_resource in stack.stack_resources:
# Note: Line below has been patched
# if stack_resource.logical_resource_id == logical_resource_id:
if stack_resource and stack_resource.logical_resource_id == logical_resource_id:
resource = stack_resource
break
else:
raise ValidationError(logical_resource_id)
template = self.response_template(
responses.DESCRIBE_STACK_RESOURCE_RESPONSE_TEMPLATE)
def Resource_create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
props = cloudformation_json['Properties']
region_name = props.get('Region') or aws_stack.get_region()
path_part = props.get('PathPart')
api_id = props.get('RestApiId')
parent_id = props.get('ParentId')
id = props.get('Id') or short_uid()
return apigw_models.Resource(id, region_name, api_id, path_part, parent_id)
def create_config_file(config_file, executableName, streamName, applicationName,
credentialsProvider=None, region_name=None, **kwargs):
if not credentialsProvider:
credentialsProvider = 'DefaultAWSCredentialsProviderChain'
region_name = region_name or aws_stack.get_region()
content = """
executableName = %s
streamName = %s
applicationName = %s
AWSCredentialsProvider = %s
processingLanguage = python/2.7
parentShardPollIntervalMillis = 2000
regionName = %s
""" % (executableName, streamName, applicationName, credentialsProvider, region_name)
# optional properties
for key, value in iteritems(kwargs):
content += """
%s = %s""" % (key, value)
content = content.replace(' ', '')
save_file(config_file, content)
def get_stream_info(stream_name, log_file=None, shards=None, env=None, endpoint_url=None,
ddb_lease_table_suffix=None, env_vars={}):
if not ddb_lease_table_suffix:
ddb_lease_table_suffix = DEFAULT_DDB_LEASE_TABLE_SUFFIX
# construct stream info
env = aws_stack.get_environment(env)
props_file = os.path.join(tempfile.gettempdir(), 'kclipy.%s.properties' % short_uid())
app_name = '%s%s' % (stream_name, ddb_lease_table_suffix)
stream_info = {
'name': stream_name,
'region': aws_stack.get_region(),
'shards': shards,
'properties_file': props_file,
'log_file': log_file,
'app_name': app_name,
'env_vars': env_vars
}
# set local connection
if aws_stack.is_local_env(env):
stream_info['conn_kwargs'] = {
'host': HOSTNAME,
'port': config.PORT_KINESIS,
'is_secure': bool(USE_SSL)
}
if endpoint_url:
if 'conn_kwargs' not in stream_info:
stream_info['conn_kwargs'] = {}
def resolve_ref(stack_name, ref, resources, attribute):
if ref == 'AWS::Region':
return aws_stack.get_region()
resource_status = {}
if stack_name:
resource_status = describe_stack_resource(stack_name, ref)
if not resource_status:
return
attr_value = resource_status.get(attribute)
if attr_value not in [None, '']:
return attr_value
elif ref in resources:
resource_status = resources[ref]['__details__']
# fetch resource details
resource = resources.get(ref)
resource_new = retrieve_resource_details(ref, resource_status, resources, stack_name)
if not resource_new:
return
resource_type = get_resource_type(resource)
def start_stepfunctions(port=None, asynchronous=False, update_listener=None):
port = port or config.PORT_STEPFUNCTIONS
install.install_stepfunctions_local()
backend_port = DEFAULT_PORT_STEPFUNCTIONS_BACKEND
# TODO: local port is currently hard coded in Step Functions Local :/
backend_port = 8083
lambda_endpoint = aws_stack.get_local_service_url('lambda')
dynamodb_endpoint = aws_stack.get_local_service_url('dynamodb')
sns_endpoint = aws_stack.get_local_service_url('sns')
sqs_endpoint = aws_stack.get_local_service_url('sqs')
sfn_endpoint = aws_stack.get_local_service_url('stepfunctions')
cmd = ('cd %s; java -Dcom.amazonaws.sdk.disableCertChecking -Xmx%s -jar StepFunctionsLocal.jar '
'--lambda-endpoint %s --dynamodb-endpoint %s --sns-endpoint %s '
'--sqs-endpoint %s --aws-region %s --aws-account %s --step-functions-endpoint %s') % (
install.INSTALL_DIR_STEPFUNCTIONS, MAX_HEAP_SIZE, lambda_endpoint, dynamodb_endpoint,
sns_endpoint, sqs_endpoint, aws_stack.get_region(), TEST_AWS_ACCOUNT_ID, sfn_endpoint)
print('Starting mock StepFunctions (%s port %s)...' % (get_service_protocol(), port))
start_proxy_for_service('stepfunctions', port, backend_port, update_listener)
return do_run(cmd, asynchronous)