Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, *args, **kwargs):
super(cls, self).__init__(*args, **kwargs)
namespace['__init__'] = __init__
cls = super().__new__(mcls, name, bases, namespace)
cls._subclasses = set()
cls._properties = OrderedDict()
# Update subclass lists, and update properties (in reverse order)
for bcls in reversed(cls.mro()[1:]):
if type(bcls) is mcls:
bcls._subclasses.add(cls)
cls._properties.update(bcls._properties)
cls._properties.update(
(key, value) for key, value in namespace.items()
if isinstance(value, Property))
for name in list(cls._properties):
# Remove items which are no longer properties
if name in namespace and not isinstance(namespace[name], Property):
del cls._properties[name]
continue
# Optional arguments must follow mandatory
if cls._properties[name].default is not Property.empty:
cls._properties.move_to_end(name)
if sys.version_info <= (3, 6): # pragma: no cover
for name, property_ in cls._properties.items():
property_.__set_name__(cls, name)
cls._validate_init()
cls._generate_signature()
# -*- coding: utf-8 -*-
import uuid
from ..base import Property
from .multihypothesis import MultipleHypothesis
from .state import State, StateMutableSequence
from .update import Update
class Track(StateMutableSequence):
"""Track type
A :class:`~.StateMutableSequence` representing a track.
"""
states = Property(
[State],
default=None,
doc="The initial states of the track. Default `None` which initialises"
"with empty list.")
id = Property(
str,
default=None,
doc="The unique track ID")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialise metadata
self._metadata = {}
for state in self.states:
self._update_metadata_from_state(state)
Generate track predictions at detection times and calculate probabilities
for all prediction-detection pairs for single prediction and multiple
detections.
"""
predictor = Property(
Predictor,
doc="Predict tracks to detection times")
updater = Property(
Updater,
doc="Updater used to get measurement prediction")
clutter_spatial_density = Property(
float,
doc="Spatial density of clutter - tied to probability of false "
"detection")
prob_detect = Property(
Probability,
default=Probability(0.85),
doc="Target Detection Probability")
prob_gate = Property(
Probability,
default=Probability(0.95),
doc="Gate Probability - prob. gate contains true measurement "
"if detected")
def hypothesise(self, track, detections, timestamp):
r"""Evaluate and return all track association hypotheses.
For a given track and a set of N detections, return a
MultipleHypothesis with N+1 detections (first detection is
a 'MissedDetection'), each with an associated probability.
Probabilities are assumed to be exhaustive (sum to 1) and mutually
where :math:`i` the inclination (radian), :math:`\Omega` is the
longitude of the ascending node (radian), :math:`e` is the orbital
eccentricity (unitless), :math:`\omega` the argument of perigee
(radian), :math:`M_0` the mean anomaly (radian) and :math:`n` the
mean motion (radian/[time])
For sampling, the :attr:`transition_noise` parameter,
:math:`\epsilon`, should be used to draw from
:math:`\mathcal{N}(M_{t_1},\epsilon)`
TODO: test the efficiency of this method
"""
transition_noise = Property(
float, default=0.0, doc=r"Transition noise :math:`\epsilon`")
def matrix(self):
pass
def ndim_state(self):
"""The transition operates on the 6-dimensional orbital state vector
Returns
-------
: int
6
"""
return 6
def function(self, orbital_state, noise=0,
If, at a specific time step, the :class:`~.State` of one of the
:class:`~.tracks` is assessed as close to more than one track then an
:class:`~.Association` object will be return for all possible association
combinations
"""
association_threshold = Property(
float, default=10,
doc="Threshold distance measure which states must be within for an "
"association to be recorded.Default is 10")
consec_pairs_confirm = Property(
int, default=3,
doc="Number of consecutive time instances which track pairs are "
"required to be within a specified threshold in order for an "
"association to be formed. Default is 3")
consec_misses_end = Property(
int, default=2,
doc="Number of consecutive time instances which track pairs are "
"required to exceed a specified threshold in order for an "
"association to be ended. Default is 2")
measurement_model_track1 = Property(
MeasurementModel,
doc="Measurement model which specifies which elements within the "
"track state are to be used to calculate distance over")
measurement_model_track2 = Property(
MeasurementModel,
doc="Measurement model which specifies which elements within the "
"track state are to be used to calculate distance over")
def associate_tracks(self, tracks_set_1, tracks_set_2):
"""Associate two sets of tracks together.
:math:`X_{t_{0}}` at epoch :attr:`State.timestamp` :math:`t_0` and
by way of keywords, via:
coordinates = "Cartesian" (the orbital state vector),
coordinates = "Keplerian" (Keplarian elements),
coordinates = "TLE" (Two-Line elements) or
coordinates = "Equinoctial" (equinoctial elements).
The gravitational parameter :math:`GM` can be defined. If left
undefined it defaults to that of the Earth, :math:`3.986004418
(\pm 0.000000008) \\times 10^{14} \mathrm{m}^3 \mathrm{s}^{−2}`.
:reference: Curtis, H.D. 2010, Orbital Mechanics for Engineering
Students (3rd Ed), Elsevier Aerospace Engineering Series
"""
coordinates = Property(
strg, default="cartesian",
doc="The parameterisation used on initiation. Acceptable values "
"are 'Cartesian', 'Keplerian', 'TLE', or 'Equinoctial'. All"
"other inputs will return errors"
)
grav_parameter = Property(
float, default=3.986004418e14,
doc=r"Standard gravitational parameter :math:`\mu = G M` in units of "
r":math:`\mathrm{m}^3 \mathrm{s}^{-2}`")
covar = Property(
CovarianceMatrix, default=None,
doc="The covariance matrix. Care should be exercised in that its coordinate"
"frame isn't defined, and output will be highly dependant on which"
"parameterisation is chosen."
CSV file must have headers, as these are used to determine which fields
to use to generate the detection.
Parameters
----------
"""
state_vector_fields = Property(
[str], doc='List of columns names to be used in state vector')
time_field = Property(
str, doc='Name of column to be used as time field')
time_field_format = Property(
str, default=None, doc='Optional datetime format')
timestamp = Property(
bool, default=False, doc='Treat time field as a timestamp from epoch')
metadata_fields = Property(
[str], default=None, doc='List of columns to be saved as metadata, '
'default all')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._detections = set()
@property
def detections(self):
return self._detections.copy()
def detections_gen(self):
with self.path.open(encoding=self.encoding, newline='') as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
if self.time_field_format is not None:
Gaussian State object with an associated weight. Used as components
for a GaussianMixtureState.
"""
weight = Property(float, default=0, doc="Weight of the Gaussian State.")
class ParticleState(Type):
"""Particle State type
This is a particle state object which describes the state as a
distribution of particles"""
timestamp = Property(datetime.datetime, default=None,
doc="Timestamp of the state. Default None.")
particles = Property([Particle],
doc='List of particles representing state')
@property
def mean(self):
"""The state mean, equivalent to state vector"""
result = np.average([p.state_vector for p in self.particles], axis=0,
weights=[p.weight for p in self.particles])
# Convert type as may have type of weights
return result.astype(np.float, copy=False)
@property
def state_vector(self):
"""The mean value of the particle states"""
return self.mean
@property
# -*- coding: utf-8 -*-
import bisect
import datetime
from warnings import warn
from ..base import Property
from .base import Feeder
class TimeBufferedFeeder(Feeder):
"""Buffer detections so they can be yielded in time order.
Any "old" detections (where the time is earlier than the head of the
buffer) shall be dropped, producing a :class:`UserWarning`.
"""
buffer_size = Property(int, default=1000, doc="Max size of buffer")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._detections = set()
@property
def detections(self):
return self._detections
def detections_gen(self):
detections_iter = iter(self.detector.detections_gen())
time_detections_buffer = [next(detections_iter)]
for time_detections in detections_iter:
# Drop "old" detections
if len(time_detections_buffer) >= self.buffer_size and \