Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# We haven't exposed bitstream filters yet, so we're gonna use the `ffmpeg` CLI.
h264_path = 'night-sky.h264'
if not os.path.exists(h264_path):
subprocess.check_call([
'ffmpeg',
'-i', av.datasets.curated('pexels/time-lapse-video-of-night-sky-857195.mp4'),
'-vcodec', 'copy',
'-an',
'-bsf:v', 'h264_mp4toannexb',
h264_path,
])
fh = open(h264_path, 'rb')
codec = av.CodecContext.create('h264', 'r')
while True:
chunk = fh.read(1 << 16)
packets = codec.parse(chunk)
print("Parsed {} packets from {} bytes:".format(len(packets), len(chunk)))
for packet in packets:
print(' ', packet)
frames = codec.decode(packet)
for frame in frames:
print(' ', frame)
def __init__(self) -> None:
self.codec = av.CodecContext.create("h264", "r")
def _encode_frame(
self, frame: av.VideoFrame, force_keyframe: bool
) -> Iterator[bytes]:
if self.codec and (
frame.width != self.codec.width or frame.height != self.codec.height
):
self.codec = None
if self.codec is None:
self.codec = av.CodecContext.create("libx264", "w")
self.codec.width = frame.width
self.codec.height = frame.height
self.codec.pix_fmt = "yuv420p"
self.codec.time_base = fractions.Fraction(1, MAX_FRAME_RATE)
self.codec.options = {
"profile": "baseline",
"level": "31",
"tune": "zerolatency",
}
packages = self.codec.encode(frame)
yield from self._split_bitstream(b"".join(p.to_bytes() for p in packages))