Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Only the default stanza. In this case, modinput should exit.
self.log_warning("No stanza found for input type: " + self.input_type)
sys.exit(0)
account_fields = self.get_account_fields()
checkbox_fields = self.get_checkbox_fields()
self.input_stanzas = {}
for stanza in all_stanzas:
full_stanza_name = '{}://{}'.format(self.input_type, stanza.get('name'))
if full_stanza_name in inputs.inputs:
if stanza.get('disabled', False):
raise RuntimeError("Running disabled data input!")
stanza_params = {}
for k, v in stanza.iteritems():
if k in checkbox_fields:
stanza_params[k] = sutils.is_true(v)
elif k in account_fields:
stanza_params[k] = copy.deepcopy(v)
else:
stanza_params[k] = v
self.input_stanzas[stanza.get('name')] = stanza_params
def handleEdit(self, confInfo):
logger.info("start edit setup configure.")
scheme, host, port = utils.extract_http_scheme_host_port(scc.getMgmtUri())
conf_mgr = conf.ConfManager(self.getSessionKey(), self.appName, scheme=scheme, host=host, port=port)
ta_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_conf)
customized_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_customized_conf)
all_origin_settings = ta_conf_file.get_all()
all_settings = utils.escape_json_control_chars(
self.callerArgs.data[setup_const.all_settings][0])
all_settings = json.loads(all_settings)
# write global and proxy settings
self._updateGlobalSettings(setup_const.global_settings, all_settings,
all_origin_settings, ta_conf_file)
# write customized settings
customized_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_customized_conf)
self._updateConfStanzas(all_settings.get(setup_const.myta_customized_settings, {}), customized_conf_file, self.encrypt_fields_customized)
# write account credential settings
for cred, conf_file in self.cred_confs:
cred_conf_file = get_or_create_conf_file(conf_mgr, conf_file)
def handleEdit(self, confInfo):
logger.info("start edit setup configure.")
scheme, host, port = utils.extract_http_scheme_host_port(scc.getMgmtUri())
conf_mgr = conf.ConfManager(self.getSessionKey(), self.appName, scheme=scheme, host=host, port=port)
ta_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_conf)
customized_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_customized_conf)
all_origin_settings = ta_conf_file.get_all()
all_settings = utils.escape_json_control_chars(
self.callerArgs.data[setup_const.all_settings][0])
all_settings = json.loads(all_settings)
# write global and proxy settings
self._updateGlobalSettings(setup_const.global_settings, all_settings,
all_origin_settings, ta_conf_file)
# write customized settings
customized_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_customized_conf)
self._updateConfStanzas(all_settings.get(setup_const.myta_customized_settings, {}), customized_conf_file, self.encrypt_fields_customized)
# write account credential settings
for cred, conf_file in self.cred_confs:
cred_conf_file = get_or_create_conf_file(conf_mgr, conf_file)
creds = all_settings.get(cred, {})
def handleEdit(self, confInfo):
logger.info("start edit setup configure.")
scheme, host, port = utils.extract_http_scheme_host_port(scc.getMgmtUri())
conf_mgr = conf.ConfManager(self.getSessionKey(), self.appName, scheme=scheme, host=host, port=port)
ta_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_conf)
customized_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_customized_conf)
all_origin_settings = ta_conf_file.get_all()
all_settings = utils.escape_json_control_chars(
self.callerArgs.data[setup_const.all_settings][0])
all_settings = json.loads(all_settings)
# write global and proxy settings
self._updateGlobalSettings(setup_const.global_settings, all_settings,
all_origin_settings, ta_conf_file)
# write customized settings
customized_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_customized_conf)
self._updateConfStanzas(all_settings.get(setup_const.myta_customized_settings, {}), customized_conf_file, self.encrypt_fields_customized)
# write account credential settings
for cred, conf_file in self.cred_confs:
cred_conf_file = get_or_create_conf_file(conf_mgr, conf_file)
creds = all_settings.get(cred, {})
def handleEdit(self, confInfo):
logger.info("start edit setup configure.")
scheme, host, port = utils.extract_http_scheme_host_port(scc.getMgmtUri())
conf_mgr = conf.ConfManager(self.getSessionKey(), self.appName, scheme=scheme, host=host, port=port)
ta_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_conf)
customized_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_customized_conf)
all_origin_settings = ta_conf_file.get_all()
all_settings = utils.escape_json_control_chars(
self.callerArgs.data[setup_const.all_settings][0])
all_settings = json.loads(all_settings)
# write global and proxy settings
self._updateGlobalSettings(setup_const.global_settings, all_settings,
all_origin_settings, ta_conf_file)
# write customized settings
customized_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_customized_conf)
self._updateConfStanzas(all_settings.get(setup_const.myta_customized_settings, {}), customized_conf_file, self.encrypt_fields_customized)
# write account credential settings
for cred, conf_file in self.cred_confs:
cred_conf_file = get_or_create_conf_file(conf_mgr, conf_file)
creds = all_settings.get(cred, {})
if creds == setup_const.ignore_backend_req:
logger.info("Ignore backend rest request")
continue
if creds:
def handleEdit(self, confInfo):
logger.info("start edit setup configure.")
scheme, host, port = utils.extract_http_scheme_host_port(scc.getMgmtUri())
conf_mgr = conf.ConfManager(self.getSessionKey(), self.appName, scheme=scheme, host=host, port=port)
ta_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_conf)
customized_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_customized_conf)
all_origin_settings = ta_conf_file.get_all()
all_settings = utils.escape_json_control_chars(
self.callerArgs.data[setup_const.all_settings][0])
all_settings = json.loads(all_settings)
# write global and proxy settings
self._updateGlobalSettings(setup_const.global_settings, all_settings,
all_origin_settings, ta_conf_file)
# write customized settings
customized_conf_file = get_or_create_conf_file(conf_mgr, setup_const.myta_customized_conf)
self._updateConfStanzas(all_settings.get(setup_const.myta_customized_settings, {}), customized_conf_file, self.encrypt_fields_customized)
# write account credential settings
for cred, conf_file in self.cred_confs:
cred_conf_file = get_or_create_conf_file(conf_mgr, conf_file)
creds = all_settings.get(cred, {})
if creds == setup_const.ignore_backend_req:
logger.info("Ignore backend rest request")
continue
if creds:
@retry(exceptions=[binding.HTTPError])
def register_capabilities(self, capabilities):
'''Register app capabilities.
:param capabilities: App capabilities, example: {
'object_type1': {
'read': 'read_app_object_type1',
'write': 'write_app_object_type1',
'delete': 'delete_app_object_type1'},
'object_type2': {
'read': 'read_app_object_type2',
'write': 'write_app_object_type2',
'delete': 'delete_app_object_type2'},
...}
:type capabilities: ``dict``
'''
@retry(exceptions=[binding.HTTPError])
def _get_hec_config(self, hec_input_name, session_key,
scheme, host, port, **context):
hc = HECConfig(
session_key, scheme=scheme, host=host, port=port, **context)
settings = hc.get_settings()
if utils.is_true(settings.get('disabled')):
# Enable HEC input
logging.info('Enabling HEC')
settings['disabled'] = '0'
settings['enableSSL'] = context.get('hec_enablessl', '1')
settings['port'] = context.get('hec_port', '8088')
hc.update_settings(settings)
hec_input = hc.get_input(hec_input_name)
if not hec_input:
# Create HEC input
@retry(exceptions=[binding.HTTPError])
def delete_password(self, user):
'''Delete password.
:param user: User name.
:type user: ``string``
:raises CredentialNotExistException: If password of realm:user
doesn't exist.
Usage::
>>> from solnlib import credentials
>>> cm = credentials.CredentialManager(session_key,
'Splunk_TA_test',
realm='realm_test')
>>> cm.delete_password('testuser1')
@retry(exceptions=[binding.HTTPError])
def get_session_key(username, password,
scheme=None, host=None, port=None, **context):
'''Get splunkd access token.
:param username: The Splunk account username, which is used to
authenticate the Splunk instance.
:type username: ``string``
:param password: The Splunk account password.
:type password: ``string``
:param scheme: (optional) The access scheme, default is None.
:type scheme: ``string``
:param host: (optional) The host name, default is None.
:type host: ``string``
:param port: (optional) The port number, default is None.
:type port: ``integer``
:returns: Splunk session key.