Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param codec: (Optional) Writing codec to use.
:param bitrate: (Optional) Bitrate of the written audio file.
:raise IOError: If any error occurs while using FFMPEG to write data.
"""
directory = os.path.split(path)[0]
if not os.path.exists(directory):
os.makedirs(directory)
get_logger().debug('Writing file %s', path)
input_kwargs = {'ar': sample_rate, 'ac': data.shape[1]}
output_kwargs = {'ar': sample_rate, 'strict': '-2'}
if bitrate:
output_kwargs['audio_bitrate'] = bitrate
if codec is not None and codec != 'wav':
output_kwargs['codec'] = codec
process = (
ffmpeg
.input('pipe:', format='f32le', **input_kwargs)
.output(path, **output_kwargs)
.overwrite_output()
.run_async(pipe_stdin=True, quiet=True))
try:
process.stdin.write(data.astype('
if 'streams' not in probe or len(probe['streams']) == 0:
raise IOError('No stream was found with ffprobe')
metadata = next(
stream
for stream in probe['streams']
if stream['codec_type'] == 'audio')
n_channels = metadata['channels']
if sample_rate is None:
sample_rate = metadata['sample_rate']
output_kwargs = {'format': 'f32le', 'ar': sample_rate}
if duration is not None:
output_kwargs['t'] = _to_ffmpeg_time(duration)
if offset is not None:
output_kwargs['ss'] = _to_ffmpeg_time(offset)
process = (
ffmpeg
.input(path)
.output('pipe:', **output_kwargs)
.run_async(pipe_stdout=True, pipe_stderr=True))
buffer, _ = process.communicate()
waveform = np.frombuffer(buffer, dtype='