How to use the oscpy.parser.read_message function in oscpy

To help you get started, we’ve selected a few oscpy examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github kivy / oscpy / tests / performances.py View on Github external
while time() < timeout:
        n += 1
        p, s = format_message(b'/address', pattern)

    size = len(p) / 1000
    print(
        f"formated message {n} times ({n / DURATION}/s) "
        f"({n * size / DURATION:.2f}MB/s)"
    )

    n = 0
    timeout = time() + DURATION

    while time() < timeout:
        n += 1
        read_message(p)

    print(
        f"parsed message {n} times ({n / DURATION}/s) "
        f"({n * size / DURATION:.2f}MB/s)"
    )


    n = 0
    timeout = time() + DURATION

    while time() < timeout:
        n += 1
        read_message(format_message(b'/address', pattern)[0])

    print(
        f"round-trip {n} times ({n / DURATION}/s) "
github kivy / oscpy / tests / test_parser.py View on Github external
def test_format_encoding():
    s = u'éééààà'
    with raises(TypeError):
        read_message(format_message('/test', [s])[0])

    assert read_message(format_message('/test', [s], encoding='utf8')[0])[2][0] == s.encode('utf8')  # noqa
    assert read_message(format_message('/test', [s], encoding='utf8')[0], encoding='utf8')[2][0] == s  # noqa

    with raises(UnicodeEncodeError):
        format_message('/test', [s], encoding='ascii')  # noqa

    with raises(UnicodeDecodeError):
        read_message(format_message('/test', [s], encoding='utf8')[0], encoding='ascii')  # noqa

    assert read_message(
        format_message('/test', [s], encoding='utf8')[0],
        encoding='ascii', encoding_errors='ignore'
    )[2][0] == ''

    assert read_message(
        format_message('/test', [s], encoding='utf8')[0],
        encoding='ascii', encoding_errors='replace'
    )[2][0] == u'������������'
github kivy / oscpy / tests / test_parser.py View on Github external
def test_format_encoding():
    s = u'éééààà'
    with raises(TypeError):
        read_message(format_message('/test', [s])[0])

    assert read_message(format_message('/test', [s], encoding='utf8')[0])[2][0] == s.encode('utf8')  # noqa
    assert read_message(format_message('/test', [s], encoding='utf8')[0], encoding='utf8')[2][0] == s  # noqa

    with raises(UnicodeEncodeError):
        format_message('/test', [s], encoding='ascii')  # noqa

    with raises(UnicodeDecodeError):
        read_message(format_message('/test', [s], encoding='utf8')[0], encoding='ascii')  # noqa

    assert read_message(
        format_message('/test', [s], encoding='utf8')[0],
        encoding='ascii', encoding_errors='ignore'
    )[2][0] == ''

    assert read_message(
        format_message('/test', [s], encoding='utf8')[0],
        encoding='ascii', encoding_errors='replace'
    )[2][0] == u'������������'
github kivy / oscpy / tests / test_parser.py View on Github external
def test_read_message_wrong_address():
    msg, stat = format_message(b'test', [])
    with raises(ValueError, match="doesn't start with a '/'") as e:
        address, tags, values, size = read_message(msg)
github kivy / oscpy / tests / test_parser.py View on Github external
def test_format_encoding():
    s = u'éééààà'
    with raises(TypeError):
        read_message(format_message('/test', [s])[0])

    assert read_message(format_message('/test', [s], encoding='utf8')[0])[2][0] == s.encode('utf8')  # noqa
    assert read_message(format_message('/test', [s], encoding='utf8')[0], encoding='utf8')[2][0] == s  # noqa

    with raises(UnicodeEncodeError):
        format_message('/test', [s], encoding='ascii')  # noqa

    with raises(UnicodeDecodeError):
        read_message(format_message('/test', [s], encoding='utf8')[0], encoding='ascii')  # noqa

    assert read_message(
        format_message('/test', [s], encoding='utf8')[0],
        encoding='ascii', encoding_errors='ignore'
    )[2][0] == ''

    assert read_message(
        format_message('/test', [s], encoding='utf8')[0],
        encoding='ascii', encoding_errors='replace'
github kivy / oscpy / tests / test_parser.py View on Github external
def test_read_broken_message():
    # a message where ',' starting the list of tags has been replaced
    # with \x00
    s = b'/tmp\x00\x00\x00\x00\x00i\x00\x00\x00\x00\x00\x01'
    with raises(ValueError):
        read_message(s)
github kivy / oscpy / tests / test_parser.py View on Github external
def test_read_message():
    source, msg, result = message_1
    msg = struct.pack('>%iB' % len(msg), *msg)
    address, tags, values, size = read_message(msg)
    assert address == result[0]
    assert values == result[1]

    source, msg, result = message_2
    msg = struct.pack('>%iB' % len(msg), *msg)
    address, tags, values, size = read_message(msg)
    assert address == result[0]
    assert tags == b'iisff'
    assert values == result[1]
github kivy / oscpy / tests / test_parser.py View on Github external
def test_read_bundle():
    pad = padded(len('#bundle'))
    data = struct.pack('>%isQ' % pad, b'#bundle', 1)

    tests = (
        message_1,
        message_2,
        message_1,
        message_2,
    )

    for source, msg, result in tests:
        msg = struct.pack('>%iB' % len(msg), *msg)
        assert read_message(msg)[::2] == result
        data += struct.pack('>i', len(msg)) + msg

    timetag, messages = read_bundle(data)
    for test, r in zip(tests, messages):
        assert (r[0], r[2]) == test[2]