Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_patch_equality(self):
patch1 = jsonpatch.JsonPatch([{ "op": "add", "path": "/a/b/c", "value": "foo" }])
patch2 = jsonpatch.JsonPatch([{ "path": "/a/b/c", "op": "add", "value": "foo" }])
self.assertEqual(patch1, patch2)
:returns: The result of the patch operation.
:raises: PatchError if the patch fails to apply.
:raises: exception.ClientSideError if the patch adds a new root attribute.
"""
# Prevent removal of root attributes.
for p in patch:
if p['op'] == 'add' and p['path'].count('/') == 1:
if p['path'].lstrip('/') not in doc:
msg = _('Adding a new attribute (%s) to the root of '
'the resource is not allowed')
raise exception.ClientSideError(msg % p['path'])
# Apply operations one at a time, to improve error reporting.
for patch_op in patch:
try:
doc = jsonpatch.apply_patch(doc, jsonpatch.JsonPatch([patch_op]))
except _JSONPATCH_EXCEPTIONS as e:
raise exception.PatchError(patch=patch_op, reason=e)
return doc
def apply_jsonpatch(doc, patch):
for p in patch:
if p['op'] == 'add' and p['path'].count('/') == 1:
if p['path'].lstrip('/') not in doc:
msg = _('Adding a new attribute (%s) to the root of '
' the resource is not allowed')
raise wsme.exc.ClientSideError(msg % p['path'])
return jsonpatch.apply_patch(doc, jsonpatch.JsonPatch(patch))
def _apply_patches(self, resources):
for _, resource in resources.iteritems():
if self.namespace:
if 'namespace' in resource['value']['metadata']:
op = 'replace'
else:
op = 'add'
resource['patch'].append({"op": op, "path": "/metadata/namespace", "value": self.namespace})
if len(resource['patch']):
patch = jsonpatch.JsonPatch(resource['patch'])
result = patch.apply(resource['value'])
resource['value'] = result
return resources
# TODO(gilbert.pilz@oracle.com) check if there are any assemblies that
# refer to this plan and raise an PlanStillReferenced exception if
# there are.
if not pecan.request.body or len(pecan.request.body) < 1:
raise exception.BadRequest(reason='empty request body')
# check to make sure the request has the right Content-Type
if (pecan.request.content_type is None or
pecan.request.content_type != 'application/json-patch+json'):
raise exception.UnsupportedMediaType(
name=pecan.request.content_type,
method='PATCH')
try:
patch = jsonpatch.JsonPatch.from_string(pecan.request.body)
patched_obj = patch.apply(plan_obj.refined_content())
db_obj = handler.update(uuid, patched_obj)
except KeyError:
# a key error indicates one of the patch operations is missing a
# component
raise exception.BadRequest(reason=MAL_PATCH_ERR)
except jsonpatch.JsonPatchConflict:
raise exception.Unprocessable
except jsonpatch.JsonPatchException as jpe:
raise JsonPatchProcessingException(reason=str(jpe))
return fluff_plan(db_obj.refined_content(), db_obj.uuid)
def patch_event(event):
""" :type event: dart.model.event.Event """
p = JsonPatch(request.get_json())
return update_event(event, Event.from_dict(p.apply(event.to_dict())))
def take_action(self, parsed_args):
LOG.debug("take_action(%s)", parsed_args)
client = self.app.client_manager.application_catalog
jp_obj = None
if not parsed_args.filename:
jp_obj = json.load(sys.stdin)
else:
with open(parsed_args.filename) as fpatch:
jp_obj = json.load(fpatch)
jpatch = jsonpatch.JsonPatch(jp_obj)
environment_id = parsed_args.id
session_id = parsed_args.session_id
environment = client.environments.get(environment_id, session_id)
object_model = jpatch.apply(environment.services)
murano_utils.traverse_and_replace(object_model)
client.services.put(
environment_id,
path='/',
data=jpatch.apply(environment.services),
session_id=session_id)