Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# -*- coding: utf-8 -*-
from ..base import Property
from .groundtruth import GroundTruthPath
from .state import State, GaussianState, StateVector
from ..models.measurement import MeasurementModel
class Detection(State):
"""Detection type"""
measurement_model = Property(MeasurementModel, default=None,
doc="The measurement model used to generate the detection\
(the default is ``None``)")
metadata = Property(dict, default=None,
doc='Dictionary of metadata items for Detections.')
def __init__(self, state_vector, *args, **kwargs):
super().__init__(state_vector, *args, **kwargs)
if self.metadata is None:
self.metadata = {}
class GaussianDetection(Detection, GaussianState):
"""GaussianDetection type"""
# -*- coding: utf-8 -*-
from abc import abstractmethod
from ..base import Base, Property
from ..models.measurement import MeasurementModel
class Sensor(Base):
"""Sensor base class
A sensor object that operates according to a given
:class:`~.MeasurementModel`.
"""
measurement_model = Property(
MeasurementModel, default=None, doc="Measurement model")
@abstractmethod
def gen_measurement(**kwargs):
"""Generate a measurement"""
raise NotImplementedError
p=self.model_probs[self.index])
return self.transition_models[self.index]
class SimpleDetectionSimulator(DetectionSimulator):
"""A simple detection simulator.
Parameters
----------
groundtruth : GroundTruthReader
Source of ground truth tracks used to generate detections for.
measurement_model : MeasurementModel
Measurement model used in generating detections.
"""
groundtruth = Property(GroundTruthReader)
measurement_model = Property(MeasurementModel)
meas_range = Property(np.ndarray)
detection_probability = Property(Probability, default=0.9)
clutter_rate = Property(float, default=2.0)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.real_detections = set()
self.clutter_detections = set()
self.index = 0
@property
def clutter_spatial_density(self):
"""returns the clutter spatial density of the measurement space - num
clutter detections per unit volume per timestep"""
return self.clutter_rate/np.prod(np.diff(self.meas_range))
association_threshold = Property(
float, default=10,
doc="Threshold distance measure which states must be within for an "
"association to be recorded.Default is 10")
consec_pairs_confirm = Property(
int, default=3,
doc="Number of consecutive time instances which track pairs are "
"required to be within a specified threshold in order for an "
"association to be formed. Default is 3")
consec_misses_end = Property(
int, default=2,
doc="Number of consecutive time instances which track pairs are "
"required to exceed a specified threshold in order for an "
"association to be ended. Default is 2")
measurement_model_track1 = Property(
MeasurementModel,
doc="Measurement model which specifies which elements within the "
"track state are to be used to calculate distance over")
measurement_model_track2 = Property(
MeasurementModel,
doc="Measurement model which specifies which elements within the "
"track state are to be used to calculate distance over")
def associate_tracks(self, tracks_set_1, tracks_set_2):
"""Associate two sets of tracks together.
Parameters
----------
tracks_set_1 : list of :class:`~.Track` objects
Tracks to associate to track set 2
tracks_set_2 : list of :class:`~.Track` objects
Tracks to associate to track set 1
"""Initiator that maps measurement space to state space
Works for both linear and non-linear co-ordinate input
This initiator utilises the :class:`~.MeasurementModel` matrix to convert
:class:`~.Detection` state vector and model covariance into state space.
Utilises the ReversibleModel inverse function to convert
non-linear spherical co-ordinates into Cartesian x/y co-ordinates
for use in predictions and mapping.
This then replaces mapped values in the :attr:`prior_state` to form the
initial :class:`~.GaussianState` of the :class:`~.Track`.
"""
prior_state = Property(GaussianState, doc="Prior state information")
measurement_model = Property(MeasurementModel, doc="Measurement model")
def initiate(self, detections, **kwargs):
tracks = set()
for detection in detections:
if detection.measurement_model is not None:
measurement_model = detection.measurement_model
else:
measurement_model = self.measurement_model
if isinstance(measurement_model, NonLinearModel):
if isinstance(measurement_model, ReversibleModel):
state_vector = measurement_model.inverse_function(
detection.state_vector)
model_matrix = measurement_model.jacobian(state_vector)
inv_model_matrix = np.linalg.pinv(model_matrix)
consec_pairs_confirm = Property(
int, default=3,
doc="Number of consecutive time instances which track pairs are "
"required to be within a specified threshold in order for an "
"association to be formed. Default is 3")
consec_misses_end = Property(
int, default=2,
doc="Number of consecutive time instances which track pairs are "
"required to exceed a specified threshold in order for an "
"association to be ended. Default is 2")
measurement_model_track1 = Property(
MeasurementModel,
doc="Measurement model which specifies which elements within the "
"track state are to be used to calculate distance over")
measurement_model_track2 = Property(
MeasurementModel,
doc="Measurement model which specifies which elements within the "
"track state are to be used to calculate distance over")
def associate_tracks(self, tracks_set_1, tracks_set_2):
"""Associate two sets of tracks together.
Parameters
----------
tracks_set_1 : list of :class:`~.Track` objects
Tracks to associate to track set 2
tracks_set_2 : list of :class:`~.Track` objects
Tracks to associate to track set 1
Returns
-------
AssociationSet
# -*- coding: utf-8 -*-
from abc import abstractmethod
from ..base import Base, Property
from ..models.measurement import MeasurementModel
class Updater(Base):
"""Updater base class
An updater is used to update the state, utilising a
:class:`~.MeasurementModel`.
"""
measurement_model = Property(MeasurementModel, doc="measurement model")
@abstractmethod
def predict_measurement(
self, state_prediction, measurement_model=None, **kwargs):
"""Get measurement prediction from state prediction
Parameters
----------
state_prediction : :class:`~.StatePrediction`
The state prediction
measurement_model: :class:`~.MeasurementModel`, optional
The measurement model used to generate the measurement prediction.
Should be used in cases where the measurement model is dependent
on the received measurement. The default is `None`, in which case
the updater will use the measurement model specified on
initialisation
Computes the Generalized Optimal SubPattern Assignment (GOPSA) metric
for two sets of :class:`~.Track` objects. This implementation of GOSPA
is based on the auction algorithm.
The GOPSA metric is calculated at each time step in which a
:class:`~.Track` object is present
Reference:
[1] A. S. Rahmathullah, A. F. García-Fernández, L. Svensson,
Generalized optimal sub-pattern assignment metric, 2016,
[online] Available: http://arxiv.org/abs/1601.05585.
"""
p = Property(float, doc="1<=p
The GOPSA metric is calculated at each time step in which a
:class:`~.Track` object is present
Reference:
[1] A. S. Rahmathullah, A. F. García-Fernández, L. Svensson,
Generalized optimal sub-pattern assignment metric, 2016,
[online] Available: http://arxiv.org/abs/1601.05585.
"""
p = Property(float, doc="1<=p