Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def create(self, payload: dict, in_: IResource = None) -> IResource:
await self.get_transaction()
if in_ is None:
in_ = self.db
view = get_multi_adapter((in_, self.request), app_settings["http_methods"]["POST"], name="")
async def json():
return payload
self.request.json = json
resp = await view()
await self.commit()
path = resp.headers["Location"]
if path.startswith("http://") or path.startswith("https://"):
# strip off container prefix
container_url = get_object_url(in_, self.request) # type: ignore
path = path[len(container_url or "") :]
return await navigate_to(in_, path.strip("/")) # type: ignore
def __init__(self, context, request, field):
self.context = context
self.request = request
self.field = field
iface = resolve_dotted_name(app_settings["cloud_storage"])
alsoProvides(field, iface)
self.file_storage_manager = get_multi_adapter((context, request, field), IFileStorageManager)
self.dm = get_adapter(
self.file_storage_manager, IUploadDataManager, name=app_settings.get("cloud_datamanager") or "db"
)
configure.scan("guillotina.exc_resp")
configure.scan("guillotina.fields")
configure.scan("guillotina.migrations")
# always load guillotina
app_configurator.configure_application("guillotina")
app_configurator.configure_all_applications()
apply_concrete_behaviors()
# update *after* plugins loaded
update_app_settings(settings)
if "logging" in app_settings:
try:
logging.config.dictConfig(app_settings["logging"])
except Exception:
app_logger.error("Could not setup logging configuration", exc_info=True)
# Make and initialize asgi app
root.app = server_app
server_app.root = root
server_app.config = config
server_app.settings = app_settings
for k, v in _moved.items():
# for b/w compatibility, convert these
if k in app_settings:
app_settings[v] = app_settings[k]
del app_settings[k]
optimize_settings(app_settings)
def load_service(_context, service):
# prevent circular import
from guillotina.security.utils import protect_view
service_conf = service['config']
factory = resolve_dotted_name(service['klass'])
permission = service_conf.get(
'permission', app_settings.get('default_permission', None))
protect_view(factory, permission)
method = service_conf.get('method', 'GET')
default_layer = resolve_dotted_name(
app_settings.get('default_layer', IDefaultLayer))
layer = service_conf.get('layer', default_layer)
name = service_conf.get('name', '')
content = service_conf.get('context', Interface)
logger.debug('Defining adapter for ' # noqa
'{0:s} {1:s} {2:s} to {3:s} name {4:s}'.format(
content.__identifier__,
app_settings['http_methods'][method].__identifier__,
layer.__identifier__,
str(factory),
name))
def load_commands(module_name, commands):
module = resolve_dotted_name(module_name)
if hasattr(module, "app_settings") and app_settings != module.app_settings:
commands.update(module.app_settings.get("commands", {}))
for dependency in module.app_settings.get("applications") or []:
load_commands(dependency, commands)
async def __call__(self):
for possible_default_file in app_settings["default_static_filenames"]:
if possible_default_file in self.context:
return await self.serve_file(self.context[possible_default_file])
def get_jwk_key(settings=None):
if settings is None:
settings = app_settings
if settings.get("jwk") is None:
if not settings.get("debug"):
logger.warning(
"You are utilizing JWK keys but you have not provided "
"a jwk key setting in your application settings. "
"A key has been dynamically generated for you; however, "
"if you are running more than one guillotina process, "
"the key will NOT be shared between them and your "
"application will not function properly"
)
key = jwk.JWK.generate(kty="oct", size=256)
settings["jwk"] = key
return settings["jwk"]
def run(self, arguments, settings, app):
app_settings["root_user"]["password"] = TESTING_SETTINGS["root_user"]["password"]
root = get_utility(IApplication, name="root")
request = get_mocked_request()
login()
helpers = ShellHelpers(app, root, request)
task_vars.request.set(request)
use_db = helpers.use_db # noqa
use_container = helpers.use_container # noqa
commit = helpers.commit # noqa
abort = helpers.abort # noqa
setup = helpers.setup_context # noqa
try:
from IPython.terminal.embed import InteractiveShellEmbed # type: ignore
from traitlets.config.loader import Config # type: ignore
except ImportError:
sys.stderr.write(
async def validate(self, token):
if token.get("type") not in ("bearer", "wstoken", "cookie"):
return
if "." not in token.get("token", ""):
# quick way to check if actually might be jwt
return
try:
validated_jwt = jwt.decode(
token["token"], app_settings["jwt"]["secret"], algorithms=[app_settings["jwt"]["algorithm"]]
)
token["id"] = validated_jwt["id"]
token["decoded"] = validated_jwt
user = await find_user(token)
if user is not None and user.id == token["id"]:
return user
except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError, KeyError):
pass
return