Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def actnodedataChanged(self, nodeid, nodedata, changed_elems):
''' Update buffers when a different node is selected, or when
the data of the current node is updated. '''
self.makeCurrent()
# Shape data change
if 'SHAPE' in changed_elems:
if nodedata.polys:
contours, fills, colors = zip(*nodedata.polys.values())
# Create contour buffer with color
buf = np.concatenate(contours)
update_buffer(self.allpolysbuf, buf)
buf = np.concatenate(colors)
update_buffer(self.allpolysclrbuf, buf)
self.allpolys.set_vertex_count(len(buf) // 2)
# Create fill buffer
buf = np.concatenate(fills)
update_buffer(self.allpfillbuf, buf)
self.allpfill.set_vertex_count(len(buf) // 2)
else:
self.allpolys.set_vertex_count(0)
self.allpfill.set_vertex_count(0)
# Trail data change
if 'TRAILS' in changed_elems:
if len(nodedata.traillat0):
update_buffer(self.trailbuf, np.array(
self.win.blit(self.editwin.bmp, (self.editwin.winx, self.editwin.winy))
# Draw frames
pg.draw.rect(self.win, white, self.editwin.rect, 1)
pg.draw.rect(self.win, white, pg.Rect(1, 1, self.width - 1, self.height - 1), 1)
# Add debug line
self.fontsys.printat(self.win, 10, 2, str(bs.sim.utc.replace(microsecond=0)))
self.fontsys.printat(self.win, 10, 18, tim2txt(bs.sim.simt))
self.fontsys.printat(self.win, 10+80, 2, \
"ntraf = " + str(bs.traf.ntraf))
self.fontsys.printat(self.win, 10+160, 2, \
"Freq=" + str(int(len(self.dts) / max(0.001, sum(self.dts)))))
self.fontsys.printat(self.win, 10+240, 2, \
"#LOS = " + str(len(bs.traf.asas.lospairs_unique)))
self.fontsys.printat(self.win, 10+240, 18, \
"Total LOS = " + str(len(bs.traf.asas.lospairs_all)))
self.fontsys.printat(self.win, 10+240, 34, \
"#Con = " + str(len(bs.traf.asas.confpairs_unique)))
self.fontsys.printat(self.win, 10+240, 50, \
"Total Con = " + str(len(bs.traf.asas.confpairs_all)))
# Frame ready, flip to screen
pg.display.flip()
# If needed, take a screenshot
if self.screenshot:
pg.image.save(self.win,self.screenshotname)
self.screenshot=False
return
pg.draw.circle(self.win,green,(int(trafx[i]),int(trafy[i])),pixelrad,1)
else:
self.win.blit(self.ambacsymbol[isymb], pos)
if self.swsep and not toosmall:
pg.draw.circle(self.win,amber,(int(trafx[i]),int(trafy[i])),pixelrad,1)
# Draw last trail part
if bs.traf.trails.active:
pg.draw.line(self.win, tuple(bs.traf.trails.accolor[i]),
(ltx[i], lty[i]), (trafx[i], trafy[i]))
# Label text
label = []
if self.swlabel > 0:
label.append(bs.traf.id[i]) # Line 1 of label: id
else:
label.append(" ")
if self.swlabel > 1:
if bs.traf.alt[i]>bs.traf.translvl:
label.append("FL"+str(int(round(bs.traf.alt[i] / (100.*ft))))) # Line 2 of label: altitude
else:
label.append(str(int(round(bs.traf.alt[i] / ft)))) # Line 2 of label: altitude
else:
label.append(" ")
if self.swlabel > 2:
cas = bs.traf.cas[i] / kts
label.append(str(int(round(cas)))) # line 3 of label: speed
else:
label.append(" ")
def navdata_load_rwythresholds():
rwythresholds = dict()
curthresholds = None
zfile = ZipFile(os.path.join(settings.navdata_path, 'apt.zip'))
print("Reading apt.dat from apt.zip")
with zfile.open('apt.dat', 'r') as f:
for line in f:
elems = line.decode(encoding="ascii", errors="ignore").strip().split()
if len(elems) == 0:
continue
# 1: AIRPORT
if elems[0] == '1':
# Add airport to runway threshold database
curthresholds = dict()
rwythresholds[elems[4]] = curthresholds
continue
if elems[0] == '100':
# Only asphalt and concrete runways
def init():
# Load the palette file selected in settings
pfile = path.join(settings.gfx_path, 'palettes', settings.colour_palette)
if path.isfile(pfile):
print('Loading palette ' + settings.colour_palette)
exec(compile(open(pfile).read(), pfile, 'exec'), globals())
return True
else:
print('Palette file not found ' + pfile)
return False
def main():
"""
Start BlueSky: Create gui and simulation objects
"""
# When importerror gives different name than (pip) install needs, also advise latest version
missingmodules = {"OpenGL": "pyopengl and pyopengl-accelerate", "PyQt4": "pyqt5"}
# Catch import errors
try:
# Initialize bluesky modules
bs.init()
# Start gui if this is the main process
if bs.settings.is_gui:
from bluesky.ui import qtgl
qtgl.start()
elif bs.settings.is_sim:
bs.sim.start()
# Give info on missing module
except ImportError as error:
modulename = missingmodules.get(error.name) or error.name
print("Bluesky needs", modulename)
print("Install using e.g. pip install", modulename)
print('BlueSky normal end.')
# lat,lon type ?
if name.count(",")>0: #lat,lon or apt,rwy type
txt1,txt2 = name.split(",")
if islat(txt1):
self.lat = txt2lat(txt1)
self.lon = txt2lon(txt2)
self.name = ""
self.type ="latlon"
# runway type ? "EHAM/RW06","EHGG/RWY27"
elif name.count("/RW")>0:
try:
aptname,rwytxt = name.split("/RW")
rwyname = rwytxt.lstrip("Y").upper() # remove Y and spaces
self.lat,self.lon = bs.navdb.rwythresholds[aptname][rwyname][:2] # raises error if not found
except:
self.error = True
self.type = "rwy"
# airport?
elif bs.navdb.aptid.count(name)>0:
idx = bs.navdb.aptid.index(name.upper())
self.lat = bs.navdb.aptlat[idx]
self.lon = bs.navdb.aptlon[idx]
self.type ="apt"
# fix or navaid?
elif bs.navdb.wpid.count(name)>0:
idx = bs.navdb.getwpidx(name,reflat,reflon)
self.lat = bs.navdb.wplat[idx]
""" Implementation of BlueSky's plugin system. """
import ast
from os import path
import sys
from glob import glob
import imp
import bluesky as bs
from bluesky import settings
from bluesky.tools import varexplorer as ve
from bluesky.tools.simtime import timed_function
# Register settings defaults
settings.set_variable_defaults(plugin_path='plugins', enabled_plugins=['datafeed'])
# Dict of descriptions of plugins found for this instance of bluesky
plugin_descriptions = dict()
# Dict of loaded plugins for this instance of bluesky
active_plugins = dict()
class Plugin:
def __init__(self, fname):
fname = path.normpath(path.splitext(fname)[0].replace('\\', '/'))
self.module_path, self.module_name = path.split(fname)
self.module_imp = fname.replace('/', '.')
self.plugin_doc = ''
self.plugin_name = ''
self.plugin_type = ''
self.plugin_stack = []
try:
from PyQt5.QtCore import QObject, QEvent, QTimer, pyqtSignal, \
QCoreApplication as qapp
except ImportError:
from PyQt4.QtCore import QObject, QEvent, QTimer, pyqtSignal, \
QCoreApplication as qapp
# Local imports
from bluesky import settings
from .simevents import SimStateEventType, SimQuitEventType, BatchEventType, \
BatchEvent, StackTextEvent, StackTextEventType, SimQuitEvent, SetNodeIdType, \
SetActiveNodeType, AddNodeType
Listener.fileno = lambda self: self._listener._socket.fileno()
# Register settings defaults
settings.set_variable_defaults(max_nnodes=cpu_count())
def split_scenarios(scentime, scencmd):
start = 0
for i in range(1, len(scencmd) + 1):
if i == len(scencmd) or scencmd[i][:4] == 'SCEN':
scenname = scencmd[start].split()[1].strip()
yield (scenname, scentime[start:i], scencmd[start:i])
start = i
class MainManager(QObject):
instance = None
# Signals
nodes_changed = pyqtSignal(str, tuple, int)
activenode_changed = pyqtSignal(tuple, int)
import os
from multiprocessing import cpu_count
from threading import Thread
import sys
from subprocess import Popen
import zmq
import msgpack
# Local imports
import bluesky as bs
from .discovery import Discovery
# Register settings defaults
bs.settings.set_variable_defaults(max_nnodes=cpu_count(),
event_port=9000, stream_port=9001,
simevent_port=10000, simstream_port=10001,
enable_discovery=False)
def split_scenarios(scentime, scencmd):
''' Split the contents of a batch file into individual scenarios. '''
start = 0
for i in range(1, len(scencmd) + 1):
if i == len(scencmd) or scencmd[i][:4] == 'SCEN':
scenname = scencmd[start].split()[1].strip()
yield dict(name=scenname, scentime=scentime[start:i], scencmd=scencmd[start:i])
start = i
class Server(Thread):
''' Implementation of the BlueSky simulation server. '''