Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, *args, **kwargs):
super(cls, self).__init__(*args, **kwargs)
namespace['__init__'] = __init__
cls = super().__new__(mcls, name, bases, namespace)
cls._subclasses = set()
cls._properties = OrderedDict()
# Update subclass lists, and update properties (in reverse order)
for bcls in reversed(cls.mro()[1:]):
if type(bcls) is mcls:
bcls._subclasses.add(cls)
cls._properties.update(bcls._properties)
cls._properties.update(
(key, value) for key, value in namespace.items()
if isinstance(value, Property))
for name in list(cls._properties):
# Remove items which are no longer properties
if name in namespace and not isinstance(namespace[name], Property):
del cls._properties[name]
continue
# Optional arguments must follow mandatory
if cls._properties[name].default is not Property.empty:
cls._properties.move_to_end(name)
if sys.version_info <= (3, 6): # pragma: no cover
for name, property_ in cls._properties.items():
property_.__set_name__(cls, name)
cls._validate_init()
cls._generate_signature()
def declarative_class(app, what, name, obj, options, lines):
"""Add declared properties to Parameters list for numpydoc"""
if what == "class" and issubclass(obj, Base):
param_index = _headings("Parameters", lines)
attr_index = _headings("Attributes", lines)
for name, property_ in obj.properties.items():
is_sequence = isinstance(property_.cls, Sequence)
if is_sequence:
cls = property_.cls[0]
else:
cls = property_.cls
class_name = "{}.{}".format(
cls.__module__, cls.__name__)
# To shorten names for builtins and also stonesoup components
tild = class_name.split(".")[0] in ("stonesoup", "builtins")
# To add optional if default value is defined.
is_optional = property_.default is not property_.empty
doc_type = "{}:class:`{}{}`{}".format(
is_sequence and "sequence of " or "",
# -*- coding: utf-8 -*-
import uuid
from ..base import Property
from .multihypothesis import MultipleHypothesis
from .state import State, StateMutableSequence
from .update import Update
class Track(StateMutableSequence):
"""Track type
A :class:`~.StateMutableSequence` representing a track.
"""
states = Property(
[State],
default=None,
doc="The initial states of the track. Default `None` which initialises"
"with empty list.")
id = Property(
str,
default=None,
doc="The unique track ID")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialise metadata
self._metadata = {}
for state in self.states:
self._update_metadata_from_state(state)
Generate track predictions at detection times and calculate probabilities
for all prediction-detection pairs for single prediction and multiple
detections.
"""
predictor = Property(
Predictor,
doc="Predict tracks to detection times")
updater = Property(
Updater,
doc="Updater used to get measurement prediction")
clutter_spatial_density = Property(
float,
doc="Spatial density of clutter - tied to probability of false "
"detection")
prob_detect = Property(
Probability,
default=Probability(0.85),
doc="Target Detection Probability")
prob_gate = Property(
Probability,
default=Probability(0.95),
doc="Gate Probability - prob. gate contains true measurement "
"if detected")
def hypothesise(self, track, detections, timestamp):
r"""Evaluate and return all track association hypotheses.
For a given track and a set of N detections, return a
MultipleHypothesis with N+1 detections (first detection is
a 'MissedDetection'), each with an associated probability.
Probabilities are assumed to be exhaustive (sum to 1) and mutually
where :math:`i` the inclination (radian), :math:`\Omega` is the
longitude of the ascending node (radian), :math:`e` is the orbital
eccentricity (unitless), :math:`\omega` the argument of perigee
(radian), :math:`M_0` the mean anomaly (radian) and :math:`n` the
mean motion (radian/[time])
For sampling, the :attr:`transition_noise` parameter,
:math:`\epsilon`, should be used to draw from
:math:`\mathcal{N}(M_{t_1},\epsilon)`
TODO: test the efficiency of this method
"""
transition_noise = Property(
float, default=0.0, doc=r"Transition noise :math:`\epsilon`")
def matrix(self):
pass
def ndim_state(self):
"""The transition operates on the 6-dimensional orbital state vector
Returns
-------
: int
6
"""
return 6
def function(self, orbital_state, noise=0,
If, at a specific time step, the :class:`~.State` of one of the
:class:`~.tracks` is assessed as close to more than one track then an
:class:`~.Association` object will be return for all possible association
combinations
"""
association_threshold = Property(
float, default=10,
doc="Threshold distance measure which states must be within for an "
"association to be recorded.Default is 10")
consec_pairs_confirm = Property(
int, default=3,
doc="Number of consecutive time instances which track pairs are "
"required to be within a specified threshold in order for an "
"association to be formed. Default is 3")
consec_misses_end = Property(
int, default=2,
doc="Number of consecutive time instances which track pairs are "
"required to exceed a specified threshold in order for an "
"association to be ended. Default is 2")
measurement_model_track1 = Property(
MeasurementModel,
doc="Measurement model which specifies which elements within the "
"track state are to be used to calculate distance over")
measurement_model_track2 = Property(
MeasurementModel,
doc="Measurement model which specifies which elements within the "
"track state are to be used to calculate distance over")
def associate_tracks(self, tracks_set_1, tracks_set_2):
"""Associate two sets of tracks together.
class LinearGaussian(MeasurementModel, LinearModel, GaussianModel):
r"""This is a class implementation of a time-invariant 1D
Linear-Gaussian Measurement Model.
The model is described by the following equations:
.. math::
y_t = H_k*x_t + v_k,\ \ \ \ v(k)\sim \mathcal{N}(0,R)
where ``H_k`` is a (:py:attr:`~ndim_meas`, :py:attr:`~ndim_state`) \
matrix and ``v_k`` is Gaussian distributed.
"""
noise_covar = Property(CovarianceMatrix, doc="Noise covariance")
@property
def ndim_meas(self):
"""ndim_meas getter method
Returns
-------
:class:`int`
The number of measurement dimensions
"""
return len(self.mapping)
def matrix(self, **kwargs):
"""Model matrix :math:`H(t)`
:math:`X_{t_{0}}` at epoch :attr:`State.timestamp` :math:`t_0` and
by way of keywords, via:
coordinates = "Cartesian" (the orbital state vector),
coordinates = "Keplerian" (Keplarian elements),
coordinates = "TLE" (Two-Line elements) or
coordinates = "Equinoctial" (equinoctial elements).
The gravitational parameter :math:`GM` can be defined. If left
undefined it defaults to that of the Earth, :math:`3.986004418
(\pm 0.000000008) \\times 10^{14} \mathrm{m}^3 \mathrm{s}^{−2}`.
:reference: Curtis, H.D. 2010, Orbital Mechanics for Engineering
Students (3rd Ed), Elsevier Aerospace Engineering Series
"""
coordinates = Property(
strg, default="cartesian",
doc="The parameterisation used on initiation. Acceptable values "
"are 'Cartesian', 'Keplerian', 'TLE', or 'Equinoctial'. All"
"other inputs will return errors"
)
grav_parameter = Property(
float, default=3.986004418e14,
doc=r"Standard gravitational parameter :math:`\mu = G M` in units of "
r":math:`\mathrm{m}^3 \mathrm{s}^{-2}`")
covar = Property(
CovarianceMatrix, default=None,
doc="The covariance matrix. Care should be exercised in that its coordinate"
"frame isn't defined, and output will be highly dependant on which"
"parameterisation is chosen."
CSV file must have headers, as these are used to determine which fields
to use to generate the detection.
Parameters
----------
"""
state_vector_fields = Property(
[str], doc='List of columns names to be used in state vector')
time_field = Property(
str, doc='Name of column to be used as time field')
time_field_format = Property(
str, default=None, doc='Optional datetime format')
timestamp = Property(
bool, default=False, doc='Treat time field as a timestamp from epoch')
metadata_fields = Property(
[str], default=None, doc='List of columns to be saved as metadata, '
'default all')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._detections = set()
@property
def detections(self):
return self._detections.copy()
def detections_gen(self):
with self.path.open(encoding=self.encoding, newline='') as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
if self.time_field_format is not None:
Gaussian State object with an associated weight. Used as components
for a GaussianMixtureState.
"""
weight = Property(float, default=0, doc="Weight of the Gaussian State.")
class ParticleState(Type):
"""Particle State type
This is a particle state object which describes the state as a
distribution of particles"""
timestamp = Property(datetime.datetime, default=None,
doc="Timestamp of the state. Default None.")
particles = Property([Particle],
doc='List of particles representing state')
@property
def mean(self):
"""The state mean, equivalent to state vector"""
result = np.average([p.state_vector for p in self.particles], axis=0,
weights=[p.weight for p in self.particles])
# Convert type as may have type of weights
return result.astype(np.float, copy=False)
@property
def state_vector(self):
"""The mean value of the particle states"""
return self.mean
@property