Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
max_cache_age_days=args.max_cache_age_days)
page_output(tabulate(table, args))
else:
args.columns = ["ServiceCode", "AttributeNames"]
page_output(tabulate(describe_services(), args))
parser = register_parser(pricing, help="List AWS prices")
pricing_arg = parser.add_argument("service_code", nargs="?", help="""
AWS product offer to list prices for. Run without this argument to see the list of available product service codes.""")
pricing_arg.completer = lambda **kwargs: [service["ServiceCode"] for service in describe_services()]
parser.add_argument("--columns", nargs="+")
parser.add_argument("--filters", nargs="+", metavar="NAME=VALUE", type=lambda x: x.split("=", 1), default=[])
parser.add_argument("--terms", nargs="+", default=["OnDemand"])
parser.add_argument("--sort-by")
parser.add_argument("--spot", action="store_true", help="Display AWS EC2 Spot Instance pricing history")
parser.add_argument("--spot-start-time", type=Timestamp, default=Timestamp("-1h"), metavar="START",
help="Time to start spot price history." + Timestamp.__doc__)
parser.add_argument("-columns-spot")
def ls(args):
paginator = getattr(clients, "lambda").get_paginator("list_functions")
page_output(tabulate(paginate(paginator), args, cell_transforms={"LastModified": Timestamp}))
def get_logs(args):
for event in CloudwatchLogReader(args.log_stream_name, head=args.head, tail=args.tail):
print(str(Timestamp(event["timestamp"])), event["message"])
def export_log_files(args):
bucket_name = "aegea-cloudwatch-log-export-{}-{}".format(ARN.get_account_id(), clients.logs.meta.region_name)
bucket_arn = ARN(service="s3", region="", account_id="", resource=bucket_name)
logs_principal = {"Service": "logs.amazonaws.com"}
policy = IAMPolicyBuilder(action="s3:GetBucketAcl", resource=str(bucket_arn), principal=logs_principal)
policy.add_statement(action="s3:PutObject", resource=str(bucket_arn) + "/*", principal=logs_principal)
lifecycle = S3BucketLifecycleBuilder(expiration=dict(Days=30))
lifecycle.add_rule(abort_incomplete_multipart_upload=20)
bucket = ensure_s3_bucket(bucket_name, policy=policy, lifecycle=lifecycle)
if not args.end_time:
args.end_time = Timestamp.match_precision(Timestamp("-0s"), args.start_time)
export_task_args = dict(logGroupName=args.log_group,
fromTime=int(timestamp(args.start_time) * 1000),
to=int(timestamp(args.end_time) * 1000),
destination=bucket.name)
if args.log_stream:
export_task_args.update(logStreamNamePrefix=args.log_stream)
cache_key = hashlib.sha256(json.dumps(export_task_args, sort_keys=True).encode()).hexdigest()[:32]
export_task_args.update(destinationPrefix=cache_key)
for log_object in bucket.objects.filter(Prefix=cache_key):
logger.debug("Reusing completed export task %s", log_object.key)
break
else:
logger.debug("Starting new log export task %s", export_task_args)
task_desc = clients.logs.create_export_task(**export_task_args)
try:
while task_desc.get("status", {}).get("code") != "COMPLETED":
cell_transforms[page["TimePeriod"]["Start"]] = format_float
for i, group in enumerate(page["Groups"]):
value = group["Metrics"][args.metrics[0]]
if isinstance(value, dict) and "Amount" in value:
value = float(value["Amount"])
rows[group["Keys"][0]].setdefault(title, group["Keys"][0])
rows[group["Keys"][0]].setdefault("TOTAL", 0)
rows[group["Keys"][0]]["TOTAL"] += value
rows[group["Keys"][0]][page["TimePeriod"]["Start"]] = value
args.columns.append("TOTAL")
rows = [row for row in rows.values() if row["TOTAL"] > args.min_total]
rows = sorted(rows, key=lambda row: -row["TOTAL"])
page_output(tabulate(rows, args, cell_transforms=cell_transforms))
parser_cost = register_parser(cost, help="List AWS costs")
parser_cost.add_argument("--time-period-start", type=Timestamp, default=Timestamp("-7d"),
help="Time to start cost history." + Timestamp.__doc__)
parser_cost.add_argument("--time-period-end", type=Timestamp, default=Timestamp("-0d"),
help="Time to end cost history." + Timestamp.__doc__)
parser_cost.add_argument("--granularity", choices={"HOURLY", "DAILY", "MONTHLY"}, help="AWS cost granularity")
parser_cost.add_argument("--metrics", nargs="+", default=["AmortizedCost"],
choices={"AmortizedCost", "BlendedCost", "NetAmortizedCost", "NetUnblendedCost",
"NormalizedUsageAmount", "UnblendedCost", "UsageQuantity"})
parser_cost.add_argument("--group-by", nargs="+", default=[],
choices={"AZ", "INSTANCE_TYPE", "LINKED_ACCOUNT", "OPERATION", "PURCHASE_TYPE", "SERVICE",
"REGION", "USAGE_TYPE", "PLATFORM", "TENANCY", "RECORD_TYPE", "LEGAL_ENTITY_NAME",
"DEPLOYMENT_OPTION", "DATABASE_ENGINE", "CACHE_ENGINE", "INSTANCE_TYPE_FAMILY",
"BILLING_ENTITY", "RESERVATION_ID", "SAVINGS_PLANS_TYPE", "SAVINGS_PLAN_ARN"})
parser_cost.add_argument("--group-by-tag", nargs="+", default=[])
parser_cost.add_argument("--min-total", type=int, help="Omit rows that total below this number")
def cost_forecast(args):
value = group["Metrics"][args.metrics[0]]
if isinstance(value, dict) and "Amount" in value:
value = float(value["Amount"])
rows[group["Keys"][0]].setdefault(title, group["Keys"][0])
rows[group["Keys"][0]].setdefault("TOTAL", 0)
rows[group["Keys"][0]]["TOTAL"] += value
rows[group["Keys"][0]][page["TimePeriod"]["Start"]] = value
args.columns.append("TOTAL")
rows = [row for row in rows.values() if row["TOTAL"] > args.min_total]
rows = sorted(rows, key=lambda row: -row["TOTAL"])
page_output(tabulate(rows, args, cell_transforms=cell_transforms))
parser_cost = register_parser(cost, help="List AWS costs")
parser_cost.add_argument("--time-period-start", type=Timestamp, default=Timestamp("-7d"),
help="Time to start cost history." + Timestamp.__doc__)
parser_cost.add_argument("--time-period-end", type=Timestamp, default=Timestamp("-0d"),
help="Time to end cost history." + Timestamp.__doc__)
parser_cost.add_argument("--granularity", choices={"HOURLY", "DAILY", "MONTHLY"}, help="AWS cost granularity")
parser_cost.add_argument("--metrics", nargs="+", default=["AmortizedCost"],
choices={"AmortizedCost", "BlendedCost", "NetAmortizedCost", "NetUnblendedCost",
"NormalizedUsageAmount", "UnblendedCost", "UsageQuantity"})
parser_cost.add_argument("--group-by", nargs="+", default=[],
choices={"AZ", "INSTANCE_TYPE", "LINKED_ACCOUNT", "OPERATION", "PURCHASE_TYPE", "SERVICE",
"REGION", "USAGE_TYPE", "PLATFORM", "TENANCY", "RECORD_TYPE", "LEGAL_ENTITY_NAME",
"DEPLOYMENT_OPTION", "DATABASE_ENGINE", "CACHE_ENGINE", "INSTANCE_TYPE_FAMILY",
"BILLING_ENTITY", "RESERVATION_ID", "SAVINGS_PLANS_TYPE", "SAVINGS_PLAN_ARN"})
parser_cost.add_argument("--group-by-tag", nargs="+", default=[])
parser_cost.add_argument("--min-total", type=int, help="Omit rows that total below this number")
def cost_forecast(args):
get_cost_forecast_args = dict(get_common_method_args(args), Metric=args.metric, PredictionIntervalLevel=75)
res = clients.ce.get_cost_forecast(**get_cost_forecast_args)