Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
) -> pd.DataFrame:
so6 = SO6.from_file(input_file.as_posix())
if so6 is None:
raise RuntimeError
total: List[Dict[str, int]] = []
if starting_from is None:
start_time = so6.data.time1.min()
else:
start_time = max(to_datetime(starting_from), so6.data.time1.min())
if ending_at is None:
end_time = so6.data.time2.max()
else:
end_time = min(to_datetime(ending_at), so6.data.time2.max())
if end_time < start_time:
msg = f"End time {end_time} is anterior to start time {start_time}"
raise ValueError(msg)
# First clip
so6 = so6.between(start_time, end_time)
delta = timedelta(minutes=interval)
size_range = int((end_time - start_time) / delta) + 1
time_list = [start_time + i * delta for i in range(size_range)]
all_sectors = [nm_airspaces[airspace] for airspace in sector_list]
so6 = so6.inside_bbox(
cascaded_union([s.flatten() for s in all_sectors if s is not None])
)
def at_airport(self, before: timelike, after: timelike,
airport: Airport) -> Optional[pd.DataFrame]:
before = to_datetime(before)
after = to_datetime(after)
before_hour = round_time(before, how='before')
after_hour = round_time(after, how='after')
request = self.airport_request.format(
before_time=before.timestamp(),
after_time=after.timestamp(),
before_hour=before_hour.timestamp(),
after_hour=after_hour.timestamp(),
airport_latmax=airport.lat + 0.1,
airport_latmin=airport.lat - 0.1,
airport_lonmax=airport.lon + 0.1,
airport_lonmin=airport.lon - 0.1,)
df = self._impala(request)
def between(
self, start: timelike, stop: time_or_delta, strict: bool = True
) -> "Flight":
"""
WARNING: strict: bool = True is not taken into account yet.
"""
start = to_datetime(start)
if isinstance(stop, timedelta):
stop = start + stop
else:
stop = to_datetime(stop)
t: np.ndarray = np.stack(list(self.timestamp))
index = np.where((start < t) & (t < stop))
new_data: np.ndarray = np.stack(list(self.coords))[index]
time1: List[datetime] = [start, *t[index]]
time2: List[datetime] = [*t[index], stop]
if start > t[0]:
new_data = np.vstack([self.at(start), new_data])
else:
time1, time2 = time1[1:], time2[1:]
functions are provided:
.. code:: python
>>> so6['HOP36PP'].at("2018/01/01 18:40")
longitude 1.733156
latitude 44.388586
altitude 26638.857143
dtype: float64
"""
if time is None:
raise NotImplementedError()
time = to_datetime(time)
timearray: np.ndarray[datetime] = np.array([time.timestamp()])
res = self.interpolate(timearray)
return Position(
pd.Series(res[0], index=["longitude", "latitude", "altitude"])
)
def between(
self, start: timelike, stop: time_or_delta, strict: bool = True
) -> "Flight":
"""
WARNING: strict: bool = True is not taken into account yet.
"""
start = to_datetime(start)
if isinstance(stop, timedelta):
stop = start + stop
else:
stop = to_datetime(stop)
t: np.ndarray = np.stack(list(self.timestamp))
index = np.where((start < t) & (t < stop))
new_data: np.ndarray = np.stack(list(self.coords))[index]
time1: List[datetime] = [start, *t[index]]
time2: List[datetime] = [*t[index], stop]
if start > t[0]:
new_data = np.vstack([self.at(start), new_data])
else:
time1, time2 = time1[1:], time2[1:]
if stop < t[-1]:
new_data = np.vstack([new_data, self.at(stop)])
else:
time1, time2 = time1[:-1], time2[:-1]
**Useful options for debug**
- **count** (boolean, default: False): add a column stating how
many sensors received each record;
- **nautical_units** (boolean, default: True): convert data stored
in Impala to standard nautical units (ft, ft/min, knots).
- **cached** (boolean, default: True): switch to False to force a
new request to the database regardless of the cached files;
delete previous cache files;
- **limit** (optional, int): maximum number of records requested
LIMIT keyword in SQL.
"""
start = to_datetime(start)
if stop is not None:
stop = to_datetime(stop)
else:
stop = start + timedelta(days=1)
# default obvious parameter
where_clause = "where"
if progressbar is True:
if stop - start > date_delta:
progressbar = tqdm
else:
progressbar = iter
if progressbar is False:
progressbar = iter
**See also:**
- `flight_list
<#traffic.data.eurocontrol.b2b.NMB2B.flight_list>`_ which
returns more comprehensive information about flights.
"""
if start is not None:
start = to_datetime(start)
else:
start = datetime.now(timezone.utc)
if stop is not None:
stop = to_datetime(stop)
else:
stop = start + timedelta(hours=1)
data = REQUESTS["FlightPlanListRequest"].format(
send_time=datetime.now(timezone.utc),
aircraftId=callsign if callsign is not None else "*",
origin=origin if origin is not None else "*",
destination=destination if destination is not None else "*",
start=start,
stop=stop,
)
rep = self.post(data) # type: ignore
return FlightPlanList.fromET(rep.reply.find("data"))
"""Returns the position in the trajectory at a given timestamp.
- ``time`` can be passed as a string, an epoch, a Python datetime, or
a Pandas timestamp.
- If no time is passed (default), the last know position is returned.
- If no position is available at the given timestamp, None is returned.
If you expect a position at any price, consider `Flight.resample
<#traffic.core.Flight.resample>`_
"""
if time is None:
return Position(self.data.ffill().iloc[-1])
index = to_datetime(time)
df = self.data.set_index("timestamp")
if index not in df.index:
id_ = getattr(self, "flight_id", self.callsign)
logging.warning(f"No index {index} for flight {id_}")
return None
return Position(df.loc[index])
columns = "mintime, maxtime, rawmsg, msgcount, icao24, hour"
if other_columns is not None:
if isinstance(other_columns, str):
columns += f", {other_columns}"
else:
columns += ", " + ", ".join(other_columns)
parse_columns = columns
# default obvious parameter
where_clause = "where"
airports_params = [airport, departure_airport, arrival_airport]
count_airports_params = sum(x is not None for x in airports_params)
start = to_datetime(start)
if table_name not in self._raw_tables:
raise RuntimeError(f"{table_name} is not a valid table name")
if stop is not None:
stop = to_datetime(stop)
else:
stop = start + timedelta(days=1)
if progressbar is True:
if stop - start > date_delta:
progressbar = tqdm
else:
progressbar = iter
if progressbar is False:
def between(self, start: timelike, stop: time_or_delta) -> "SO6":
"""Selects all segments of the SO6 dataframe with intervals intersecting
[``start``, ``stop``].
The ``stop`` argument may be also be written as a
``datetime.timedelta``.
"""
start = to_datetime(start)
if isinstance(stop, timedelta):
stop = start + stop
else:
stop = to_datetime(stop)
return SO6(
self.data[(self.data.time1 <= stop) & (self.data.time2 >= start)]
)