Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_video_frame(
self, width, height, pts, format="yuv420p", time_base=VIDEO_TIME_BASE
):
"""
Create a single blank video frame.
"""
frame = VideoFrame(width=width, height=height, format=format)
for p in frame.planes:
p.update(bytes(p.buffer_size))
frame.pts = pts
frame.time_base = time_base
return frame
def test_haldclut_graph(self):
raise SkipTest()
graph = Graph()
img = Image.open(fate_suite('png1/lena-rgb24.png'))
frame = VideoFrame.from_image(img)
img_source = graph.add_buffer(frame)
hald_img = Image.open('hald_7.png')
hald_frame = VideoFrame.from_image(hald_img)
hald_source = graph.add_buffer(hald_frame)
try:
hald_filter = graph.add('haldclut')
except ValueError:
# Not in Libav.
raise SkipTest()
sink = graph.add('buffersink')
img_source.link(0, hald_filter, 0)
hald_source.link(0, hald_filter, 1)
hald_filter.link(0, sink, 0)
graph.config()
self.assertIs(img_source.outputs[0].linked_to, hald_filter.inputs[0])
def test_encoding_with_pts(self):
path = self.sandboxed('video_with_pts.mov')
output = av.open(path, 'w')
stream = output.add_stream('libx264', 24)
stream.width = WIDTH
stream.height = HEIGHT
stream.pix_fmt = "yuv420p"
for i in range(DURATION):
frame = VideoFrame(WIDTH, HEIGHT, 'rgb24')
frame.pts = i * 2000
frame.time_base = Fraction(1, 48000)
for packet in stream.encode(frame):
self.assertEqual(packet.time_base, Fraction(1, 24))
output.mux(packet)
for packet in stream.encode(None):
self.assertEqual(packet.time_base, Fraction(1, 24))
output.mux(packet)
output.close()
def test_roundtrip(self):
image = Image.open(fate_png())
frame = VideoFrame.from_image(image)
img = frame.to_image()
img.save(self.sandboxed('roundtrip-high.jpg'))
self.assertImagesAlmostEqual(image, img)
def encode(
self, frame: Frame, force_keyframe: bool = False
) -> Tuple[List[bytes], int]:
assert isinstance(frame, av.VideoFrame)
packages = self._encode_frame(frame, force_keyframe)
timestamp = convert_timebase(frame.pts, frame.time_base, VIDEO_TIME_BASE)
return self._packetize(packages), timestamp
def encode(
self, frame: Frame, force_keyframe: bool = False
) -> Tuple[List[bytes], int]:
assert isinstance(frame, VideoFrame)
if frame.format.name != "yuv420p":
frame = frame.reformat(format="yuv420p")
if self.codec and (frame.width != self.cfg.g_w or frame.height != self.cfg.g_h):
lib.vpx_codec_destroy(self.codec)
self.codec = None
if not self.codec:
# create codec
self.codec = ffi.new("vpx_codec_ctx_t *")
self.cfg.g_timebase.num = 1
self.cfg.g_timebase.den = VIDEO_CLOCK_RATE
self.cfg.g_lag_in_frames = 0
self.cfg.g_threads = number_of_threads(
frame.width * frame.height, multiprocessing.cpu_count()
)
out_container = av.open(self.folder+timestamp+".mp4", mode='w')
print("Recorder running, saving to ", self.folder+timestamp+".mp4")
out_stream = None
video_frame = None
while True:
if self.reset_signal.is_set() or self.finished_signal.is_set():
self.reset_signal.clear()
break
try:
if out_stream is None:
current_frame = self.input_queue.get(timeout=1)
out_stream = out_container.add_stream('mpeg4', rate=50)
out_stream.width, out_stream.height = current_frame.shape[::-1]
out_stream.pix_fmt = 'yuv420p'
out_stream.bit_rate = self.kbit_rate*1000
video_frame = av.VideoFrame(current_frame.shape[1], current_frame.shape[0], "gray")
video_frame.planes[0].update(current_frame)
else:
video_frame.planes[0].update(self.input_queue.get(timeout=1))
print("Got and written frame")
packet = out_stream.encode(video_frame)
out_container.mux(packet)
self.update_framerate()
except Empty:
pass
if self.finished_signal.is_set():
break
if out_stream is not None:
out_container.close()
def __init__(self, frame, fix_missing_offset= 0):
"""Frame wrapper of av.VideoFrame.
Parameters
----------
frame : av.VideoFrame
VideoFrame object to be wrapped.
fix_missing_offset : int, optional
Frame id offset to fix a missing frame, by default 0
"""
assert isinstance(frame, VideoFrame)
self.frame = frame
self.fix_missing_offset = fix_missing_offset