Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
response.raise_for_status()
data = response.json()
self._detections = set()
for state in data['states']:
if state[8]: # On ground
continue
# Must have position (lon, lat, geo-alt)
if not all(state[index] for index in (5, 6, 13)):
continue
timestamp = datetime.datetime.utcfromtimestamp(state[3])
# Skip old detections
if time is not None and timestamp <= time:
continue
self._detections.add(Detection(
StateVector([[state[5]], [state[6]], [state[13]]]),
timestamp=timestamp,
metadata={
'icao24': state[0],
'callsign': state[1],
'orign_country': state[2],
'sensors': state[12],
'squawk': state[14],
'spi': state[15],
'source': self.sources[state[16]],
}
))
time = datetime.datetime.utcfromtimestamp(data['time'])
yield time, self.detections
while time + self.timestep > datetime.datetime.utcnow():
easting, northing, zone_num, northern = utm.from_latlon(
*detection.state_vector[self.mapping[::-1], 0],
self.zone_number)
if self.zone_number is None:
self.zone_number = zone_num
if self.northern is None:
self.northern = northern >= 'N'
elif (self.northern and northern < 'N') or (
not self.northern and northern >= 'N'):
warnings.warn("Detection cannot be converted to UTM zone")
continue
state_vector = detection.state_vector.copy()
state_vector[self.mapping, 0] = easting, northing
utm_detections.add(
Detection(
state_vector,
timestamp=detection.timestamp,
measurement_model=detection.measurement_model,
metadata=detection.metadata))
yield time, utm_detections
else:
time_field_value = parse(row[self.time_field])
if self.metadata_fields is None:
local_metadata = dict(row)
copy_local_metadata = dict(local_metadata)
for (key, value) in copy_local_metadata.items():
if (key == self.time_field) or\
(key in self.state_vector_fields):
del local_metadata[key]
else:
local_metadata = {field: row[field]
for field in self.metadata_fields
if field in row}
detect = Detection(np.array(
[[row[col_name]] for col_name in self.state_vector_fields],
dtype=np.float32), time_field_value,
metadata=local_metadata)
self._detections = {detect}
yield time_field_value, self.detections
(the default is ``None``)")
metadata = Property(dict, default=None,
doc='Dictionary of metadata items for Detections.')
def __init__(self, state_vector, *args, **kwargs):
super().__init__(state_vector, *args, **kwargs)
if self.metadata is None:
self.metadata = {}
class GaussianDetection(Detection, GaussianState):
"""GaussianDetection type"""
class Clutter(Detection):
"""Clutter type for detections classed as clutter
This is same as :class:`~.Detection`, but can be used to identify clutter
for metrics and analysis purposes.
"""
class TrueDetection(Detection):
"""TrueDetection type for detections that come from ground truth
This is same as :class:`~.Detection`, but can be used to identify true
detections for metrics and analysis purposes.
"""
groundtruth_path = Property(
GroundTruthPath,
"""
class TrueDetection(Detection):
"""TrueDetection type for detections that come from ground truth
This is same as :class:`~.Detection`, but can be used to identify true
detections for metrics and analysis purposes.
"""
groundtruth_path = Property(
GroundTruthPath,
doc="Ground truth path that this detection came from")
class MissedDetection(Detection):
"""Detection type for a missed detection
This is same as :class:`~.Detection`, but it is used in
MultipleHypothesis to indicate the null hypothesis (no
detections are associated with the specified track).
"""
state_vector = Property(
StateVector, default=None,
doc="State vector. Default `None`.")
def __init__(self, state_vector=None, *args, **kwargs):
super().__init__(state_vector, *args, **kwargs)
def __bool__(self):
return False
list of dict{"measurement": Detection, "weight": float or int}
"""
prediction = Property(
Prediction,
doc="Predicted track state")
measurement_prediction = Property(
MeasurementPrediction,
default=None,
doc="Optional track prediction in measurement space")
weighted_measurements = Property(
list,
default=list(),
doc="Weighted measurements used for hypothesis and updating")
selected_measurement = Property(
Detection,
default=None,
doc="The measurement that was selected to associate with a track.")
@property
def measurement(self):
return self.get_selected_measurement()
def add_weighted_detections(self, measurements, weights, normalize=False):
# verify that 'measurements' and 'weights' are the same size and the
# correct data types
if any(not (isinstance(measurement, Detection))
for measurement in measurements):
raise Exception('measurements must all be of type Detection!')
if any(not (isinstance(weight, float) or isinstance(weight, int))
for weight in weights):
ground_truth : :class:`~.State`
A ground-truth state
Returns
-------
:class:`~.Detection`
A measurement generated from the given state. The timestamp of the\
measurement is set equal to that of the provided state.
"""
measurement_vector = self.measurement_model.function(
ground_truth.state_vector, noise=noise, **kwargs)
model_copy = copy.copy(self.measurement_model)
return Detection(measurement_vector,
measurement_model=model_copy,
timestamp=ground_truth.timestamp)
self.metadata = {}
class GaussianDetection(Detection, GaussianState):
"""GaussianDetection type"""
class Clutter(Detection):
"""Clutter type for detections classed as clutter
This is same as :class:`~.Detection`, but can be used to identify clutter
for metrics and analysis purposes.
"""
class TrueDetection(Detection):
"""TrueDetection type for detections that come from ground truth
This is same as :class:`~.Detection`, but can be used to identify true
detections for metrics and analysis purposes.
"""
groundtruth_path = Property(
GroundTruthPath,
doc="Ground truth path that this detection came from")
class MissedDetection(Detection):
"""Detection type for a missed detection
This is same as :class:`~.Detection`, but it is used in
MultipleHypothesis to indicate the null hypothesis (no
# extract latitude and longitude values from JSON record
lat_value = float(record['LATITUDE'])/600000
lon_value = float(record['LONGITUDE'])/600000
# extract timestamp values from JSON record
time_value = datetime.utcfromtimestamp(float(
record['TIME']))
# delete lat, lon, timestamp from JSON record;
# the rest is metadata
del record['LATITUDE']
del record['LONGITUDE']
del record['TIME']
# form Detection object from JSON record
detect = Detection(
np.array([[lon_value], [lat_value]],
dtype=np.float32),
time_value, metadata=record)
yield time_value, {detect}