Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
frame_count,
codec_name if codec_name != 'mjpeg' else 'jpg',
))
path_list.append(path)
with open(path, 'wb') as f:
f.write(new_packet)
frame_count += 1
if frame_count > 5:
break
ctx = av.Codec(codec_name, 'r').create()
for path in path_list:
with open(path, 'rb') as f:
size = os.fstat(f.fileno()).st_size
packet = Packet(size)
size = f.readinto(packet)
frame = ctx.decode(packet)[0]
self.assertEqual(frame.width, width)
self.assertEqual(frame.height, height)
self.assertEqual(frame.format.name, pix_fmt)
def iter_raw_frames(path, packet_sizes, ctx):
with open(path, 'rb') as f:
for i, size in enumerate(packet_sizes):
packet = Packet(size)
read_size = f.readinto(packet)
assert size
assert read_size == size
if not read_size:
break
for frame in ctx.decode(packet):
yield frame
while True:
try:
frames = ctx.decode(None)
except EOFError:
break
for frame in frames:
yield frame
if not frames:
break
def decode(self, encoded_frame: JitterFrame) -> List[Frame]:
try:
packet = av.Packet(encoded_frame.data)
packet.pts = encoded_frame.timestamp
packet.time_base = VIDEO_TIME_BASE
frames = self.codec.decode(packet)
except av.AVError as e:
logger.warning("failed to decode, skipping package: " + str(e))
return []
return frames