Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def response_closed(self) -> None:
logger.trace(
f"response_closed "
f"our_state={self.h11_state.our_state!r} "
f"their_state={self.h11_state.their_state}"
)
if (
self.h11_state.our_state is h11.DONE
and self.h11_state.their_state is h11.DONE
):
# Get ready for another request/response cycle.
self.h11_state.start_next_cycle()
else:
await self.close()
if self.on_release is not None:
await self.on_release()
def _after_request(self, stream_id: int, future: asyncio.Future) -> None:
super()._after_request(stream_id, future)
if self.connection.our_state is h11.DONE:
self.connection.start_next_cycle()
self._handle_events()
async def recycle_or_close(self) -> None:
if self.connection.our_state is h11.DONE:
await self.app_send_channel.aclose()
await self.app_receive_channel.aclose()
self.connection.start_next_cycle()
self.app_send_channel, self.app_receive_channel = trio.open_memory_channel(10)
self.response = None
self.scope = None
self.state = ASGIHTTPState.REQUEST
else:
raise MustCloseError()
def on_response_complete(self):
self.server_state.total_requests += 1
if self.transport.is_closing():
return
# Set a short Keep-Alive timeout.
self.timeout_keep_alive_task = self.loop.call_later(
self.timeout_keep_alive, self.timeout_keep_alive_handler
)
# Unpause data reads if needed.
self.flow.resume_reading()
# Unblock any pipelined events.
if self.conn.our_state is h11.DONE and self.conn.their_state is h11.DONE:
self.conn.start_next_cycle()
self.handle_events()
state machine and connection or not, and if not, closes the socket and
state machine.
This method is safe to call multiple times.
"""
# The logic here is as follows. Once we've got EndOfMessage, only two
# things can be true. Either a) the connection is suitable for
# connection re-use per RFC 7230, or b) it is not. h11 signals this
# difference by what happens when you call `next_event()`.
#
# If the connection is safe to re-use, when we call `next_event()`
# we'll get back a h11.NEED_DATA and the state machine will be reset to
# (IDLE, IDLE). If it's not, we'll get either ConnectionClosed or we'll
# find that our state is MUST_CLOSE, and then we should close the
# connection accordingly.
continue_states = (h11.IDLE, h11.DONE)
event = self._state_machine.next_event()
our_state = self._state_machine.our_state
their_state = self._state_machine.their_state
must_close = (
event is not h11.NEED_DATA or
our_state not in continue_states or
their_state not in continue_states
)
if must_close:
self.close()
elif our_state is h11.DONE and their_state is h11.DONE:
self._state_machine.start_next_cycle()
def handle(self):
self._logger.info('new connection from %s', self.client_address[0])
try:
while True:
# pylint: disable=protected-access
event = self.receive_event()
if isinstance(event, h11.Request): # not `ConnectionClosed`
# `RulesContext` takes care of handling one complete
# request/response cycle.
RulesContext(self.server.compiled_rules, self)._run(event)
self._logger.debug('states: %r', self._hconn.states)
if self._hconn.states == {h11.CLIENT: h11.DONE,
h11.SERVER: h11.DONE}:
# Connection persists, proceed to the next cycle.
self._hconn.start_next_cycle()
else:
# Connection has to be closed (e.g. because HTTP/1.0
# or because somebody sent "Connection: close").
break
except Exception as e:
self._logger.error('error: %s', e)
self._logger.debug('states: %r', self._hconn.states)
if self._hconn.our_state in [h11.SEND_RESPONSE, h11.IDLE]:
self._send_fatal_error(e)
def _after_request(self, stream_id: int, future: asyncio.Future) -> None:
super()._after_request(stream_id, future)
if self.connection.our_state is h11.DONE:
self.connection.start_next_cycle()
self._handle_events()
async def _release(self) -> None:
assert self._writer is not None
if (
self._h11_state.our_state is h11.DONE
and self._h11_state.their_state is h11.DONE
):
self._h11_state.start_next_cycle()
else:
await self.close()
if self.on_release is not None:
await self.on_release(self)
async def _maybe_recycle(self) -> None:
await self._close_stream()
if self.connection.our_state is h11.DONE:
try:
self.connection.start_next_cycle()
except h11.LocalProtocolError:
await self.send(Closed())
else:
self.response = None
self.scope = None
await self.can_read.set()
await self.send(Updated())
else:
await self.can_read.set()
await self.send(Closed())
def recycle_or_close(self, future: asyncio.Future) -> None:
if self.connection.our_state is h11.DONE:
try:
self.connection.start_next_cycle()
except h11.LocalProtocolError:
self.close()
else:
self.transport.resume_reading() # type: ignore
self.app_queue = asyncio.Queue(loop=self.loop)
self.response = None
self.scope = None
self.state = ASGIHTTPState.REQUEST
self.start_keep_alive_timeout()
self.handle_events()
else: # Either reached a good close state, or has errored
self.close()