Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
stream.width = width
stream.height = (height or
width * img.shape[0] // img.shape[1])
if bitrate is not None:
stream.bit_rate = int(bitrate)
elif quality is not None and codec == str('wmv2'):
bitrate = quality * _estimate_bitrate([stream.height,
stream.width],
export_rate)
stream.bit_rate = int(bitrate)
# Ensure correct memory layout
img = img.astype(img.dtype, order='C', copy=False)
frame = av.VideoFrame.from_ndarray(img, format=str('rgb24'))
packet = stream.encode(frame)
if packet is not None:
output.mux(packet)
# Finish encoding the stream
while True:
try:
packet = stream.encode()
except av.AVError: # End of file raises AVError since after av 0.4
break
if packet is None:
break
output.mux(packet)
output.close()
def test_ndarray_rgb(self):
array = numpy.random.randint(0, 256, size=(480, 640, 3), dtype=numpy.uint8)
for format in ['rgb24', 'bgr24']:
frame = VideoFrame.from_ndarray(array, format=format)
self.assertEqual(frame.width, 640)
self.assertEqual(frame.height, 480)
self.assertEqual(frame.format.name, format)
self.assertTrue((frame.to_ndarray() == array).all())
def test_ndarray_rgba_align(self):
array = numpy.random.randint(0, 256, size=(238, 318, 4), dtype=numpy.uint8)
for format in ['argb', 'rgba', 'abgr', 'bgra']:
frame = VideoFrame.from_ndarray(array, format=format)
self.assertEqual(frame.width, 318)
self.assertEqual(frame.height, 238)
self.assertEqual(frame.format.name, format)
self.assertTrue((frame.to_ndarray() == array).all())
def test_ndarray_yuyv422_align(self):
array = numpy.random.randint(0, 256, size=(238, 318, 2), dtype=numpy.uint8)
frame = VideoFrame.from_ndarray(array, format='yuyv422')
self.assertEqual(frame.width, 318)
self.assertEqual(frame.height, 238)
self.assertEqual(frame.format.name, 'yuyv422')
self.assertTrue((frame.to_ndarray() == array).all())
def test_ndarray_rgba(self):
array = numpy.random.randint(0, 256, size=(480, 640, 4), dtype=numpy.uint8)
for format in ['argb', 'rgba', 'abgr', 'bgra']:
frame = VideoFrame.from_ndarray(array, format=format)
self.assertEqual(frame.width, 640)
self.assertEqual(frame.height, 480)
self.assertEqual(frame.format.name, format)
self.assertTrue((frame.to_ndarray() == array).all())
def _stop(self):
for thread in self.threads:
thread.stop()
thread.wait()
#test in any pending data in streams
for i, (name, input) in enumerate(self.inputs.items()):
ev = input.poll(timeout=0.2)
if ev>0:
pos, data = input.recv(return_data=True)
# TODO take format from stream params need change WebCamAV
frame = av.VideoFrame.from_ndarray(data, format='rgb24')
packet = self.av_streams[i].encode(frame)
if packet is not None:
self.av_containers[i].mux(packet)
# flush stream = encode empty frame until empty packet
for i, av_stream in enumerate(self.av_streams):
for packet in av_stream.encode():
self.av_containers[i].mux(packet)
# Close files
for i, av_container in enumerate(self.av_containers):
av_container.close()
def process_data(self, pos, data):
if self._start_index is None:
self._start_index = int(pos - 1)
print('_start_index video', self._start_index)
self.recv_start_index.emit(self.name, self._start_index)
frame = av.VideoFrame.from_ndarray(data, format='rgb24')
for packet in self.av_stream.encode(frame):
self.av_container.mux(packet)
async def recv(self):
pts, time_base = await self.next_timestamp()
# rotate image
rows, cols, _ = self.img.shape
M = cv2.getRotationMatrix2D((cols / 2, rows / 2), int(pts * time_base * 45), 1)
img = cv2.warpAffine(self.img, M, (cols, rows))
# create video frame
frame = VideoFrame.from_ndarray(img, format="bgr24")
frame.pts = pts
frame.time_base = time_base
return frame
output = av.open(args.output[0], 'w')
stream = output.add_stream(args.codec, args.rate)
stream.bit_rate = args.bitrate
stream.pix_fmt = args.format
for i, path in enumerate(args.inputs):
print(os.path.basename(path))
img = cv2.imread(path)
if not i:
stream.height = args.height or (args.width * img.shape[0] / img.shape[1]) or img.shape[0]
stream.width = args.width or img.shape[1]
frame = av.VideoFrame.from_ndarray(img, format='bgr24')
packet = stream.encode(frame)
output.mux(packet)
output.close()
async def recv(self):
self.data_bgr = await self.camera_device.get_latest_frame()
frame = VideoFrame.from_ndarray(self.data_bgr, format='bgr24')
pts, time_base = await self.next_timestamp()
frame.pts = pts
frame.time_base = time_base
return frame