Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wav_get_file_info(filename: str) -> SoundFileInfo:
"""Fetch some information about the audio file (wav format)."""
filenamebytes = _get_filename_bytes(filename)
wav = ffi.new("drwav*")
if not lib.drwav_init_file(wav, filenamebytes, ffi.NULL):
raise DecodeError("could not open/decode file")
try:
duration = wav.totalPCMFrameCount / wav.sampleRate
sample_width = wav.bitsPerSample // 8
is_float = wav.translatedFormatTag == lib.DR_WAVE_FORMAT_IEEE_FLOAT
return SoundFileInfo(filename, FileFormat.WAV, wav.channels, wav.sampleRate,
_format_from_width(sample_width, is_float), duration, wav.totalPCMFrameCount)
finally:
lib.drwav_uninit(wav)
def flac_get_file_info(filename: str) -> SoundFileInfo:
"""Fetch some information about the audio file (flac format)."""
filenamebytes = _get_filename_bytes(filename)
flac = lib.drflac_open_file(filenamebytes, ffi.NULL)
if not flac:
raise DecodeError("could not open/decode file")
try:
duration = flac.totalPCMFrameCount / flac.sampleRate
sample_width = flac.bitsPerSample // 8
return SoundFileInfo(filename, FileFormat.FLAC, flac.channels, flac.sampleRate,
_format_from_width(sample_width), duration, flac.totalPCMFrameCount)
finally:
lib.drflac_close(flac)
source.userdata_ptr = ffi.new("char[]", struct.pack('q', id(source)))
decoder_init = {
FileFormat.UNKNOWN: lib.ma_decoder_init,
FileFormat.VORBIS: lib.ma_decoder_init_vorbis,
FileFormat.WAV: lib.ma_decoder_init_wav,
FileFormat.FLAC: lib.ma_decoder_init_flac,
FileFormat.MP3: lib.ma_decoder_init_mp3
}[source_format]
result = decoder_init(lib._internal_decoder_read_callback, lib._internal_decoder_seek_callback,
source.userdata_ptr, ffi.addressof(decoder_config), decoder)
if result != lib.MA_SUCCESS:
raise DecodeError("failed to init decoder", result)
if seek_frame > 0:
result = lib.ma_decoder_seek_to_pcm_frame(decoder, seek_frame)
if result != lib.MA_SUCCESS:
raise DecodeError("failed to seek to frame", result)
def on_close() -> None:
if id(source) in _callback_decoder_sources:
del _callback_decoder_sources[id(source)]
g = _samples_stream_generator(frames_to_read, nchannels, output_format, decoder, None, on_close)
dummy = next(g)
assert len(dummy) == 0
return g
If you send() a number into the generator rather than just using next() on it,
you'll get that given number of frames, instead of the default configured amount.
This is particularly useful to plug this stream into an audio device callback that
wants a variable number of frames per call.
"""
filenamebytes = _get_filename_bytes(filename)
decoder = ffi.new("ma_decoder *")
decoder_config = lib.ma_decoder_config_init(output_format.value, nchannels, sample_rate)
decoder_config.ditherMode = dither.value
result = lib.ma_decoder_init_file(filenamebytes, ffi.addressof(decoder_config), decoder)
if result != lib.MA_SUCCESS:
raise DecodeError("failed to init decoder", result)
if seek_frame > 0:
result = lib.ma_decoder_seek_to_pcm_frame(decoder, seek_frame)
if result != lib.MA_SUCCESS:
raise DecodeError("failed to seek to frame", result)
g = _samples_stream_generator(frames_to_read, nchannels, output_format, decoder, None)
dummy = next(g)
assert len(dummy) == 0
return g
def decode_file(filename: str, output_format: SampleFormat = SampleFormat.SIGNED16,
nchannels: int = 2, sample_rate: int = 44100, dither: DitherMode = DitherMode.NONE) -> DecodedSoundFile:
"""Convenience function to decode any supported audio file to raw PCM samples in your chosen format."""
sample_width = _width_from_format(output_format)
samples = _array_proto_from_format(output_format)
filenamebytes = _get_filename_bytes(filename)
frames = ffi.new("ma_uint64 *")
data = ffi.new("void **")
decoder_config = lib.ma_decoder_config_init(output_format.value, nchannels, sample_rate)
decoder_config.ditherMode = dither.value
result = lib.ma_decode_file(filenamebytes, ffi.addressof(decoder_config), frames, data)
if result != lib.MA_SUCCESS:
raise DecodeError("failed to decode file", result)
buffer = ffi.buffer(data[0], frames[0] * nchannels * sample_width)
samples.frombytes(buffer)
return DecodedSoundFile(filename, nchannels, sample_rate, output_format, samples)
def wav_stream_file(filename: str, frames_to_read: int = 1024,
seek_frame: int = 0) -> Generator[array.array, None, None]:
"""Streams the WAV audio file as interleaved 16 bit signed integer sample arrays segments.
This uses a fixed chunk size and cannot be used as a generic miniaudio decoder input stream.
Consider using stream_file() instead."""
filenamebytes = _get_filename_bytes(filename)
wav = ffi.new("drwav*")
if not lib.drwav_init_file(wav, filenamebytes, ffi.NULL):
raise DecodeError("could not open/decode file")
if seek_frame > 0:
result = lib.drwav_seek_to_pcm_frame(wav, seek_frame)
if result <= 0:
raise DecodeError("can't seek")
try:
decodebuffer = ffi.new("drwav_int16[]", frames_to_read * wav.channels)
buf_ptr = ffi.cast("drwav_int16 *", decodebuffer)
while True:
num_samples = lib.drwav_read_pcm_frames_s16(wav, frames_to_read, buf_ptr)
if num_samples <= 0:
break
buffer = ffi.buffer(decodebuffer, num_samples * 2 * wav.channels)
samples = _create_int_array(2)
samples.frombytes(buffer)
yield samples
finally:
lib.drwav_uninit(wav)
def vorbis_get_file_info(filename: str) -> SoundFileInfo:
"""Fetch some information about the audio file (vorbis format)."""
filenamebytes = _get_filename_bytes(filename)
error = ffi.new("int *")
vorbis = lib.stb_vorbis_open_filename(filenamebytes, error, ffi.NULL)
if not vorbis:
raise DecodeError("could not open/decode file")
try:
info = lib.stb_vorbis_get_info(vorbis)
duration = lib.stb_vorbis_stream_length_in_seconds(vorbis)
num_frames = lib.stb_vorbis_stream_length_in_samples(vorbis)
return SoundFileInfo(filename, FileFormat.VORBIS, info.channels, info.sample_rate,
SampleFormat.SIGNED16, duration, num_frames)
finally:
lib.stb_vorbis_close(vorbis)
def flac_read_file_s32(filename: str) -> DecodedSoundFile:
"""Reads and decodes the whole flac audio file. Resulting sample format is 32 bits signed integer."""
filenamebytes = _get_filename_bytes(filename)
channels = ffi.new("unsigned int *")
sample_rate = ffi.new("unsigned int *")
num_frames = ffi.new("drflac_uint64 *")
memory = lib.drflac_open_file_and_read_pcm_frames_s32(filenamebytes, channels, sample_rate, num_frames, ffi.NULL)
if not memory:
raise DecodeError("cannot load/decode file")
try:
samples = _create_int_array(4)
buffer = ffi.buffer(memory, num_frames[0] * channels[0] * 4)
samples.frombytes(buffer)
return DecodedSoundFile(filename, channels[0], sample_rate[0], SampleFormat.SIGNED32, samples)
finally:
lib.drflac_free(memory, ffi.NULL)
def mp3_read_file_s16(filename: str, want_nchannels: int = 0, want_sample_rate: int = 0) -> DecodedSoundFile:
"""Reads and decodes the whole mp3 audio file. Resulting sample format is 16 bits signed integer."""
filenamebytes = _get_filename_bytes(filename)
config = ffi.new("drmp3_config *")
config.outputChannels = want_nchannels
config.outputSampleRate = want_sample_rate
num_frames = ffi.new("drmp3_uint64 *")
memory = lib.drmp3_open_file_and_read_pcm_frames_s16(filenamebytes, config, num_frames, ffi.NULL)
if not memory:
raise DecodeError("cannot load/decode file")
try:
samples = _create_int_array(2)
buffer = ffi.buffer(memory, num_frames[0] * config.outputChannels * 2)
samples.frombytes(buffer)
return DecodedSoundFile(filename, config.outputChannels, config.outputSampleRate,
SampleFormat.SIGNED16, samples)
finally:
lib.drmp3_free(memory, ffi.NULL)
def mp3_read_f32(data: bytes, want_nchannels: int = 0, want_sample_rate: int = 0) -> DecodedSoundFile:
"""Reads and decodes the whole mp3 audio data. Resulting sample format is 32 bits float."""
config = ffi.new("drmp3_config *")
config.outputChannels = want_nchannels
config.outputSampleRate = want_sample_rate
num_frames = ffi.new("drmp3_uint64 *")
memory = lib.drmp3_open_memory_and_read_pcm_frames_f32(data, len(data), config, num_frames, ffi.NULL)
if not memory:
raise DecodeError("cannot load/decode data")
try:
samples = array.array('f')
buffer = ffi.buffer(memory, num_frames[0] * config.outputChannels * 4)
samples.frombytes(buffer)
return DecodedSoundFile("", config.outputChannels, config.outputSampleRate,
SampleFormat.FLOAT32, samples)
finally:
lib.drmp3_free(memory, ffi.NULL)