Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
service_commands += ["uninstall"]
if opt in ("--start-service"):
service_commands += ["start"]
if opt in ("--stop-service"):
service_commands = ["stop"]
if opt in ("--restart-service"):
service_commands = ["restart"]
if opt in ("--service-status"):
service_commands = ["status"]
if service_commands:
import kervi.hal as hal
hal_driver = hal._load(self.config.platform.driver)
if hal_driver:
hal.service_commands(
service_commands,
self.config.application.name,
self.config.application.id,
script_path
)
exit()
if detect_devices:
import kervi.hal as hal
hal_driver = hal._load(self.config.platform.driver)
if hal_driver:
devices = hal.detect_devices()
print("devices:")
_pretty_print(devices)
service_commands += ["uninstall"]
if opt in ("--start-service"):
service_commands += ["start"]
if opt in ("--stop-service"):
service_commands = ["stop"]
if opt in ("--restart-service"):
service_commands = ["restart"]
if opt in ("--service-status"):
service_commands = ["status"]
if service_commands:
import kervi.hal as hal
hal_driver = hal._load(self.config.platform.driver)
if hal_driver:
hal.service_commands(
service_commands,
self.config.application.name,
self.config.application.id,
script_path
)
exit()
if detect_devices:
import kervi.hal as hal
hal_driver = hal._load(self.config.platform.driver)
if hal_driver:
devices = hal.detect_devices()
print("devices:")
_pretty_print(devices)
if service_commands:
import kervi.hal as hal
hal_driver = hal._load(self.config.platform.driver)
if hal_driver:
hal.service_commands(
service_commands,
self.config.application.name,
self.config.application.id,
script_path
)
exit()
if detect_devices:
import kervi.hal as hal
hal_driver = hal._load(self.config.platform.driver)
if hal_driver:
devices = hal.detect_devices()
print("devices:")
_pretty_print(devices)
exit()
#if settings:
# self.settings = app_helpers._deep_update(self.settings, settings)
#self._validateSettings()
self.started = False
import kervi.spine as spine
from kervi.zmq_spine import _ZMQSpine
self.spine = _ZMQSpine()
self.config.network.ipc_root_port = nethelper.get_free_port([self.config.network.ipc_root_port])
self.spine._init_spine("kervi-main", self.config.network.ipc_root_port, None, self.config.network.ipc_root_address)
if service_commands:
import kervi.hal as hal
hal_driver = hal._load(self.config.platform.driver)
if hal_driver:
hal.service_commands(
service_commands,
self.config.application.name,
self.config.application.id,
script_path
)
exit()
if detect_devices:
import kervi.hal as hal
hal_driver = hal._load(self.config.platform.driver)
if hal_driver:
devices = hal.detect_devices()
print("devices:")
_pretty_print(devices)
exit()
self._log_handler = KerviLogHandler(self.config)
self._log_queue = self._log_handler._log_queue
self._logger = KerviLog("module")
self._logger.info("Starting kervi module, please wait")
try:
from kervi.version import VERSION
def init_process(self, **kwargs):
self.spine.log.verbose("load: %s", self.name)
try:
import kervi.hal as hal
hal._load()
__import__(self.name, fromlist=[''])
except ImportError:
self.spine.log.exception("module not found:{0}", self.name)
except:
self.spine.log.exception("error load module:{0}", self.name)
self.spine.send_command("startThreads", local_only=True)
self.spine.trigger_event(
"moduleLoaded",
self.name
)
None,
self.config.network.ipc_root_address
)
from kervi import spine
self.spine = spine.Spine()
self.spine.register_event_handler("processReady", self._process_ready, scope="app-" + self.config.application.id)
self.spine = spine.Spine()
self._module_processes = []
self._process_info = []
self._process_info_lock = threading.Lock()
import kervi.hal as hal
hal_driver = hal._load()
if hal_driver:
self._logger.verbose("Using HAL driver: %s", hal_driver)
self._actions = _ModuleActions(self)
#from kervi.utility.storage import init_db
#init_db(script_name)
from kervi.storage.storage_manager import StorageManager
self._authentication = StorageManager()
from kervi.utility.authorization_manager import AuthorizationManager
self._authentication = AuthorizationManager()
from kervi.messaging.message_manager import MessageManager
self._message_handler = MessageManager()
self._app_actions = _AppActions(self)
import kervi.hal as hal
hal_driver = hal._load(self.config.platform.driver)
if hal_driver:
print("platform driver:", hal_driver)
self._log_handler = KerviLogHandler(self.config)
self._log_queue = self._log_handler._log_queue
self._logger = KerviLog("application")
#self._validateSettings()
self.started = False
self._webserver_port = None
try:
from kervi.version import VERSION
except:
VERSION="0.0.0"
self._logger.verbose("kervi version: %s", VERSION)
import kervi.hal as hal
hal_driver = hal._load(self.config.platform.driver)
if hal_driver:
self._logger.verbose("platform driver: %s", hal_driver)
self._logger.verbose("board: %s", hal.get_board_name())
from kervi.plugin.message_bus.bus_manager import BusManager
self.bus_manager = BusManager(self._log_queue)
self.config.network.ipc_root_port = nethelper.get_free_port([self.config.network.ipc_root_port])
self.bus_manager.load("kervi-main", self.config.network.ipc_root_port, None, self.config.network.ipc_root_address)
from kervi import spine
self.spine = spine.Spine()
self.spine.register_query_handler("GetApplicationInfo", self._get_application_info)
self.spine.register_query_handler("getProcessInfo", self.get_process_info)
self.spine.register_event_handler("modulePing", self._module_ping)
self.spine.register_event_handler("processReady", self._process_ready, scope="app-" + self.config.application.id)
self.spine.register_event_handler("WebAppReady", self._webapp_ready, scope="app-" + self.config.application.id)