Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_oscclient():
osc = OSCThreadServer()
sock = osc.listen()
port = sock.getsockname()[1]
acc = []
def success(*values):
acc.append(values[0])
osc.bind(b'/success', success, sock)
client = OSCClient('localhost', port)
timeout = time() + 5
while len(acc) < 50:
if time() > timeout:
raise OSError('timeout while waiting for success message.')
client.send_message(b'/success', [1])
while len(acc) < 100:
if time() > timeout:
raise OSError('timeout while waiting for success message.')
client.send_bundle(
[
(b'/success', [i])
for i in range(10)
def test_timetag():
osc = OSCThreadServer(drop_late_bundles=True)
osc.drop_late_bundles = True
sock = osc.listen()
port = sock.getsockname()[1]
acc = []
@osc.address(b'/success', sock)
def success(*values):
acc.append(True)
@osc.address(b'/failure', sock)
def failure(*values):
acc.append(False)
client = OSCClient('localhost', port)
timeout = time() + 5
while len(acc) < 50:
if time() > timeout:
raise OSError('timeout while waiting for success message.')
client.send_message(b'/success', [1])
while len(acc) < 100:
if time() > timeout:
raise OSError('timeout while waiting for success message.')
client.send_bundle(
[
(b'/failure', [i])
for i in range(10)
def build(self):
self.service = None
# self.start_service()
self.server = server = OSCThreadServer()
server.listen(
address=b'localhost',
port=3002,
default=True,
)
server.bind(b'/message', self.display_message)
server.bind(b'/date', self.date)
self.client = OSCClient(b'localhost', 3000)
self.root = Builder.load_string(KV)
return self.root
'p4a example service using oscpy to communicate with main application.'
from random import sample, randint
from string import ascii_letters
from time import localtime, asctime, sleep
from oscpy.server import OSCThreadServer
from oscpy.client import OSCClient
CLIENT = OSCClient('localhost', 3002)
def ping(*_):
'answer to ping messages'
CLIENT.send_message(
b'/message',
[
''.join(sample(ascii_letters, randint(10, 20)))
.encode('utf8'),
],
)
def send_date():
'send date to the application'
CLIENT.send_message(
def __init__(self, address, port):
self.address = address
self.port = port
self.osc = OSCClient(address, port)