Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def __call__(self):
# We need to get the upload as async IO and look for an adapter
# for the field to save there by chunks
kwargs = {}
if "filename" in self.request.matchdict:
kwargs["filename"] = self.request.matchdict["filename"]
try:
adapter = get_multi_adapter((self.context, self.request, self.field), IFileManager)
return await self.handle(adapter, kwargs)
except AttributeError:
# file does not exist
return HTTPNotFound(content={"reason": "File does not exist"})
result[attribute_name] = text
if result["type"] == "object":
if IJSONField.providedBy(field):
result.update(field.json_schema)
if IDict.providedBy(field):
if "properties" not in result:
result["properties"] = {}
if field.value_type:
field_serializer = get_multi_adapter(
(field.value_type, self.schema, self.request), ISchemaFieldSerializeToJson
)
result["additionalProperties"] = field_serializer.serialize()
else:
result["additionalProperties"] = True
elif IObject.providedBy(field):
schema_serializer = get_multi_adapter((field.schema, self.request), ISchemaSerializeToJson)
result["properties"] = schema_serializer.serialize()
if field.extra_values is not None:
result.update(field.extra_values)
return result
async def __call__(self):
if self.behavior is not None and IAsyncBehavior.implementedBy(self.behavior.__class__):
# providedBy not working here?
await self.behavior.load(create=True)
# We need to get the upload as async IO and look for an adapter
# for the field to save there by chunks
adapter = get_multi_adapter((self.context, self.request, self.field), IFileManager)
return await adapter.tus_create()
async def __call__(self):
factory = self.factory
result = {
"title": factory.type_name,
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"required": [],
"definitions": {},
"properties": {},
}
# Base object class serialized
for name, field in get_fields_in_order(factory.schema):
if field.required:
result["required"].append(name)
serializer = get_multi_adapter((field, factory.schema, self.request), ISchemaFieldSerializeToJson)
result["properties"][name] = await serializer()
invariants = []
for i in factory.schema.queryTaggedValue("invariants", []):
invariants.append("%s.%s" % (i.__module__, i.__name__))
result["invariants"] = invariants
# Behavior serialization
for schema in factory.behaviors or ():
schema_serializer = get_multi_adapter((schema, self.request), ISchemaSerializeToJson)
serialization = await schema_serializer()
result["properties"][schema_serializer.name] = (
{"$ref": "#/components/schemas/" + schema_serializer.name},
)
result["definitions"][schema_serializer.name] = serialization
async def __call__(self):
if self.behavior is not None and IAsyncBehavior.implementedBy(self.behavior.__class__):
# providedBy not working here?
await self.behavior.load(create=True)
# We need to get the upload as async IO and look for an adapter
# for the field to save there by chunks
adapter = get_multi_adapter((self.context, self.request, self.field), IFileManager)
return await adapter.upload()
field = field.bind(behavior)
field_context = behavior
else:
# main object field
factory = get_cached_factory(context.type_name)
schema = factory.schema
try:
field = schema[field_name]
except KeyError:
return HTTPNotFound(content={"reason": f"No field: {field_name}"})
field = field.bind(context)
field_context = context
# check permission
read_permissions = merged_tagged_value_dict(schema, read_permission.key)
serializer = get_multi_adapter((context, request), IResourceSerializeToJson)
if not serializer.check_permission(read_permissions.get(field_name)):
return HTTPUnauthorized(content={"reason": "You are not authorized to render this field"})
field_renderer = query_multi_adapter((context, request, field), IFieldValueRenderer)
if field_renderer is None:
return await serializer.serialize_field(field_context, field)
else:
return await field_renderer()
async def __call__(self):
group: Group = await self.get_group()
serializer = get_multi_adapter((group, self.request), IResourceSerializeToJsonSummary)
return await serializer()
text = text["properties"]
elif value is not None and (force or value != field.missing_value):
text = json_compatible(value)
if text:
if attribute_name == "value_type":
attribute_name = "items"
result[attribute_name] = text
if result["type"] == "object":
if IJSONField.providedBy(field):
result.update(field.json_schema)
if IDict.providedBy(field):
if "properties" not in result:
result["properties"] = {}
if field.value_type:
field_serializer = get_multi_adapter(
(field.value_type, self.schema, self.request), ISchemaFieldSerializeToJson
)
result["additionalProperties"] = field_serializer.serialize()
else:
result["additionalProperties"] = True
elif IObject.providedBy(field):
schema_serializer = get_multi_adapter((field.schema, self.request), ISchemaSerializeToJson)
result["properties"] = schema_serializer.serialize()
if field.extra_values is not None:
result.update(field.extra_values)
return result
if (length > MAX_ALLOWED or length == 0) and not fullobjects:
result["items"] = []
else:
result["items"] = []
async for ident, member in self.context.async_items(suppress_events=True):
if not ident.startswith("_") and bool(
security.check_permission("guillotina.AccessContent", member)
):
if fullobjects:
result["items"].append(
await get_multi_adapter((member, self.request), IResourceSerializeToJson)()
)
else:
result["items"].append(
await get_multi_adapter((member, self.request), IResourceSerializeToJsonSummary)()
)
result["length"] = length
return result
async def render(self):
# We need to get the upload as async IO and look for an adapter
# for the field to save there by chunks
adapter = get_multi_adapter((self.context, self.request, self.field), IFileManager)
return await adapter.tus_options()