Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test(self, resource):
""" Test that the argument this asserts for is present in the
resource. """
try:
resource.inner[self.name].resolve()
return True
except errors.NoMatching:
return False
def clean_allowed(self):
from boto.ec2 import connect_to_region
c = connect_to_region('eu-west-1')
allowed = []
try:
groups = self.params.allowed.get_iterable()
except errors.NoMatching:
groups = []
for group in groups:
try:
name = group.as_string()
except errors.TypeError:
name = group.get_key("name").as_string()
try:
groups = c.get_all_security_groups(groupnames=[name])
g = groups[0]
except EC2ResponseError:
# FIXME: Don't raise if simulating
raise TypeError("No such EC2 SecurityGroup '%s'" % name)
allowed.append((g.name, g.owner_id))
def clean_allowed(self):
from boto.ec2 import connect_to_region
c = connect_to_region('eu-west-1')
allowed = []
try:
groups = self.params.allowed.get_iterable()
except errors.NoMatching:
groups = []
for group in groups:
try:
name = group.as_string()
except errors.TypeError:
name = group.get_key("name").as_string()
try:
groups = c.get_all_security_groups(groupnames=[name])
g = groups[0]
except EC2ResponseError:
# FIXME: Don't raise if simulating
raise TypeError("No such EC2 SecurityGroup '%s'" % name)
allowed.append((g.name, g.owner_id))
len_defaults = len(defaults) if defaults else 0
padding = len_args - len_defaults
defaults = itertools.chain(itertools.repeat(_MARKER, padding), defaults)
result = {}
for arg, default in itertools.chain(zip(args, defaults), zip(kwargs, itertools.repeat(_MARKER2, len(kwargs)))):
if arg in ignore:
continue
try:
if not expression:
raise KeyError
node = expression.get_key(arg)
except KeyError:
if default == _MARKER:
raise errors.NoMatching(arg)
elif default == _MARKER2:
continue
result[arg] = default
else:
if default == _MARKER:
result[arg] = node.resolve()
elif isinstance(default, int):
result[arg] = node.as_int()
elif isinstance(default, basestring):
result[arg] = node.as_string()
else:
result[arg] = node.resolve()
return result
def create_parts(self):
c = self.parts = PartCollection()
for k in self.config.node.get('parts').keys():
v = self.config.node.get('parts').get(k)
try:
classname = get_encrypted(v.get("class").resolve())
except NoMatching:
classname = "compute"
r = PartType.types[classname](self, k, v)
r.set_state(self.state.get_state(k))
c.add_part(r)
def resolve(self):
instance = self.resource
try:
value = self.node.resolve()
except errors.NoMatching:
# return PolicyCollection(instance.policies.default())
return None
if type(value) in types.StringTypes:
if value not in instance.policies:
raise error.ParseError(
"'%s' is not a valid policy for %r" % (value, instance))
return PolicyCollection(StandardPolicy(value))
if isinstance(value, dict):
triggers = []
for policy, conditions in value.items():
if policy not in instance.policies:
raise error.ParseError(
"'%s' is not a valid policy for %r" % (policy, instance))
if not isinstance(conditions, list):
def apply(self):
if self.root.readonly:
return
name = self.params.name.as_string()
changed = False
roles = [r for r in self.connection.list_roles()['list_roles_response']['list_roles_result']['roles'] if r['role_name'] == name]
if not roles:
self.create()
changed = True
existing_policy_names = set(p for p in self.connection.list_role_policies(name)['list_role_policies_response']['list_role_policies_result']['policy_names'])
try:
new_policy_names = set(k for k in self.params.policies.keys())
except errors.NoMatching:
new_policy_names = set()
for policy in (existing_policy_names - new_policy_names):
with self.root.ui.throbber("Removing policy '%s'" % policy):
self.connection.delete_role_policy(name, policy)
changed = True
for policy in (new_policy_names - existing_policy_names):
with self.root.ui.throbber("Adding policy '%s'" % policy):
policies = self.params.policies[policy].as_list()
policy_str = json.dumps({"Statement": policies})
self.connection.put_role_policy(name, policy, policy_str)
changed = True
# FIXME: Get intersection of policies and check if updates required
def apply_scaling(self):
try:
after = self.params.dynos.keys()
except errors.NoMatching:
after = []
for dyno in after:
if not self.app or dyno not in self.app.processes:
raise ExecutionError(
"Tried to configure dyno '%s' but it doesn't exist in the app" % dyno)
for dyno in after:
scale = self.params.dynos[dyno].as_int()
current_scale = len(self.app.processes[dyno])
if current_scale != scale:
self.action("Scaling dyno '%s' from %d workers to %d workers" %
(dyno, current_scale, scale))
if self.app and not self.root.simulate:
self.app.processes.scale(scale)
def apply(self):
try:
args = list(self.root.yaybu.options)
except NoMatching:
return
self.schema = {}
for arg in args:
yarg = YaybuArg(
str(arg.name),
arg.type.as_string('string'),
arg.default.as_string(None),
arg.help.as_string(None),
)
self.add(yarg)
self.members.update(self.parse())