Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def schema_type():
return colander.Mapping(unknown='preserve')
lint.check_config_file(charm_path)
return lint.lint, lint.exit_code
def metadata(self):
metadata = None
with open(os.path.join(self.charm_path, 'metadata.yaml'), "rb") as f:
metadata = yaml.safe_load(f.read())
return metadata
def promulgate(self):
pass
class StrictMapping(colander.Mapping):
def __init__(self):
super(StrictMapping, self).__init__(unknown='raise')
class Boolean(object):
def deserialize(self, node, cstruct):
if cstruct is colander.null:
return colander.null
if isinstance(cstruct, bool):
cstruct = str(cstruct).lower()
if cstruct not in ('true', 'false'):
raise colander.Invalid(
node, '"%s" is not one of true, false' % cstruct)
class StringOrEmpty(colander.String):
def _addMetabolizeSchema(self, schema):
scenario = colander.SchemaNode(colander.Mapping())
transformation_types = colander.OneOf([
'phase1',
'phase1_selected',
'phase2',
'phase2_selected',
'glycosidase',
'mass_filter',
'gut'])
scenario.add(colander.SchemaNode(colander.String(),
name='type',
validator=transformation_types))
# TODO add validator for steps, but what are valid options for steps?
scenario.add(colander.SchemaNode(colander.String(),
name='steps'))
import json
import os
import colander
from pyramid.security import NO_PERMISSION_REQUIRED
from kinto.core import Service
HERE = os.path.dirname(__file__)
ORIGIN = os.path.dirname(HERE)
class VersionResponseSchema(colander.MappingSchema):
body = colander.SchemaNode(colander.Mapping(unknown="preserve"))
version_response_schemas = {
"200": VersionResponseSchema(description="Return the running Instance version information.")
}
version = Service(name="version", path="/__version__", description="Version")
@version.get(
permission=NO_PERMISSION_REQUIRED,
tags=["Utilities"],
operation_id="__version__",
response_schemas=version_response_schemas,
)
def __init__(self, **kwargs):
super(StrictAboutExtraKeysColanderMappingSchema, self).__init__(colander.Mapping(unknown='raise'), **kwargs)
import colander
from cornice import Service
from pyramid.security import NO_PERMISSION_REQUIRED
contribute = Service(
name="contribute.json", description="Open-source information", path="/contribute.json"
)
class ContributeResponseSchema(colander.MappingSchema):
body = colander.SchemaNode(colander.Mapping(unknown="preserve"))
contribute_responses = {
"200": ContributeResponseSchema(description="Return open source contributing information.")
}
@contribute.get(
permission=NO_PERMISSION_REQUIRED,
tags=["Utilities"],
operation_id="contribute",
response_schemas=contribute_responses,
)
def contribute_get(request):
return {
"name": "kinto",
import colander
from pyramid.security import NO_PERMISSION_REQUIRED, Authenticated
from kinto.core import Service
hello = Service(name="hello", path="/", description="Welcome")
class HelloResponseSchema(colander.MappingSchema):
body = colander.SchemaNode(colander.Mapping(unknown="preserve"))
hello_response_schemas = {
"200": HelloResponseSchema(description="Return information about the running Instance.")
}
@hello.get(
permission=NO_PERMISSION_REQUIRED,
tags=["Utilities"],
operation_id="server_info",
response_schemas=hello_response_schemas,
)
def get_hello(request):
"""Return information regarding the current instance."""
settings = request.registry.settings
if isinstance(request, dict):
merge_dicts(request, defaults)
return super().deserialize(cstruct)
class BatchRequest(colander.MappingSchema):
body = BatchPayloadSchema()
class BatchResponseSchema(colander.MappingSchema):
status = colander.SchemaNode(colander.Integer())
path = colander.SchemaNode(colander.String())
headers = colander.SchemaNode(
colander.Mapping(unknown="preserve"), validator=string_values, missing=colander.drop
)
body = colander.SchemaNode(colander.Mapping(unknown="preserve"), missing=colander.drop)
class BatchResponseBodySchema(colander.MappingSchema):
responses = colander.SequenceSchema(BatchResponseSchema(missing=colander.drop))
class BatchResponse(colander.MappingSchema):
body = BatchResponseBodySchema()
class ErrorResponseSchema(colander.MappingSchema):
body = ErrorSchema()
batch_responses = {
"200": BatchResponse(description="Return a list of operation responses."),
def _get_msdata_schema(self):
def textarea_or_file(node, value):
"""Validator that either textarea or file upload is filled"""
if not(not value['ms_data'] is colander.null or
not value['ms_data_file'] is colander.null):
error = 'Either ms_data or ms_data_file must be set'
exception = colander.Invalid(node)
exception.add(colander.Invalid(msdata_stringSchema, error))
exception.add(colander.Invalid(msdata_fileSchema, error))
raise exception
schema = colander.SchemaNode(colander.Mapping(),
validator=textarea_or_file)
valid_formats = colander.OneOf(['mzxml',
'mass_tree',
'form_tree',
'mgf',
])
schema.add(colander.SchemaNode(colander.String(),
validator=valid_formats,
name='ms_data_format'
))
filled = colander.Length(min=1)
msdata_stringSchema = colander.SchemaNode(colander.String(),
validator=filled,
missing=colander.null,
else:
params = {}
# The name of SchemaNode must be the same of SQLA class attribute.
params['name'] = name
if 'type' in params:
type_ = params.pop('type')
elif uselist:
# xToMany relationships.
type_ = colander.Sequence()
else:
# xToOne relationships.
type_ = colander.Mapping()
if 'children' in params:
children = params.pop('children')
else:
# Map columns according to ColanderAlchemy configuration
mapping = SQLAlchemyMapping(cls)
children = uselist and (mapping,) or mapping.children
if nullable == False:
params['missing'] = colander.required
elif 'type' not in params and uselist:
params['missing'] = []
elif 'type' not in params: