Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup_method(self, test_method):
self.stack_name = None
self.profile = None
self.iam_role = None
self.region = "eu-west-1"
ConnectionManager._boto_sessions = {}
ConnectionManager._clients = {}
ConnectionManager._stack_keys = {}
# Temporary workaround for https://github.com/spulec/moto/issues/1924
os.environ.setdefault("AWS_ACCESS_KEY_ID", "sceptre_test_key_id")
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "sceptre_test_access_key")
self.connection_manager = ConnectionManager(
region=self.region,
stack_name=self.stack_name,
profile=self.profile,
iam_role=self.iam_role
)
def test_call_with_valid_service_and_stack_name_call(self):
service = 's3'
command = 'list_buckets'
connection_manager = ConnectionManager(
region=self.region,
stack_name='stack'
)
return_value = connection_manager.call(
service, command, {}, stack_name='stack'
)
assert return_value['ResponseMetadata']['HTTPStatusCode'] == 200
def test_connection_manager_initialised_with_no_optional_parameters(self):
connection_manager = ConnectionManager(region=sentinel.region)
assert connection_manager.stack_name is None
assert connection_manager.profile is None
assert connection_manager.region == sentinel.region
assert connection_manager._boto_sessions == {}
assert connection_manager._clients == {}
assert connection_manager._stack_keys == {}
def setup_method(self, test_method):
self.region = "region"
self.bucket_name = "bucket_name"
self.stack_group_path = "stack_group_path"
self.stack_name = "stack_name"
connection_manager = Mock(spec=ConnectionManager)
connection_manager.create_bucket_lock = threading.Lock()
self.template = Template(
path="/folder/template.py",
sceptre_user_data={},
connection_manager=connection_manager
)
def setup_method(self, test_method):
self.stack_name = None
self.profile = None
self.iam_role = None
self.region = "eu-west-1"
ConnectionManager._boto_sessions = {}
ConnectionManager._clients = {}
ConnectionManager._stack_keys = {}
# Temporary workaround for https://github.com/spulec/moto/issues/1924
os.environ.setdefault("AWS_ACCESS_KEY_ID", "sceptre_test_key_id")
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "sceptre_test_access_key")
self.connection_manager = ConnectionManager(
region=self.region,
stack_name=self.stack_name,
profile=self.profile,
iam_role=self.iam_role
)
def test_connection_manager_initialised_with_all_parameters(self):
connection_manager = ConnectionManager(
region=self.region,
stack_name="stack",
profile="profile",
iam_role="iam_role"
)
assert connection_manager.stack_name == "stack"
assert connection_manager.profile == "profile"
assert connection_manager.iam_role == "iam_role"
assert connection_manager.region == self.region
assert connection_manager._boto_sessions == {}
assert connection_manager._clients == {}
assert connection_manager._stack_keys == {
"stack": (self.region, "profile", "iam_role")
}
def setup_method(self, test_method):
self.stack = MagicMock(spec=Stack)
self.stack.connection_manager = MagicMock(spec=ConnectionManager)
self.stack.external_name = "external_name"
self.asg_scaling_processes = ASGScalingProcesses(None, self.stack)
def test_resolve_with_args(self, mock_get_output_value):
stack = MagicMock(spec=Stack)
stack.dependencies = []
stack._connection_manager = MagicMock(spec=ConnectionManager)
stack_output_external_resolver = StackOutputExternal(
"another/account-vpc::VpcId region::profile::iam_role", stack
)
mock_get_output_value.return_value = "output_value"
stack_output_external_resolver.resolve()
mock_get_output_value.assert_called_once_with(
"another/account-vpc", "VpcId", "region", "profile", "iam_role"
)
assert stack.dependencies == []
def __init__(self, stack):
self.stack = stack
self.name = self.stack.name
self.logger = logging.getLogger(__name__)
self.connection_manager = ConnectionManager(
self.stack.region, self.stack.profile,
self.stack.external_name, self.stack.iam_role
)
def connection_manager(self):
"""
Returns ConnectionManager.
:returns: ConnectionManager.
:rtype: ConnectionManager
"""
if self._connection_manager is None:
self._connection_manager = ConnectionManager(
self.region, self.profile, self.external_name, self.iam_role
)
return self._connection_manager