Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _request_mock(self, orig_self: ClientSession,
method: str, url: 'Union[URL, str]',
*args: Tuple,
**kwargs: Dict) -> 'ClientResponse':
"""Return mocked response object or raise connection error."""
if orig_self.closed:
raise RuntimeError('Session is closed')
url = normalize_url(merge_params(url, kwargs.get('params')))
url_str = str(url)
for prefix in self._passthrough:
if url_str.startswith(prefix):
return (await self.patcher.temp_original(
orig_self, method, url, *args, **kwargs
))
response = await self.match(method, url, **kwargs)
key = (method, url)
self.requests.setdefault(key, [])
self.requests[key].append(RequestCall(args, copy.deepcopy(kwargs)))
if response is None:
raise ClientConnectionError(
'Connection refused: {} {}'.format(method, url)