Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
""" Start the mainloop (and possible other threads) """
splash.show()
bs.init(pygame=True)
bs.sim.operate()
bs.scr.init()
# Main loop for BlueSky
while not bs.sim.mode == bs.sim.end:
bs.sim.update() # Update sim
bs.scr.update() # GUI update
# Restart traffic simulation:
if bs.sim.mode == bs.sim.init:
bs.sim.reset()
bs.scr.objdel() # Delete user defined objects
bs.sim.stop()
pg.quit()
print('BlueSky normal end.')
self.redrawradbg = True
# Draw edit window
self.editwin.update()
if self.redrawradbg or redrawrad or self.editwin.redraw:
self.win.blit(self.menu.bmps[self.menu.ipage], \
(self.menu.x, self.menu.y))
self.win.blit(self.editwin.bmp, (self.editwin.winx, self.editwin.winy))
# Draw frames
pg.draw.rect(self.win, white, self.editwin.rect, 1)
pg.draw.rect(self.win, white, pg.Rect(1, 1, self.width - 1, self.height - 1), 1)
# Add debug line
self.fontsys.printat(self.win, 10, 2, str(bs.sim.utc.replace(microsecond=0)))
self.fontsys.printat(self.win, 10, 18, tim2txt(bs.sim.simt))
self.fontsys.printat(self.win, 10+80, 2, \
"ntraf = " + str(bs.traf.ntraf))
self.fontsys.printat(self.win, 10+160, 2, \
"Freq=" + str(int(len(self.dts) / max(0.001, sum(self.dts)))))
self.fontsys.printat(self.win, 10+240, 2, \
"#LOS = " + str(len(bs.traf.asas.lospairs_unique)))
self.fontsys.printat(self.win, 10+240, 18, \
"Total LOS = " + str(len(bs.traf.asas.lospairs_all)))
self.fontsys.printat(self.win, 10+240, 34, \
"#Con = " + str(len(bs.traf.asas.confpairs_unique)))
self.fontsys.printat(self.win, 10+240, 50, \
"Total Con = " + str(len(bs.traf.asas.confpairs_all)))
# Frame ready, flip to screen
def send_siminfo(self):
t = time.time()
dt = np.maximum(t - self.prevtime, 0.00001) # avoid divide by 0
speed = (self.samplecount - self.prevcount) / dt * bs.sim.simdt
bs.net.send_stream(b'SIMINFO', (speed, bs.sim.simdt, bs.sim.simt,
str(bs.sim.utc.replace(microsecond=0)), bs.traf.ntraf, bs.sim.state, stack.get_scenname()))
self.prevtime = t
self.prevcount = self.samplecount
self.keyb.update()
# Navdisp mode: get center:
if self.swnavdisp:
i = bs.traf.id2idx(self.ndacid)
if i >= 0:
self.ndlat = bs.traf.lat[i]
self.ndlon = bs.traf.lon[i]
self.ndcrs = bs.traf.hdg[i]
else:
self.swnavdisp = False
else:
self.ndcrs = 0.0
# Simulation: keep track of timestep
# For measuring game loop frequency
self.dts.append(bs.sim.simdt)
if len(self.dts) > 20:
del self.dts[0]
# Radar window
# --------------Background--------------
if self.redrawradbg or self.swnavdisp:
if self.swnavdisp or not self.swsat:
self.radbmp.fill(darkgrey)
else:
#--------------Satellite image--------------
navsel = (self.lat0, self.lat1, \
self.lon0, self.lon1)
if not self.satsel == navsel:
# Map cutting and scaling: normal case
self.conflog.log(self.confinside_all)
# Register distance values upon entry of experiment area
newentries = np.logical_not(self.insexp) * insexp
self.dstart2D[newentries] = self.distance2D[newentries]
self.dstart3D[newentries] = self.distance3D[newentries]
self.workstart[newentries] = self.work[newentries]
self.entrytime[newentries] = sim.simt
# Log flight statistics when exiting experiment area
exits = self.insexp * np.logical_not(insexp)
if np.any(exits):
self.flst.log(
np.array(traf.id)[exits],
self.create_time[exits],
sim.simt - self.entrytime[exits],
self.dstart2D[exits] - self.distance2D[exits],
self.dstart3D[exits] - self.distance3D[exits],
self.workstart[exits] - self.work[exits],
traf.lat[exits],
traf.lon[exits],
traf.alt[exits],
traf.tas[exits],
traf.vs[exits],
traf.hdg[exits],
traf.asas.active[exits],
traf.pilot.alt[exits],
traf.pilot.tas[exits],
traf.pilot.vs[exits],
traf.pilot.hdg[exits])
# delete all aicraft in self.delidx
def log(self, *additional_vars):
if self.file and bs.sim.simt >= self.tlog:
# Set the next log timestep
self.tlog += self.dt
# Make the variable reference list
varlist = [bs.sim.simt]
varlist += [v.get() for v in self.selvars]
varlist += additional_vars
# Get the number of rows from the first array/list
nrows = 1
for v in varlist:
if isinstance(v, (list, np.ndarray)):
nrows = len(v)
break
if nrows == 0:
return
# Convert (numeric) arrays to text, leave text arrays untouched
txtdata = [
txtcol for col in varlist for txtcol in col2txt(col, nrows)]
# log the data to file
def plot(self):
# Pause simulation
bs.sim.hold()
# Open a plot window attached to a command?
# plot, showplot and other matplotlib commands
# Continue simulation
bs.sim.op()
return
def ic(filename=""):
""" Function implementing the IC stack command. """
global scenname
# reset sim always
bs.sim.reset()
# Get the filename of new scenario
if not filename:
filename = bs.scr.show_file_dialog()
# Clean up filename
filename = filename.strip()
# Reset sim and open new scenario file
if filename:
try:
for (cmdtime, cmd) in readscn(filename):
scentime.append(cmdtime)
scencmd.append(cmd)
scenname, _ = os.path.splitext(os.path.basename(filename))
def update():
data = dict(
lat=traf.lat,
lon=traf.lon,
alt=traf.alt
)
sim.send_event(b'MLSTATEREPLY', data, myclientrte)
sim.hold()
def setspeedforRTA(self, idx, torta, xtorta):
#debug print("setspeedforRTA called, torta,xtorta =",torta,xtorta/nm)
# Calculate required CAS to meet RTA
# for aircraft nr. idx (scalar)
if torta < -90. : # -999 signals there is no RTA defined in remainder of route
return False
deltime = torta-bs.sim.simt # Remaining time to next RTA [s] in simtime
if deltime>0: # Still possible?
trafax = abs(bs.traf.perf.acceleration()[idx])
gsrta = calcvrta(bs.traf.gs[idx], xtorta, deltime, trafax)
# Subtract tail wind speed vector
tailwind = (bs.traf.windnorth[idx]*bs.traf.gsnorth[idx] + bs.traf.windeast[idx]*bs.traf.gseast[idx]) / \
bs.traf.gs[idx]*bs.traf.gs[idx]
# Convert to CAS
rtacas = tas2cas(gsrta-tailwind,bs.traf.alt[idx])
# Performance limits on speed will be applied in traf.update
if bs.traf.actwp.spdcon[idx]<0. and bs.traf.swvnavspd[idx]:
bs.traf.actwp.spd[idx] = rtacas
#print("setspeedforRTA: xtorta =",xtorta)