Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_classes_not_abstract() -> None:
d1 = multidict.MultiDict({"a": "b"}) # type: multidict.MultiDict[str]
d2 = multidict.CIMultiDict({"a": "b"}) # type: multidict.CIMultiDict[str]
d3 = multidict.MultiDictProxy(d1)
d4 = multidict.CIMultiDictProxy(d2)
d1.getone("a")
d2.getall("a")
d3.getone("a")
d4.getall("a")
def func(data_dict=None):
md = multidict.MultiDict(data_dict or dict())
return multidict.MultiDictProxy(md)
def test_query_empty():
url = URL("http://example.com")
assert isinstance(url.query, MultiDictProxy)
assert url.query == MultiDict()
"""Parses a MIME type into its components.
mimetype is a MIME type string.
Returns a MimeType object.
Example:
>>> parse_mimetype('text/html; charset=utf-8')
MimeType(type='text', subtype='html', suffix='',
parameters={'charset': 'utf-8'})
"""
if not mimetype:
return MimeType(type='', subtype='', suffix='',
parameters=MultiDictProxy(MultiDict()))
parts = mimetype.split(';')
params = MultiDict() # type: MultiDict[str]
for item in parts[1:]:
if not item:
continue
key, value = cast(Tuple[str, str],
item.split('=', 1) if '=' in item else (item, ''))
params.add(key.lower().strip(), value.strip(' "'))
fulltype = parts[0].strip().lower()
if fulltype == '*':
fulltype = '*/*'
mtype, stype = (cast(Tuple[str, str], fulltype.split('/', 1))
if '/' in fulltype else (fulltype, ''))
continue
key, value = cast(Tuple[str, str],
item.split('=', 1) if '=' in item else (item, ''))
params.add(key.lower().strip(), value.strip(' "'))
fulltype = parts[0].strip().lower()
if fulltype == '*':
fulltype = '*/*'
mtype, stype = (cast(Tuple[str, str], fulltype.split('/', 1))
if '/' in fulltype else (fulltype, ''))
stype, suffix = (cast(Tuple[str, str], stype.split('+', 1))
if '+' in stype else (stype, ''))
return MimeType(type=mtype, subtype=stype, suffix=suffix,
parameters=MultiDictProxy(params))
continue
key, value = cast(Tuple[str, str],
item.split('=', 1) if '=' in item else (item, ''))
params.add(key.lower().strip(), value.strip(' "'))
fulltype = parts[0].strip().lower()
if fulltype == '*':
fulltype = '*/*'
mtype, stype = (cast(Tuple[str, str], fulltype.split('/', 1))
if '/' in fulltype else (fulltype, ''))
stype, suffix = (cast(Tuple[str, str], stype.split('+', 1))
if '+' in stype else (stype, ''))
return MimeType(type=mtype, subtype=stype, suffix=suffix,
parameters=MultiDictProxy(params))
match = re.match(
r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$",
param, re.M
)
if match is None: # pragma: no cover
# the check exists to suppress mypy error
continue
key, _, value, _ = match.groups()
link.add(key, value)
key = link.get("rel", url) # type: ignore
link.add("url", self.url.join(URL(url)))
links.add(key, MultiDictProxy(link))
return MultiDictProxy(links)
def update_headers(self, headers: Optional[LooseHeaders]) -> None:
"""Update request headers."""
self.headers = CIMultiDict() # type: CIMultiDict[str]
# add host
netloc = cast(str, self.url.raw_host)
if helpers.is_ipv6_address(netloc):
netloc = '[{}]'.format(netloc)
if not self.url.is_default_port():
netloc += ':' + str(self.url.port)
self.headers[hdrs.HOST] = netloc
if headers:
if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
headers = headers.items() # type: ignore
for key, value in headers:
# A special case for Host header
if key.lower() == 'host':
self.headers[key] = value
else:
self.headers.add(key, value)
def isasyncgenfunction(obj: Any) -> bool:
func = getattr(inspect, 'isasyncgenfunction', None)
if func is not None:
return func(obj)
else:
return False
@attr.s(frozen=True, slots=True)
class MimeType:
type = attr.ib(type=str)
subtype = attr.ib(type=str)
suffix = attr.ib(type=str)
parameters = attr.ib(type=MultiDictProxy) # type: MultiDictProxy[str]
@functools.lru_cache(maxsize=56)
def parse_mimetype(mimetype: str) -> MimeType:
"""Parses a MIME type into its components.
mimetype is a MIME type string.
Returns a MimeType object.
Example:
>>> parse_mimetype('text/html; charset=utf-8')
MimeType(type='text', subtype='html', suffix='',
parameters={'charset': 'utf-8'})
def get_input_validation_error(self, data_to_validate: typing.Any) -> ProcessValidationError:
"""
Return an ProcessValidationError containing validation
detail error for input data
:param data_to_validate: data to use to generate validation error
:return:
"""
# Prevent serpyco error when Rrequest context give us a MultiDictProxy
if isinstance(data_to_validate, (MultiDictProxy, MultiDict)):
data_to_validate = dict(data_to_validate)
try:
self.serializer.load(data_to_validate)
raise WorkflowException("Serializer should raise an exception here")
except ValidationError as exc:
return ProcessValidationError(
message='Validation error of input data: "{}"'.format(exc.args[0]),
details=exc.args[1],
original_exception=exc,
)
except Exception as exc:
self._logger.exception(
'Unknown error during serpyco load: "{}": "{}"'.format(type(exc).__name__, str(exc))
)
return ProcessValidationError(