Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ndarray_s16p_align_8(self):
frame = AudioFrame(format='s16p', layout='stereo', samples=159, align=8)
array = frame.to_ndarray()
self.assertEqual(array.dtype, '
def test_pts_simple(self):
fifo = av.AudioFifo()
iframe = av.AudioFrame(samples=1024)
iframe.pts = 0
iframe.sample_rate = 48000
iframe.time_base = '1/48000'
fifo.write(iframe)
oframe = fifo.read(512)
self.assertTrue(oframe is not None)
self.assertEqual(oframe.pts, 0)
self.assertEqual(oframe.time_base, iframe.time_base)
self.assertEqual(fifo.samples_written, 1024)
self.assertEqual(fifo.samples_read, 512)
self.assertEqual(fifo.pts_per_sample, 1.0)
iframe.pts = 1024
def test_null_constructor(self):
frame = AudioFrame()
self.assertEqual(frame.format.name, 's16')
self.assertEqual(frame.layout.name, 'stereo')
self.assertEqual(len(frame.planes), 0)
self.assertEqual(frame.samples, 0)
def test_missing_time_base(self):
fifo = av.AudioFifo()
iframe = av.AudioFrame(samples=1024)
iframe.pts = 0
iframe.sample_rate = 48000
fifo.write(iframe)
oframe = fifo.read(512)
self.assertTrue(oframe is not None)
self.assertIsNone(oframe.pts)
self.assertIsNone(oframe.time_base)
self.assertEqual(oframe.sample_rate, iframe.sample_rate)
def test_basic_to_nd_array(self):
frame = AudioFrame(format='s16p', layout='stereo', samples=160)
with warnings.catch_warnings(record=True) as recorded:
array = frame.to_nd_array()
self.assertEqual(array.shape, (2, 160))
# check deprecation warning
self.assertEqual(len(recorded), 1)
self.assertEqual(recorded[0].category, AttributeRenamedWarning)
self.assertEqual(
str(recorded[0].message),
'AudioFrame.to_nd_array is deprecated; please use AudioFrame.to_ndarray.')
def generate_audio_frame():
"""Generate a blank audio frame."""
audio_frame = av.AudioFrame(format="dbl", layout="mono", samples=1024)
# audio_bytes = b''.join(b'\x00\x00\x00\x00\x00\x00\x00\x00'
# for i in range(0, 1024))
audio_bytes = b"\x00\x00\x00\x00\x00\x00\x00\x00" * 1024
audio_frame.planes[0].update(audio_bytes)
audio_frame.sample_rate = AUDIO_SAMPLE_RATE
audio_frame.time_base = Fraction(1, AUDIO_SAMPLE_RATE)
return audio_frame
try:
frame = next(container.decode(*streams))
except (av.AVError, StopIteration):
if audio_track:
asyncio.run_coroutine_threadsafe(audio_track._queue.put(None), loop)
if video_track:
asyncio.run_coroutine_threadsafe(video_track._queue.put(None), loop)
break
# read up to 1 second ahead
if throttle_playback:
elapsed_time = time.time() - start_time
if frame_time and frame_time > elapsed_time + 1:
time.sleep(0.1)
if isinstance(frame, AudioFrame) and audio_track:
if (
frame.format.name != audio_format_name
or frame.layout.name != audio_layout_name
or frame.sample_rate != audio_sample_rate
):
frame.pts = None
frame = audio_resampler.resample(frame)
# fix timestamps
frame.pts = audio_samples
frame.time_base = fractions.Fraction(1, audio_sample_rate)
audio_samples += frame.samples
audio_fifo.write(frame)
while True:
frame = audio_fifo.read(audio_samples_per_frame)
def encode(
self, frame: Frame, force_keyframe: bool = False
) -> Tuple[List[bytes], int]:
assert isinstance(frame, AudioFrame)
assert frame.format.name == "s16"
assert frame.layout.name in ["mono", "stereo"]
channels = len(frame.layout.channels)
data = bytes(frame.planes[0])
timestamp = frame.pts
# resample at 8 kHz
if frame.sample_rate != SAMPLE_RATE:
data, self.rate_state = audioop.ratecv(
data,
SAMPLE_WIDTH,
channels,
frame.sample_rate,
SAMPLE_RATE,
self.rate_state,
def decode(self, encoded_frame: JitterFrame) -> List[Frame]:
frame = AudioFrame(format="s16", layout="mono", samples=SAMPLES_PER_FRAME)
frame.planes[0].update(self._convert(encoded_frame.data, SAMPLE_WIDTH))
frame.pts = encoded_frame.timestamp
frame.sample_rate = SAMPLE_RATE
frame.time_base = TIME_BASE
return [frame]