Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def creator_error_on_missing(obj, pathcomp):
"""
Raise a dpath.exceptions.PathNotFound exception due to a missing path
in an attempt to set a value
"""
raise dpath.exceptions.PathNotFound(
"{} does not exist in {}".format(
pathcomp,
obj
)
# don't need them.
del parent[key]
else:
# This key can't be removed completely because it
# would affect the order of items that remain in our
# result.
parent[key] = None
except:
# Attempt to treat parent like a dictionary instead.
del parent[key]
counter[0] += 1
[deleted] = dpath.segments.foldm(obj, f, [0])
if not deleted:
raise dpath.exceptions.PathNotFound("Could not find {0} to delete it".format(glob))
return deleted
def get(obj, segments):
'''
Return the value at the path indicated by segments.
get(obj, segments) -> value
'''
current = obj
for (i, segment) in enumerate(segments):
if leaf(current):
raise PathNotFound('Path: {}[{}]'.format(segments, i))
current = current[segment]
return current
dur_change_item = None
if not sub_dict(prev_item.config) == sub_dict(curr_item.config):
eph_change_item = ChangeItem.from_items(old_item=prev_item, new_item=curr_item, source_watcher=self)
if self.ephemerals_skipped():
# deepcopy configs before filtering
dur_prev_item = deepcopy(prev_item)
dur_curr_item = deepcopy(curr_item)
# filter-out ephemeral paths in both old and new config dicts
if self.ephemeral_paths:
for path in self.ephemeral_paths:
for cfg in [dur_prev_item.config, dur_curr_item.config]:
try:
dpath.util.delete(cfg, path, separator='$')
except PathNotFound:
pass
# now, compare only non-ephemeral paths
if not sub_dict(dur_prev_item.config) == sub_dict(dur_curr_item.config):
dur_change_item = ChangeItem.from_items(old_item=dur_prev_item, new_item=dur_curr_item,
source_watcher=self)
# store all changes, divided in specific categories
if eph_change_item:
self.ephemeral_items.append(eph_change_item)
app.logger.debug("%s: ephemeral changes in item %s/%s/%s" % (self.i_am_singular, eph_change_item.account, eph_change_item.region, eph_change_item.name))
if dur_change_item:
self.changed_items.append(dur_change_item)
app.logger.debug("%s: durable changes in item %s/%s/%s" % (self.i_am_singular, dur_change_item.account, dur_change_item.region, dur_change_item.name))
elif eph_change_item is not None:
policies = list()
for key in policy_keys:
try:
policy = dpath.util.values(item.config, key, separator='$')
if isinstance(policy, list):
for p in policy:
if not p:
continue
if isinstance(p, list):
policies.extend([Policy(pp) for pp in p])
else:
policies.append(Policy(p))
else:
policies.append(Policy(policy))
except PathNotFound:
continue
return policies
def durable_hash(config, ephemeral_paths):
durable_item = deepcopy(config)
for path in ephemeral_paths:
try:
dpath.util.delete(durable_item, path, separator='$')
except PathNotFound:
pass
return hash_config(durable_item)
def durable_hash(item, ephemeral_paths):
"""
Remove all ephemeral paths from the item and return the hash of the new structure.
:param item: dictionary, representing an item tracked in security_monkey
:return: hash of the sorted json dump of the item with all ephemeral paths removed.
"""
durable_item = deepcopy(item)
for path in ephemeral_paths:
try:
dpath.util.delete(durable_item, path, separator='$')
except PathNotFound:
pass
return hash_config(durable_item)
for (i, segment) in enumerate(segments[:-1]):
try:
# Optimistically try to get the next value. This makes the
# code agnostic to whether current is a list or a dict.
# Unfortunately, for our use, 'x in thing' for lists checks
# values, not keys whereas dicts check keys.
current[segment]
except:
if creator != None:
creator(current, segments, i, hints=hints)
else:
raise
current = current[segment]
if i != length - 1 and leaf(current):
raise PathNotFound('Path: {}[{}]'.format(segments, i))
if isinstance(segments[-1], (int, long)):
extend(current, segments[-1])
current[segments[-1]] = value
return obj
def durable_hash(self, item, ephemeral_paths):
"""
Remove all ephemeral paths from the item and return the hash of the new structure.
:param item: dictionary, representing an item tracked in security_monkey
:return: hash of the sorted json dump of the item with all ephemeral paths removed.
"""
durable_item = deepcopy(item)
if ephemeral_paths:
for path in ephemeral_paths:
try:
dpath.util.delete(durable_item, path, separator='$')
except PathNotFound:
pass
return self.hash_config(durable_item)