Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def iter_raw_frames(path, packet_sizes, ctx):
with open(path, 'rb') as f:
for i, size in enumerate(packet_sizes):
packet = Packet(size)
read_size = f.readinto(packet)
assert size
assert read_size == size
print i + 1, size, read_size
if not read_size:
break
for frame in ctx.decode(packet):
print ' ', frame
yield frame
print 'flushing...'
while True:
frames = ctx.decode(None)
for frame in frames:
print ' ', frame
yield frame
if not frames:
frame_count,
codec_name if codec_name != 'mjpeg' else 'jpg',
))
path_list.append(path)
with open(path, 'wb') as f:
f.write(new_packet)
frame_count += 1
if frame_count > 5:
break
ctx = av.Codec(codec_name, 'r').create()
for path in path_list:
with open(path, 'rb') as f:
size = os.fstat(f.fileno()).st_size
packet = Packet(size)
size = f.readinto(packet)
frame = ctx.decode(packet)[0]
self.assertEqual(frame.width, width)
self.assertEqual(frame.height, height)
self.assertEqual(frame.format.name, pix_fmt)
def decode(self, data):
packet = av.packet.Packet(data)
return self._decoder.decode(packet)