Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:class:`CASTable`
'''
for key, value in list(kwargs.items()):
if importoptions is None and key.lower() == 'importoptions':
importoptions = value
del kwargs[key]
elif casout is None and key.lower() == 'casout':
casout = value
del kwargs[key]
out = self.upload(data, importoptions=importoptions,
casout=casout, **kwargs)
if out.severity > 1:
raise SWATError(out.status)
return out['casTable']
:class:`CASTable`
'''
for key, value in list(kwargs.items()):
if importoptions is None and key.lower() == 'importoptions':
importoptions = value
del kwargs[key]
elif casout is None and key.lower() == 'casout':
casout = value
del kwargs[key]
out = self.upload(data, importoptions=importoptions,
casout=casout, **kwargs)
if out.severity > 1:
raise SWATError(out.status)
return out['casTable']
if get_option('cas.debug.requests'):
_print_request('DELETE', url, self._req_sess.headers, data)
res = self._req_sess.put(url, data=data)
if get_option('cas.debug.responses'):
_print_response(res.text)
res = res.text
break
except requests.ConnectionError:
self._set_next_connection()
except Exception as exc:
raise SWATError(str(exc))
finally:
del self._req_sess.headers['JSON-Parameters']
try:
txt = a2u(res, 'utf-8')
out = json.loads(txt, strict=False)
except Exception:
sys.stderr.write(txt)
sys.stderr.write('\n')
raise
try:
if out.get('disposition', None) is None:
if out.get('error'):
raise SWATError(self._results['error'])
def _set_next_connection(self):
''' Iterate to the next available controller '''
self._host_index += 1
try:
self._current_hostname = self._hostname[self._host_index]
self._current_baseurl = self._baseurl[self._host_index]
self._current_port = self._port[self._host_index]
except IndexError:
self._current_hostname = ''
self._current_baseurl = ''
self._current_port = -1
raise SWATError('Unable to connect to any URL: %s' %
', '.join(self._baseurl))
SWAT library exceptions
'''
from __future__ import print_function, division, absolute_import, unicode_literals
class SWATError(Exception):
'''
Base class for all SWAT exceptions
'''
pass
class SWATOptionError(SWATError):
'''
SWAT configuration option error
'''
pass
class SWATCASActionError(SWATError):
'''
CAS action error exception
Parameters
----------
message : string
The error message
response : CASResponse
except Exception as exc:
raise SWATError(str(exc))
try:
txt = a2u(res, 'utf-8')
self._results = json.loads(txt, strict=False)
except Exception:
sys.stderr.write(txt)
sys.stderr.write('\n')
raise
try:
if self._results.get('disposition', None) is None:
if self._results.get('error'):
raise SWATError(self._results['error'])
else:
raise SWATError('Unknown error')
except ValueError as exc:
raise SWATError(str(exc))
return self
try:
txt = a2u(res, 'utf-8')
self._results = json.loads(txt, strict=False)
except Exception:
sys.stderr.write(txt)
sys.stderr.write('\n')
raise
try:
if self._results.get('disposition', None) is None:
if self._results.get('error'):
raise SWATError(self._results['error'])
else:
raise SWATError('Unknown error')
except ValueError as exc:
raise SWATError(str(exc))
return self
True
If all options were set successfully
'''
for name, value in six.iteritems(kwargs):
name = str(name)
typ = errorcheck(self._sw_connection.getOptionType(name),
self._sw_connection)
try:
if typ == 'boolean':
if value in [True, False, 1, 0]:
errorcheck(self._sw_connection.setBooleanOption(name,
value and 1 or 0),
self._sw_connection)
else:
raise SWATError('%s is not a valid boolean value' % value)
elif typ == 'string':
if isinstance(value, (binary_types, text_types)):
errorcheck(self._sw_connection.setStringOption(name, a2n(value)),
self._sw_connection)
else:
errorcheck(self._sw_connection.setStringOption(name, value),
self._sw_connection)
elif typ == 'int32':
errorcheck(self._sw_connection.setInt32Option(name, int32(value)),
self._sw_connection)
elif typ == 'int64':
errorcheck(self._sw_connection.setInt64Option(name, int64(value)),
self._sw_connection)
elif typ == 'double':
errorcheck(self._sw_connection.setDoubleOption(name, float64(value)),
self._sw_connection)
if get_option('cas.debug.responses'):
_print_response(res.text)
try:
txt = a2u(res.text, 'utf-8')
out = json.loads(txt, strict=False)
except Exception:
sys.stderr.write(txt)
sys.stderr.write('\n')
raise
if out.get('error', None):
if out.get('details', None):
raise SWATError('%s (%s)' % (out['error'], out['details']))
raise SWATError(out['error'])
self._session = out['uuid']
break
else:
url = urllib.parse.urljoin(self._current_baseurl,
'cas/sessions')
if get_option('cas.debug.requests'):
_print_request('PUT', url, self._req_sess.headers)
res = self._req_sess.put(url, data=b'')
if get_option('cas.debug.responses'):
_print_response(res.text)