Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
while time() < timeout:
n += 1
p, s = format_message(b'/address', pattern)
size = len(p) / 1000
print(
f"formated message {n} times ({n / DURATION}/s) "
f"({n * size / DURATION:.2f}MB/s)"
)
n = 0
timeout = time() + DURATION
while time() < timeout:
n += 1
read_message(p)
print(
f"parsed message {n} times ({n / DURATION}/s) "
f"({n * size / DURATION:.2f}MB/s)"
)
n = 0
timeout = time() + DURATION
while time() < timeout:
n += 1
read_message(format_message(b'/address', pattern)[0])
print(
f"round-trip {n} times ({n / DURATION}/s) "
def test_format_encoding():
s = u'éééààà'
with raises(TypeError):
read_message(format_message('/test', [s])[0])
assert read_message(format_message('/test', [s], encoding='utf8')[0])[2][0] == s.encode('utf8') # noqa
assert read_message(format_message('/test', [s], encoding='utf8')[0], encoding='utf8')[2][0] == s # noqa
with raises(UnicodeEncodeError):
format_message('/test', [s], encoding='ascii') # noqa
with raises(UnicodeDecodeError):
read_message(format_message('/test', [s], encoding='utf8')[0], encoding='ascii') # noqa
assert read_message(
format_message('/test', [s], encoding='utf8')[0],
encoding='ascii', encoding_errors='ignore'
)[2][0] == ''
assert read_message(
format_message('/test', [s], encoding='utf8')[0],
encoding='ascii', encoding_errors='replace'
)[2][0] == u'������������'
def test_format_encoding():
s = u'éééààà'
with raises(TypeError):
read_message(format_message('/test', [s])[0])
assert read_message(format_message('/test', [s], encoding='utf8')[0])[2][0] == s.encode('utf8') # noqa
assert read_message(format_message('/test', [s], encoding='utf8')[0], encoding='utf8')[2][0] == s # noqa
with raises(UnicodeEncodeError):
format_message('/test', [s], encoding='ascii') # noqa
with raises(UnicodeDecodeError):
read_message(format_message('/test', [s], encoding='utf8')[0], encoding='ascii') # noqa
assert read_message(
format_message('/test', [s], encoding='utf8')[0],
encoding='ascii', encoding_errors='ignore'
)[2][0] == ''
assert read_message(
format_message('/test', [s], encoding='utf8')[0],
encoding='ascii', encoding_errors='replace'
)[2][0] == u'������������'
def test_read_message_wrong_address():
msg, stat = format_message(b'test', [])
with raises(ValueError, match="doesn't start with a '/'") as e:
address, tags, values, size = read_message(msg)
def test_format_encoding():
s = u'éééààà'
with raises(TypeError):
read_message(format_message('/test', [s])[0])
assert read_message(format_message('/test', [s], encoding='utf8')[0])[2][0] == s.encode('utf8') # noqa
assert read_message(format_message('/test', [s], encoding='utf8')[0], encoding='utf8')[2][0] == s # noqa
with raises(UnicodeEncodeError):
format_message('/test', [s], encoding='ascii') # noqa
with raises(UnicodeDecodeError):
read_message(format_message('/test', [s], encoding='utf8')[0], encoding='ascii') # noqa
assert read_message(
format_message('/test', [s], encoding='utf8')[0],
encoding='ascii', encoding_errors='ignore'
)[2][0] == ''
assert read_message(
format_message('/test', [s], encoding='utf8')[0],
encoding='ascii', encoding_errors='replace'
def test_read_broken_message():
# a message where ',' starting the list of tags has been replaced
# with \x00
s = b'/tmp\x00\x00\x00\x00\x00i\x00\x00\x00\x00\x00\x01'
with raises(ValueError):
read_message(s)
def test_read_message():
source, msg, result = message_1
msg = struct.pack('>%iB' % len(msg), *msg)
address, tags, values, size = read_message(msg)
assert address == result[0]
assert values == result[1]
source, msg, result = message_2
msg = struct.pack('>%iB' % len(msg), *msg)
address, tags, values, size = read_message(msg)
assert address == result[0]
assert tags == b'iisff'
assert values == result[1]
def test_read_bundle():
pad = padded(len('#bundle'))
data = struct.pack('>%isQ' % pad, b'#bundle', 1)
tests = (
message_1,
message_2,
message_1,
message_2,
)
for source, msg, result in tests:
msg = struct.pack('>%iB' % len(msg), *msg)
assert read_message(msg)[::2] == result
data += struct.pack('>i', len(msg)) + msg
timetag, messages = read_bundle(data)
for test, r in zip(tests, messages):
assert (r[0], r[2]) == test[2]