Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
except ValueError as e:
errors.append({"message": "Value error", "field": name, "error": e})
except ValidationError as e:
errors.append(e.json())
except ValueDeserializationError as e:
errors.append({"message": e.message, "field": name, "error": e})
except Invalid as e:
errors.append({"message": e.args[0], "field": name, "error": e})
else:
# record object changes for potential future conflict resolution
try:
await apply_coroutine(field.set, obj, value)
changed = True
except ValidationError as e:
errors.append(e.json())
except ValueDeserializationError as e:
errors.append({"message": e.message, "field": name, "error": e})
except AttributeError:
logger.warning(f"AttributeError setting data on field {name}", exc_info=True)
except Exception:
logger.warning(
f"Unhandled error setting data on field, {schema} {name}", exc_info=True
)
errors.append(
{
"message": "Unhandled exception",
"field": name,
"error": ValueDeserializationError(field, value, "Unhandled error"),
}
)
else:
if validate_all and field.required and getattr(obj, name, None) is None:
def field_converter(field, value, context):
field.field.__name__ = field.__name__
if isinstance(value, dict) and "op" in value:
if not isinstance(value, dict):
raise ValueDeserializationError(field, value, "Not an object")
operation_name = value.get("op", "undefined")
if operation_name == "multi":
operation = query_adapter(field, field.operation_type, name=operation_name)
if operation is None:
raise ValueDeserializationError(field, value, f'"{operation_name}" not a valid operation')
value = operation(context, value.get("value"))
else:
bound_field = field.field.bind(context)
operation = query_adapter(bound_field, field.operation_type, name=operation_name)
if operation is None:
raise ValueDeserializationError(field, value, f'"{operation_name}" not a valid operation')
value = operation(context, value.get("value"))
elif isinstance(value, (dict, list)):
value = get_adapter(field.field, IJSONToValue, args=[value, context])
return value
def dict_converter(field, value, context=None):
if value == {}:
return {}
if not isinstance(value, dict):
raise ValueDeserializationError(field, value, "Not an object")
try:
keys, values = zip(*value.items())
keys = [schema_compatible(keys[idx], field.key_type, context) for idx in range(len(keys))]
values = [schema_compatible(values[idx], field.value_type, context) for idx in range(len(values))]
return dict(zip(keys, values))
except ValidationError as error:
raise ValueDeserializationError(field, value, "Wrong contained type", errors=[error])
def __call__(self, context, value):
if not isinstance(value, list):
raise ValueDeserializationError(
self.field, value, f"Invalid type patch data, must be list of updates"
)
for item in value:
_validate_field(self.field, context, item)
return super().__call__(context, value)
def do_operation(self, existing, value):
try:
existing.remove(value)
except ValueError:
raise ValueDeserializationError(self.field, value, "{} not in value".format(value))
return existing
def set_converter(field, value, context=None):
if not isinstance(value, list):
raise ValueDeserializationError(field, value, "Not an array")
return set(list_converter(field, value, context))
def _validate_field(field, context, value):
if "key" not in value or "value" not in value:
raise ValueDeserializationError(field, value, f"Invalid data")
from guillotina.behaviors.dynamic import find_field
field = find_field(context, value["key"])
# now, verify value...
if not field:
raise ValueDeserializationError(field, value, f"Dynamic field not found")
field_type = field.get("type", "unknown")
try:
valid_type = namedtuple("temp_assign_type", [field_type])
ob = valid_type({field_type: None})
bound_field = IDynamicType[field_type].bind(ob)
# validate and convert
real_value = get_adapter(bound_field, IJSONToValue, args=[value["value"], ob])
bound_field.validate(real_value)
value["value"] = real_value
def list_converter(field, value, context=None):
if not isinstance(value, list):
raise ValueDeserializationError(field, value, "Not an array")
try:
return [schema_compatible(item, field.value_type, context) for item in value]
except ValidationError as error:
raise ValueDeserializationError(field, value, "Wrong contained type", errors=[error])
def union_converter(field, value, context=None):
for f in field.fields:
try:
val = schema_compatible(value, f)
if f.__implemented__(IObject) and value and not val:
continue # IObject doesn't match
return val
except Exception:
pass
raise ValueDeserializationError(field, value, "Doesn't match any field")
async def __call__(self, field_context, context, value):
existing = self.get_existing_value(field_context)
if "bucket_index" not in value or "item_index" not in value:
raise ValueDeserializationError(self.field, value, "Not valid remove request")
try:
await existing.remove(context, value["bucket_index"], value["item_index"])
except IndexError:
raise ValueDeserializationError(self.field, value, "Not valid index value")