Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def set_kvp(cls, name, value, scope='system', secret=False):
kvp = models.KeyValuePair(id=name, name=name, value=value, scope=scope, secret=secret)
cls.st2client.keys.update(kvp)
r = requests.get(url)
except requests.exceptions.RequestException as e:
return (False, str(e))
request_out = r.text
# TODO Implement a generic parsing algorithm
drop_list = [p.split(";")[0].strip()
for p in request_out.split("\n")[4:-1]]
ret += drop_list
api_base_url = self.config.get('st2_base_url')
st_client = Client(base_url=api_base_url)
drop_list_key = "drop_list_prefixes_{}".format(ip_version)
drop_list_value = json.dumps(list(set(ret)))
drop_list_pair = KeyValuePair(name=drop_list_key,
value=drop_list_value)
try:
st_client.keys.update(drop_list_pair)
except Exception as e:
return (False, str(e))
return (True, drop_list_key)
r = requests.get(url)
except requests.exceptions.RequestException as e:
return (False, str(e))
request_out = r.text
# TODO Implement a generic parsing algorithm
fullbogons_list = request_out.split("\n")[1:-1]
ret += fullbogons_list
api_base_url = self.config.get('st2_base_url')
st_client = Client(base_url=api_base_url)
fullbogons_key = "fullbogons_{}".format(ip_version)
fullbogons_value = json.dumps(list(set(ret)))
fullbogons_pair = KeyValuePair(name=fullbogons_key,
value=fullbogons_value)
try:
st_client.keys.update(fullbogons_pair)
except Exception as e:
return (False, str(e))
return (True, fullbogons_key)
:type: local: ``bool``
:return: ``True`` on success, ``False`` otherwise.
:rtype: ``bool``
"""
if scope != SYSTEM_SCOPE:
raise ValueError('Scope %s is unsupported.' % scope)
name = self._get_full_key_name(name=name, local=local)
value = str(value)
client = self.get_api_client()
self._logger.debug('Setting value in the datastore (name=%s)', name)
instance = KeyValuePair()
instance.id = name
instance.name = name
instance.value = value
instance.scope = scope
if encrypt:
instance.secret = True
if ttl:
instance.ttl = ttl
client.keys.update(instance=instance)
return True
listen_web_port,
api_token,
ip_version).execute()
except BIRDProxyError as e:
return (False, str(e))
response = resp.get("outcome")
data = resp.get("message")
if response and store_results:
api_base_url = self.config.get('st2_base_url')
st_client = Client(base_url=api_base_url)
key = "sessions_info_{}_{}".format(
router_id, ip_version)
value = json.dumps(data)
pair = KeyValuePair(name=key,
value=value)
if ttl is not None:
pair.ttl = ttl
try:
st_client.keys.update(pair)
except Exception as e:
return (False, str(e))
return(response, key)
else:
return (response, data)
:type: local: ``bool``
:param scope: Scope under which item is saved. Defaults to system scope.
:type: local: ``str``
:return: ``True`` on success, ``False`` otherwise.
:rtype: ``bool``
"""
if scope != SYSTEM_SCOPE:
raise ValueError('Scope %s is unsupported.' % scope)
name = self._get_full_key_name(name=name, local=local)
client = self.get_api_client()
instance = KeyValuePair()
instance.id = name
instance.name = name
self._logger.debug('Deleting value from the datastore (name=%s)', name)
try:
params = {'scope': scope}
client.keys.delete(instance=instance, params=params)
except Exception as e:
self._logger.exception('Exception deleting value from datastore (name=%s): %s', name, e)
return False
return True
self.managers['Policy'] = ResourceManager(
models.Policy, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['PolicyType'] = ResourceManager(
models.PolicyType, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['Rule'] = ResourceManager(
models.Rule, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['Sensor'] = ResourceManager(
models.Sensor, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['TriggerType'] = ResourceManager(
models.TriggerType, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['Trigger'] = ResourceManager(
models.Trigger, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['TriggerInstance'] = TriggerInstanceResourceManager(
models.TriggerInstance, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['KeyValuePair'] = ResourceManager(
models.KeyValuePair, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['Webhook'] = WebhookManager(
models.Webhook, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['Timer'] = ResourceManager(
models.Timer, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['Trace'] = ResourceManager(
models.Trace, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['RuleEnforcement'] = ResourceManager(
models.RuleEnforcement, self.endpoints['api'], cacert=self.cacert, debug=self.debug)
self.managers['Stream'] = StreamManager(
self.endpoints['stream'], cacert=self.cacert, debug=self.debug)
self.managers['Workflow'] = WorkflowManager(
self.endpoints['api'], cacert=self.cacert, debug=self.debug)
# Service Registry
self.managers['ServiceRegistryGroups'] = ServiceRegistryGroupsManager(
models.ServiceRegistryGroup, self.endpoints['api'], cacert=self.cacert,