Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_error_attributes(self):
try:
av.open('does not exist')
except AVError as e:
self.assertEqual(e.errno, 2)
if is_windows:
self.assertTrue(e.strerror in ['Error number -2 occurred',
'No such file or directory'])
else:
self.assertEqual(e.strerror, 'No such file or directory')
self.assertEqual(e.filename, 'does not exist')
else:
self.fail('no exception raised')
def test_audio_buffer_sink(self):
graph = Graph()
audio_buffer = graph.add_abuffer(
format='fltp',
sample_rate=48000,
layout='5.0(side)',
time_base=Fraction(1, 48000)
)
audio_buffer.link_to(graph.add('abuffersink'))
graph.configure()
try:
graph.pull()
except AVError as ex:
# we haven't pushed any input so expect no frames / EAGAIN
if ex.errno != errno.EAGAIN:
raise ex
def seek_to_frame(self, seek_pos):
###frame accurate seeking
try:
self.video_stream.seek(self.idx_to_pts(seek_pos),mode='time')
except av.AVError as e:
raise FileSeekError()
else:
self.next_frame = self._next_frame()
self.display_time = 0
self.target_frame_idx = seek_pos
# prop = AudioPropertiesTuple(channels=a_stream.channels,
# sample_rate=a_stream.sample_rate,
# bits_per_sample=audio_format.bits,
# bytes_per_sample=audio_format.bytes)
bytespersample = a_stream.channels * audio_format.bytes
outputbytes = bytearray(b'')
# frame_generator = container.decode(audio=0)
demuxer = container.demux(a_stream)
while True:
try:
packet = next(demuxer)
except av.AVError as exc:
decode_errors.append(str(exc))
print('error demuxing', exc)
continue
except StopIteration:
break
try:
frames = packet.decode()
except av.AVError as exc:
decode_errors.append(str(exc))
print('error decoding', exc)
import pdb
pdb.set_trace()
continue
for frame in frames:
frame.pts = None
def _try_patch_world_instrinsics_file(rec_dir: str, videos: T.Sequence[Path]) -> None:
"""Tries to create a reasonable world.intrinsics file from a set of videos."""
if not videos:
return
# Make sure the default value always correlates to the frame size of BrokenStream
frame_size = (1280, 720)
# TODO: Due to the naming conventions for multipart-recordings, we can't
# easily lookup 'any' video name in the pre_recorded_calibrations, since it
# might be a multipart recording. Therefore we need to compute a hint here
# for the lookup. This could be improved.
camera_hint = ""
for video in videos:
try:
container = av.open(str(video))
except av.AVError:
continue
for camera in cm.pre_recorded_calibrations:
if camera in video.name:
camera_hint = camera
break
frame_size = (
container.streams.video[0].format.width,
container.streams.video[0].format.height,
)
break
intrinsics = cm.load_intrinsics(rec_dir, camera_hint, frame_size)
intrinsics.save(rec_dir, "world")
for i, packet in enumerate(input_container.demux()):
# print("packet", i)
for f in packet.decode():
# submit the frame for processing
graph.push(f)
# pull frames from graph until graph has done processing or is waiting for a new input
while True:
try:
out_frame = graph.pull()
out_frame.pts = None
for p in output_stream.encode(out_frame):
output_container.mux(p)
except av.AVError as ex:
if ex.errno != 11:
raise ex
else:
break
def main():
drone = tellopy.Tello()
try:
drone.connect()
drone.wait_for_connection(60.0)
retry = 3
container = None
while container is None and 0 < retry:
retry -= 1
try:
container = av.open(drone.get_video_stream())
except av.AVError as ave:
print(ave)
print('retry...')
# skip first 300 frames
frame_skip = 300
while True:
for frame in container.decode(video=0):
if 0 < frame_skip:
frame_skip = frame_skip - 1
continue
start_time = time.time()
image = cv2.cvtColor(numpy.array(frame.to_image()), cv2.COLOR_RGB2BGR)
cv2.imshow('Original', image)
cv2.imshow('Canny', cv2.Canny(image, 100, 200))
cv2.waitKey(1)
if frame.time_base < 1.0/60:
def generate():
try:
face_cascade = cv2.CascadeClassifier(
"haarcascade_frontalface_default.xml")
retry = 3
container = None
while container is None and 0 < retry:
retry -= 1
try:
container = av.open(self.drone.get_video_stream())
except av.AVError as ave:
print(ave)
print('retry...')
frame_skip = 300
while True:
for frame in container.decode(video=0):
if 0 < frame_skip:
frame_skip = frame_skip - 1
continue
start_time = time.time()
image = numpy.array(frame.to_image())
color = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if self.face_detect:
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
faces = face_cascade.detectMultiScale(
gray,
old_ts_idx = (
np.arange(0, len(old_ts) * in_frame_size, in_frame_size)
* out_frame_rate
/ in_frame_rate
)
new_ts_idx = np.arange(0, out_frame_num * out_frame_size, out_frame_size)
interpolate = interp1d(
old_ts_idx, old_ts, bounds_error=False, fill_value="extrapolate"
)
new_ts = interpolate(new_ts_idx)
# raise RuntimeError
np.save(audio_ts_loc, new_ts)
_update_info_version_to("v0.9.13", rec_dir)
except av.AVError as averr:
# Try to catch `libav.aac : Input contains (near) NaN/+-Inf` errors
# Unfortunately, the above error is only logged not raised. Instead
# `averr`, an `Invalid Argument` error with error number 22, is raised.
if retry_on_averror and averr.errno == 22:
# unfortunately
logger.error("Encountered AVError. Retrying to update recording.")
out_container.close()
# Only retry once:
update_recording_v094_to_v0913(rec_dir, retry_on_averror=False)
else:
raise # re-raise exception
def decode(self, encoded_frame: JitterFrame) -> List[Frame]:
try:
packet = av.Packet(encoded_frame.data)
packet.pts = encoded_frame.timestamp
packet.time_base = VIDEO_TIME_BASE
frames = self.codec.decode(packet)
except av.AVError as e:
logger.warning("failed to decode, skipping package: " + str(e))
return []
return frames