Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
container = av.open(fate_suite('audio-reference/chorusnoise_2ch_44kHz_s16.wav'))
audio_stream = container.streams.audio[0]
path = self.sandboxed('encoder.%s' % codec)
samples = 0
packet_sizes = []
test_bad = False
with open(path, 'w') as f:
for frame in iter_frames(container, audio_stream):
if test_bad:
bad_resampler = AudioResampler(sample_fmt, "mono", sample_rate)
bad_frame = bad_resampler.resample(frame)
with self.assertRaises(ValueError):
next(encoder.encode(bad_frame))
bad_resampler = AudioResampler(sample_fmt, channel_layout, 3000)
bad_frame = bad_resampler.resample(frame)
with self.assertRaises(ValueError):
next(encoder.encode(bad_frame))
bad_resampler = AudioResampler('u8', channel_layout, 3000)
bad_frame = bad_resampler.resample(frame)
with self.assertRaises(ValueError):
next(encoder.encode(bad_frame))
raise SkipTest()
sample_fmt = ctx.codec.audio_formats[-1].name
sample_rate = 48000
channel_layout = "stereo"
channels = 2
ctx.time_base = Fraction(1) / sample_rate
ctx.sample_rate = sample_rate
ctx.format = sample_fmt
ctx.layout = channel_layout
ctx.channels = channels
ctx.open()
resampler = AudioResampler(sample_fmt, channel_layout, sample_rate)
container = av.open(fate_suite('audio-reference/chorusnoise_2ch_44kHz_s16.wav'))
audio_stream = container.streams.audio[0]
path = self.sandboxed('encoder.%s' % codec)
samples = 0
packet_sizes = []
test_bad = False
with open(path, 'w') as f:
for frame in iter_frames(container, audio_stream):
if test_bad:
samples = 0
packet_sizes = []
test_bad = False
with open(path, 'w') as f:
for frame in iter_frames(container, audio_stream):
if test_bad:
bad_resampler = AudioResampler(sample_fmt, "mono", sample_rate)
bad_frame = bad_resampler.resample(frame)
with self.assertRaises(ValueError):
next(encoder.encode(bad_frame))
bad_resampler = AudioResampler(sample_fmt, channel_layout, 3000)
bad_frame = bad_resampler.resample(frame)
with self.assertRaises(ValueError):
next(encoder.encode(bad_frame))
bad_resampler = AudioResampler('u8', channel_layout, 3000)
bad_frame = bad_resampler.resample(frame)
with self.assertRaises(ValueError):
next(encoder.encode(bad_frame))
test_bad = False
resampled_frame = resampler.resample(frame)
samples += resampled_frame.samples
def __init__(self, user_connection, kaldi_server):
self.__resampler = AudioResampler(format='s16', layout='mono', rate=kaldi_server.samplerate)
self.__pc = user_connection
self.__audio_task = None
self.__text_task = None
self.__ks = kaldi_server
self.__kaldi_reader = None
self.__kaldi_writer = None
self.__channel = None
# args = "f=10"
# print("args = {}".format(args))
## lp_graph_list.append(lp_graph.add("lowpass", args))
## "attacks=.1|.1:decays=.2|.2:points=.-900/-900|-50.1/-900|-50/-50:soft-knee=.01:gain=0:volume=-90:delay=.1")
# self.lp_graph_list.append(self.lp_graph.add("compand", ".1|.1:.2|.2:-900/-900|-50.1/-900|-50/-50:.01:0:-90:.1"))
# self.lp_graph_list[-2].link_to(self.lp_graph_list[-1])
## lp_graph_list.append(lp_graph.add("aresample", "osr=30"))
## lp_graph_list[-2].link_to(lp_graph_list[-1])
# self.lp_graph_list.append(self.lp_graph.add("abuffersink"))
# self.lp_graph_list[-2].link_to(self.lp_graph_list[-1])
# self.lp_graph.configure()
# audio_resampler1 = av.audio.resampler.AudioResampler(format=av.AudioFormat('dblp'),
# layout=audio_stream.layout,
# rate=audio_stream.rate)
self.audio_resampler = av.audio.resampler.AudioResampler(
format=self.audio_stream.format, layout=self.audio_stream.layout, rate=60
)
self.next_audio_frame = self._next_audio_frame()
self.all_abs_samples = None
self.finished = False
self.a_levels = None
self.a_levels_log = None
self.final_rescale = True
self.log_scaling = False
logger.debug("loaded audiostream: %s" % self.audio_stream)
except StopIteration:
self.audio_stream = None
logger.debug("No audiostream found in media container")
else:
return
if self.audio_stream is not None:
self.audio_bytes_fifo = []
audiots_path = os.path.splitext(audio_file)[0] + "_timestamps.npy"
try:
self.audio_timestamps = np.load(audiots_path)
except IOError:
self.audio_timestamps = None
logger.warning("Could not load audio timestamps")
self.next_audio_frame = self._next_audio_frame()
self.audio_resampler = av.audio.resampler.AudioResampler(
format=self.audio_stream.format.packed,
layout=self.audio_stream.layout,
rate=self.audio_stream.rate,
)
self.audio_paused = False
af0, af1 = next(self.next_audio_frame), next(self.next_audio_frame)
# Check pts
self.audio_pts_rate = af0.samples # af1.pts - af0.pts
self.audio_start_pts = 0
logger.debug(
"audio_pts_rate = {} start_pts = {}".format(
self.audio_pts_rate, self.audio_start_pts
)
)
with captureStdErr:
container = av.open(path)
a_stream = container.streams.get(audio=0)[0]
format_name = a_stream.format.name
if (format_name == 'fltp' and
a_stream.codec_context.name in ('mp3float', 'aac')):
format_name = 's16'
else:
format_name = 's' + str(a_stream.format.bits)
audio_format = av.audio.format.AudioFormat(format_name)
audio_layout = av.audio.layout.AudioLayout(a_stream.channels)
sample_rate = a_stream.codec_context.sample_rate
resampler = av.audio.resampler.AudioResampler(audio_format,
audio_layout,
sample_rate)
# iprop = InputAudioPropertiesTuple(codec=a_stream.codec.name,
# format_name=a_stream.format.name,
# duration=float(a_stream.duration *
# a_stream.time_base),
# stream_bitrate=a_stream.bit_rate,
# container_bitrate=container.bit_rate,
# is_valid=True,
# decode_errors=None,
# decode_messages=None)
# prop = AudioPropertiesTuple(channels=a_stream.channels,
# sample_rate=a_stream.sample_rate,
# bits_per_sample=audio_format.bits,