Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def build_ping_frame(self, ping_data, flags=None):
"""
Builds a single Ping frame.
"""
f = PingFrame(0)
f.opaque_data = ping_data
if flags:
f.flags = set(flags)
return f
def test_ping_frame_serializes_properly(self):
f = PingFrame()
f.parse_flags(0xFF)
f.opaque_data = b'\x01\x02'
s = f.serialize()
assert s == (
b'\x00\x00\x08\x06\x01\x00\x00\x00\x00\x01\x02\x00\x00\x00\x00\x00'
b'\x00'
# error.
self._closed_streams = {}
# The flow control window manager for the connection.
self._inbound_flow_control_window_manager = WindowManager(
max_window_size=self.local_settings.initial_window_size
)
# When in doubt use dict-dispatch.
self._frame_dispatch_table = {
HeadersFrame: self._receive_headers_frame,
PushPromiseFrame: self._receive_push_promise_frame,
SettingsFrame: self._receive_settings_frame,
DataFrame: self._receive_data_frame,
WindowUpdateFrame: self._receive_window_update_frame,
PingFrame: self._receive_ping_frame,
RstStreamFrame: self._receive_rst_stream_frame,
PriorityFrame: self._receive_priority_frame,
GoAwayFrame: self._receive_goaway_frame,
ContinuationFrame: self._receive_naked_continuation,
AltSvcFrame: self._receive_alt_svc_frame,
ExtensionFrame: self._receive_unknown_frame
}
def test_ping_frame_parses_properly(self):
s = (
b'\x00\x00\x08\x06\x01\x00\x00\x00\x00\x01\x02\x00\x00\x00\x00\x00'
b'\x00'
)
f = decode_frame(s)
assert isinstance(f, PingFrame)
assert f.flags == set(['ACK'])
assert f.opaque_data == b'\x01\x02\x00\x00\x00\x00\x00\x00'
assert f.body_len == 8
def test_ping_frame_has_only_one_flag(self):
f = PingFrame()
flags = f.parse_flags(0xFF)
assert flags == set(['ACK'])
self.type,
flags,
self.stream_id & 0x7FFFFFFF # Stream ID is 32 bits.
)
return header + self.body
_FRAME_CLASSES = [
DataFrame,
HeadersFrame,
PriorityFrame,
RstStreamFrame,
SettingsFrame,
PushPromiseFrame,
PingFrame,
GoAwayFrame,
WindowUpdateFrame,
ContinuationFrame,
AltSvcFrame,
]
#: FRAMES maps the type byte for each frame to the class used to represent that
#: frame.
FRAMES = {cls.type: cls for cls in _FRAME_CLASSES}
def read_frame(self, hide=False):
while True:
frm = framereader.http2_read_frame(self.tcp_handler.rfile)
if not hide and self.dump_frames: # pragma no cover
print(frm.human_readable("<<"))
if isinstance(frm, hyperframe.frame.PingFrame):
raw_bytes = hyperframe.frame.PingFrame(flags=['ACK'], payload=frm.payload).serialize()
self.tcp_handler.wfile.write(raw_bytes)
self.tcp_handler.wfile.flush()
continue
if isinstance(frm, hyperframe.frame.SettingsFrame) and 'ACK' not in frm.flags:
self._apply_settings(frm.settings, hide)
if isinstance(frm, hyperframe.frame.DataFrame) and frm.flow_controlled_length > 0:
self._update_flow_control_window(frm.stream_id, frm.flow_controlled_length)
return frm
def _receive_ping_frame(self, frame):
"""
Receive a PING frame on the connection.
"""
events = self.state_machine.process_input(
ConnectionInputs.RECV_PING
)
flags = []
if 'ACK' in frame.flags:
evt = PingAcknowledged()
evt.ping_data = frame.opaque_data
events.append(evt)
else:
f = PingFrame(0)
f.flags = {'ACK'}
f.opaque_data = frame.opaque_data
flags.append(f)
return flags, events
size_limit=self.MAX_CLOSED_STREAMS
)
# The flow control window manager for the connection.
self._inbound_flow_control_window_manager = WindowManager(
max_window_size=self.local_settings.initial_window_size
)
# When in doubt use dict-dispatch.
self._frame_dispatch_table = {
HeadersFrame: self._receive_headers_frame,
PushPromiseFrame: self._receive_push_promise_frame,
SettingsFrame: self._receive_settings_frame,
DataFrame: self._receive_data_frame,
WindowUpdateFrame: self._receive_window_update_frame,
PingFrame: self._receive_ping_frame,
RstStreamFrame: self._receive_rst_stream_frame,
PriorityFrame: self._receive_priority_frame,
GoAwayFrame: self._receive_goaway_frame,
ContinuationFrame: self._receive_naked_continuation,
AltSvcFrame: self._receive_alt_svc_frame,
ExtensionFrame: self._receive_unknown_frame
}
def read_frame(self, hide=False):
while True:
frm = http2.parse_frame(*http2.read_raw_frame(self.tcp_handler.rfile))
if not hide and self.dump_frames: # pragma: no cover
print("<< " + repr(frm))
if isinstance(frm, hyperframe.frame.PingFrame):
raw_bytes = hyperframe.frame.PingFrame(flags=['ACK'], payload=frm.payload).serialize()
self.tcp_handler.wfile.write(raw_bytes)
self.tcp_handler.wfile.flush()
continue
if isinstance(frm, hyperframe.frame.SettingsFrame) and 'ACK' not in frm.flags:
self._apply_settings(frm.settings, hide)
if isinstance(frm, hyperframe.frame.DataFrame) and frm.flow_controlled_length > 0:
self._update_flow_control_window(frm.stream_id, frm.flow_controlled_length)
return frm