Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _refresh_port_list():
if _state['port_count'] == 0:
# If no ports are open we can reboot PortMidi
# to refresh the port list. This is a hack, but it's
# the only way to get an up-to-date list.
pm.lib.Pm_Terminate()
pm.lib.Pm_Initialize()
def _pending(self):
# I get hanging notes if MAX_EVENTS > 1, so I'll have to
# resort to calling Pm_Read() in a loop until there are no
# more pending events.
max_events = 1
# Todo: this should be allocated once
BufferType = pm.PmEvent * max_events
read_buffer = BufferType()
while pm.lib.Pm_Poll(self._stream):
# Read one message. Should return 1.
# If num_events < 0, an error occured.
length = 1 # Buffer length
num_events = pm.lib.Pm_Read(self._stream, read_buffer, length)
_check_error(num_events)
# Get the event
event = read_buffer[0]
# print('Received: {:x}'.format(event.message))
# The bytes of the message are stored like this:
# 0x00201090 -> (0x90, 0x10, 0x10)
if opening_input:
_check_error(pm.lib.Pm_OpenInput(
pm.byref(self._stream),
device['id'], # Input device
pm.null, # Input driver info
1000, # Buffer size
pm.NullTimeProcPtr, # Time callback
pm.null)) # Time info
else:
_check_error(pm.lib.Pm_OpenOutput(
pm.byref(self._stream),
device['id'], # Output device
pm.null, # Output diver info
0, # Buffer size
# (ignored when latency == 0?)
pm.NullTimeProcPtr, # Default to internal clock
pm.null, # Time info
0)) # Latency
self._device_type = 'PortMidi/{}'.format(device['interface'])
def _receive(self, block=True):
# Since there is no blocking read in PortMidi, the block
# flag is ignored and the enclosing receive() takes care
# of blocking.
# Allocate buffer.
# I get hanging notes if MAX_EVENTS > 1, so I'll have to
# resort to calling Pm_Read() in a loop until there are no
# more pending events.
max_events = 1
BufferType = pm.PmEvent * max_events
read_buffer = BufferType()
# Read available data from the stream and feed it to the parser.
while pm.lib.Pm_Poll(self._stream):
# TODO: this should be allocated once
# Read one message. Should return 1.
# If num_events < 0, an error occured.
length = 1 # Buffer length
num_events = pm.lib.Pm_Read(self._stream, read_buffer, length)
_check_error(num_events)
# Get the event
event = read_buffer[0]
# print('Received: {:x}'.format(event.message))
# The bytes of the message are stored like this:
def _open(self, **kwargs):
self._stream = pm.PortMidiStreamPtr()
opening_input = hasattr(self, 'receive')
if self.name is None:
device = _get_default_device(opening_input)
self.name = device['name']
else:
device = _get_named_device(self.name, opening_input)
if device['opened']:
if opening_input:
devtype = 'input'
else:
devtype = 'output'
raise IOError('{} port {!r} is already open'.format(devtype,
self.name))
def _send(self, message):
if message.type == 'sysex':
# Sysex messages are written as a string.
string = pm.c_char_p(bytes(message.bin()))
timestamp = 0 # Ignored when latency = 0
_check_error(pm.lib.Pm_WriteSysEx(self._stream, timestamp, string))
elif message.type == 'sysex_end':
# This crashes PortMidi, so it's best to ignore it.
pass
else:
# The bytes of a message as packed into a 32 bit integer.
packed_message = 0
for byte in reversed(message.bytes()):
packed_message <<= 8
packed_message |= byte
timestamp = 0 # Ignored when latency = 0
_check_error(pm.lib.Pm_WriteShort(self._stream,
timestamp,
packed_message))
def _check_error(return_value):
"""Raise IOError if return_value < 0.
The exception will be raised with the error message from PortMidi.
"""
if return_value == pm.pmHostError:
raise IOError('PortMidi Host Error: '
'{}'.format(pm.get_host_error_message()))
elif return_value < 0:
raise IOError(pm.lib.Pm_GetErrorText(return_value))
else:
devtype = 'output'
raise IOError('{} port {!r} is already open'.format(devtype,
self.name))
if opening_input:
_check_error(pm.lib.Pm_OpenInput(
pm.byref(self._stream),
device['id'], # Input device
pm.null, # Input driver info
1000, # Buffer size
pm.NullTimeProcPtr, # Time callback
pm.null)) # Time info
else:
_check_error(pm.lib.Pm_OpenOutput(
pm.byref(self._stream),
device['id'], # Output device
pm.null, # Output diver info
0, # Buffer size
# (ignored when latency == 0?)
pm.NullTimeProcPtr, # Default to internal clock
pm.null, # Time info
0)) # Latency
self._device_type = 'PortMidi/{}'.format(device['interface'])
def _get_default_device(get_input):
if get_input:
device_id = pm.lib.Pm_GetDefaultInputDeviceID()
else:
device_id = pm.lib.Pm_GetDefaultOutputDeviceID()
if device_id < 0:
raise IOError('no default port found')
return _get_device(device_id)
if device['opened']:
if self.is_input:
devtype = 'input'
else:
devtype = 'output'
raise IOError('{} port {!r} is already open'.format(devtype,
self.name))
if self.is_input:
_check_error(pm.lib.Pm_OpenInput(
pm.byref(self._stream),
device['id'], # Input device
pm.null, # Input driver info
1000, # Buffer size
pm.NullTimeProcPtr, # Time callback
pm.null)) # Time info
else:
_check_error(pm.lib.Pm_OpenOutput(
pm.byref(self._stream),
device['id'], # Output device
pm.null, # Output diver info
0, # Buffer size
# (ignored when latency == 0?)
pm.NullTimeProcPtr, # Default to internal clock
pm.null, # Time info
0)) # Latency
# This is set when we return, but the callback thread needs
# it to be False now (or it will just return right away.)
self.closed = False
_state['port_count'] += 1