Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_audio_buffer_sink(self):
graph = Graph()
audio_buffer = graph.add_abuffer(
format='fltp',
sample_rate=48000,
layout='5.0(side)',
time_base=Fraction(1, 48000)
)
audio_buffer.link_to(graph.add('abuffersink'))
graph.configure()
try:
graph.pull()
except AVError as ex:
# we haven't pushed any input so expect no frames / EAGAIN
if ex.errno != errno.EAGAIN:
raise ex
def test_delegate_sink(self):
graph = Graph()
src = graph.add('testsrc')
src.link_to(graph.add('buffersink'))
graph.configure()
frame = src.pull()
if Image:
frame.to_image().save(self.sandboxed('mandelbrot4.png'))
def test_generator_graph(self):
graph = Graph()
src = graph.add('testsrc')
lutrgb = graph.add('lutrgb', "r=maxval+minval-val:g=maxval+minval-val:b=maxval+minval-val", name='invert')
sink = graph.add('buffersink')
src.link_to(lutrgb)
lutrgb.link_to(sink)
# pads and links
self.assertIs(src.outputs[0].link.output, lutrgb.inputs[0])
self.assertIs(lutrgb.inputs[0].link.input, src.outputs[0])
frame = sink.pull()
self.assertIsInstance(frame, VideoFrame)
if Image:
frame.to_image().save(self.sandboxed('mandelbrot2.png'))
def test_audio_buffer_volume_filter(self):
graph = Graph()
self.link_nodes(
graph.add_abuffer(
format='fltp',
sample_rate=48000,
layout='5.0(side)',
time_base=Fraction(1, 48000)
),
graph.add('volume', volume='0.5'),
graph.add('abuffersink')
)
graph.configure()
input_frame = generate_audio_frame(0, input_format='fltp', layout='5.0(side)', sample_rate=48000)
graph.push(input_frame)
out_frame = graph.pull()
def test_haldclut_graph(self):
raise SkipTest()
graph = Graph()
img = Image.open(fate_suite('png1/lena-rgb24.png'))
frame = VideoFrame.from_image(img)
img_source = graph.add_buffer(frame)
hald_img = Image.open('hald_7.png')
hald_frame = VideoFrame.from_image(hald_img)
hald_source = graph.add_buffer(hald_frame)
try:
hald_filter = graph.add('haldclut')
except ValueError:
# Not in Libav.
raise SkipTest()
sink = graph.add('buffersink')
def init_filter_graph(in_sample_rate=48000, in_fmt='s16', in_layout='stereo'):
graph = av.filter.Graph()
volume_val = 0.10
# initialize filters
filter_chain = [
graph.add_abuffer(format=in_fmt,
sample_rate=in_sample_rate,
layout=in_layout,
time_base=Fraction(1, in_sample_rate)),
# initialize filter with keyword parameters
graph.add('volume', volume=str(volume_val)),
# there always must be a sink at the end of the filter chain
graph.add('abuffersink')
]
) and self.audio_delay <= 0.001:
start_stream = True
pbt = self.g_pool.seek_control.current_playback_time
frame_idx = self.g_pool.seek_control.ts_idx_from_playback_time(pbt)
audio_idx = bisect(
self.audio_timestamps, self.g_pool.timestamps[frame_idx]
)
self.seek_to_audio_frame(audio_idx)
if self.filter_graph_list is None:
self.current_audio_volume = self.req_audio_volume
print("Setting volume {} ".format(self.current_audio_volume))
# if self.filter_graph is not None:
# self.filter_graph.close()
# self.filter_graph = None
self.filter_graph = av.filter.Graph()
self.filter_graph_list = []
self.filter_graph_list.append(
self.filter_graph.add_buffer(template=self.audio_stream)
)
args = "volume={}:precision=float".format(self.current_audio_volume)
print("args = {}".format(args))
self.volume_filter = self.filter_graph.add("volume", args)
self.filter_graph_list.append(self.volume_filter)
self.filter_graph_list[-2].link_to(self.filter_graph_list[-1])
self.filter_graph_list.append(
self.filter_graph.add(
"aresample",
"osf={}".format(self.audio_stream.format.packed.name),
)
)
self.filter_graph_list[-2].link_to(self.filter_graph_list[-1])