Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_zip_file(file_path, get_content=False):
base_dir = file_path
if not os.path.isdir(file_path):
base_dir = tempfile.mkdtemp(prefix=ARCHIVE_DIR_PREFIX)
shutil.copy(file_path, base_dir)
TMP_FILES.append(base_dir)
tmp_dir = tempfile.mkdtemp(prefix=ARCHIVE_DIR_PREFIX)
zip_file_name = 'archive.zip'
full_zip_file = os.path.join(tmp_dir, zip_file_name)
# create zip file
if is_alpine():
create_zip_file_cli(file_path, base_dir, zip_file=full_zip_file)
else:
create_zip_file_python(file_path, base_dir, zip_file=full_zip_file)
if not get_content:
TMP_FILES.append(tmp_dir)
return full_zip_file
zip_file_content = None
with open(full_zip_file, 'rb') as file_obj:
zip_file_content = file_obj.read()
rm_dir(tmp_dir)
return zip_file_content
def do_download(param):
tmp_file = tmp_file_pattern % param
TMP_FILES.append(tmp_file)
download('http://localhost:%s/%s' % (test_port, param), tmp_file)
def execute_java_lambda(self, event, context, handler, main_file):
event_file = EVENT_FILE_PATTERN.replace('*', short_uid())
save_file(event_file, json.dumps(event))
TMP_FILES.append(event_file)
class_name = handler.split('::')[0]
classpath = '%s:%s:%s' % (LAMBDA_EXECUTOR_JAR, main_file, Util.get_java_classpath(main_file))
cmd = 'java -cp %s %s %s %s' % (classpath, LAMBDA_EXECUTOR_CLASS, class_name, event_file)
result, log_output = self.run_lambda_executor(cmd)
LOG.debug('Lambda result / log output:\n%s\n> %s' % (
result.strip(), log_output.strip().replace('\n', '\n> ')))
return result, log_output
'metricsLevel': 'NONE',
'initialPositionInStream': 'LATEST'
}
# set parameters for local connection
if aws_stack.is_local_env(env):
kwargs['kinesisEndpoint'] = '%s:%s' % (HOSTNAME, config.PORT_KINESIS)
kwargs['dynamodbEndpoint'] = '%s:%s' % (HOSTNAME, config.PORT_DYNAMODB)
kwargs['kinesisProtocol'] = 'http%s' % ('s' if USE_SSL else '')
kwargs['dynamodbProtocol'] = 'http%s' % ('s' if USE_SSL else '')
kwargs['disableCertChecking'] = 'true'
kwargs.update(configs)
# create config file
kclipy_helper.create_config_file(config_file=props_file, executableName=listener_script,
streamName=stream_name, applicationName=stream_info['app_name'],
credentialsProvider=credentialsProvider, region_name=region_name, **kwargs)
TMP_FILES.append(props_file)
# start stream consumer
stream = KinesisStream(id=stream_name, params=stream_info)
thread_consumer = KinesisProcessorThread.start_consumer(stream)
TMP_THREADS.append(thread_consumer)
return thread_consumer
def exec_lambda_code(script, handler_function='handler', lambda_cwd=None, lambda_env=None):
if lambda_cwd or lambda_env:
exec_mutex.acquire()
if lambda_cwd:
previous_cwd = os.getcwd()
os.chdir(lambda_cwd)
sys.path = [lambda_cwd] + sys.path
if lambda_env:
previous_env = dict(os.environ)
os.environ.update(lambda_env)
# generate lambda file name
lambda_id = 'l_%s' % short_uid()
lambda_file = LAMBDA_SCRIPT_PATTERN.replace('*', lambda_id)
save_file(lambda_file, script)
# delete temporary .py and .pyc files on exit
TMP_FILES.append(lambda_file)
TMP_FILES.append('%sc' % lambda_file)
try:
handler_module = imp.load_source(lambda_id, lambda_file)
module_vars = handler_module.__dict__
except Exception as e:
LOG.error('Unable to exec: %s %s' % (script, traceback.format_exc()))
raise e
finally:
if lambda_cwd or lambda_env:
if lambda_cwd:
os.chdir(previous_cwd)
sys.path.pop(0)
if lambda_env:
os.environ = previous_env
exec_mutex.release()
return module_vars[handler_function]
('AWS_ASSUME_ROLE_SESSION_NAME' in os.environ or 'AWS_ASSUME_ROLE_SESSION_NAME' in env_vars)):
# use special credentials provider that can assume IAM roles and handle temporary STS auth tokens
credentialsProvider = 'cloud.localstack.DefaultSTSAssumeRoleSessionCredentialsProvider'
# pass through env variables to child process
for var_name in ['AWS_ASSUME_ROLE_ARN', 'AWS_ASSUME_ROLE_SESSION_NAME',
'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_SESSION_TOKEN']:
if var_name in os.environ and var_name not in env_vars:
env_vars[var_name] = os.environ[var_name]
if aws_stack.is_local_env(env):
# need to disable CBOR protocol, enforce use of plain JSON,
# see https://github.com/mhart/kinesalite/issues/31
env_vars['AWS_CBOR_DISABLE'] = 'true'
if kcl_log_level or (len(log_subscribers) > 0):
if not log_file:
log_file = LOG_FILE_PATTERN.replace('*', short_uid())
TMP_FILES.append(log_file)
run('touch %s' % log_file)
# start log output reader thread which will read the KCL log
# file and print each line to stdout of this process...
reader_thread = OutputReaderThread({'file': log_file, 'level': kcl_log_level,
'log_prefix': 'KCL', 'log_subscribers': log_subscribers})
reader_thread.start()
# construct stream info
stream_info = get_stream_info(stream_name, log_file, env=env, endpoint_url=endpoint_url,
ddb_lease_table_suffix=ddb_lease_table_suffix, env_vars=env_vars)
props_file = stream_info['properties_file']
# set kcl config options
kwargs = {
'metricsLevel': 'NONE',
'initialPositionInStream': 'LATEST'
}
# file system! We must ensure that there is no data loss (i.e., we must *not* add
# this folder to TMP_FILES or similar).
return code['S3Key']
# get file content
zip_file_content = zip_file_content or get_zip_bytes(code)
# Save the zip file to a temporary file that the lambda executors can reference
code_sha_256 = base64.standard_b64encode(hashlib.sha256(zip_file_content).digest())
lambda_details.get_version('$LATEST')['CodeSize'] = len(zip_file_content)
lambda_details.get_version('$LATEST')['CodeSha256'] = code_sha_256.decode('utf-8')
tmp_dir = '%s/zipfile.%s' % (config.TMP_FOLDER, short_uid())
mkdir(tmp_dir)
tmp_file = '%s/%s' % (tmp_dir, LAMBDA_ZIP_FILE_NAME)
save_file(tmp_file, zip_file_content)
TMP_FILES.append(tmp_dir)
lambda_details.cwd = tmp_dir
return tmp_dir
try:
# records is a list of amazon_kclpy.messages.Record objects -> convert to JSON
records_dicts = [j._json_dict for j in records]
message_to_send = {'shard_id': shard_id, 'records': records_dicts}
string_to_send = '%%s\\n' %% json.dumps(message_to_send)
bytes_to_send = string_to_send.encode(DEFAULT_ENCODING)
sock.send(bytes_to_send)
except Exception as e:
msg = "WARN: Unable to forward event: %%s" %% e
print(msg)
subprocess.check_output('echo "%%s" >> %%s' %% (msg, error_log), shell=True)
kinesis_connector.KinesisProcessor.run_processor(log_file=log_file, processor_func=receive_msg)
""" % (LOCALSTACK_VENV_FOLDER, LOCALSTACK_ROOT_FOLDER, events_file, log_file)
save_file(script_file, content)
chmod_r(script_file, 0o755)
TMP_FILES.append(script_file)
return script_file