Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def add_non_field_param_to_dependency(
*, param: inspect.Parameter, dependant: Dependant
) -> Optional[bool]:
if lenient_issubclass(param.annotation, Request):
dependant.request_param_name = param.name
return True
elif lenient_issubclass(param.annotation, WebSocket):
dependant.websocket_param_name = param.name
return True
elif lenient_issubclass(param.annotation, Response):
dependant.response_param_name = param.name
return True
elif lenient_issubclass(param.annotation, BackgroundTasks):
dependant.background_tasks_param_name = param.name
return True
elif lenient_issubclass(param.annotation, SecurityScopes):
dependant.security_scopes_param_name = param.name
return True
return None
# Don't return immediately, to allow adding specific types
for type_, t_schema in field_class_to_schema:
if issubclass(field_type, type_):
f_schema.update(t_schema)
break
modify_schema = getattr(field_type, '__modify_schema__', None)
if modify_schema:
modify_schema(f_schema)
if f_schema:
return f_schema, definitions, nested_models
# Handle dataclass-based models
if lenient_issubclass(getattr(field_type, '__pydantic_model__', None), BaseModel):
field_type = field_type.__pydantic_model__
if issubclass(field_type, BaseModel):
model_name = model_name_map[field_type]
if field_type not in known_models:
sub_schema, sub_definitions, sub_nested_models = model_process_schema(
field_type,
by_alias=by_alias,
model_name_map=model_name_map,
ref_prefix=ref_prefix,
known_models=known_models,
)
definitions.update(sub_definitions)
definitions[model_name] = sub_schema
nested_models.update(sub_nested_models)
else:
def class_validator(v: Any) -> Type[T]:
if lenient_issubclass(v, type_):
return v
raise errors.SubclassError(expected_class=type_)
Take a single Pydantic ``ModelField`` (from a model) that could have been declared as a sublcass of BaseModel
(so, it could be a submodel), and generate a set with its model and all the sub-models in the tree.
I.e. if you pass a field that was declared to be of type ``Foo`` (subclass of BaseModel) as ``field``, and that
model ``Foo`` has a field of type ``Bar`` (also subclass of ``BaseModel``) and that model ``Bar`` has a field of
type ``Baz`` (also subclass of ``BaseModel``), the return value will be ``set([Foo, Bar, Baz])``.
:param field: a Pydantic ``ModelField``
:param known_models: used to solve circular references
:return: a set with the model used in the declaration for this field, if any, and all its sub-models
"""
from .main import BaseModel # noqa: F811
flat_models: Set[Type[BaseModel]] = set()
# Handle dataclass-based models
field_type = field.type_
if lenient_issubclass(getattr(field_type, '__pydantic_model__', None), BaseModel):
field_type = field_type.__pydantic_model__
if field.sub_fields:
flat_models |= get_flat_models_from_fields(field.sub_fields, known_models=known_models)
elif lenient_issubclass(field_type, BaseModel) and field_type not in known_models:
flat_models |= get_flat_models_from_model(field_type, known_models=known_models)
return flat_models
class_vars = set()
if (namespace.get('__module__'), namespace.get('__qualname__')) != ('pydantic.main', 'BaseModel'):
annotations = resolve_annotations(namespace.get('__annotations__', {}), namespace.get('__module__', None))
untouched_types = UNTOUCHED_TYPES + config.keep_untouched
# annotation only fields need to come first in fields
for ann_name, ann_type in annotations.items():
if is_classvar(ann_type):
class_vars.add(ann_name)
elif is_valid_field(ann_name):
validate_field_name(bases, ann_name)
value = namespace.get(ann_name, Undefined)
if (
isinstance(value, untouched_types)
and ann_type != PyObject
and not lenient_issubclass(getattr(ann_type, '__origin__', None), Type)
):
continue
fields[ann_name] = ModelField.infer(
name=ann_name,
value=value,
annotation=ann_type,
class_validators=vg.get_validators(ann_name),
config=config,
)
for var_name, value in namespace.items():
if (
var_name not in annotations
and is_valid_field(var_name)
and not isinstance(value, untouched_types)
and var_name not in class_vars
def _type_analysis(self) -> None: # noqa: C901 (ignore complexity)
# typing interface is horrible, we have to do some ugly checks
if lenient_issubclass(self.type_, JsonWrapper):
self.type_ = self.type_.inner_type
self.parse_json = True
elif lenient_issubclass(self.type_, Json):
self.type_ = Any
self.parse_json = True
elif isinstance(self.type_, TypeVar): # type: ignore
if self.type_.__bound__:
self.type_ = self.type_.__bound__
elif self.type_.__constraints__:
self.type_ = Union[self.type_.__constraints__]
else:
self.type_ = Any
if self.type_ is Any:
if self.required is Undefined:
self.required = False
self.allow_none = True
return
elif self.type_ is Pattern:
def _type_analysis(self) -> None: # noqa: C901 (ignore complexity)
# typing interface is horrible, we have to do some ugly checks
if lenient_issubclass(self.type_, JsonWrapper):
self.type_ = self.type_.inner_type
self.parse_json = True
elif lenient_issubclass(self.type_, Json):
self.type_ = Any
self.parse_json = True
elif isinstance(self.type_, TypeVar): # type: ignore
if self.type_.__bound__:
self.type_ = self.type_.__bound__
elif self.type_.__constraints__:
self.type_ = Union[self.type_.__constraints__]
else:
self.type_ = Any
if self.type_ is Any:
if self.required is Undefined:
self.required = False
"description", status_text or "Additional Response"
)
status_code_key = str(additional_status_code).upper()
if status_code_key == "DEFAULT":
status_code_key = "default"
operation.setdefault("responses", {})[status_code_key] = response
status_code = str(route.status_code)
operation.setdefault("responses", {}).setdefault(status_code, {})[
"description"
] = route.response_description
if (
route_response_media_type
and route.status_code not in STATUS_CODES_WITH_NO_BODY
):
response_schema = {"type": "string"}
if lenient_issubclass(route.response_class, JSONResponse):
if route.response_field:
response_schema, _, _ = field_schema(
route.response_field,
model_name_map=model_name_map,
ref_prefix=REF_PREFIX,
)
else:
response_schema = {}
operation.setdefault("responses", {}).setdefault(
status_code, {}
).setdefault("content", {}).setdefault(route_response_media_type, {})[
"schema"
] = response_schema
http422 = str(HTTP_422_UNPROCESSABLE_ENTITY)
if (all_route_params or route.body_field) and not any(
def is_scalar_sequence_field(field: ModelField) -> bool:
if (field.shape in sequence_shapes) and not lenient_issubclass(
field.type_, BaseModel
):
if field.sub_fields is not None:
for sub_field in field.sub_fields:
if not is_scalar_field(sub_field):
return False
return True
if lenient_issubclass(field.type_, sequence_types):
return True
return False