Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
plot_column = QVBoxLayout()
plot_column.addWidget(self.plot_tabs)
self.interact_column = QVBoxLayout()
self.open_options = ["Open file", "dump1090"]
if "decoders" in config:
self.open_options += list(config["decoders"])
self.open_dropdown = QComboBox()
for option in self.open_options:
self.open_dropdown.addItem(option)
self.interact_column.addWidget(self.open_dropdown)
self.projections = ["EuroPP", "Lambert93", "Mercator"]
self.projection_dropdown = QComboBox()
more_projs = config.get("projections", "extra", fallback="")
if more_projs != "":
proj_list = more_projs.split(";")
self.projections += list(x.strip() for x in proj_list)
for proj in self.projections:
self.projection_dropdown.addItem(proj)
self.interact_column.addWidget(self.projection_dropdown)
button_grid = QGridLayout()
self.extent_button = QPushButton("Extent")
button_grid.addWidget(self.extent_button, 0, 0)
self.plot_button = QPushButton("Plot")
button_grid.addWidget(self.plot_button, 0, 1)
self.airport_button = QPushButton("Airport")
button_grid.addWidget(self.airport_button, 1, 0)
self.reset_button = QPushButton("Reset")
args = parser.parse_args(args)
logger = logging.getLogger()
if args.verbose == 1:
logger.setLevel(logging.INFO)
elif args.verbose >= 2:
logger.setLevel(logging.DEBUG)
from traffic import config
from traffic.data import ModeS_Decoder
now = datetime.now(timezone.utc)
filename = Path(now.strftime(args.output.as_posix()))
try:
address = config.get("decoders", args.reference)
host_port, reference = address.split("/")
host, port = host_port.split(":")
decoder = ModeS_Decoder.from_address(
host, int(port), reference, filename.with_suffix(".csv").as_posix()
)
except Exception:
logging.info("fallback to dump1090")
decoder = ModeS_Decoder.from_dump1090(
args.reference, filename.with_suffix(".csv").as_posix()
)
def signal_handler(sig, frame):
logging.info("Interruption signal caught")
t = decoder.traffic
if t is not None:
pkl_file = filename.with_suffix(".pkl")
def on_open(self, index: int, *args, **kwargs) -> None:
if self.decoder is not None and self.updateTraffic is not None:
self.updateTraffic.terminate()
self.decoder.stop()
if index == 0:
self.openFile()
elif index == 1:
self.openDump1090()
else:
address = config.get("decoders", self.open_options[index])
host_port, reference = address.split("/")
host, port = host_port.split(":")
self.decoder = ModeS_Decoder.from_address(
host, int(port), reference
)
refresh_time = config.getint(
"decoders", "refresh_time", fallback=30
)
self.updateTraffic = UpdateTraffic(self, refresh_time)
self.updateTraffic.start()
)
session = Session()
if len(proxy_values) > 0:
session.proxies.update(proxy_values)
session.trust_env = False
carto_session.proxies.update(proxy_values)
carto_session.trust_env = False
pkcs12_filename = config.get("nmb2b", "pkcs12_filename", fallback="")
pkcs12_password = config.get("nmb2b", "pkcs12_password", fallback="")
nmb2b_mode = config.get("nmb2b", "mode", fallback="PREOPS")
if nmb2b_mode not in ["OPS", "PREOPS"]:
raise RuntimeError("mode must be one of OPS or PREOPS")
nmb2b_version = config.get("nmb2b", "version", fallback="23.0.0")
if sys.version_info < (3, 7, 0):
aircraft = Aircraft()
airports = Airports()
airways = Airways()
navaids = Navaids()
runways = Runways()
aixm_airspaces = AIXMAirspaceParser(config_file)
nm_airspaces = NMAirspaceParser(config_file)
nm_navaids = NMNavaids.from_file(nm_path_str)
nm_airways = NMRoutes()
opensky = OpenSky(
opensky_username,
"aixm_airspaces",
"nm_airspaces",
"nm_airways",
"nm_navaids",
"eurofirs",
"opensky",
]
Aircraft.cache_dir = cache_dir
Airports.cache_dir = cache_dir
Airways.cache_dir = cache_dir
Navaids.cache_dir = cache_dir
Runways.cache_dir = cache_dir
AIXMAirspaceParser.cache_dir = cache_dir
aixm_path_str = config.get("global", "aixm_path", fallback="")
if aixm_path_str != "": # coverage: ignore
AIXMAirspaceParser.aixm_path = Path(aixm_path_str)
nm_path_str = config.get("global", "nm_path", fallback="")
if nm_path_str != "": # coverage: ignore
NMAirspaceParser.nm_path = Path(nm_path_str)
NMRoutes.nm_path = Path(nm_path_str)
# -- Part to be deprecated --
# vvvvvvvvvvvvvvvvvvvvvvvvvvv
opensky_username = config.get("global", "opensky_username", fallback="")
opensky_password = config.get("global", "opensky_password", fallback="")
if opensky_password != "" and opensky_username != "": # coverage: ignore
warnings.warn(
Runways.cache_dir = cache_dir
AIXMAirspaceParser.cache_dir = cache_dir
aixm_path_str = config.get("global", "aixm_path", fallback="")
if aixm_path_str != "": # coverage: ignore
AIXMAirspaceParser.aixm_path = Path(aixm_path_str)
nm_path_str = config.get("global", "nm_path", fallback="")
if nm_path_str != "": # coverage: ignore
NMAirspaceParser.nm_path = Path(nm_path_str)
NMRoutes.nm_path = Path(nm_path_str)
# -- Part to be deprecated --
# vvvvvvvvvvvvvvvvvvvvvvvvvvv
opensky_username = config.get("global", "opensky_username", fallback="")
opensky_password = config.get("global", "opensky_password", fallback="")
if opensky_password != "" and opensky_username != "": # coverage: ignore
warnings.warn(
"""Please edit your configuration file:
# Old style, will soon no longer be supported
[global]
opensky_username =
opensky_password =
# New style
[opensky]
username =
password =
opensky_password =
# New style
[opensky]
username =
password =
""",
DeprecationWarning,
)
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^
opensky_username = config.get("opensky", "username", fallback=opensky_username)
opensky_password = config.get("opensky", "password", fallback=opensky_password)
# We keep "" for forcing to no proxy
http_proxy = config.get("network", "http.proxy", fallback="<>")
https_proxy = config.get("network", "https.proxy", fallback="<>")
paramiko_proxy = config.get("network", "ssh.proxycommand", fallback="")
proxy_values = dict(
(key, value)
for key, value in [("http", http_proxy), ("https", https_proxy)]
if value != "<>"
)
session = Session()
if len(proxy_values) > 0:
session.proxies.update(proxy_values)
(key, value)
for key, value in [("http", http_proxy), ("https", https_proxy)]
if value != "<>"
)
session = Session()
if len(proxy_values) > 0:
session.proxies.update(proxy_values)
session.trust_env = False
carto_session.proxies.update(proxy_values)
carto_session.trust_env = False
pkcs12_filename = config.get("nmb2b", "pkcs12_filename", fallback="")
pkcs12_password = config.get("nmb2b", "pkcs12_password", fallback="")
nmb2b_mode = config.get("nmb2b", "mode", fallback="PREOPS")
if nmb2b_mode not in ["OPS", "PREOPS"]:
raise RuntimeError("mode must be one of OPS or PREOPS")
nmb2b_version = config.get("nmb2b", "version", fallback="23.0.0")
if sys.version_info < (3, 7, 0):
aircraft = Aircraft()
airports = Airports()
airways = Airways()
navaids = Navaids()
runways = Runways()
aixm_airspaces = AIXMAirspaceParser(config_file)
nm_airspaces = NMAirspaceParser(config_file)
nm_navaids = NMNavaids.from_file(nm_path_str)
nm_airways = NMRoutes()