Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def actnodedataChanged(self, nodeid, nodedata, changed_elems):
''' Update buffers when a different node is selected, or when
the data of the current node is updated. '''
self.makeCurrent()
# Shape data change
if 'SHAPE' in changed_elems:
if nodedata.polys:
contours, fills, colors = zip(*nodedata.polys.values())
# Create contour buffer with color
buf = np.concatenate(contours)
update_buffer(self.allpolysbuf, buf)
buf = np.concatenate(colors)
update_buffer(self.allpolysclrbuf, buf)
self.allpolys.set_vertex_count(len(buf) // 2)
# Create fill buffer
buf = np.concatenate(fills)
update_buffer(self.allpfillbuf, buf)
self.allpfill.set_vertex_count(len(buf) // 2)
else:
self.allpolys.set_vertex_count(0)
self.allpfill.set_vertex_count(0)
# Trail data change
if 'TRAILS' in changed_elems:
if len(nodedata.traillat0):
update_buffer(self.trailbuf, np.array(
self.route_acid = data.acid
if data.acid != "" and len(data.wplat) > 0:
nsegments = len(data.wplat)
data.iactwp = min(max(0, data.iactwp), nsegments - 1)
self.routelbl.n_instances = nsegments
self.route.set_vertex_count(2 * nsegments)
routedata = np.empty(4 * nsegments, dtype=np.float32)
routedata[0:4] = [data.aclat, data.aclon,
data.wplat[data.iactwp], data.wplon[data.iactwp]]
routedata[4::4] = data.wplat[:-1]
routedata[5::4] = data.wplon[:-1]
routedata[6::4] = data.wplat[1:]
routedata[7::4] = data.wplon[1:]
update_buffer(self.routebuf, routedata)
update_buffer(self.routewplatbuf, np.array(data.wplat, dtype=np.float32))
update_buffer(self.routewplonbuf, np.array(data.wplon, dtype=np.float32))
wpname = ''
for wp, alt, spd in zip(data.wpname, data.wpalt, data.wpspd):
if alt < 0. and spd < 0.:
txt = wp[:12].ljust(24) # No second line
else:
txt = wp[:12].ljust(12) # Two lines
if alt < 0:
txt += "-----/"
elif alt > actdata.translvl:
FL = int(round((alt / (100. * ft))))
txt += "FL%03d/" % FL
else:
txt += "%05d/" % int(round(alt / ft))
data.vs = data.vs[idx]
self.naircraft = len(data.lat)
actdata.translvl = data.translvl
self.asas_vmin = data.vmin
self.asas_vmax = data.vmax
if self.naircraft == 0:
self.cpalines.set_vertex_count(0)
else:
# Update data in GPU buffers
update_buffer(self.aclatbuf, np.array(data.lat[:MAX_NAIRCRAFT], dtype=np.float32))
update_buffer(self.aclonbuf, np.array(data.lon[:MAX_NAIRCRAFT], dtype=np.float32))
update_buffer(self.achdgbuf, np.array(data.trk[:MAX_NAIRCRAFT], dtype=np.float32))
update_buffer(self.acaltbuf, np.array(data.alt[:MAX_NAIRCRAFT], dtype=np.float32))
update_buffer(self.actasbuf, np.array(data.tas[:MAX_NAIRCRAFT], dtype=np.float32))
update_buffer(self.asasnbuf, np.array(data.asasn[:MAX_NAIRCRAFT], dtype=np.float32))
update_buffer(self.asasebuf, np.array(data.asase[:MAX_NAIRCRAFT], dtype=np.float32))
# CPA lines to indicate conflicts
ncpalines = np.count_nonzero(data.inconf)
cpalines = np.zeros(4 * ncpalines, dtype=np.float32)
self.cpalines.set_vertex_count(2 * ncpalines)
# Labels and colors
rawlabel = ''
color = np.empty((min(self.naircraft, MAX_NAIRCRAFT), 4), dtype=np.uint8)
selssd = np.zeros(self.naircraft, dtype=np.uint8)
confidx = 0
zdata = zip(data.id, data.ingroup, data.inconf, data.tcpamax, data.trk, data.gs,
data.cas, data.vs, data.alt, data.lat, data.lon)
for i, (acid, ingroup, inconf, tcpa, trk, gs, cas, vs, alt, lat, lon) in enumerate(zdata):
# Get custom color if available, else default
rgb = palette.aircraft
if ingroup:
for groupmask, groupcolor in actdata.custgrclr.items():
if ingroup & groupmask:
rgb = groupcolor
break
rgb = actdata.custacclr.get(acid, rgb)
color[i, :] = tuple(rgb) + (255,)
# Check if aircraft is selected to show SSD
if actdata.ssd_all or acid in actdata.ssd_ownship:
selssd[i] = 255
if len(actdata.ssd_ownship) > 0 or actdata.ssd_conflicts or actdata.ssd_all:
update_buffer(self.ssd.selssdbuf, selssd[:MAX_NAIRCRAFT])
update_buffer(self.confcpabuf, cpalines[:MAX_NCONFLICTS * 4])
update_buffer(self.accolorbuf, color)
update_buffer(self.aclblbuf, np.array(rawlabel.encode('utf8'), dtype=np.string_))
# If there is a visible route, update the start position
if self.route_acid != "":
if self.route_acid in data.id:
idx = data.id.index(self.route_acid)
update_buffer(self.routebuf,
np.array([data.lat[idx], data.lon[idx]], dtype=np.float32))
# Update trails database with new lines
if data.swtrails:
actdata.traillat0.extend(data.traillat0)
actdata.traillon0.extend(data.traillon0)
update_buffer(self.aclblbuf, np.array(rawlabel.encode('utf8'), dtype=np.string_))
# If there is a visible route, update the start position
if self.route_acid != "":
if self.route_acid in data.id:
idx = data.id.index(self.route_acid)
update_buffer(self.routebuf,
np.array([data.lat[idx], data.lon[idx]], dtype=np.float32))
# Update trails database with new lines
if data.swtrails:
actdata.traillat0.extend(data.traillat0)
actdata.traillon0.extend(data.traillon0)
actdata.traillat1.extend(data.traillat1)
actdata.traillon1.extend(data.traillon1)
update_buffer(self.trailbuf, np.array(
list(zip(actdata.traillat0, actdata.traillon0,
actdata.traillat1, actdata.traillon1)) +
list(zip(data.traillastlat, data.traillastlon,
list(data.lat), list(data.lon))),
dtype=np.float32))
self.traillines.set_vertex_count(2 * len(actdata.traillat0) +
2 * len(data.lat))
else:
actdata.traillat0 = []
actdata.traillon0 = []
actdata.traillat1 = []
actdata.traillon1 = []
self.traillines.set_vertex_count(0)
if ingroup:
for groupmask, groupcolor in actdata.custgrclr.items():
if ingroup & groupmask:
rgb = groupcolor
break
rgb = actdata.custacclr.get(acid, rgb)
color[i, :] = tuple(rgb) + (255,)
# Check if aircraft is selected to show SSD
if actdata.ssd_all or acid in actdata.ssd_ownship:
selssd[i] = 255
if len(actdata.ssd_ownship) > 0 or actdata.ssd_conflicts or actdata.ssd_all:
update_buffer(self.ssd.selssdbuf, selssd[:MAX_NAIRCRAFT])
update_buffer(self.confcpabuf, cpalines[:MAX_NCONFLICTS * 4])
update_buffer(self.accolorbuf, color)
update_buffer(self.aclblbuf, np.array(rawlabel.encode('utf8'), dtype=np.string_))
# If there is a visible route, update the start position
if self.route_acid != "":
if self.route_acid in data.id:
idx = data.id.index(self.route_acid)
update_buffer(self.routebuf,
np.array([data.lat[idx], data.lon[idx]], dtype=np.float32))
# Update trails database with new lines
if data.swtrails:
actdata.traillat0.extend(data.traillat0)
actdata.traillon0.extend(data.traillon0)
actdata.traillat1.extend(data.traillat1)
actdata.traillon1.extend(data.traillon1)
elif alt > actdata.translvl:
FL = int(round((alt / (100. * ft))))
txt += "FL%03d/" % FL
else:
txt += "%05d/" % int(round(alt / ft))
# Speed
if spd < 0:
txt += "--- "
elif spd>2.0:
txt += "%03d" % int(round(spd / kts))
else:
txt += "M{:.2f}".format(spd) # Mach number
wpname += txt.ljust(24) # Fill out with spaces
update_buffer(self.routelblbuf, np.array(
wpname.encode('ascii', 'ignore')))
else:
self.route.set_vertex_count(0)
self.routelbl.n_instances = 0
# Check if aircraft is selected to show SSD
if actdata.ssd_all or acid in actdata.ssd_ownship:
selssd[i] = 255
if len(actdata.ssd_ownship) > 0 or actdata.ssd_conflicts or actdata.ssd_all:
update_buffer(self.ssd.selssdbuf, selssd[:MAX_NAIRCRAFT])
update_buffer(self.confcpabuf, cpalines[:MAX_NCONFLICTS * 4])
update_buffer(self.accolorbuf, color)
update_buffer(self.aclblbuf, np.array(rawlabel.encode('utf8'), dtype=np.string_))
# If there is a visible route, update the start position
if self.route_acid != "":
if self.route_acid in data.id:
idx = data.id.index(self.route_acid)
update_buffer(self.routebuf,
np.array([data.lat[idx], data.lon[idx]], dtype=np.float32))
# Update trails database with new lines
if data.swtrails:
actdata.traillat0.extend(data.traillat0)
actdata.traillon0.extend(data.traillon0)
actdata.traillat1.extend(data.traillat1)
actdata.traillon1.extend(data.traillon1)
update_buffer(self.trailbuf, np.array(
list(zip(actdata.traillat0, actdata.traillon0,
actdata.traillat1, actdata.traillon1)) +
list(zip(data.traillastlat, data.traillastlon,
list(data.lat), list(data.lon))),
dtype=np.float32))
self.traillines.set_vertex_count(2 * len(actdata.traillat0) +