Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _pending(self):
# I get hanging notes if MAX_EVENTS > 1, so I'll have to
# resort to calling Pm_Read() in a loop until there are no
# more pending events.
max_events = 1
# Todo: this should be allocated once
BufferType = pm.PmEvent * max_events
read_buffer = BufferType()
while pm.lib.Pm_Poll(self._stream):
# Read one message. Should return 1.
# If num_events < 0, an error occured.
length = 1 # Buffer length
num_events = pm.lib.Pm_Read(self._stream, read_buffer, length)
_check_error(num_events)
# Get the event
event = read_buffer[0]
# print('Received: {:x}'.format(event.message))
# The bytes of the message are stored like this:
# 0x00201090 -> (0x90, 0x10, 0x10)
# (Todo: not sure if this is correct.)
packed_message = event.message & 0xffffffff
for i in range(4):
byte = packed_message & 0xff
self._parser.feed_byte(byte)
packed_message >>= 8
def _get_default_device(get_input):
if get_input:
device_id = pm.lib.Pm_GetDefaultInputDeviceID()
else:
device_id = pm.lib.Pm_GetDefaultOutputDeviceID()
if device_id < 0:
raise IOError('no default port found')
return _get_device(device_id)
def _close(self):
self.callback = None
_check_error(pm.lib.Pm_Close(self._stream))
_state['port_count'] -= 1
def _receive(self, block=True):
# Since there is no blocking read in PortMidi, the block
# flag is ignored and the enclosing receive() takes care
# of blocking.
# Allocate buffer.
# I get hanging notes if MAX_EVENTS > 1, so I'll have to
# resort to calling Pm_Read() in a loop until there are no
# more pending events.
max_events = 1
BufferType = pm.PmEvent * max_events
read_buffer = BufferType()
# Read available data from the stream and feed it to the parser.
while pm.lib.Pm_Poll(self._stream):
# TODO: this should be allocated once
# Read one message. Should return 1.
# If num_events < 0, an error occured.
length = 1 # Buffer length
num_events = pm.lib.Pm_Read(self._stream, read_buffer, length)
_check_error(num_events)
# Get the event
event = read_buffer[0]
# print('Received: {:x}'.format(event.message))
# The bytes of the message are stored like this:
# 0x00201090 -> (0x90, 0x10, 0x10)
# (TODO: not sure if this is correct.)
packed_message = event.message & 0xffffffff
def _get_device(device_id):
info_pointer = pm.lib.Pm_GetDeviceInfo(device_id)
if not info_pointer:
raise IOError('PortMidi device with id={} not found'.format(
device_id))
info = info_pointer.contents
return {
'id': device_id,
'interface': info.interface.decode('utf-8'),
'name': info.name.decode('utf-8'),
'is_input': info.is_input,
'is_output': info.is_output,
'opened': bool(info.opened),
}
def _close(self):
_check_error(pm.lib.Pm_Close(self._stream))
def get_devices(**kwargs):
"""Return a list of devices as dictionaries."""
_refresh_port_list()
return [_get_device(i) for i in range(pm.lib.Pm_CountDevices())]