Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pts_complex_time_base(self):
resampler = AudioResampler('s16', 'mono', 44100)
iframe = AudioFrame('s16', 'stereo', 1024)
iframe.sample_rate = 48000
iframe.time_base = '1/96000'
iframe.pts = 0
oframe = resampler.resample(iframe)
self.assertIs(oframe.pts, None)
self.assertIs(oframe.time_base, None)
self.assertEqual(oframe.sample_rate, 44100)
def test_pts_assertion_same_rate(self):
resampler = AudioResampler('s16', 'mono')
iframe = AudioFrame('s16', 'stereo', 1024)
iframe.sample_rate = 48000
iframe.time_base = '1/48000'
iframe.pts = 0
oframe = resampler.resample(iframe)
self.assertEqual(oframe.pts, 0)
self.assertEqual(oframe.time_base, iframe.time_base)
self.assertEqual(oframe.sample_rate, iframe.sample_rate)
iframe.pts = 1024
oframe = resampler.resample(iframe)
self.assertEqual(oframe.pts, 1024)
def test_pts_missing_time_base(self):
resampler = AudioResampler('s16', 'mono', 44100)
iframe = AudioFrame('s16', 'stereo', 1024)
iframe.sample_rate = 48000
iframe.pts = 0
oframe = resampler.resample(iframe)
self.assertIs(oframe.pts, None)
self.assertIs(oframe.time_base, None)
self.assertEqual(oframe.sample_rate, 44100)
raise SkipTest()
sample_fmt = ctx.codec.audio_formats[-1].name
sample_rate = 48000
channel_layout = "stereo"
channels = 2
ctx.time_base = Fraction(1) / sample_rate
ctx.sample_rate = sample_rate
ctx.format = sample_fmt
ctx.layout = channel_layout
ctx.channels = channels
ctx.open()
resampler = AudioResampler(sample_fmt, channel_layout, sample_rate)
container = av.open(fate_suite('audio-reference/chorusnoise_2ch_44kHz_s16.wav'))
audio_stream = container.streams.audio[0]
path = self.sandboxed('encoder.%s' % codec_name)
samples = 0
packet_sizes = []
with open(path, 'wb') as f:
for frame in iter_frames(container, audio_stream):
# We need to let the encoder retime.
frame.pts = None
"""
def test_identity_passthrough(self):
# If we don't ask it to do anything, it won't.
resampler = AudioResampler()
iframe = AudioFrame('s16', 'stereo', 1024)
oframe = resampler.resample(iframe)
self.assertIs(iframe, oframe)
arg_parser.add_argument('-p', '--play', action='store_true')
arg_parser.add_argument('-d', '--data', action='store_true')
arg_parser.add_argument('-f', '--format')
arg_parser.add_argument('-l', '--layout')
arg_parser.add_argument('-r', '--rate', type=int)
arg_parser.add_argument('-s', '--size', type=int, default=1024)
arg_parser.add_argument('-c', '--count', type=int, default=5)
args = arg_parser.parse_args()
ffplay = None
container = av.open(args.path)
stream = next(s for s in container.streams if s.type == 'audio')
fifo = av.AudioFifo() if args.size else None
resampler = av.AudioResampler(
format=av.AudioFormat(args.format or stream.format.name).packed if args.format else None,
layout=int(args.layout) if args.layout and args.layout.isdigit() else args.layout,
rate=args.rate,
) if (args.format or args.layout or args.rate) else None
read_count = 0
fifo_count = 0
sample_count = 0
for i, packet in enumerate(container.demux(stream)):
for frame in packet.decode():
read_count += 1
print('>>>> %04d' % read_count, frame)
if args.data:
def player_worker(
loop, container, streams, audio_track, video_track, quit_event, throttle_playback
):
audio_fifo = av.AudioFifo()
audio_format_name = "s16"
audio_layout_name = "stereo"
audio_sample_rate = 48000
audio_samples = 0
audio_samples_per_frame = int(audio_sample_rate * AUDIO_PTIME)
audio_resampler = av.AudioResampler(
format=audio_format_name, layout=audio_layout_name, rate=audio_sample_rate
)
video_first_pts = None
frame_time = None
start_time = time.time()
while not quit_event.is_set():
try:
frame = next(container.decode(*streams))
except (av.AVError, StopIteration):
if audio_track:
asyncio.run_coroutine_threadsafe(audio_track._queue.put(None), loop)
if video_track:
asyncio.run_coroutine_threadsafe(video_track._queue.put(None), loop)
import time
from qtproxy import Q
import av
parser = argparse.ArgumentParser()
parser.add_argument('path')
args = parser.parse_args()
container = av.open(args.path)
stream = next(s for s in container.streams if s.type == 'audio')
fifo = av.AudioFifo()
resampler = av.AudioResampler(
format=av.AudioFormat('s16').packed,
layout='stereo',
rate=48000,
)
qformat = Q.AudioFormat()
qformat.setByteOrder(Q.AudioFormat.LittleEndian)
qformat.setChannelCount(2)
qformat.setCodec('audio/pcm')
qformat.setSampleRate(48000)
qformat.setSampleSize(16)
qformat.setSampleType(Q.AudioFormat.SignedInt)
output = Q.AudioOutput(qformat)