Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_config_data_paths_existing(dst):
try:
ConfigData(
src_path="./i_do_not_exist",
extra_paths=["./i_do_not_exist"],
dst_path=dst,
envops=EnvOps(),
)
except ValidationError as e:
assert len(e.errors()) == 2
for i, p in enumerate(("src_path", "extra_paths")):
err = e.errors()[i]
assert err["loc"][0] == p
assert err["msg"] == "Project template not found."
else:
raise AssertionError()
class Child(Parent):
def echo(self):
return 'child'
class Different:
def echo(self):
return 'different'
class Model(BaseModel):
v: Type[Parent] = Parent
assert Model(v=Parent).v().echo() == 'parent'
assert Model().v().echo() == 'parent'
assert Model(v=Child).v().echo() == 'child'
with pytest.raises(ValidationError) as exc_info:
Model(v=Different)
assert exc_info.value.errors() == [
{
'loc': ('v',),
'msg': 'subclass of Parent expected',
'type': 'type_error.subclass',
'ctx': {'expected_class': 'Parent'},
}
def test_constrained_bytes_too_long():
with pytest.raises(ValidationError) as exc_info:
ConBytesModel(v=b'this is too long')
assert exc_info.value.errors() == [
{
'loc': ('v',),
'msg': 'ensure this value has at most 10 characters',
'type': 'value_error.any_str.max_length',
'ctx': {'limit_value': 10},
}
def test_invalid_analysis_date_range(invalid_request_range: JSONDict) -> None:
with pytest.raises(ValidationError) as e:
invalid_request = AnalysisRequest(**invalid_request_range) # noqa: F841
assert e.value.errors() == [
{
"loc": ("endDate",),
"msg": "Period must not exceed 24 hours",
"type": "value_error",
}
def test_url_str_relative_and_custom_schemes():
class Model(BaseModel):
v: urlstr(relative=True)
# By default, ws not allowed
url = 'ws://test.test'
with pytest.raises(ValidationError) as exc_info:
Model(v=url)
assert exc_info.value.errors() == [
{
'loc': ('v',),
'msg': 'url scheme "ws" is not allowed',
'type': 'value_error.url.scheme',
'ctx': {'scheme': 'ws'},
}
]
class Model(BaseModel):
v: urlstr(relative=True, schemes={'http', 'https', 'ws'})
assert Model(v=url).v == url
class Config:
orm_mode = True
class ModelInvalid(BaseModel):
foo: str
bar: int
class Config:
orm_mode = True
foo = FooGetAttr()
model = Model.from_orm(foo)
assert model.foo == 'Foo'
assert model.bar == 1
assert model.dict(exclude_unset=True) == {'foo': 'Foo'}
with pytest.raises(ValidationError):
ModelInvalid.from_orm(foo)
def test_redis_dsns():
class Model(BaseModel):
a: RedisDsn
m = Model(a='redis://user:pass@localhost:5432/app')
assert m.a == 'redis://user:pass@localhost:5432/app'
assert m.a.user == 'user'
assert m.a.password == 'pass'
with pytest.raises(ValidationError) as exc_info:
Model(a='http://example.org')
assert exc_info.value.errors()[0]['type'] == 'value_error.url.scheme'
with pytest.raises(ValidationError) as exc_info:
Model(a='redis://localhost:5432/app')
error = exc_info.value.errors()[0]
assert error == {'loc': ('a',), 'msg': 'userinfo required in URL but missing', 'type': 'value_error.url.userinfo'}
def validate_request(*args, **kwargs):
try:
# validate query
arg = request.args
if not arg:
arg = {}
json_query = query(**arg) if query else None
# validate data
json_obj = request.get_json()
if json_obj is None:
json_obj = {}
json_data = data(**json_obj) if data else None
except ValidationError as err:
abort(make_response(jsonify(message=str(err)), 422))
except Exception:
raise
request.query = json_query
request.json_data = json_data
response = func(*args, **kwargs)
others = ()
if isinstance(response, tuple) and len(response) > 1:
response, others = response[0], response[1:]
if resp and not isinstance(response, resp):
abort_json(500, 'Wrong response type produced by server.')
if resp:
return make_response(jsonify(**response.dict()), *others)
from pydantic import BaseModel, HttpUrl, ValidationError
class MyModel(BaseModel):
url: HttpUrl
m = MyModel(url='http://www.example.com')
print(m.url)
try:
MyModel(url='ftp://invalid.url')
except ValidationError as e:
print(e)
try:
MyModel(url='not a url')
except ValidationError as e:
print(e)
async def _validate_kwargs(
meta: HandlerMeta, data_generators: DataGenerators, request: web.Request
) -> _HandlerKwargs:
validated: _HandlerKwargs = {}
if meta.request_type is None:
return validated
raw = {}
for raw_data_generator in data_generators.raw:
k, raw_data = await raw_data_generator(request)
raw[k] = raw_data
try:
cleaned = cast(BaseModel, meta.request_type).parse_obj(raw)
except ValidationError as e:
raise HTTPBadRequest(validation_error=e) from e
for kwargs_data_generator in data_generators.kwargs:
for k, param in kwargs_data_generator(meta, cleaned):
validated[k] = param
return validated