Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_stream_seek(self, use_deprecated_api=False):
container = av.open(fate_suite('h264/interlaced_crop.mp4'))
video_stream = next(s for s in container.streams if s.type == 'video')
total_frame_count = 0
# Count number of frames in video
for packet in container.demux(video_stream):
for frame in packet.decode():
total_frame_count += 1
target_frame = int(total_frame_count / 2.0)
time_base = float(video_stream.time_base)
rate = float(video_stream.average_rate)
target_sec = target_frame * 1 / rate
target_timestamp = int(target_sec / time_base) + video_stream.start_time
def test_buffer_read_write(self):
buffer_ = StringIO()
wrapped = MethodLogger(buffer_)
write_rgb_rotate(av.open(wrapped, 'w', 'mp4'))
# Make sure it did actually write.
writes = wrapped._filter('write')
self.assertTrue(writes)
self.assertTrue(buffer_.tell())
# Standard assertions.
buffer_.seek(0)
assert_rgb_rotate(self, av.open(buffer_))
def main(args):
global input, output, invstream, outvstream, inastream, outastream
input = av.open(args.input_file)
invstream = input.streams.video[0]
inastream = input.streams.audio[0]
frame = next(input.decode(video=0))
output = av.open(args.output_file, 'w')
outvstream = output.add_stream(args.codec, invstream.rate)
outvstream.pix_fmt = invstream.pix_fmt
outvstream.height = invstream.height
outvstream.width = invstream.width
outvstream.options = {"preset":"medium","tune":"film","crf":"22"}
outastream = output.add_stream(template=inastream)
outastream.options = {}
input.seek(args.seekstart*1000000)
alignment_dir = Path(args.alignment_dir)
alignments = alignment_dir / 'alignments.json'
with alignments.open() as f:
alignments = json.load(f)
# alignments.sort()
def main(args):
global input, output, invstream, outvstream, inastream, outastream
input = av.open(args.input_file)
invstream = input.streams.video[0]
inastream = input.streams.audio[0]
frame = next(input.decode(video=0))
output = av.open(args.output_file, 'w')
outvstream = output.add_stream(args.codec, invstream.rate)
outvstream.pix_fmt = invstream.pix_fmt
outvstream.height = invstream.height
outvstream.width = invstream.width
outvstream.options = {"preset":"medium","tune":"film","crf":"22"}
outastream = output.add_stream(template=inastream)
outastream.options = {}
input.seek(args.seekstart*1000000)
def hashVid(conn, vidUrl, url):
vidHash = ''
try:
container = av.open(vidUrl['reddit_video']['fallback_url'])
except:
deleteItem(conn, url)
print('invalid check so it was ignored')
vidHash = 'invalid'
else:
for frame in container.decode(video=0):
vidHash += str(dhash.dhash_int(frame.to_image())) + ' '
return vidHash
order += 1
n //= 1024
return '%d%sB' % (n, ('', 'k', 'M', 'G', 'T', 'P')[order])
usage = []
for round_ in xrange(args.count):
print('Round %d/%d:' % (round_ + 1, args.count))
if args.gc:
gc.collect()
usage.append(resource.getrusage(resource.RUSAGE_SELF))
fh = av.open(args.input)
vs = next(s for s in fh.streams if s.type == 'video')
fi = 0
for packet in fh.demux([vs]):
for frame in packet.decode():
if args.print_:
print(frame)
if args.to_rgb:
print(frame.to_rgb())
if args.to_image:
print(frame.to_image())
fi += 1
if fi > args.frames:
break
frame = packet = fh = vs = None
def write_vid_blk(arr, vpath):
uid = uuid4()
vname = "{}.mp4".format(uid)
fpath = os.path.join(vpath, vname)
arr = np.clip(arr, 0, 255).astype(np.uint8)
container = av.open(fpath, mode='w')
stream = container.add_stream('mpeg4', rate=30)
stream.width = arr.shape[2]
stream.height = arr.shape[1]
stream.pix_fmt = 'yuv420p'
for fm in arr:
fm = cv2.cvtColor(fm, cv2.COLOR_GRAY2RGB)
fmav = av.VideoFrame.from_ndarray(fm, format='rgb24')
for p in stream.encode(fmav):
container.mux(p)
for p in stream.encode():
container.mux(p)
container.close()
return fpath
def main():
video_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
'images', 'movie.mp4'))
print('Loading {}...'.format(video_path))
clip = av.open(video_path)
for frame in clip.decode(video=0):
print('{} ------'.format(frame.index))
img = frame.to_image()
if img.width != device.width or img.height != device.height:
# resize video to fit device
size = device.width, device.height
img = img.resize(size, PIL.Image.ANTIALIAS)
device.display(img.convert(device.mode))
import time
import av
import av.datasets
print("Decoding with default (slice) threading...")
container = av.open(av.datasets.curated('pexels/time-lapse-video-of-night-sky-857195.mp4'))
start_time = time.time()
for packet in container.demux():
print(packet)
for frame in packet.decode():
print(frame)
default_time = time.time() - start_time
container.close()
print("Decoding with auto threading...")
container = av.open(av.datasets.curated('pexels/time-lapse-video-of-night-sky-857195.mp4'))
# !!! This is the only difference.