Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def ls(args):
bucket = resources.s3.Bucket(args.billing_reports_bucket.format(account_id=ARN.get_account_id()))
now = datetime.utcnow()
year = args.year or now.year
month = str(args.month or now.month).zfill(2)
next_year = year + ((args.month or now.month) + 1) // 12
next_month = str(((args.month or now.month) + 1) % 12).zfill(2)
manifest_name = "aegea/{report}/{yr}{mo}01-{next_yr}{next_mo}01/{report}-Manifest.json"
manifest_name = manifest_name.format(report=__name__, yr=year, mo=month, next_yr=next_year, next_mo=next_month)
try:
manifest = json.loads(bucket.Object(manifest_name).get().get("Body").read())
for report_key in manifest["reportKeys"]:
report = BytesIO(bucket.Object(report_key).get().get("Body").read())
with gzip.GzipFile(fileobj=report) as fh:
reader = csv.DictReader(fh)
for line in reader:
page_output(tabulate(filter_line_items(reader, args), args))
except ClientError as e:
'description="Built by {} for {}"'.format(__name__, ARN.get_iam_username())]
ensure_ecr_repo(args.name, read_access=args.read_access)
with tempfile.NamedTemporaryFile(mode="wt") as exec_fh:
exec_fh.write(build_docker_image_shellcode % (encode_dockerfile(args), ))
exec_fh.flush()
submit_args = submit_parser.parse_args(["--execute", exec_fh.name])
submit_args.volumes = [["/var/run/docker.sock", "/var/run/docker.sock"]]
submit_args.privileged = True
submit_args.watch = True
submit_args.dry_run = args.dry_run
submit_args.image = args.builder_image
submit_args.environment = [
dict(name="TAG", value="latest"),
dict(name="REPO", value=args.name),
dict(name="AWS_DEFAULT_REGION", value=ARN.get_region()),
dict(name="AWS_ACCOUNT_ID", value=ARN.get_account_id())
]
builder_iam_role = ensure_iam_role(__name__, trust=["ecs-tasks"], policies=args.builder_iam_policies)
submit_args.job_role = builder_iam_role.name
job = submit(submit_args)
return dict(job=job)
def ensure_deploy_iam_policy():
sqs_arn = ARN(service="sqs", region="*", resource="github-*")
policy_doc = IAMPolicyBuilder(action="sqs:*", resource=str(sqs_arn))
sns_arn = ARN(service="sns", resource="github-*")
policy_doc.add_statement(action="sns:Subscribe", resource=str(sns_arn))
s3_arn = ARN(service="s3", region="", account_id="", resource="deploy-status-{}/*".format(ARN.get_account_id()))
policy_doc.add_statement(action="s3:PutObject", resource=str(s3_arn))
return ensure_iam_policy(__name__, policy_doc)
bucket.upload_fileobj(args.execute, key_name)
payload_url = clients.s3.generate_presigned_url(
ClientMethod='get_object',
Params=dict(Bucket=bucket.name, Key=key_name),
ExpiresIn=3600 * 24 * 7
)
tmpdir_fmt = "${AWS_BATCH_CE_NAME:-$AWS_EXECUTION_ENV}.${AWS_BATCH_JQ_NAME:-}.${AWS_BATCH_JOB_ID:-}.XXXXX"
shellcode += ['BATCH_SCRIPT=$(mktemp --tmpdir "{tmpdir_fmt}")'.format(tmpdir_fmt=tmpdir_fmt),
"apt-get update -qq",
"apt-get install -qqy --no-install-suggests --no-install-recommends curl ca-certificates gnupg",
"curl -L '{payload_url}' > $BATCH_SCRIPT".format(payload_url=payload_url),
"chmod +x $BATCH_SCRIPT",
"$BATCH_SCRIPT"]
elif args.cwl:
ensure_dynamodb_table("aegea-batch-jobs", hash_key_name="job_id")
bucket = ensure_s3_bucket(args.staging_s3_bucket or "aegea-batch-jobs-" + ARN.get_account_id())
args.environment.append(dict(name="AEGEA_BATCH_S3_BASE_URL", value="s3://" + bucket.name))
from cwltool.main import main as cwltool_main
with io.BytesIO() as preprocessed_cwl:
if cwltool_main(["--print-pre", args.cwl], stdout=preprocessed_cwl) != 0:
raise AegeaException("Error while running cwltool")
cwl_spec = yaml.load(preprocessed_cwl.getvalue())
payload = base64.b64encode(preprocessed_cwl.getvalue()).decode()
args.environment.append(dict(name="AEGEA_BATCH_CWL_DEF_B64", value=payload))
payload = base64.b64encode(args.cwl_input.read()).decode()
args.environment.append(dict(name="AEGEA_BATCH_CWL_JOB_B64", value=payload))
for requirement in cwl_spec.get("requirements", []):
if requirement["class"] == "DockerRequirement":
# FIXME: dockerFile support: ensure_ecr_image(...)
# container_props["image"] = requirement["dockerPull"]
def export_log_files(args):
bucket_name = "aegea-cloudwatch-log-export-{}-{}".format(ARN.get_account_id(), clients.logs.meta.region_name)
bucket_arn = ARN(service="s3", region="", account_id="", resource=bucket_name)
logs_principal = {"Service": "logs.amazonaws.com"}
policy = IAMPolicyBuilder(action="s3:GetBucketAcl", resource=str(bucket_arn), principal=logs_principal)
policy.add_statement(action="s3:PutObject", resource=str(bucket_arn) + "/*", principal=logs_principal)
lifecycle = S3BucketLifecycleBuilder(expiration=dict(Days=30))
lifecycle.add_rule(abort_incomplete_multipart_upload=20)
bucket = ensure_s3_bucket(bucket_name, policy=policy, lifecycle=lifecycle)
if not args.end_time:
args.end_time = Timestamp.match_precision(Timestamp("-0s"), args.start_time)
export_task_args = dict(logGroupName=args.log_group,
fromTime=int(timestamp(args.start_time) * 1000),
to=int(timestamp(args.end_time) * 1000),
destination=bucket.name)
if args.log_stream:
export_task_args.update(logStreamNamePrefix=args.log_stream)
cache_key = hashlib.sha256(json.dumps(export_task_args, sort_keys=True).encode()).hexdigest()[:32]
else:
mountpoint, efs_id = args.efs_storage, __name__
if not efs_id.startswith("fs-"):
for filesystem in clients.efs.describe_file_systems()["FileSystems"]:
if filesystem["Name"] == efs_id:
efs_id = filesystem["FileSystemId"]
break
else:
raise AegeaException('Could not resolve "{}" to a valid EFS filesystem ID'.format(efs_id))
mount_targets = clients.efs.describe_mount_targets(FileSystemId=efs_id)["MountTargets"]
args.environment.append(dict(name="AEGEA_EFS_DESC", value=json.dumps(mount_targets)))
commands = efs_vol_shellcode.format(efs_mountpoint=args.efs_storage, efs_id=efs_id).splitlines()
shellcode += commands
if args.execute:
bucket = ensure_s3_bucket(args.staging_s3_bucket or "aegea-batch-jobs-" + ARN.get_account_id())
key_name = hashlib.sha256(args.execute.read()).hexdigest()
args.execute.seek(0)
bucket.upload_fileobj(args.execute, key_name)
payload_url = clients.s3.generate_presigned_url(
ClientMethod='get_object',
Params=dict(Bucket=bucket.name, Key=key_name),
ExpiresIn=3600 * 24 * 7
)
tmpdir_fmt = "${AWS_BATCH_CE_NAME:-$AWS_EXECUTION_ENV}.${AWS_BATCH_JQ_NAME:-}.${AWS_BATCH_JOB_ID:-}.XXXXX"
shellcode += ['BATCH_SCRIPT=$(mktemp --tmpdir "{tmpdir_fmt}")'.format(tmpdir_fmt=tmpdir_fmt),
"apt-get update -qq",
"apt-get install -qqy --no-install-suggests --no-install-recommends curl ca-certificates gnupg",
"curl -L '{payload_url}' > $BATCH_SCRIPT".format(payload_url=payload_url),
"chmod +x $BATCH_SCRIPT",
"$BATCH_SCRIPT"]
def get_ecr_image_uri(tag):
return "{}.dkr.ecr.{}.amazonaws.com/{}".format(ARN.get_account_id(), ARN.get_region(), tag)