Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
await db.async_set(self.arguments.container, container)
await container.install()
# Local Roles assign owner as the creator user
roleperm = IPrincipalRoleManager(container)
roleperm.assign_role_to_principal("guillotina.Owner", "root")
await notify(ObjectAddedEvent(container, db, container.__name__))
try:
await tm.commit(txn=txn)
except ConflictIdOnContainer:
# ignore id conflicts
await tm.abort(txn=txn)
txn = await tm.begin()
task_vars.registry.set(None)
task_vars.container.set(container)
api = WikipediaAPI()
folder_count = 0
async for page_data in api.iter_pages():
await self.import_folder(api, tm, txn, container, page_data)
folder_count += 1
if folder_count >= self.arguments.per_node:
break
try:
await tm.commit(txn=txn)
except ConflictIdOnContainer:
# ignore id conflicts
await tm.abort(txn=txn)
api.close()
tm = db.get_transaction_manager()
task_vars.db.set(db)
tm = db.get_transaction_manager()
txn = await tm.begin()
container = await db.async_get(self.arguments.container)
if container is None:
container = await create_content("Container", id=self.arguments.container, title="Test Data")
container.__name__ = self.arguments.container
await db.async_set(self.arguments.container, container)
await container.install()
# Local Roles assign owner as the creator user
roleperm = IPrincipalRoleManager(container)
roleperm.assign_role_to_principal("guillotina.Owner", "root")
await notify(ObjectAddedEvent(container, db, container.__name__))
try:
await tm.commit(txn=txn)
except ConflictIdOnContainer:
# ignore id conflicts
await tm.abort(txn=txn)
txn = await tm.begin()
task_vars.registry.set(None)
task_vars.container.set(container)
api = WikipediaAPI()
folder_count = 0
async for page_data in api.iter_pages():
await self.import_folder(api, tm, txn, container, page_data)
folder_count += 1
if folder_count >= self.arguments.per_node:
class IExample(IResource):
metadata("categories")
index_field("boolean_field", type="boolean")
boolean_field = schema.Bool(required=False)
index_field("categories", field_mapping=CATEGORIES_MAPPING)
categories = schema.List(
title="categories", default=[], value_type=schema.JSONField(title="term", schema=TERM_SCHEMA)
)
textline_field = schema.TextLine(title="kk", widget="testing", required=False)
text_field = schema.Text(required=False)
dict_value = schema.Dict(key_type=schema.TextLine(), value_type=schema.TextLine(), required=False)
datetime = schema.Datetime(required=False)
write_permission(write_protected="example.MyPermission")
write_protected = schema.TextLine(title="Write protected field", required=False)
default_factory_test = schema.Text(defaultFactory=lambda: "foobar")
context_default_factory_test = schema.Text(defaultFactory=ContextDefaultFactory())
@index_field.with_accessor(IExample, "categories_accessor", field="categories")
def categories_index_accessor(ob):
if not ob.categories:
raise NoIndexField
else:
return [c["label"] for c in ob.categories]
bucket_len=10, required=False, default=None, key_type=schema.Text(), value_type=schema.Text()
)
read_permission(no_read_field="example.MyPermission")
no_read_field = schema.TextLine(required=False, default="")
@configure.behavior(
title="", provides=ITestBehavior, marker=IMarkerBehavior, for_="guillotina.interfaces.IResource"
)
class GTestBehavior(AnnotationBehavior):
foobar_context = ContextProperty("foobar_context")
class ITestContextBehavior(Interface):
foobar = schema.TextLine()
class IMarkerTestContextBehavior(Interface):
pass
@configure.behavior(
title="",
provides=ITestContextBehavior,
marker=IMarkerTestContextBehavior,
for_="guillotina.interfaces.IResource",
)
class GContextTestBehavior(ContextBehavior):
pass
class IExample(IResource):
metadata("categories")
index_field("boolean_field", type="boolean")
boolean_field = schema.Bool(required=False)
index_field("categories", field_mapping=CATEGORIES_MAPPING)
categories = schema.List(
title="categories", default=[], value_type=schema.JSONField(title="term", schema=TERM_SCHEMA)
)
textline_field = schema.TextLine(title="kk", widget="testing", required=False)
text_field = schema.Text(required=False)
dict_value = schema.Dict(key_type=schema.TextLine(), value_type=schema.TextLine(), required=False)
datetime = schema.Datetime(required=False)
write_permission(write_protected="example.MyPermission")
write_protected = schema.TextLine(title="Write protected field", required=False)
default_factory_test = schema.Text(defaultFactory=lambda: "foobar")
context_default_factory_test = schema.Text(defaultFactory=ContextDefaultFactory())
@index_field.with_accessor(IExample, "categories_accessor", field="categories")
def categories_index_accessor(ob):
if not ob.categories:
raise NoIndexField
else:
"contenttype",
)
@configure.service(
context=IApplication, method="GET", permission="guillotina.AccessContent", name="@raise-http-exception"
)
@configure.service(
context=IApplication, method="POST", permission="guillotina.AccessContent", name="@raise-http-exception"
)
async def raise_http_exception(context, request):
raise HTTPUnprocessableEntity()
# Create a new permission and grant it to authenticated users only
configure.permission("example.EndpointPermission", "example permission")
configure.grant(permission="example.EndpointPermission", role="guillotina.Authenticated")
@configure.service(
context=IApplication, method="GET", permission="example.EndpointPermission", name="@myEndpoint"
)
async def my_endpoint(context, request):
return {"foo": "bar"}
class ITestAsyncUtility(IAsyncUtility):
pass
@configure.utility(provides=ITestAsyncUtility)
class AsyncUtility:
@configure.subscriber(for_=(IResource, IObjectAddedEvent), priority=-1000)
def sync_foobar_sub(ob, evt):
if not hasattr(evt, "called"):
evt.called = 0
evt.called += 1
async def create(self, payload: dict, in_: IResource = None) -> IResource:
await self.get_transaction()
if in_ is None:
in_ = self.db
view = get_multi_adapter((in_, self.request), app_settings["http_methods"]["POST"], name="")
async def json():
return payload
self.request.json = json
resp = await view()
await self.commit()
path = resp.headers["Location"]
if path.startswith("http://") or path.startswith("https://"):
# strip off container prefix
container_url = get_object_url(in_, self.request) # type: ignore
path = path[len(container_url or "") :]
return await navigate_to(in_, path.strip("/")) # type: ignore
async def __call__(self):
serializer = get_multi_adapter((self.context, self.request), IResourceSerializeToJson)
include = omit = []
if self.request.query.get("include"):
include = self.request.query.get("include").split(",")
if self.request.query.get("omit"):
omit = self.request.query.get("omit").split(",")
try:
result = await serializer(include=include, omit=omit)
except TypeError:
result = await serializer()
await notify(ObjectVisitedEvent(self.context))
return result
async def default_head(context, request):
return {}
@configure.service(
context=IResource,
method="GET",
permission="guillotina.ViewContent",
summary="Retrieves serialization of resource",
responses=get_content_json_schema_responses,
parameters=[
{"name": "include", "in": "query", "required": True, "schema": {"type": "string"}},
{"name": "omit", "in": "query", "required": True, "schema": {"type": "string"}},
],
)
class DefaultGET(Service):
@profilable
async def __call__(self):
serializer = get_multi_adapter((self.context, self.request), IResourceSerializeToJson)
include = omit = []
if self.request.query.get("include"):
include = self.request.query.get("include").split(",")
if self.request.query.get("omit"):
omit = self.request.query.get("omit").split(",")
try:
result = await serializer(include=include, omit=omit)
except TypeError:
result = await serializer()
await notify(ObjectVisitedEvent(self.context))
return result