Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_write_data():
"""Test send data over MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
await channel.write(b"test")
assert not output.empty()
message = output.get_nowait()
assert message.channel_id == channel.uuid
assert message.flow_type == CHANNEL_FLOW_DATA
assert message.data == b"test"
async def test_write_data_peer_error(raise_timeout):
"""Test send data over MultiplexerChannel but peer don't response."""
output = asyncio.Queue(1)
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
# fill peer queue
output.put_nowait(None)
with pytest.raises(MultiplexerTransportError):
await channel.write(b"test")
async def test_init_client_peer_wait(peer_listener, peer_manager, test_endpoint):
"""Test setup of ClientPeer."""
client = ClientPeer("127.0.0.1", "8893")
connector = Connector("127.0.0.1", "8822")
assert not client.is_connected
assert not peer_manager.peer_available("localhost")
valid = datetime.utcnow() + timedelta(days=1)
aes_key = os.urandom(32)
aes_iv = os.urandom(16)
hostname = "localhost"
fernet_token = create_peer_config(valid.timestamp(), hostname, aes_key, aes_iv)
await client.start(connector, fernet_token, aes_key, aes_iv)
await asyncio.sleep(0.1)
assert peer_manager.peer_available("localhost")
assert client.is_connected
await writer_peer.drain()
await asyncio.sleep(0.1)
assert server.peers.peer_available(hostname)
async def mock_new_channel(multiplexer, channel):
"""Mock new channel."""
while True:
message = await channel.read()
peer_messages.append(message)
peer_address.append(channel.ip_address)
_, writer_ssl = await asyncio.open_connection(host="127.0.0.1", port="32000")
multiplexer = Multiplexer(crypto, reader_peer, writer_peer, mock_new_channel)
writer_ssl.write(TLS_1_2)
await writer_ssl.drain()
await asyncio.sleep(0.1)
assert peer_messages
assert peer_messages[0] == TLS_1_2
assert peer_address
assert peer_address[0] == IP_ADDR
peer = server.peers.get_peer(hostname)
assert peer._multiplexer._throttling == 0.002
multiplexer.shutdown()
await multiplexer.wait()
await asyncio.sleep(0.1)
token = hashlib.sha256(crypto.decrypt(token)).digest()
test_client_peer.writer.write(crypto.encrypt(token))
await test_client_peer.writer.drain()
await asyncio.sleep(0.1)
assert peer_manager.peer_available(hostname)
async def mock_new_channel(multiplexer, channel):
"""Mock new channel."""
while True:
message = await channel.read()
peer_messages.append(message)
peer_address.append(channel.ip_address)
multiplexer = Multiplexer(
crypto, test_client_peer.reader, test_client_peer.writer, mock_new_channel
)
test_client_ssl.writer.write(TLS_1_2)
await test_client_ssl.writer.drain()
await asyncio.sleep(0.1)
assert peer_messages
assert peer_messages[0] == TLS_1_2
assert peer_address
assert peer_address[0] == IP_ADDR
multiplexer.shutdown()
await multiplexer.wait()
await asyncio.sleep(0.1)
async def test_write_data():
"""Test send data over MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
await channel.write(b"test")
assert not output.empty()
message = output.get_nowait()
assert message.channel_id == channel.uuid
assert message.flow_type == CHANNEL_FLOW_DATA
assert message.data == b"test"
client.writer.write(crypto.encrypt(token))
await client.writer.drain()
await asyncio.sleep(0.1)
assert init_task.exception() is None
assert init_task.done()
assert peer.is_ready
assert peer.is_connected
ping_task = loop.create_task(peer.multiplexer.ping())
await asyncio.sleep(0.1)
ping_data = await client.reader.read(1024)
ping = crypto.decrypt(ping_data)
assert ping[16] == CHANNEL_FLOW_PING
assert int.from_bytes(ping[17:21], "big") == 0
assert ping[21:25] == b"ping"
ping_task.cancel()
client.writer.close()
client.close.set()
await asyncio.sleep(0.1)
assert peer.multiplexer.wait().done()
async def test_read_data_on_close():
"""Test send data over MultiplexerChannel on close."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
assert not channel.closing
channel.close()
with pytest.raises(MultiplexerTransportClose):
data = await channel.read()
assert channel.closing
):
"""Test and init a connector with whitelist bad requests."""
assert not test_endpoint
connector = Connector("127.0.0.1", "8822", True)
multiplexer_client._new_connections = connector.handler
connector.whitelist.add(IP_ADDR)
assert IP_ADDR in connector.whitelist
assert BAD_ADDR not in connector.whitelist
channel = await multiplexer_server.create_channel(BAD_ADDR)
await asyncio.sleep(0.1)
assert not test_endpoint
with pytest.raises(MultiplexerTransportClose):
await channel.read()
async def test_write_data_after_close():
"""Test send data over MultiplexerChannel."""
output = asyncio.Queue()
channel = MultiplexerChannel(output, IP_ADDR)
assert isinstance(channel.uuid, UUID)
assert not channel.closing
channel.close()
with pytest.raises(MultiplexerTransportClose):
await channel.write(b"test")
assert channel.closing