Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@configure.subscriber(for_=(IResource, IObjectAddedEvent), priority=-1000)
def sync_foobar_sub(ob, evt):
if not hasattr(evt, "called"):
evt.called = 0
evt.called += 1
""" Creates an index
"""
async def remove_catalog(self, container: IContainer):
""" Deletes an index
"""
async def get_data(self, content, indexes=None, schemas=None):
data = {}
adapter = query_adapter(content, ICatalogDataAdapter)
if adapter:
data.update(await adapter(indexes, schemas))
return data
@configure.adapter(for_=IResource, provides=ISecurityInfo)
class DefaultSecurityInfoAdapter(object):
def __init__(self, content):
self.content = content
def __call__(self):
""" access_users and access_roles """
return {
"path": get_content_path(self.content),
"depth": get_content_depth(self.content),
"parent_uuid": getattr(getattr(self.content, "__parent__", None), "uuid", None),
"access_users": get_principals_with_access_content(self.content),
"access_roles": get_roles_with_access_content(self.content),
"type_name": self.content.type_name,
"tid": self.content.__serial__,
"modification_date": json_compatible(self.content.modification_date),
}
headers = {
'Access-Control-Expose-Headers': 'Location',
'Location': absolute_url()
}
serializer = queryMultiAdapter(
(obj, self.request),
IResourceSerializeToJson
)
response = await serializer()
return Response(response=response, headers=headers, status=201)
@configure.service(
context=IResource, method='PATCH', permission='guillotina.ModifyContent',
summary='Modify the content of this resource',
parameters=patch_content_json_schema_parameters,
responses={
"200": {
"description": "Resource data",
"schema": {
"$ref": "#/definitions/Resource"
}
}
})
class DefaultPATCH(Service):
async def __call__(self):
data = await self.get_data()
behaviors = data.get('@behaviors', None)
for behavior in behaviors or ():
self.context.add_behavior(behavior)
{"in": "path", "name": "filename", "required": True, "schema": {"type": "string"}},
],
responses={
"204": {
"description": "Successfully patched data",
"headers": {
"Location": {"schema": {"type": "string"}},
"Tus-Resumable": {"schema": {"type": "string"}},
"Access-Control-Expose-Headers": {"schema": {"type": "string"}},
},
}
},
),
)
@configure.service(
context=IResource,
method="HEAD",
permission="guillotina.ModifyContent",
name="@tusupload/{field_name}",
**_traversed_file_doc(
"TUS endpoint",
parameters=[{"in": "path", "name": "field_name", "required": True, "schema": {"type": "string"}}],
responses={
"200": {
"description": "Successfully patched data",
"headers": {
"Location": {"schema": {"type": "string"}},
"Tus-Resumable": {"schema": {"type": "string"}},
"Access-Control-Expose-Headers": {"schema": {"type": "string"}},
},
}
},
from guillotina.response import ASGIResponse
from guillotina.response import HTTPConflict
from guillotina.response import HTTPNotFound
from guillotina.response import HTTPPreconditionFailed
from guillotina.response import Response
from guillotina.utils import apply_coroutine
from guillotina.utils import get_object_url
from guillotina.utils import resolve_dotted_name
from zope.interface import alsoProvides
import base64
import posixpath
import uuid
@configure.adapter(for_=(IResource, IRequest, ICloudFileField), provides=IFileManager)
class FileManager(object):
def __init__(self, context, request, field):
self.context = context
self.request = request
self.field = field
iface = resolve_dotted_name(app_settings["cloud_storage"])
alsoProvides(field, iface)
self.file_storage_manager = get_multi_adapter((context, request, field), IFileStorageManager)
self.dm = get_adapter(
self.file_storage_manager, IUploadDataManager, name=app_settings.get("cloud_datamanager") or "db"
)
async def prepare_download(
self, disposition=None, filename=None, content_type=None, size=None, extra_headers=None, **kwargs
context=IResource,
method="POST",
permission="guillotina.ManageCatalog",
name="@catalog",
summary="Initialize catalog",
responses={"200": {"description": "Successfully initialized catalog"}},
)
async def catalog_post(context, request):
search = query_utility(ICatalogUtility)
await search.initialize_catalog(context)
return {}
@directives.index_field.with_accessor(IResource, "access_roles", type="keyword", fields=["acl"])
def get_access_roles(ob):
roles = get_roles_with_access_content(ob)
return roles
}
}
}
}))
class TusHeadFile(UploadFile):
async def __call__(self):
# We need to get the upload as async IO and look for an adapter
# for the field to save there by chunks
adapter = getMultiAdapter(
(self.context, self.request, self.field), IFileManager)
return await adapter.tus_head()
@configure.service(
context=IResource, method='PATCH', permission='guillotina.ModifyContent',
name='@tusupload',
**_traversed_file_doc('TUS endpoint', parameters=[{
"name": "Upload-Offset",
"in": "headers",
"type": "integer",
"required": True
}, {
"name": "CONTENT-LENGTH",
"in": "headers",
"type": "integer",
"required": True
}], responses={
'204': {
'description': 'Successfully patched data',
'headers': {
'Upload-Offset': {
security.check_permission("guillotina.AccessContent", member)
):
if fullobjects:
result["items"].append(
await get_multi_adapter((member, self.request), IResourceSerializeToJson)()
)
else:
result["items"].append(
await get_multi_adapter((member, self.request), IResourceSerializeToJsonSummary)()
)
result["length"] = length
return result
@configure.adapter(for_=(IResource, Interface), provides=IResourceSerializeToJsonSummary)
class DefaultJSONSummarySerializer(object):
"""Default ISerializeToJsonSummary adapter.
Requires context to be adaptable to IContentListingObject, which is
the case for all content objects providing IResource.
"""
def __init__(self, context, request):
self.context = context
self.request = request
async def __call__(self):
summary = json_compatible(
{
"@id": get_object_url(self.context, self.request),
@configure.subscriber(for_=(IResource, IObjectModifiedEvent))
def modified_object(obj, event):
"""Set the modification date of an object."""
now = datetime.now(tz=_zone)
obj.modification_date = now