Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_change_list(self):
first = {'a': ['b']}
second = {'a': ['c']}
assert second == patch(
[('change', 'a.0', ('b', 'c'))], first)
first = {'a': {'b': {'c': ['d']}}}
second = {'a': {'b': {'c': ['e']}}}
assert second == patch(
[('change', 'a.b.c.0', ('d', 'e'))], first)
first = {'a': {'b': {'c': [{'d': 'e'}]}}}
second = {'a': {'b': {'c': [{'d': 'f'}]}}}
assert second == patch(
[('change', 'a.b.c.0.d', ('e', 'f'))], first)
def test_add_list(self):
first = {'a': [1]}
second = {'a': [1, 2]}
assert second == patch(
[('add', 'a', [(1, 2)])], first)
first = {'a': {'b': [1]}}
second = {'a': {'b': [1, 2]}}
assert second == patch(
[('add', 'a.b', [(1, 2)])], first)
first = {'foo': [{'x': 1}, {'y': 2}, {'z': 4}]}
second = {'bar': 'baz'}
expected = {
'f': {'foo': [{'x': 1}, {'y': 2}, {'z': 4}],
'bar': 'baz'},
's': {'bar': 'baz'}}
for resolution, expected_value in expected.items():
m = Merger(lca, first, second, {})
try:
m.run()
except UnresolvedConflictsException as e:
m.continue_run([resolution for _ in e.content])
self.assertEqual(patch(m.unified_patches, lca),
expected_value)
if patched["_version"] != target_version:
for history_document in history_documents:
entry_version = history_document["entry_version"]
if entry_version == "removed" or entry_version >= target_version:
if history_document["method_name"] == "add":
patched = "remove"
elif history_document["method_name"] == "remove":
patched = history_document["changes"]
break
else:
diff = dictdiffer.swap(history_document["changes"])
patched = dictdiffer.patch(diff, patched)
else:
break
return patched
patched = deepcopy(current)
# Sort the changes by descending timestamp.
for change in db.history.find({"otu.id": otu_id}, sort=[("otu.version", -1)]):
if change["otu"]["version"] == "removed" or change["otu"]["version"] > version:
reverted_history_ids.append(change["_id"])
if change["method_name"] == "remove":
patched = change["diff"]
elif change["method_name"] == "create":
patched = None
else:
diff = dictdiffer.swap(change["diff"])
patched = dictdiffer.patch(diff, patched)
else:
break
if current == {}:
current = None
return current, patched, reverted_history_ids
diff = dictdiffer.diff(prod_data, new_data)
for diff_type, new_key, content in diff:
if diff_type == 'add':
if new_key:
if isinstance(new_key, list):
# ['subject_terms', 0]
updated_keys.append(new_key[0])
else:
# 'subject_terms'
updated_keys.append(new_key)
else:
# content must be list of new adds
for key in content:
updated_keys.append(key)
updates = dictdiffer.patch(diff, new_data)
for key in updates.keys():
if key not in updated_keys:
del updates[key]
if updates:
updates['recid'] = recid
marcxml = Record(updates).legacy_export_as_marc()
result = make_robotupload_marcxml(
url=url,
marcxml=marcxml,
callback_url=callback_url,
mode='correct',
nonce=obj.id
)
if "[INFO]" not in result.text:
if "cannot use the service" in result.text:
# IP not in the list
column_values={'name': name},
transaction=transaction)
# if we have no historical information about the ancestor blob
if ancestor_change is None:
self.log.debug("history for data_version %s for release %s absent" % (old_data_version, name))
raise
ancestor_blob = ancestor_change.get('data')
tip_release = self.getReleases(name=name, transaction=transaction)[0]
tip_blob = tip_release.get('data')
m = dictdiffer.merge.Merger(ancestor_blob, tip_blob, blob, {})
try:
m.run()
# Merger merges the patches into a single unified patch,
# but we need dictdiffer.patch to actually apply the patch
# to the original blob
unified_blob = dictdiffer.patch(m.unified_patches, ancestor_blob)
# converting the resultant dict into a blob and then
# converting it to JSON
what['data'] = unified_blob
# we want the data_version for the dictdiffer.merged blob to be one
# more than that of the latest blob
tip_data_version = tip_release['data_version']
super(Releases, self).update(where={"name": name}, what=what, changed_by=changed_by, old_data_version=tip_data_version,
transaction=transaction, dryrun=dryrun)
# cache will have a data_version of one plus the tip
# data_version
new_data_version = tip_data_version + 1
except dictdiffer.merge.UnresolvedConflictsException:
self.log.debug("latest version of release %s cannot be merged with new blob" % name)
raise e
if not dryrun:
cache.put("blob", name, {"data_version": new_data_version, "blob": blob})
patched = deepcopy(current)
# Sort the changes by descending timestamp.
async for change in db.history.find({"kind.id": kind_id}, sort=[("created_at", -1)]):
if change["kind"]["version"] == "removed" or change["kind"]["version"] > version:
reverted_history_ids.append(change["_id"])
if change["method_name"] == "remove":
patched = change["diff"]
elif change["method_name"] == "create":
patched = None
else:
diff = dictdiffer.swap(change["diff"])
patched = dictdiffer.patch(diff, patched)
else:
break
if current == {}:
current = None
return current, patched, reverted_history_ids
patched = deepcopy(current)
# Sort the changes by descending timestamp.
async for change in db.history.find({"virus.id": virus_id}, sort=[("created_at", -1)]):
if change["virus"]["version"] == "removed" or change["virus"]["version"] > version:
reverted_history_ids.append(change["_id"])
if change["method_name"] == "remove":
patched = change["diff"]
elif change["method_name"] == "create":
patched = None
else:
diff = dictdiffer.swap(change["diff"])
patched = dictdiffer.patch(diff, patched)
else:
break
if current == {}:
current = None
return current, patched, reverted_history_ids