Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
app_name: str = "") -> ffi.CData:
context_config = lib.ma_context_config_init()
context_config.threadPriority = thread_prio.value
context = ffi.new("ma_context*")
if app_name:
self._context_app_name = app_name.encode()
context_config.pulse.pApplicationName = ffi.from_buffer(self._context_app_name)
context_config.jack.pClientName = ffi.from_buffer(self._context_app_name)
if backends:
# use a context to supply a preferred backend list
backends_mem = ffi.new("ma_backend[]", len(backends))
for i, b in enumerate(backends):
backends_mem[i] = b.value
result = lib.ma_context_init(backends_mem, len(backends), ffi.addressof(context_config), context)
if result != lib.MA_SUCCESS:
raise MiniaudioError("cannot init context", result)
else:
result = lib.ma_context_init(ffi.NULL, 0, ffi.addressof(context_config), context)
if result != lib.MA_SUCCESS:
raise MiniaudioError("cannot init context", result)
return context
self._devconfig = lib.ma_device_config_init(lib.ma_device_type_playback)
self._devconfig.sampleRate = self.sample_rate
self._devconfig.playback.channels = self.nchannels
self._devconfig.playback.format = self.format.value
self._devconfig.playback.pDeviceID = device_id or ffi.NULL
self._devconfig.bufferSizeInMilliseconds = self.buffersize_msec
self._devconfig.pUserData = self.userdata_ptr
self._devconfig.dataCallback = lib._internal_data_callback
self._devconfig.stopCallback = lib._internal_stop_callback
self._devconfig.periods = callback_periods
self.callback_generator = None # type: Optional[PlaybackCallbackGeneratorType]
self._context = self._make_context(backends or [], thread_prio, app_name)
result = lib.ma_device_init(self._context, ffi.addressof(self._devconfig), self._device)
if result != lib.MA_SUCCESS:
raise MiniaudioError("failed to init device", result)
if self._device.pContext.backend == lib.ma_backend_null:
raise MiniaudioError("no suitable audio backend found")
self.backend = ffi.string(lib.ma_get_backend_name(self._device.pContext.backend)).decode()
def _samples_stream_generator(frames_to_read: int, nchannels: int, output_format: SampleFormat,
decoder: ffi.CData, data: Any,
on_close: Optional[Callable] = None) -> Generator[array.array, int, None]:
_reference = data # make sure any data passed in is not garbage collected
sample_width = _width_from_format(output_format)
samples_proto = _array_proto_from_format(output_format)
allocated_buffer_frames = max(frames_to_read, 16384)
try:
decodebuffer = ffi.new("int8_t[]", allocated_buffer_frames * nchannels * sample_width)
buf_ptr = ffi.cast("void *", decodebuffer)
want_frames = (yield samples_proto) or frames_to_read
while True:
if want_frames > allocated_buffer_frames:
raise MiniaudioError("wanted to read more frames than storage was allocated for ({} vs {})"
.format(want_frames, allocated_buffer_frames))
num_frames = lib.ma_decoder_read_pcm_frames(decoder, buf_ptr, want_frames)
if num_frames <= 0:
break
buffer = ffi.buffer(decodebuffer, num_frames * sample_width * nchannels)
samples = array.array(samples_proto.typecode)
samples.frombytes(buffer)
want_frames = (yield samples) or frames_to_read
finally:
if on_close:
on_close()
lib.ma_decoder_uninit(decoder)
def _array_proto_from_format(sampleformat: SampleFormat) -> array.array:
arrays = {
SampleFormat.UNSIGNED8: _create_int_array(1),
SampleFormat.SIGNED16: _create_int_array(2),
SampleFormat.SIGNED32: _create_int_array(4),
SampleFormat.FLOAT32: array.array('f')
}
if sampleformat in arrays:
return arrays[sampleformat]
raise MiniaudioError("the requested sample format can not be used directly: "
+ sampleformat.name + " (convert it first)")
def _format_from_width(sample_width: int, is_float: bool = False) -> SampleFormat:
if is_float:
return SampleFormat.FLOAT32
elif sample_width == 1:
return SampleFormat.UNSIGNED8
elif sample_width == 2:
return SampleFormat.SIGNED16
elif sample_width == 3:
return SampleFormat.SIGNED24
elif sample_width == 4:
return SampleFormat.SIGNED32
else:
raise MiniaudioError("unsupported sample width", sample_width)
def stream_file(info, filename):
def progress_stream_wrapper(stream) -> miniaudio.PlaybackCallbackGeneratorType:
framecount = yield(b"")
try:
while True:
framecount = yield stream.send(framecount)
print(".", end="", flush=True)
except StopIteration:
return
output_format = info.sample_format
try:
filestream = miniaudio.stream_file(filename, output_format=output_format, sample_rate=info.sample_rate)
except miniaudio.MiniaudioError as x:
print("Cannot create optimal stream:", x)
print("Creating stream with different sample format!")
output_format = miniaudio.SampleFormat.SIGNED16
filestream = miniaudio.stream_file(filename, output_format=output_format, sample_rate=info.sample_rate,
dither=miniaudio.DitherMode.TRIANGLE)
stream = progress_stream_wrapper(filestream)
next(stream) # start the generator
device = miniaudio.PlaybackDevice(output_format=output_format, sample_rate=info.sample_rate)
print("playback device backend:", device.backend, device.format.name, device.sample_rate, "hz")
device.start(stream)
input("Audio file playing in the background. Enter to stop playback: ")
device.close()
def start(self, callback_generator: GeneratorTypes, stop_callback: Union[Callable, None] = None) -> None:
"""Start playback or capture, using the given callback generator (should already been started)"""
if self.callback_generator:
raise MiniaudioError("can't start an already started device")
if not inspect.isgenerator(callback_generator):
raise TypeError("callback must be a generator", type(callback_generator))
self.callback_generator = callback_generator
self.stop_callback = stop_callback
result = lib.ma_device_start(self._device)
if result != lib.MA_SUCCESS:
raise MiniaudioError("failed to start audio device", result)
self.running = True