Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def on_completed(_):
self.status = "successfully completed"
self._localization_storage.save_pldata_to_disk()
logger.info("camera localization completed")
self.on_localization_ended()
def on_canceled_or_killed():
self._localization_storage.save_pldata_to_disk()
logger.info("camera localization canceled")
self.on_localization_ended()
self._task = self._create_task()
self._task.add_observer("on_yield", on_yield)
self._task.add_observer("on_completed", on_completed)
self._task.add_observer("on_canceled_or_killed", on_canceled_or_killed)
self._task.add_observer("on_exception", tasklib.raise_exception)
self._task.add_observer("on_started", self.on_localization_started)
logger.info("Start camera localization")
self.status = "0% completed"
def validate_gaze_mapper(self, gaze_mapper):
def validation_completed(accuracy_and_precision):
accuracy, precision = accuracy_and_precision
gaze_mapper.accuracy_result = "{:.1f}° from {} / {} samples".format(
accuracy.result, accuracy.num_used, accuracy.num_total
)
gaze_mapper.precision_result = "{:.1f}° from {} / {} samples".format(
precision.result, precision.num_used, precision.num_total
)
task = worker.validate_gaze.create_bg_task(
gaze_mapper, self._reference_location_storage
)
task.add_observer("on_completed", validation_completed)
task.add_observer("on_exception", tasklib.raise_exception)
self._task_manager.add_task(task)
def calculate(self, calibration):
def on_calibration_completed(status_and_result):
calibration.status, calibration.result = status_and_result
self._calibration_storage.save_to_disk()
self.on_calibration_computed(calibration)
task = worker.create_calibration.create_task(
calibration, all_reference_locations=self._reference_location_storage
)
task.add_observer("on_completed", on_calibration_completed)
task.add_observer("on_exception", tasklib.raise_exception)
self._task_manager.add_task(task)
return task
def on_completed(_):
self._detection_storage.save_pldata_to_disk()
logger.info("marker detection completed")
self.on_detection_ended()
def on_canceled_or_killed():
self._detection_storage.save_pldata_to_disk()
logger.info("marker detection canceled")
self.on_detection_ended()
self._task = self._create_task()
self._task.add_observer("on_yield", on_yield)
self._task.add_observer("on_completed", on_completed)
self._task.add_observer("on_canceled_or_killed", on_canceled_or_killed)
self._task.add_observer("on_exception", tasklib.raise_exception)
self._task.add_observer("on_started", self.on_detection_started)
logger.info("Start marker detection")
def start_detection(self):
def on_detection_yields(detection):
self._reference_location_storage.add(detection)
def on_detection_completed(_):
self._reference_location_storage.save_to_disk()
self._detection_task = worker.detect_circle_markers.CircleMarkerDetectionTask()
self._detection_task.add_observer("on_exception", tasklib.raise_exception)
self._detection_task.add_observer(
"on_started", lambda: self.on_detection_started(self._detection_task)
)
self._detection_task.add_observer("on_yield", on_detection_yields)
self._detection_task.add_observer("on_completed", on_detection_completed)
self._task_manager.add_task(self._detection_task)
return self._detection_task
reason = (
"not enough markers with the defined origin marker id "
"were collected"
)
else:
reason = "not enough markers were collected"
self.status = "failed: " + reason
logger.info("markers 3d model optimization '{}' ".format(self.status))
self._optimization_storage.save_plmodel_to_disk()
self._task = self._create_task()
self._task.add_observer("on_yield", on_yield)
self._task.add_observer("on_completed", on_completed)
self._task.add_observer("on_exception", tasklib.raise_exception)
self._task.add_observer("on_started", self.on_optimization_started)
logger.info("Start markers 3d model optimization")
self.status = "0% completed"