Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_read_packet():
with raises(ValueError):
read_packet(struct.pack('>%is' % len('test'), b'test'))
"""
match = self._match_address
advanced_matching = self.advanced_matching
addresses = self.addresses
while True:
drop_late = self.drop_late_bundles
if not self.sockets:
sleep(.01)
continue
else:
read, write, error = select(self.sockets, [], [], self.timeout)
for sender_socket in read:
data, sender = sender_socket.recvfrom(65535)
for address, types, values, offset in read_packet(
data, drop_late=drop_late, encoding=self.encoding,
encoding_errors=self.encoding_errors
):
matched = False
if advanced_matching:
for sock, addr in addresses:
if sock == sender_socket and match(addr, address):
matched = True
for cb, get_address in addresses[(sock, addr)]:
if get_address:
cb(address, *values)
else:
cb(*values)
else:
if (sender_socket, address) in addresses: