Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
PREFIX = CONFIG.get('general', 'resource_prefix')
WINDOWS_AMI_ID = CONFIG.get('windows', 'windows2016.{}'.format(REGION))
INSTANCE_TYPE = CONFIG.get('windows', 'instance_type')
LINUX_AMI_ID = CONFIG.get('linux', 'ami')
LINUX_INSTANCE_TYPE = CONFIG.get('linux', 'instance_type')
SSM_DOC_NAME = PREFIX + 'delete-snapshot'
CFN_STACK_NAME = PREFIX + 'delete-snapshot'
TEST_CFN_STACK_NAME = PREFIX + 'delete-snapshot'
logging.basicConfig(level=CONFIG.get('general', 'log_level').upper())
LOGGER = logging.getLogger(__name__)
logging.getLogger('botocore').setLevel(level=logging.WARNING)
boto3.setup_default_session(region_name=REGION)
iam_client = boto3.client('iam')
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')
sts_client = boto3.client('sts')
ec2_client = boto3.client('ec2')
def verify_role_created(role_arn):
LOGGER.info("Verifying that role exists: " + role_arn)
# For what ever reason assuming a role that got created too fast fails, so we just wait until we can.
retry_count = 12
while True:
try:
sts_client.assume_role(RoleArn=role_arn, RoleSessionName="checking_assume")
break
def set_up_mock_queue():
# set up mock queue
boto3.setup_default_session(region_name='eu-west-1')
conn = boto3.resource('sqs')
q = conn.create_queue(QueueName='gov_uk_notify_sms_queue')
return q
if len(terms) > 1:
for param in terms[1:]:
if "=" in param:
key, value = param.split('=')
else:
raise AnsibleError("ssm parameter store plugin needs key=value pairs, but received %s" % param)
if key == "region" or key == "aws_profile":
session[key] = value
# decrypt the value or not
if key == "decrypt" and value.lower() == "false":
ssm_dict['WithDecryption'] = False
if "aws_profile" in session:
boto3.setup_default_session(profile_name=session['aws_profile'])
if "region" in session:
client = boto3.client('ssm', region_name=session['region'])
else:
client = boto3.client('ssm')
try:
response = client.get_parameters(**ssm_dict)
except ClientError as e:
raise AnsibleError("SSM lookup exception: {0}".format(e))
ret.update(response)
if ret['Parameters']:
return [ret['Parameters'][0]['Value']]
else:
def key(self, request):
account = AccountApp().get(request.sourceProfile)
boto3.setup_default_session(profile_name=request.sourceProfile)
sts = boto3.client('sts')
try:
response = sts.assume_role(RoleArn=account.trustRoleArn, RoleSessionName=request.profile)
credentials = Credentials(
secretAccessKey=response['Credentials']['SecretAccessKey'],
sessionToken=response['Credentials']['SessionToken'],
accessKeyId=response['Credentials']['AccessKeyId']
)
return credentials
except ClientError:
print('')
print('\tThe request signature we calculated does not match the signature you provided.')
print('\tCheck your AWS Secret Access Key and signing method on profile [' + request.profile + ']')
print('\tCheck your Arn role [' + account.trustRoleArn + ']')
print('')
sys.exit(1)
def setup_session_and_refresh_connections(self):
if self.profile_name and self.region_name:
boto3.setup_default_session(
profile_name = self.profile_name,
region_name = self.region_name,
)
elif self.profile_name:
boto3.setup_default_session(profile_name = self.profile_name)
elif self.region_name:
boto3.setup_default_session(region_name = self.region_name)
else:
return None
self.refresh_boto_connections()
wait_sec=dict(default=1200, type='int'),
),
mutually_exclusive=[['kms_arn', 'kms_alias']],
#supports_check_mode=True
)
state = self.module.params['state']
ami_id = self.module.params['ami_id']
aws_access_key = self.module.params['aws_access_key']
aws_secret_key = self.module.params['aws_secret_key']
wait_sec = self.module.params['wait_sec']
tag_name = self.module.params['tag_name']
tag_value = self.module.params['tag_value']
if aws_access_key and aws_secret_key:
boto3.setup_default_session(aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=self.module.params['region'])
else:
boto3.setup_default_session(region_name=self.module.params['region'])
self.ec2_client = boto3.client('ec2')
self.account_id = boto3.client('sts').get_caller_identity()['Account']
if state == 'list':
if ami_id != None:
ami = self.ec2_client.describe_images(ImageIds=[ami_id])['Images'][0]
self.module.exit_json(changed=False, results=ami,
state="list")
if state == 'present':
def get_credentials(profile, args, config):
console_input = prompter()
mfa_token = console_input('Enter AWS MFA code for device [%s] '
'(renewing for %s seconds): ' %
(args.device, args.duration))
boto3.setup_default_session(profile_name='default')
client = boto3.client('sts')
if args.assume_role:
logging.info("Assuming Role - Profile: %s, Role: %s, Duration: %s",
profile, args.assume_role, args.duration)
try:
print((args.assume_role, args.role_session_name, args.device, mfa_token))
response = client.assume_role(
RoleArn=args.assume_role,
RoleSessionName=args.role_session_name,
SerialNumber=args.device,
TokenCode=mfa_token
)
except ClientError as e:
def _refresh_session(self):
"""
Update boto3's default session by creating a new session based on values set in the context. Some properties of
the Boto3's session object are read-only. Therefore when Click parses new AWS session related properties (like
region & profile), it will call this method to create a new session with latest values for these properties.
"""
try:
boto3.setup_default_session(region_name=self._aws_region, profile_name=self._aws_profile)
except botocore.exceptions.ProfileNotFound as ex:
raise CredentialsError(str(ex))