Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
- as a a string, `by` defines the minimum time range without
data for a flight.
"""
if isinstance(by, pd.DataFrame):
for i, (_, line) in enumerate(by.iterrows()):
if nb_flights is None or i < nb_flights:
flight = self[line]
if flight is not None:
yield flight
return
if "flight_id" in self.data.columns:
for i, (_, df) in enumerate(self.data.groupby("flight_id")):
if nb_flights is None or i < nb_flights:
yield Flight(df)
else:
for i, (_, df) in enumerate(
self.data.groupby(["icao24", "callsign"])
):
if nb_flights is None or i < nb_flights:
yield from Flight(df).split(
by if by is not None else "10 minutes"
)
if nb_flights is None or i < nb_flights:
flight = self[line]
if flight is not None:
yield flight
return
if "flight_id" in self.data.columns:
for i, (_, df) in enumerate(self.data.groupby("flight_id")):
if nb_flights is None or i < nb_flights:
yield Flight(df)
else:
for i, (_, df) in enumerate(
self.data.groupby(["icao24", "callsign"])
):
if nb_flights is None or i < nb_flights:
yield from Flight(df).split(
by if by is not None else "10 minutes"
)
def _repr_svg_(self):
# even 25m should be enough to limit the size of resulting notebooks!
if self.shape is None:
return None
if len(self.shape.coords) < 1000:
return super()._repr_svg_()
return super(
Flight,
# cast should be useless but return type of simplify() is Union
cast(Flight, self.simplify(25)),
)._repr_svg_()
p_icao24 = hasattr(index, "icao24")
if p_callsign or p_icao24:
query = []
if p_callsign:
query.append(f"callsign == '{index.callsign}'")
if p_icao24:
query.append(f"icao24 == '{index.icao24}'")
df = self.data.query(
query[0] if len(query) == 1 else " and ".join(query)
)
if df.shape[0] == 0:
return None
flight: Optional[Flight] = Flight(df)
if flight is not None and hasattr(index, "firstSeen"):
# refers to OpenSky REST API
flight = flight.after(index.firstSeen)
if flight is not None and hasattr(index, "lastSeen"):
# refers to OpenSky REST API
flight = flight.before(index.lastSeen)
if flight is not None and hasattr(index, "start"): # more natural
flight = flight.after(index.start)
if flight is not None and hasattr(index, "stop"): # more natural
flight = flight.before(index.stop)
return flight
return None
def wrapper(
f: Callable[..., "Traffic"]
) -> Callable[..., Union["Traffic", LazyTraffic]]:
# Check parameters passed (esp. filter_if) are not lambda because those
# are not serializable therefore **silently** fail when multiprocessed.
msg = """
{method}(lambda f: ...) will *silently* fail when evaluated on several cores.
It should be safe to create a proper named function and pass it to filter_if.
"""
def is_lambda(f) -> bool:
return isinstance(f, types.LambdaType) and f.__name__ == ""
# Check the decorated method is implemented by A
if not hasattr(Flight, f.__name__):
raise TypeError(f"Class Flight does not provide {f.__name__}")
def lazy_λf(lazy: LazyTraffic, *args, **kwargs) -> LazyTraffic:
op_idx = LazyLambda(f.__name__, idx_name, *args, **kwargs)
if any(is_lambda(arg) for arg in args):
logging.warning(msg.format(method=f.__name__))
if any(is_lambda(arg) for arg in kwargs.values()):
logging.warning(msg.format(method=f.__name__))
return LazyTraffic(
lazy.wrapped_t,
lazy.stacked_ops + [op_idx],
lazy.iterate_kw,
lazy.tqdm_kw,
)
def faulty_flight(exc=None) -> Dict[str, Any]:
if exc is None:
exc = sys.last_traceback
tb = exc.tb_next.tb_next
while tb is not None:
loc = tb.tb_frame.f_locals
if any(isinstance(x, Flight) for x in loc.values()):
return loc
tb = tb.tb_next
return dict()
def _repr_svg_(self):
# even 25m should be enough to limit the size of resulting notebooks!
if self.shape is None:
return None
if len(self.shape.coords) < 1000:
return super()._repr_svg_()
return super(
Flight,
# cast should be useless but return type of simplify() is Union
cast(Flight, self.simplify(25)),
)._repr_svg_()
# Take the method in Flight and create a LazyCollection
def λf(wrapped_t: "Traffic", *args, **kwargs) -> LazyTraffic:
op_idx = LazyLambda(f.__name__, idx_name, *args, **kwargs)
if any(is_lambda(arg) for arg in args):
logging.warning(msg.format(method=f.__name__))
if any(is_lambda(arg) for arg in kwargs.values()):
logging.warning(msg.format(method=f.__name__))
return LazyTraffic(wrapped_t, [op_idx])
if f.__doc__ is not None:
λf.__doc__ = f.__doc__
else:
λf.__doc__ = getattr(Flight, f.__name__).__doc__
λf.__annotations__ = dict( # make a copy!!
getattr(Flight, f.__name__).__annotations__
)
λf.__annotations__["return"] = LazyTraffic
if λf.__doc__ is not None:
λf.__doc__ += """\n .. warning::
This method will be stacked for lazy evaluation. """
return λf
.query('altitude > 10000')
.compute_wind()
.plot_wind(ax, alpha=.5)
)
"""
if "projection" in ax.__dict__ and "transform" not in kwargs:
kwargs["transform"] = PlateCarree()
if any(w not in self.data.columns for w in ["wind_u", "wind_v"]):
raise RuntimeError(
"No wind data in trajectory. Consider Flight.compute_wind()"
)
copy_self: Optional[Flight] = self
if filtered:
copy_self = self.filter(roll=17)
if copy_self is None:
return []
copy_self = copy_self.query("roll.abs() < .5")
if copy_self is None:
return []
copy_self = copy_self.filter(wind_u=17, wind_v=17)
if copy_self is None:
return []
if resolution is not None:
if isinstance(resolution, (int, str)):