Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
queue_name = os.path.basename(queue.url)
if queue_name.startswith(ARN(topic.arn).resource):
row = dict(Topic=topic, Queue=queue)
status_object = bucket.Object(os.path.join(queue_name, "status"))
if status_object.key not in recent_status_objects:
continue
try:
github, owner, repo, events, instance = os.path.dirname(status_object.key).split("-", 4)
status = json.loads(status_object.get()["Body"].read().decode("utf-8"))
row.update(status, Owner=owner, Repo=repo, Instance=instance,
Updated=status_object.last_modified)
except Exception:
pass
table.append(row)
args.columns = ["Owner", "Repo", "Instance", "Status", "Ref", "Commit", "Updated", "Topic", "Queue"]
page_output(tabulate(table, args))
def certificates(args):
page_output(tabulate(paginate(clients.acm.get_paginator("list_certificates")), args))
def cmks(args):
aliases = {alias.get("TargetKeyId"): alias for alias in paginate(clients.kms.get_paginator("list_aliases"))}
table = []
for key in paginate(clients.kms.get_paginator("list_keys")):
key.update(aliases.get(key["KeyId"], {}))
table.append(key)
page_output(tabulate(table, args))
def ls(args):
table = []
describe_repositories_args = dict(repositoryNames=args.repositories) if args.repositories else {}
for repo in paginate(clients.ecr.get_paginator("describe_repositories"), **describe_repositories_args):
try:
res = clients.ecr.get_repository_policy(repositoryName=repo["repositoryName"])
repo["policy"] = json.loads(res["policyText"])
except clients.ecr.exceptions.RepositoryPolicyNotFoundException:
pass
orig_len = len(table)
for image in paginate(clients.ecr.get_paginator("describe_images"), repositoryName=repo["repositoryName"]):
table.append(dict(image, **repo))
if len(table) == orig_len:
table.append(repo)
page_output(tabulate(table, args))
def cost_forecast(args):
get_cost_forecast_args = dict(get_common_method_args(args), Metric=args.metric, PredictionIntervalLevel=75)
res = clients.ce.get_cost_forecast(**get_cost_forecast_args)
args.columns = ["TimePeriod.Start", "MeanValue", "PredictionIntervalLowerBound", "PredictionIntervalUpperBound"]
cell_transforms = {col: format_float
for col in ["MeanValue", "PredictionIntervalLowerBound", "PredictionIntervalUpperBound"]}
title = "TOTAL ({})".format(boto3.session.Session().profile_name)
table = res["ForecastResultsByTime"] + [{"TimePeriod": {"Start": title}, "MeanValue": res["Total"]["Amount"]}]
page_output(tabulate(table, args, cell_transforms=cell_transforms))
def filter_and_tabulate(collection, args, **kwargs):
return tabulate(filter_collection(collection, args), args, **kwargs)
def event_source_mappings(args):
paginator = getattr(clients, "lambda").get_paginator("list_event_source_mappings")
page_output(tabulate(paginate(paginator), args))
args.columns, cell_transforms = [title], {"TOTAL": format_float}
for page in clients.ce.get_cost_and_usage(**get_cost_and_usage_args)["ResultsByTime"]:
args.columns.append(page["TimePeriod"]["Start"])
cell_transforms[page["TimePeriod"]["Start"]] = format_float
for i, group in enumerate(page["Groups"]):
value = group["Metrics"][args.metrics[0]]
if isinstance(value, dict) and "Amount" in value:
value = float(value["Amount"])
rows[group["Keys"][0]].setdefault(title, group["Keys"][0])
rows[group["Keys"][0]].setdefault("TOTAL", 0)
rows[group["Keys"][0]]["TOTAL"] += value
rows[group["Keys"][0]][page["TimePeriod"]["Start"]] = value
args.columns.append("TOTAL")
rows = [row for row in rows.values() if row["TOTAL"] > args.min_total]
rows = sorted(rows, key=lambda row: -row["TOTAL"])
page_output(tabulate(rows, args, cell_transforms=cell_transforms))
def instances(args):
page_output(tabulate(list_rds_instances(), args))