Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, name, config, manager):
from kervi.config.configuration import _Configuration
self._config = _Configuration()
from kervi.config import Configuration
self._global_config = Configuration
plugin_config = {}
if config:
plugin_config = config.as_dict()
self._config._load(config_user=plugin_config, config_base=self.get_default_config())
self._name = name
self._manager = manager
from kervi.spine import Spine
self.spine = Spine()
def _launch(scope, name, process_class, config_data, ipc_port, root_close, log_queue, **kwargs):
try:
from kervi.config import Configuration
Configuration._load(json_data=config_data)
process_config = Configuration
#if "log" in process_config:
k_logging.init_process_logging(name, process_config.log, log_queue=log_queue)
log = k_logging.KerviLog(name)
log.debug('create process:{0} ipc port:{1}:', process_class.__name__, ipc_port)
process = process_class(scope, name, process_config, ipc_port, root_close, log_queue, **kwargs)
try:
while not process.do_terminate:
if not process._is_connected and process.spine.is_connected:
process._is_connected = True
process.spine.trigger_event("processReady", scope, name, process._pid)
process.process_step()
except KeyboardInterrupt:
pass
except:
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import uuid
import http.cookies as Cookie
from kervi.spine import Spine
from kervi.config import Configuration
from kervi.plugin.plugin_manager import PluginManager
SESSIONS = {}
_PLUGIN_MANAGER = None
_PLUGIN_MANAGER = PluginManager(Configuration, "authentication")
def active():
return len(_PLUGIN_MANAGER.plugins)>0
def allow_anonymous():
if "anonymous" in Configuration.authentication.users.keys:
return Configuration.authentication.users.anonymous.enabled
return False
def is_session_valid(headers):
if active():
if headers and headers["Cookie"]:
cookie = Cookie.SimpleCookie()
cookie.load(headers["Cookie"])
if 'kervisession' in cookie:
def enabled():
from kervi.config import Configuration
return Configuration.encryption.use_ssl
def load(self):
for plugin in self._plugin_manager.plugins:
self._channels[plugin.message_type] = plugin
self._config = Configuration.messaging
self._levels = Configuration.log.levels
def __init__(self):
Controller.__init__(self, "authentication", "Authentication")
#self._config = Configuration.authentication
self._sessions = {}
self._plugin_manager = PluginManager(Configuration, "authentication", [AuthenticationPlugin])
self._plugin_manager.load_managed_plugins()
self.spine.register_query_handler("authorizationActive", self._is_active)
self.spine.register_query_handler("authorizationValidSessionHeader", self._is_session_valid)
self.spine.register_query_handler("authorizationAllowAnonymousUser", self.allow_anonymous)
self.spine.register_query_handler("authorizationGetUsers", self.get_users)
self.spine.register_query_handler("authorizationAuthorizeUser", self.authorize)
self.spine.register_command_handler("authorizationRemoveSession", self.remove_session)
self._spine = Spine()
self._spine.register_event_handler("valueChanged", self._store_value)
self._spine.register_query_handler("getValueData", self._get_value_data)
self._spine.register_command_handler("storeSetting", self._store_setting)
self._spine.register_query_handler("retrieveSetting", self._retrieve_setting)
self._spine.register_query_handler("getMessageItems", self._get_messages)
self._spine.register_event_handler("newMessage", self._store_message)
#SPINE.register_command_handler("createCronJob", create_cron_job)
#SPINE.register_command_handler("deleteCronJob", delete_cron_job)
#SPINE.register_query_handler("queryCronJob", query_cron_job)
self._plugin_manager = None
self._plugin_manager = PluginManager(Configuration, "storage", [StoragePlugin])
self._plugin_manager.load_managed_plugins()
def __init__(self, log_queue=None):
self._plugin_manager = None
self._plugin_manager = PluginManager(Configuration, "message_bus", [KerviBusPlugin], load_silent=True, log_queue=log_queue)
self._plugin_manager.load_managed_plugins()
self._current_bus = None
self._log = KerviLog("BusManager")
def load(self):
for plugin in self._plugin_manager.plugins:
self._channels[plugin.message_type] = plugin
self._config = Configuration.messaging
self._levels = Configuration.log.levels
def get_cert():
from kervi.config import Configuration
return (Configuration.encryption.cert_file, Configuration.encryption.key_file)