Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_delete_not_ok(self):
with self.assertRaises(BundleError):
postgres_dbs.PostgresDB(MagicMock(), "foo", {'delete': 0})
with self.assertRaises(BundleError):
postgres_dbs.PostgresDB(MagicMock(), "foo", {'delete': 1})
def test_installed_not_ok(self):
with self.assertRaises(BundleError):
pkg_yum.YumPkg(MagicMock(), "foo", {'installed': 0})
with self.assertRaises(BundleError):
pkg_yum.YumPkg(MagicMock(), "foo", {'installed': 1})
def test_installed_not_ok(self):
with self.assertRaises(BundleError):
pkg_pip.PipPkg(MagicMock(), "foo", {'installed': 0})
with self.assertRaises(BundleError):
pkg_pip.PipPkg(MagicMock(), "foo", {'installed': 1})
with self.assertRaises(BundleError):
pkg_pip.PipPkg(MagicMock(), "foo", {'installed': False, 'version': "1.0"})
def get_auto_deps(self, items):
deps = []
groups = self.attributes['groups'] or []
for item in items:
if item.ITEM_TYPE_NAME == "group":
if not (item.name in groups or (
self.attributes['gid'] in [item.attributes['gid'], item.name] and
self.attributes['gid'] is not None
)):
# we don't need to depend on this group
continue
elif item.attributes['delete']:
raise BundleError(_(
"{item1} (from bundle '{bundle1}') depends on item "
"{item2} (from bundle '{bundle2}') which is set to be deleted"
).format(
item1=self.id,
bundle1=self.bundle.name,
item2=item.id,
bundle2=item.bundle.name,
))
else:
deps.append(item.id)
return deps
"""
with tempfile() as tmp_file:
if self.attributes['content_type'] == 'binary':
local_path = self.template
else:
local_path = tmp_file
with open(local_path, 'wb') as f:
f.write(self.content)
if self.attributes['verify_with']:
cmd = self.attributes['verify_with'].format(quote(local_path))
io.debug("calling local verify command for {i}: {c}".format(c=cmd, i=self.id))
if call(cmd, shell=True) == 0:
io.debug("{i} passed local validation".format(i=self.id))
else:
raise BundleError(_(
"{i} failed local validation using: {c}"
).format(c=cmd, i=self.id))
yield local_path
def validate_attributes(cls, bundle, item_id, attributes):
if attributes.get('delete', False):
for attr in attributes.keys():
if attr not in ['delete'] + list(BUILTIN_ITEM_ATTRIBUTES.keys()):
raise BundleError(_(
"{item} from bundle '{bundle}' cannot have other "
"attributes besides 'delete'"
).format(item=item_id, bundle=bundle.name))
if 'content' in attributes and 'source' in attributes:
raise BundleError(_(
"{item} from bundle '{bundle}' cannot have both 'content' and 'source'"
).format(item=item_id, bundle=bundle.name))
if 'content' in attributes and attributes.get('content_type') == 'binary':
raise BundleError(_(
"{item} from bundle '{bundle}' cannot have binary inline content "
"(use content_type 'base64' instead)"
).format(item=item_id, bundle=bundle.name))
if 'encoding' in attributes and attributes.get('content_type') in (
'any',
)):
raise BundleError(_(
"{item1} (from bundle '{bundle1}') blocking path to "
"{item2} (from bundle '{bundle2}')"
).format(
item1=item.id,
bundle1=item.bundle.name,
item2=self.id,
bundle2=self.bundle.name,
))
if (
item.ITEM_TYPE_NAME == "directory" and
item.name == self.name
):
if item.attributes['purge']:
raise BundleError(_(
"cannot git_deploy into purged directory {}"
).format(item.name))
else:
deps.add(item.id)
return deps
def _validate_required_attributes(cls, bundle, item_id, attributes):
missing = []
for attrname in cls.REQUIRED_ATTRIBUTES:
if attrname not in attributes:
missing.append(attrname)
if missing:
raise BundleError(_(
"{item} in bundle '{bundle}' missing required attribute(s): {attrs}"
).format(
item=item_id,
bundle=bundle.name,
attrs=", ".join(missing),
))
preceded_by items and attaches triggering items to preceding items.
"""
for item in items.values():
if item.preceded_by and item.triggered:
raise BundleError(_(
"triggered item '{item}' in bundle '{bundle}' must not use "
"'preceded_by' (use chained triggers instead)".format(
bundle=item.bundle.name,
item=item.id,
),
))
for triggered_item_id in item.preceded_by:
try:
triggered_item = items[triggered_item_id]
except KeyError:
raise BundleError(_(
"unable to find definition of '{item1}' preceding "
"'{item2}' in bundle '{bundle}'"
).format(
bundle=item.bundle.name,
item1=triggered_item_id,
item2=item.id,
))
if not triggered_item.triggered:
raise BundleError(_(
"'{item1}' in bundle '{bundle1}' precedes "
"'{item2}' in bundle '{bundle2}', "
"but missing 'triggered' attribute"
).format(
item1=triggered_item.id,
bundle1=triggered_item.bundle.name,
item2=item.id,