Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, samplerate: int = 0, samplewidth: int = 0, nchannels: int = 0, frames_per_chunk: int = 0) -> None:
super().__init__(samplerate, samplewidth, nchannels, frames_per_chunk, 0)
self.mixed_chunks = self.mixer.chunks()
output_format = {
1: miniaudio.SampleFormat.UNSIGNED8,
2: miniaudio.SampleFormat.SIGNED16,
3: miniaudio.SampleFormat.SIGNED24,
4: miniaudio.SampleFormat.SIGNED32
}[self.samplewidth]
buffersize_msec = self.nchannels * 1000 * self.frames_per_chunk // self.samplerate
self.mixed_chunks = self.mixer.chunks()
self.device = miniaudio.PlaybackDevice(output_format, self.nchannels, self.samplerate, buffersize_msec)
stream = self.generator()
next(stream) # start generator
self.device.start(stream)
def stream_any(source: StreamableSource, source_format: FileFormat = FileFormat.UNKNOWN,
output_format: SampleFormat = SampleFormat.SIGNED16, nchannels: int = 2,
sample_rate: int = 44100, frames_to_read: int = 1024,
dither: DitherMode = DitherMode.NONE, seek_frame: int = 0) -> Generator[array.array, int, None]:
"""
Convenience generator function to decode and stream any source of encoded audio data
(such as a network stream). Stream result is chunks of raw PCM samples in the chosen format.
If you send() a number into the generator rather than just using next() on it,
you'll get that given number of frames, instead of the default configured amount.
This is particularly useful to plug this stream into an audio device callback that
wants a variable number of frames per call.
"""
decoder = ffi.new("ma_decoder *")
decoder_config = lib.ma_decoder_config_init(output_format.value, nchannels, sample_rate)
decoder_config.ditherMode = dither.value
_callback_decoder_sources[id(source)] = source
source.userdata_ptr = ffi.new("char[]", struct.pack('q', id(source)))
decoder_init = {
def __init__(self, playback_format: SampleFormat = SampleFormat.SIGNED16,
playback_channels: int = 2, capture_format: SampleFormat = SampleFormat.SIGNED16,
capture_channels: int = 2, sample_rate: int = 44100, buffersize_msec: int = 200,
playback_device_id: Union[ffi.CData, None] = None, capture_device_id: Union[ffi.CData, None] = None,
callback_periods: int = 0, backends: Optional[List[Backend]] = None,
thread_prio: ThreadPriority = ThreadPriority.HIGHEST, app_name: str = "") -> None:
super().__init__()
self.capture_format = capture_format
self.playback_format = playback_format
self.sample_width = _width_from_format(capture_format)
self.capture_channels = capture_channels
self.playback_channels = playback_channels
self.sample_rate = sample_rate
self.buffersize_msec = buffersize_msec
_callback_data[id(self)] = self
self.userdata_ptr = ffi.new("char[]", struct.pack('q', id(self)))
self._devconfig = lib.ma_device_config_init(lib.ma_device_type_duplex)
def __init__(self, input_format: SampleFormat = SampleFormat.SIGNED16, nchannels: int = 2,
sample_rate: int = 44100, buffersize_msec: int = 200, device_id: Union[ffi.CData, None] = None,
callback_periods: int = 0, backends: Optional[List[Backend]] = None,
thread_prio: ThreadPriority = ThreadPriority.HIGHEST, app_name: str = "") -> None:
super().__init__()
self.format = input_format
self.sample_width = _width_from_format(input_format)
self.nchannels = nchannels
self.sample_rate = sample_rate
self.buffersize_msec = buffersize_msec
_callback_data[id(self)] = self
self.userdata_ptr = ffi.new("char[]", struct.pack('q', id(self)))
self._devconfig = lib.ma_device_config_init(lib.ma_device_type_capture)
self._devconfig.sampleRate = self.sample_rate
self._devconfig.capture.channels = self.nchannels
self._devconfig.capture.format = self.format.value
self._devconfig.capture.pDeviceID = device_id or ffi.NULL
def mp3_get_info(data: bytes) -> SoundFileInfo:
"""Fetch some information about the audio data (mp3 format)."""
config = ffi.new("drmp3_config *")
config.outputChannels = 0
config.outputSampleRate = 0
mp3 = ffi.new("drmp3 *")
if not lib.drmp3_init_memory(mp3, data, len(data), config, ffi.NULL):
raise DecodeError("could not open/decode data")
try:
num_frames = lib.drmp3_get_pcm_frame_count(mp3)
duration = num_frames / mp3.sampleRate
return SoundFileInfo("", FileFormat.MP3, mp3.channels, mp3.sampleRate,
SampleFormat.SIGNED16, duration, num_frames)
finally:
lib.drmp3_uninit(mp3)
def __init__(self, playback_format: SampleFormat = SampleFormat.SIGNED16,
playback_channels: int = 2, capture_format: SampleFormat = SampleFormat.SIGNED16,
capture_channels: int = 2, sample_rate: int = 44100, buffersize_msec: int = 200,
playback_device_id: Union[ffi.CData, None] = None, capture_device_id: Union[ffi.CData, None] = None,
callback_periods: int = 0, backends: Optional[List[Backend]] = None,
thread_prio: ThreadPriority = ThreadPriority.HIGHEST, app_name: str = "") -> None:
super().__init__()
self.capture_format = capture_format
self.playback_format = playback_format
self.sample_width = _width_from_format(capture_format)
self.capture_channels = capture_channels
self.playback_channels = playback_channels
self.sample_rate = sample_rate
self.buffersize_msec = buffersize_msec
_callback_data[id(self)] = self
self.userdata_ptr = ffi.new("char[]", struct.pack('q', id(self)))
self._devconfig = lib.ma_device_config_init(lib.ma_device_type_duplex)
self._devconfig.sampleRate = self.sample_rate
return os.path.join(os.path.abspath(os.path.dirname(__file__)), 'samples', filename)
def stream_pcm(source):
required_frames = yield b"" # generator initialization
while True:
required_bytes = required_frames * channels * sample_width
sample_data = source.read(required_bytes)
if not sample_data:
break
print(".", end="", flush=True)
required_frames = yield sample_data
filename = samples_path("music.m4a") # AAC encoded file
device = miniaudio.PlaybackDevice(output_format=miniaudio.SampleFormat.SIGNED16,
nchannels=channels, sample_rate=sample_rate)
ffmpeg = subprocess.Popen(["ffmpeg", "-v", "fatal", "-hide_banner", "-nostdin",
"-i", filename, "-f", "s16le", "-acodec", "pcm_s16le",
"-ac", str(channels), "-ar", str(sample_rate), "-"],
stdin=None, stdout=subprocess.PIPE)
stream = stream_pcm(ffmpeg.stdout)
next(stream) # start the generator
device.start(stream)
input("Audio file playing in the background. Enter to stop playback: ")
device.close()
ffmpeg.terminate()
def _format_from_width(sample_width: int, is_float: bool = False) -> SampleFormat:
if is_float:
return SampleFormat.FLOAT32
elif sample_width == 1:
return SampleFormat.UNSIGNED8
elif sample_width == 2:
return SampleFormat.SIGNED16
elif sample_width == 3:
return SampleFormat.SIGNED24
elif sample_width == 4:
return SampleFormat.SIGNED32
else:
raise MiniaudioError("unsupported sample width", sample_width)
def mp3_read_file_s16(filename: str, want_nchannels: int = 0, want_sample_rate: int = 0) -> DecodedSoundFile:
"""Reads and decodes the whole mp3 audio file. Resulting sample format is 16 bits signed integer."""
filenamebytes = _get_filename_bytes(filename)
config = ffi.new("drmp3_config *")
config.outputChannels = want_nchannels
config.outputSampleRate = want_sample_rate
num_frames = ffi.new("drmp3_uint64 *")
memory = lib.drmp3_open_file_and_read_pcm_frames_s16(filenamebytes, config, num_frames, ffi.NULL)
if not memory:
raise DecodeError("cannot load/decode file")
try:
samples = _create_int_array(2)
buffer = ffi.buffer(memory, num_frames[0] * config.outputChannels * 2)
samples.frombytes(buffer)
return DecodedSoundFile(filename, config.outputChannels, config.outputSampleRate,
SampleFormat.SIGNED16, samples)
finally:
lib.drmp3_free(memory, ffi.NULL)
def _array_proto_from_format(sampleformat: SampleFormat) -> array.array:
arrays = {
SampleFormat.UNSIGNED8: _create_int_array(1),
SampleFormat.SIGNED16: _create_int_array(2),
SampleFormat.SIGNED32: _create_int_array(4),
SampleFormat.FLOAT32: array.array('f')
}
if sampleformat in arrays:
return arrays[sampleformat]
raise MiniaudioError("the requested sample format can not be used directly: "
+ sampleformat.name + " (convert it first)")