Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update():
stack.stack('ECHO MY_PLUGIN update: creating a random aircraft')
stack.stack('MCRE 1')
def stop(self): # Quit mode
self.mode = self.end
datalog.reset()
bs.stack.saveclose() # Save close configuration
# datalog.save()
return
# Check condition types
# Get relevant actual value
self.actual = (self.condtype == alttype) * bs.traf.alt[self.idx] + \
(self.condtype == spdtype) * bs.traf.cas[self.idx]
# Compare sign of actual difference with sign of last difference
actdif = self.target - self.actual
idxtrue = np.where(actdif*self.lastdif <= 0.0)[0] # Sign changed
self.lastdif = actdif
if len(idxtrue)==0:
return
# Execute commands found to have true condition
for i in idxtrue:
stack.stack(self.cmd[i])
self.delcondition(idxtrue) # Remove when executed
def send_event(self, eventname, data=None, target=None):
# On the sim side, target is obtained from the currently-parsed stack command
target = target or stack.routetosender() or [b'*']
pydata = msgpack.packb(data, default=encode_ndarray, use_bin_type=True)
self.event_io.send_multipart(target + [eventname, pydata])
def makeLogfileName(logname):
timestamp = datetime.now().strftime('%Y%m%d_%H-%M-%S')
fname = "%s_%s_%s.log" % (logname, stack.get_scenname(), timestamp)
return settings.log_path + '/' + fname
if prefun:
if hasattr(prefun, '__istimed'):
preupdate_funs[name] = prefun
else:
preupdate_funs[name] = timed_function(f'{name}.{prefun.__name__}', dt)(prefun)
if updfun:
if hasattr(updfun, '__istimed'):
update_funs[name] = updfun
else:
update_funs[name] = timed_function(f'{name}.{updfun.__name__}', dt)(updfun)
if rstfun:
reset_funs[name] = rstfun
# Add the plugin's stack functions to the stack
bs.stack.append_commands(stackfuns)
# Add the plugin as data parent to the variable explorer
ve.register_data_parent(plugin, name.lower())
return True, 'Successfully loaded plugin %s' % name
except ImportError as e:
print('BlueSky plugin system failed to load', name, ':', e)
return False, 'Failed to load %s' % name
acalt = np.array(n * [acalt])
if acspd is None:
acspd = np.random.randint(250, 450, n) * kts
elif isinstance(acspd,(float, int)):
acspd = np.array(n * [acspd])
actype = n * [actype] if isinstance(actype, str) else actype
dest = n * [dest] if isinstance(dest, str) else dest
# SAVEIC: save cre command when filled in
# Special provision in case SAVEIC is on: then save individual CRE commands
# Names of aircraft (acid) need to be recorded for saved future commands
# And positions need to be the same in case of *MCRE"
for i in range(n):
bs.stack.savecmd(" ".join([ "CRE", acid[i], actype[i],
str(aclat[i]), str(aclon[i]), str(int(round(achdg[i]))),
str(int(round(acalt[i]/ft))),
str(int(round(acspd[i]/kts)))]))
# Aircraft Info
self.id[-n:] = acid
self.type[-n:] = actype
# Positions
self.lat[-n:] = aclat
self.lon[-n:] = aclon
self.alt[-n:] = acalt
self.hdg[-n:] = achdg
self.trk[-n:] = achdg
def getviewctr(self):
return self.client_pan.get(stack.sender()) or self.def_pan
def remove(name):
''' Remove a loaded plugin. '''
if name not in active_plugins:
return False, 'Plugin %s not loaded' % name
preset = reset_funs.pop(name, None)
if preset:
# Call module reset first to clear plugin state just in case.
preset()
descr = plugin_descriptions.get(name)
cmds, _ = list(zip(*descr.plugin_stack))
bs.stack.remove_commands(cmds)
active_plugins.pop(name)
preupdate_funs.pop(name)
update_funs.pop(name)
# Update UTC time
self.utc += datetime.timedelta(seconds=self.simdt)
# Datalog pre-update (communicate current sim time to loggers)
datalog.preupdate(self.simt)
# Plugins pre-update
plugin.preupdate(self.simt)
# For measuring game loop frequency
self.dts.append(self.simdt)
if len(self.dts) > 20:
del self.dts[0]
stack.checkscen()
# Always process stack
stack.process()
if self.mode == Simulation.op:
bs.traf.update(self.simt, self.simdt)
# Update metrics
self.metric.update()
# Update plugins
plugin.update(self.simt)
# Update loggers
datalog.postupdate()