Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
required=True,
),
mail=dict(
type='string',
required=True,
# http://docs.python-cerberus.org/en/stable/validation-rules.html#regex
regex=r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$',
),
count=dict(
type='integer',
required=True,
coerce=int,
),
)
PreCerberusConstructed = djburger.validators.constructors.Cerberus(scheme, purge_unknown=True)
PreCerberusWrapped = djburger.validators.wrappers.Cerberus(cerberus.Validator(scheme, purge_unknown=True))
email_re = re.compile(r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$')
PreDjBurgerConstructed = djburger.validators.constructors.DictMixed(
dict(
name=djburger.validators.constructors.IsStr,
mail=djburger.validators.constructors.Chain(
djburger.validators.constructors.IsStr,
djburger.validators.constructors.Lambda(email_re.match),
),
count=djburger.validators.constructors.Or(
# int
djburger.validators.constructors.IsInt,
# str
djburger.validators.constructors.Chain(
djburger.validators.constructors.Lambda(lambda x: x.isdigit()),
contributors=[
dict(name="Doe Sr., ن یک تست", affiliation="Atlantis",
type="Other"),
dict(name="SmЭтith Sr., Marco", affiliation="Atlantis",
type="DataCurator")
],
title="Đây là một thử nghiệm",
upload_type="publication",
)
)
response = self.post(
'depositionlistresource', data=test_data, code=201,
)
res_id = response.json['id']
v = Validator()
if not v.validate(response.json, self.resource_schema):
print v.errors
raise AssertionError("Output does not validate according to schema")
if not v.validate(response.json['metadata'], self.metadata_schema):
print v.errors
raise AssertionError("Output does not validate according to schema")
# Upload 3 files
for i in range(3):
response = self.post(
'depositionfilelistresource',
urlargs=dict(resource_id=res_id),
is_json=False,
data={
'file': make_pdf_fixture('test%s.pdf' % i),
'name': 'test-%s.pdf' % i,
def a(I):
I.before
I.x
I.after
class ParentWithSame(object):
@story
def a(I):
I.before
I.x
I.after
ParentWithSame.a.contract(
Validator(
{
"foo": {"type": "integer", "coerce": int},
"bar": {"type": "list", "schema": {"type": "integer", "coerce": int}},
"baz": {"type": "integer", "coerce": int},
}
)
)
class SequentialParent(object):
@story
def a(I):
I.before
I.x
I.y
I.after
def validate_url(field, value, error):
if not value.startswith('http'):
error(field, 'Not a valid HTTP URL')
def load_ssh_private_key(doc):
if doc.get('key_helper') == 'true':
return 'unset'
if 'ssh_private_key_filename' not in doc:
return util.NO_TEST_FLAG
return util.read_file(doc['ssh_private_key_filename'])
class LaunchValidator(cerberus.Validator):
""" Needs to use unintuitive pattern so that child validator can be created
for validated the nested dcos_config. See:
http://docs.python-cerberus.org/en/latest/customize.html#instantiating-custom-validators
"""
def __init__(self, *args, **kwargs):
super(LaunchValidator, self).__init__(*args, **kwargs)
assert 'config_dir' in kwargs, 'This class must be supplied with the config_dir kwarg'
self.config_dir = kwargs['config_dir']
def _normalize_coerce_expand_local_path(self, value):
if not value:
return value
return expand_path(value, self.config_dir)
def _expand_error_dict(errors: dict) -> str:
"""
# Load the legacy `settings.json` file. Return immediately if it is not found.
try:
with open(LEGACY_PATH, "r") as f:
config = json.load(f)
except IOError:
return None
# Convert database settings to a single connection string.
convert_db(config)
db = pymongo.MongoClient(config["db_connection_string"])[config["db_name"]]
# Move settings that should be in database to database.
v = cerberus.Validator(virtool.settings.schema.SCHEMA, purge_unknown=True)
v.validate(config)
db.settings.update_one({"_id": "settings"}, {
"$set": v.document
}, upsert=True)
# Rewrite settings file without DB-stored settings.
v = cerberus.Validator(schema=SCHEMA, purge_unknown=True)
v.validate(config)
convert_http(config)
convert_job_limits(config)
convert_proxy(config)
remove_defaults(config)
config = dict(v.document)
"required": True,
},
},
}
for s in SCHEMAS:
SCHEMAS[s]["files"] = {
"description": "Files associated with the document",
# TODO: fix this since this is currently comming out a CommentedMap
# "type": "list",
# "schema": {"type": "string"},
"required": False,
}
class NoDescriptionValidator(Validator):
def _validate_description(self, description, field, value):
"""Don't validate descriptions
The rule's arguments are validated against this schema:
{'type': 'string'}"""
if False:
pass
def _validate_eallowed(self, eallowed, field, value):
"""Test if value is in list
The rule's arguments are validated against this schema:
{'type': 'list'}
"""
if value not in eallowed:
warn(
'"{}" is not in the preferred entries for "{}", please '
def __validate(cls, schema, param):
v = cerberus.Validator(schema)
if v.validate(param):
return
raise Exception(json.dumps(v.errors))
pepa_schemas = []
for fn in glob.glob(valdir + '/*.yaml'):
log.info("Loading schema: {0}".format(fn))
with salt.utils.fopen(fn) as fhr:
template = jinja2.Template(fhr.read())
data = output
data['grains'] = __grains__.copy()
data['pillar'] = __pillar__.copy()
schema = yaml.load(
template.render(data),
Loader=SaltYamlSafeLoader
)
all_schemas.update(schema)
pepa_schemas.append(fn)
val = cerberus.Validator()
if not val.validate(output['pepa_keys'], all_schemas):
for ekey, error in six.iteritems(val.errors):
log.warning('Validation failed for key {0}: {1}'.format(ekey, error))
output['pepa_schema_keys'] = all_schemas
output['pepa_schemas'] = pepa_schemas
def validate_info():
v = Validator()
schema = {
"title": {"required": True, "type": "string"},
"version": {"required": True, "type": "string"},
"description": {"type": "string"},
"termsOfService": {"type": "string"},
"contact": {
"type": "dict",
"schema": {
"name": {"type": "string"},
"url": {"type": "string", "validator": _validate_url},
"email": {
"type": "string",
"regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$",
},
},
},