Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_no_attach_history_handler_when_command_is_history(
self, mock_recorder, mock_db_sqlite3, mock_sqlite3):
mock_session = mock.Mock(Session)
mock_session.get_scoped_config.return_value = {
'cli_history': 'enabled'
}
parsed_args = argparse.Namespace()
parsed_args.command = 'history'
attach_history_handler(session=mock_session, parsed_args=parsed_args)
self.assertFalse(mock_recorder.add_handler.called)
self.assertFalse(mock_db_sqlite3.connect.called)
def setUp(self):
self.session = mock.Mock(Session)
self.output_stream_factory = mock.Mock(OutputStreamFactory)
# MagicMock is needed because it can handle context managers.
# Normal Mock will throw AttributeErrors
output_stream_context = mock.MagicMock()
self.output_stream = mock.Mock()
output_stream_context.__enter__.return_value = self.output_stream
self.output_stream_factory.get_pager_stream.return_value = \
output_stream_context
self.output_stream_factory.get_stdout_stream.return_value = \
output_stream_context
self.db_reader = mock.Mock(DatabaseRecordReader)
def __init__(self, session, aws_region, domain, task_list):
if not isinstance(session, Session):
raise TypeError("session must be an instance "
"of botocore.session.Session")
self._identity = None
self._session = session
self._aws_region = aws_region
self._domain = domain
self._task_list = task_list
# set user agent to botoflow
# import here to avoid import cycles
from .. import __version__
session.user_agent_name = 'botoflow'
session.user_agent_version = __version__
self._fix_endpoint()
region = os.environ['AWS_DEFAULT_REGION']
elif 'EC2_REGION' in os.environ:
region = os.environ['EC2_REGION']
else:
if not boto3:
if HAS_BOTO:
# boto.config.get returns None if config not found
region = boto.config.get('Boto', 'aws_region')
if not region:
region = boto.config.get('Boto', 'ec2_region')
else:
module.fail_json(msg=missing_required_lib('boto'), exception=BOTO_IMP_ERR)
elif HAS_BOTO3:
# here we don't need to make an additional call, will default to 'us-east-1' if the below evaluates to None.
try:
region = botocore.session.Session(profile=profile_name).get_config_variable('region')
except botocore.exceptions.ProfileNotFound as e:
pass
else:
module.fail_json(msg=missing_required_lib('boto3'), exception=BOTO3_IMP_ERR)
if not security_token:
if os.environ.get('AWS_SECURITY_TOKEN'):
security_token = os.environ['AWS_SECURITY_TOKEN']
elif os.environ.get('AWS_SESSION_TOKEN'):
security_token = os.environ['AWS_SESSION_TOKEN']
elif os.environ.get('EC2_SECURITY_TOKEN'):
security_token = os.environ['EC2_SECURITY_TOKEN']
elif HAS_BOTO and boto.config.get('Credentials', 'aws_security_token'):
security_token = boto.config.get('Credentials', 'aws_security_token')
elif HAS_BOTO and boto.config.get('default', 'aws_security_token'):
security_token = boto.config.get('default', 'aws_security_token')
def mk_boto_session(profile: Optional[str] = None,
creds: Optional[ReadOnlyCredentials] = None,
region_name: Optional[str] = None) -> Session:
""" Get botocore session with correct `region` configured
:param profile: profile name to lookup
:param creds: Override credentials with supplied data
:param region_name: default region_name to use if not configured for a given profile
"""
session = botocore.session.Session(profile=profile)
if creds is not None:
session.set_credentials(creds.access_key,
creds.secret_key,
creds.token)
_region = session.get_config_variable("region")
if _region is None:
if region_name is None or region_name == "auto":
_region = auto_find_region(session, default='us-west-2')
else:
_region = region_name
session.set_config_variable("region", _region)
return session
def post_to_es(payload):
# Post data to ES cluster with exponential backoff
# Get aws_region and credentials to post signed URL to ES
es_region = ES_REGION or os.environ['AWS_REGION']
session = botocore.session.Session({'region': es_region})
creds = botocore.credentials.get_credentials(session)
es_url = urlparse.urlparse(ES_ENDPOINT)
es_endpoint = es_url.netloc or es_url.path # Extract the domain name in ES_ENDPOINT
# Post data with exponential backoff
retries = 0
while (retries < ES_MAX_RETRIES):
if retries > 0:
millis = 2 ** retries * .100
if DEBUG:
print('DEBUG: Wait for {:.1f} seconds'.format(millis))
time.sleep(millis)
try:
es_ret_str = post_data_to_es(payload, es_region, creds, es_endpoint, '/_bulk')
if DEBUG:
def _create_and_verify(profile_to_use=None):
session = botocore.session.Session(profile=profile_to_use)
session.get_config_variable('region')
return session
def __init__(self, configuration):
super(AmazonElasticsearchService, self).__init__(configuration)
region = configuration['region']
cred = None
if configuration.get('use_aws_iam_profile', False):
cred = credentials.get_credentials(session.Session())
else:
cred = credentials.Credentials(access_key=configuration.get('access_key', ''),
secret_key=configuration.get('secret_key', ''))
self.auth = AWSV4Sign(cred, region, 'es')