Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def get_events(self) -> AsyncGenerator[h2.events.Event, None]:
while True:
await self.transport.updated.wait()
events = self.connection.receive_data(self.transport.data)
self.transport.clear()
for event in events:
if isinstance(event, h2.events.ConnectionTerminated):
self.transport.close()
elif isinstance(event, h2.events.DataReceived):
self.connection.acknowledge_received_data(
event.flow_controlled_length, event.stream_id,
)
self.server.data_received(self.connection.data_to_send())
yield event
if self.transport.closed.is_set():
break
def handle_server_event(cls, event, h2_conn, rfile, wfile):
if isinstance(event, h2.events.ConnectionTerminated):
return False
elif isinstance(event, h2.events.RequestReceived):
if event.stream_id != 1:
# ignore requests initiated by push promises
return True
h2_conn.send_headers(1, [(':status', '200')])
wfile.write(h2_conn.data_to_send())
wfile.flush()
h2_conn.push_stream(1, 2, [
(':authority', "127.0.0.1:{}".format(cls.port)),
(':method', 'GET'),
(':scheme', 'https'),
(':path', '/pushed_stream_foo'),
('foo', 'bar')
def test_connectionterminated_repr(self, additional_data, data_repr):
"""
ConnectionTerminated has a useful debug representation.
"""
e = h2.events.ConnectionTerminated()
e.error_code = h2.errors.ErrorCodes.INADEQUATE_SECURITY
e.last_stream_id = 33
e.additional_data = additional_data
assert repr(e) == (
"" % data_repr
)
else:
eid = event.stream_id
if isinstance(event, events.RequestReceived):
return self._handle_request_received(eid, event)
elif isinstance(event, events.ResponseReceived):
return self._handle_response_received(eid, event)
elif isinstance(event, events.DataReceived):
return self._handle_data_received(eid, event, source_conn)
elif isinstance(event, events.StreamEnded):
return self._handle_stream_ended(eid)
elif isinstance(event, events.StreamReset):
return self._handle_stream_reset(eid, event, is_server, other_conn)
elif isinstance(event, events.RemoteSettingsChanged):
return self._handle_remote_settings_changed(event, other_conn)
elif isinstance(event, events.ConnectionTerminated):
return self._handle_connection_terminated(event, is_server)
elif isinstance(event, events.PushedStreamReceived):
return self._handle_pushed_stream_received(event)
elif isinstance(event, events.PriorityUpdated):
return self._handle_priority_updated(eid, event)
elif isinstance(event, events.TrailersReceived):
raise NotImplementedError('TrailersReceived not implemented')
# fail-safe for unhandled events
return True
if not data:
raise ConnectionError("Server closed the connection")
for event in self.protocol.receive_data(data):
logger.debug("APN: %s", event)
stream_id = getattr(event, "stream_id", 0)
error = getattr(event, "error_code", None)
channel = self.channels.get(stream_id)
if isinstance(event, h2.events.RemoteSettingsChanged):
m = event.changed_settings.get(
h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS
)
if m:
self.max_concurrent_streams = m.new_value
elif isinstance(event, h2.events.ConnectionTerminated):
# When Apple is not happy with the whole connection,
# it sends smth like {"reason": "BadCertificateEnvironment"}
# Catch it here, so that connection pool can be invalidated.
self.closing = True
if not self.outcome:
if event.additional_data:
try:
self.outcome = json.loads(
event.additional_data.decode("utf-8")
)["reason"]
except Exception:
self.outcome = str(event.additional_data[:100])
else:
self.outcome = str(event.error_code)
logger.info("Closing with %s", self.outcome)
elif not stream_id and error is not None:
self.resetTimeout()
for event in events:
if isinstance(event, h2.events.RequestReceived):
self._requestReceived(event)
elif isinstance(event, h2.events.DataReceived):
self._requestDataReceived(event)
elif isinstance(event, h2.events.StreamEnded):
self._requestEnded(event)
elif isinstance(event, h2.events.StreamReset):
self._requestAborted(event)
elif isinstance(event, h2.events.WindowUpdated):
self._handleWindowUpdate(event)
elif isinstance(event, h2.events.PriorityUpdated):
self._handlePriorityUpdate(event)
elif isinstance(event, h2.events.ConnectionTerminated):
self.transport.loseConnection()
self.connectionLost(
ConnectionLost("Remote peer sent GOAWAY"),
_cancelTimeouts=False,
)
self._tryToWriteControlData()
# Dispatch table
self._event_handlers = {
events.RequestReceived: self._request_received,
events.ResponseReceived: self._response_received,
events.TrailersReceived: self._trailers_received,
events.DataReceived: self._data_received,
events.WindowUpdated: self._window_updated,
events.RemoteSettingsChanged: self._remote_settings_changed,
events.PingAcknowledged: self._ping_acknowledged,
events.StreamEnded: self._stream_ended,
events.StreamReset: self._stream_reset,
events.PushedStreamReceived: self._pushed_stream_received,
events.SettingsAcknowledged: self._settings_acknowledged,
events.PriorityUpdated: self._priority_updated,
events.ConnectionTerminated: self._connection_terminated,
}
self.connection.acknowledge_received_data(
event.flow_controlled_length, event.stream_id
)
elif isinstance(event, h2.events.StreamEnded):
await self.streams[event.stream_id].handle(EndBody(stream_id=event.stream_id))
elif isinstance(event, h2.events.StreamReset):
await self._close_stream(event.stream_id)
await self._window_updated(event.stream_id)
elif isinstance(event, h2.events.WindowUpdated):
await self._window_updated(event.stream_id)
elif isinstance(event, h2.events.PriorityUpdated):
await self._priority_updated(event)
elif isinstance(event, h2.events.RemoteSettingsChanged):
if h2.settings.SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
await self._window_updated(None)
elif isinstance(event, h2.events.ConnectionTerminated):
await self.send(Closed())
await self._flush()
if isinstance(event, h2.events.RequestReceived):
headers = CIMultiDict()
for name, value in event.headers:
headers.add(name.title(), value)
self.handle_request(
event.stream_id, headers[':method'].upper(), headers[':path'], headers,
)
elif isinstance(event, h2.events.DataReceived):
self.streams[event.stream_id].append(event.data)
elif isinstance(event, h2.events.StreamReset):
self.streams[event.stream_id].task.cancel()
elif isinstance(event, h2.events.StreamEnded):
self.streams[event.stream_id].complete()
elif isinstance(event, h2.events.WindowUpdated):
self._window_updated(event.stream_id)
elif isinstance(event, h2.events.ConnectionTerminated):
self.close()
return
self.write(self.connection.data_to_send()) # type: ignore
for event in events:
if isinstance(event, ResponseReceived):
self.handle_response (event.stream_id, event.headers)
elif isinstance(event, TrailersReceived):
self.handle_trailers (event.stream_id, event.headers)
elif isinstance(event, StreamReset):
if event.remote_reset:
h = self.get_handler (event.stream_id)
if h:
h.response = http_response.FailedResponse (721, respcodes.get (721), h.request)
h.handle_callback ()
self.remove_handler (event.stream_id)
elif isinstance(event, ConnectionTerminated):
self.asyncon.handle_close (721, "HTTP2 Connection Terminated")
elif isinstance (event, DataReceived):
h = self.get_handler (event.stream_id)
if h:
h.collect_incoming_data (event.data)
self.adjust_flow_control_window (event.stream_id)
elif isinstance(event, StreamEnded):
h = self.get_handler (event.stream_id)
if h:
self.remove_handler (event.stream_id)
h.found_terminator ()
self.send_data ()