How to use the av.VideoFrame function in av

To help you get started, we’ve selected a few av examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github aiortc / aiortc / tests / codecs.py View on Github external
def create_video_frame(
        self, width, height, pts, format="yuv420p", time_base=VIDEO_TIME_BASE
    ):
        """
        Create a single blank video frame.
        """
        frame = VideoFrame(width=width, height=height, format=format)
        for p in frame.planes:
            p.update(bytes(p.buffer_size))
        frame.pts = pts
        frame.time_base = time_base
        return frame
github mikeboers / PyAV / tests / test_filters.py View on Github external
def test_haldclut_graph(self):

        raise SkipTest()

        graph = Graph()

        img = Image.open(fate_suite('png1/lena-rgb24.png'))
        frame = VideoFrame.from_image(img)
        img_source = graph.add_buffer(frame)

        hald_img = Image.open('hald_7.png')
        hald_frame = VideoFrame.from_image(hald_img)
        hald_source = graph.add_buffer(hald_frame)

        try:
            hald_filter = graph.add('haldclut')
        except ValueError:
            # Not in Libav.
            raise SkipTest()

        sink = graph.add('buffersink')

        img_source.link(0, hald_filter, 0)
        hald_source.link(0, hald_filter, 1)
        hald_filter.link(0, sink, 0)
        graph.config()

        self.assertIs(img_source.outputs[0].linked_to, hald_filter.inputs[0])
github mikeboers / PyAV / tests / test_encode.py View on Github external
def test_encoding_with_pts(self):

        path = self.sandboxed('video_with_pts.mov')
        output = av.open(path, 'w')

        stream = output.add_stream('libx264', 24)
        stream.width = WIDTH
        stream.height = HEIGHT
        stream.pix_fmt = "yuv420p"

        for i in range(DURATION):
            frame = VideoFrame(WIDTH, HEIGHT, 'rgb24')
            frame.pts = i * 2000
            frame.time_base = Fraction(1, 48000)

            for packet in stream.encode(frame):
                self.assertEqual(packet.time_base, Fraction(1, 24))
                output.mux(packet)

        for packet in stream.encode(None):
            self.assertEqual(packet.time_base, Fraction(1, 24))
            output.mux(packet)

        output.close()
github mikeboers / PyAV / tests / test_videoframe.py View on Github external
def test_roundtrip(self):
        image = Image.open(fate_png())
        frame = VideoFrame.from_image(image)
        img = frame.to_image()
        img.save(self.sandboxed('roundtrip-high.jpg'))
        self.assertImagesAlmostEqual(image, img)
github aiortc / aiortc / src / aiortc / codecs / h264.py View on Github external
def encode(
        self, frame: Frame, force_keyframe: bool = False
    ) -> Tuple[List[bytes], int]:
        assert isinstance(frame, av.VideoFrame)
        packages = self._encode_frame(frame, force_keyframe)
        timestamp = convert_timebase(frame.pts, frame.time_base, VIDEO_TIME_BASE)
        return self._packetize(packages), timestamp
github aiortc / aiortc / src / aiortc / codecs / vpx.py View on Github external
def encode(
        self, frame: Frame, force_keyframe: bool = False
    ) -> Tuple[List[bytes], int]:
        assert isinstance(frame, VideoFrame)
        if frame.format.name != "yuv420p":
            frame = frame.reformat(format="yuv420p")

        if self.codec and (frame.width != self.cfg.g_w or frame.height != self.cfg.g_h):
            lib.vpx_codec_destroy(self.codec)
            self.codec = None

        if not self.codec:
            # create codec
            self.codec = ffi.new("vpx_codec_ctx_t *")
            self.cfg.g_timebase.num = 1
            self.cfg.g_timebase.den = VIDEO_CLOCK_RATE
            self.cfg.g_lag_in_frames = 0
            self.cfg.g_threads = number_of_threads(
                frame.width * frame.height, multiprocessing.cpu_count()
            )
github portugueslab / stytra / stytra / hardware / video.py View on Github external
out_container = av.open(self.folder+timestamp+".mp4", mode='w')
            print("Recorder running, saving to ", self.folder+timestamp+".mp4")
            out_stream = None
            video_frame = None
            while True:
                if self.reset_signal.is_set() or self.finished_signal.is_set():
                    self.reset_signal.clear()
                    break
                try:
                    if out_stream is None:
                        current_frame = self.input_queue.get(timeout=1)
                        out_stream = out_container.add_stream('mpeg4', rate=50)
                        out_stream.width, out_stream.height = current_frame.shape[::-1]
                        out_stream.pix_fmt = 'yuv420p'
                        out_stream.bit_rate = self.kbit_rate*1000
                        video_frame = av.VideoFrame(current_frame.shape[1], current_frame.shape[0], "gray")
                        video_frame.planes[0].update(current_frame)
                    else:
                        video_frame.planes[0].update(self.input_queue.get(timeout=1))
                    print("Got and written frame")
                    packet = out_stream.encode(video_frame)
                    out_container.mux(packet)
                    self.update_framerate()

                except Empty:
                    pass
            if self.finished_signal.is_set():
                break

        if out_stream is not None:
            out_container.close()
github JunweiLiang / Object_Detection_Tracking / diva_io / video / frame.py View on Github external
def __init__(self, frame, fix_missing_offset= 0):
        """Frame wrapper of av.VideoFrame.

        Parameters
        ----------
        frame : av.VideoFrame
            VideoFrame object to be wrapped.
        fix_missing_offset : int, optional
            Frame id offset to fix a missing frame, by default 0
        """
        assert isinstance(frame, VideoFrame)
        self.frame = frame
        self.fix_missing_offset = fix_missing_offset