Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_inbound_bad_zlib_decoder_end_state(self, monkeypatch: MonkeyPatch) -> None:
compressed_payload = b"x" * 23
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=True, extensions=[ext])
result = ext.frame_inbound_header(
proto,
fp.Opcode.BINARY,
fp.RsvBits(True, False, False),
len(compressed_payload),
)
assert isinstance(result, fp.RsvBits)
assert result.rsv1
class FailDecompressor:
def decompress(self, data: bytes) -> bytes:
return b""
def flush(self) -> None:
raise zlib.error()
def test_client_decompress_after_uncompressible_frame(self, client: bool) -> None:
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=client, extensions=[ext])
# A PING frame
result = ext.frame_inbound_header(
proto, fp.Opcode.PING, fp.RsvBits(False, False, False), 0
)
result2 = ext.frame_inbound_payload_data(proto, b"")
assert not isinstance(result2, fp.CloseReason)
assert ext.frame_inbound_complete(proto, True) is None
# A compressed TEXT frame
payload = b"x" * 23
compressed_payload = b"\xaa\xa8\xc0\n\x00\x00"
result3 = ext.frame_inbound_header(
proto,
fp.Opcode.TEXT,
def test_client_side_masking_two_byte_frame(self) -> None:
proto = fp.FrameProtocol(client=True, extensions=[])
payload = b"x" * 126
data = proto.send_data(payload, fin=True)
assert data[0] == 0x82
assert data[1] == 0xFE
assert struct.unpack("!H", data[2:4])[0] == len(payload)
masking_key = data[4:8]
maskbytes = itertools.cycle(masking_key)
assert data[8:] == bytearray(b ^ next(maskbytes) for b in bytearray(payload))
def test_inbound_bad_zlib_payload(self) -> None:
compressed_payload = b"x" * 23
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=True, extensions=[ext])
result = ext.frame_inbound_header(
proto,
fp.Opcode.BINARY,
fp.RsvBits(True, False, False),
len(compressed_payload),
)
assert isinstance(result, fp.RsvBits)
assert result.rsv1
result2 = ext.frame_inbound_payload_data(proto, compressed_payload)
assert result2 is fp.CloseReason.INVALID_FRAME_PAYLOAD_DATA
def test_outbound_uncompressible_opcode(self) -> None:
ext = wpext.PerMessageDeflate()
ext._enabled = True
proto = fp.FrameProtocol(client=True, extensions=[ext])
rsv = fp.RsvBits(False, False, False)
payload = b"x" * 23
rsv, data = ext.frame_outbound(proto, fp.Opcode.PING, rsv, payload, True)
assert rsv.rsv1 is False
assert data == payload
def test_decompressor_reset(self, client: bool, no_context_takeover: bool) -> None:
if client:
args = {"server_no_context_takeover": no_context_takeover}
else:
args = {"client_no_context_takeover": no_context_takeover}
ext = wpext.PerMessageDeflate(**args)
ext._enabled = True
proto = fp.FrameProtocol(client=client, extensions=[ext])
result = ext.frame_inbound_header(
proto, fp.Opcode.BINARY, fp.RsvBits(True, False, False), 0
)
assert isinstance(result, fp.RsvBits)
assert result.rsv1
assert ext._decompressor is not None
result2 = ext.frame_inbound_complete(proto, True)
assert not isinstance(result2, fp.CloseReason)
if no_context_takeover:
assert ext._decompressor is None
else:
assert ext._decompressor is not None
def test_message_length_max_short(self) -> None:
proto = fp.FrameProtocol(client=False, extensions=[])
payload = b"x" * 125
data = proto.send_data(payload, fin=True)
assert data == b"\x82" + bytearray([len(payload)]) + payload
def test_ping_with_payload(self) -> None:
proto = fp.FrameProtocol(client=False, extensions=[])
payload = r"ÂŻ\_(ă)_/ÂŻ".encode("utf8")
data = proto.ping(payload)
assert data == b"\x89" + bytearray([len(payload)]) + payload
def test_outbound_handling_single_frame(self) -> None:
ext = self.FakeExtension()
proto = fp.FrameProtocol(client=False, extensions=[ext])
payload = "đđđđ"
data = proto.send_data(payload, fin=True)
payload_bytes = (payload + "ÂŽ").encode("utf8")
assert data == b"\x91" + bytearray([len(payload_bytes)]) + payload_bytes
def test_mismatched_data_messages2(self) -> None:
proto = fp.FrameProtocol(client=False, extensions=[])
payload = b"it's all just ascii, right?"
data = proto.send_data(payload, fin=False)
assert data == b"\x02" + bytearray([len(payload)]) + payload
payload_str = "âď¸âď¸â
âď¸â"
with pytest.raises(TypeError):
proto.send_data(payload_str)