Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_resample_unwrapped() -> None:
# https://github.com/xoolive/traffic/issues/41
df = pd.DataFrame.from_records(
[
(pd.Timestamp("2019-01-01 12:00:00Z"), 345),
(pd.Timestamp("2019-01-01 12:00:30Z"), 355),
(pd.Timestamp("2019-01-01 12:01:00Z"), 5),
(pd.Timestamp("2019-01-01 12:01:30Z"), 15),
],
columns=["timestamp", "track"],
)
resampled = Flight(df).resample("1s")
assert resampled.query("50 < track < 300") is None
resampled_10 = Flight(df).resample(10)
assert len(resampled_10) == 10
"altitude": 36000,
"callsign": "WZZ1066",
"flight_id": "231619151",
"icao24": "471f52",
},
{
"timestamp": pd.Timestamp("2019-07-02 15:15:52+0000", tz="UTC"),
"longitude": 0.5097222166666667,
"latitude": 47.71388888333333,
"altitude": 36000,
"callsign": "WZZ1066",
"flight_id": "231619151",
"icao24": "471f52",
},
]
flight = Flight(pd.DataFrame.from_records(records))
assert flight.clip(eurofirs["LFBB"]) is None
.compute_xy(projection)
.assign(round_t=lambda df: df.timestamp.dt.round(round_t))
)
cumul = list()
# Multiprocessing is implemented on each timerange slot only.
# TODO: it would probably be more efficient to multiprocess over each
# t_chunk rather than multiprocess the distance computation.
for _, t_chunk in tqdm(
t_xyt.groupby("round_t"), total=len(set(t_xyt.data.round_t))
):
with ProcessPoolExecutor(max_workers=max_workers) as executor:
tasks = {
executor.submit(Flight.distance, first, second): (
first.icao24,
second.icao24,
)
for (first, second) in yield_pairs(traffic.__class__(t_chunk))
}
for future in as_completed(tasks):
result = future.result()
if result is not None:
cumul.append(result)
if len(cumul) == 0:
return None
return CPA(pd.concat(cumul, sort=False))
def allFtPointProfile(self, name: str) -> Flight:
if name not in ["ftfm", "rtfm", "ctfm"]:
raise ValueError(f"{name} must be one of ftfm, rtfm and ctfm.")
return Flight(
pd.DataFrame.from_records(
[
x.split(":")
for x in self.data[name + "AllFtPointProfile"].split()
],
columns=[
"timestamp",
"point",
"route",
"flightLevel",
"pointDistance",
"pointType",
"geoPointId",
"relDist",
"isVisible",
],
def parsePlan(self, name) -> Optional[Flight]:
assert self.reply is not None
msg = "No {} found in requested fields"
if self.reply.find(name) is None:
warnings.warn(msg.format(name))
return None
parser = ParseFields()
return Flight(
pd.DataFrame.from_records(
[
dict(elt for p in point for elt in parser.parse(p).items())
for point in self.reply.findall(name)
]
)
.rename(columns={"timeOver": "timestamp"})
.assign(
flightPlanPoint=lambda x: x.flightPlanPoint == "true",
icao24=self.aircraftAddress.lower()
if hasattr(self, "aircraftAddress")
else None,
callsign=self.aircraftId,
origin=self.aerodromeOfDeparture,
destination=self.aerodromeOfDestination,
flight_id=self.flight_id,
def _onload():
setattr(Airspace, "kepler", airspace_kepler)
setattr(Flight, "kepler", flight_kepler)
setattr(Traffic, "kepler", traffic_kepler)
setattr(KeplerGl, "add_data", map_add_data)
def on_filter(self, max_alt: int, max_time: datetime) -> None:
assert self._traffic is not None
west, east, south, north = self.map_plot.ax.get_extent(
crs=PlateCarree()
)
self._tview = self._traffic.before(max_time).sort_values("timestamp")
if self._tview is None:
return
filtered = Traffic.from_flights(
Flight(f.data.ffill().bfill()) for f in self._tview
)
if "altitude" in filtered.data.columns:
filtered = filtered.query(
f"altitude != altitude or altitude <= {max_alt}"
)
if "latitude" in self._tview.data.columns:
filtered = filtered.query(
"latitude != latitude or "
f"({west} <= longitude <= {east} and "
f"{south} <= latitude <= {north})"
)
self.identifier_select.clear()
text = self.identifier_input.text()
# cast is necessary because of the @lru_cache on callsigns which hides
# the type annotation
def get_flight(filename: str, directory: Path) -> Union[Flight, Traffic]:
flight: Union[None, Flight, Traffic] = Traffic.from_file(
directory / f"{filename}.json.gz", dtype={"icao24": str}
)
if flight is None:
raise RuntimeError(f"File {filename}.json.gz not found in {directory}")
icao24 = set(flight.data.icao24)
if len(icao24) == 1:
# easier way to cast...
flight = Flight(flight.data)
# -- Dealing with time-like features --
if "hour" in flight.data.columns:
flight = flight.assign(
hour=lambda df: pd.to_datetime(df.hour * 1e9).dt.tz_localize("utc")
)
if "last_position" in flight.data.columns:
flight = flight.assign(
last_position=lambda df: pd.to_datetime(
df.last_position * 1e6
).dt.tz_localize("utc")
)
return flight.assign(
timestamp=lambda df: df.timestamp.dt.tz_localize("utc")
)
df = pd.DataFrame.from_records(self.cumul)
self.cumul.clear()
if self._flight is not None:
df = pd.concat([self._flight.data, df], sort=False)
if self.version is not None:
# remove columns added by nuc_p, nuc_r
if "HPL" in df.columns:
df = df.drop(columns=["HPL", "RCu", "RCv"])
if "HVE" in df.columns:
df = df.drop(columns=["HVE", "VVE"])
if len(df) == 0:
return None
self._flight = Flight(
df.assign(
callsign=df.callsign.replace("", None)
.fillna(method="ffill")
.fillna(method="bfill")
)
)
return self._flight
if "last_position" in df.columns:
if df.query("last_position == last_position").shape[0] == 0:
continue
cumul.append(df)
if len(cumul) == 0:
return None
df = pd.concat(cumul, sort=True).sort_values("timestamp")
if count is True:
df = df.assign(count=lambda df: df["count"].astype(int))
if return_flight:
return Flight(df)
return Traffic(df)