Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
----------
hostname : string or list
The CAS host to connect to.
port : int
The CAS port to connect to.
protocol : string, optional
The protocol override value.
Returns
-------
string
'cas' or 'http'
'''
if protocol is None:
protocol = cf.get_option('cas.protocol')
if isinstance(hostname, six.string_types):
hostname = re.split(r'\s+', hostname.strip())
# Try to detect the proper protocol
if protocol == 'auto':
import socket
# for ptype in ['http', 'https']:
for host in hostname:
for ptype in ['http']:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
sock.connect((host, port))
else:
raise OSError('The specified authinfo file does not '
'exist: %s' % authinfo)
# If a prototype exists, use it for the connection config
prototype = kwargs.get('prototype')
if prototype is not None:
soptions = prototype._soptions
protocol = prototype._protocol
else:
# Get connection parameters from config
if hostname is None:
hostname = cf.get_option('cas.hostname')
if port is None:
port = cf.get_option('cas.port')
# Detect protocol
if (isinstance(hostname, items_types)
and (hostname[0].startswith('http:')
or hostname[0].startswith('https:'))):
protocol = hostname[0].split(':', 1)[0]
elif (isinstance(hostname, six.string_types)
and (hostname.startswith('http:')
or hostname.startswith('https:'))):
protocol = hostname.split(':', 1)[0]
else:
protocol = self._detect_protocol(hostname, port, protocol=protocol)
soptions = getsoptions(session=session, locale=locale,
index = get_option('cas.dataset.index_name')
if index:
if not isinstance(index, (list, tuple, set)):
index = [index]
for idx in index:
if idx in cdf.columns:
if cdf.attrs.get('ByVar1'):
cdf.set_index([idx], append=True, inplace=True)
else:
cdf.set_index([idx], inplace=True)
adjust = get_option('cas.dataset.index_adjustment')
if adjust != 0 and str(cdf.index.dtype).startswith('int'):
names = cdf.index.names
cdf.index = cdf.index.values + adjust
cdf.index.names = names
if get_option('cas.dataset.drop_index_name'):
names = list(cdf.index.names)
names[-1] = None
cdf.index.names = names
# Only set one index
break
# Detect casout tables
if not(tablename) and unknownname and columnscol and rowscol:
tablename = unknownname
# if we have enough information to build CASTable objects, do it
if caslib and tablename and not castable:
tables = []
for lib, tbl in zip(cdf[caslib], cdf[tablename]):
if connection is not None:
tbl = connection.CASTable(tbl, caslib=lib)
Returns
-------
One of the following depending on the cas.dataset.format option:
SASDataFrame object
SASDataFrame representation of SWIG CASTable
DataFrame object
Pandas DataFrame representation of SWIG CASTable
dict or list
Any variant of the Pandas DataFrame.to_dict() results
tuple
A tuple of tuples of the data values only
'''
tformat = get_option('cas.dataset.format')
needattrs = (tformat == 'dataframe:sas')
# We can short circuit right away if they just want tuples
if tformat.startswith('tuple'):
return _sw_table.toTuples(a2n(get_option('encoding_errors'), 'utf-8'),
casdt.cas2python_datetime,
casdt.cas2python_date,
casdt.cas2python_time)
kwargs = {}
check = errorcheck
if connection is not None:
kwargs['formatter'] = connection.SASFormatter()
else:
kwargs['formatter'] = SASFormatter(soptions=soptions)
except requests.ConnectionError:
self._set_next_connection()
# Get ID of results
action_name = 'session.listresults'
post_data = a2u('').encode('utf-8')
self._req_sess.headers.update({
'Content-Type': 'application/json',
'Content-Length': str(len(post_data)),
})
url = urllib.parse.urljoin(self._current_baseurl,
'cas/sessions/%s/actions/%s' %
(self._session, action_name))
if get_option('cas.debug.requests'):
_print_request('POST', url, self._req_sess.headers, post_data)
res = self._req_sess.post(url, data=post_data)
if get_option('cas.debug.responses'):
_print_response(res.text)
try:
txt = a2u(res.text, 'utf-8')
out = json.loads(txt, strict=False)
except Exception:
sys.stderr.write(txt)
sys.stderr.write('\n')
raise
result_id = out['results']['Queued Results']['rows'][0][0]
attrs[ukey] = []
for i in range(nitems):
attrs[ukey].append(check(_sw_table.getInt64ArrayAttributeItem(key, i),
_sw_table))
elif typ == 'double-array':
nitems = check(_sw_table.getAttributeNItems(), _sw_table)
attrs[ukey] = []
for i in range(nitems):
attrs[ukey].append(check(
_sw_table.getIntDoubleArrayAttributeItem(key, i),
_sw_table))
kwargs['attrs'] = attrs
# Setup date / datetime regexes
dt_formats = get_option('cas.dataset.datetime_formats')
if isinstance(dt_formats, six.string_types):
dt_formats = [dt_formats]
datetime_regex = re.compile(r'^(%s)\d*\.\d*$' % '|'.join(dt_formats), flags=re.I)
d_formats = get_option('cas.dataset.date_formats')
if isinstance(dt_formats, six.string_types):
d_formats = [d_formats]
date_regex = re.compile(r'^(%s)\d*\.\d*$' % '|'.join(d_formats), flags=re.I)
# Construct columns
ncolumns = check(_sw_table.getNColumns(), _sw_table)
caslib = None
tablename = None
castable = None
rowscol = None
columnscol = None
suppress_subparams : list of strings, optional
A list of absolute parameter names that should not have
their sub-parameters documented
param_names : list, optional
A list that is populated with all absolute parameter names
in the resulting docstring
results_format : boolean, optional
Is this description being used for results rather than
parameter formatting?
Returns
-------
string
'''
if not get_option('interactive_mode'):
return ''
output = []
for param in params:
_format_param(param, connection, indent=0, output=output,
suppress_subparams=suppress_subparams,
param_names=param_names, results_format=results_format)
if not(results_format) and output[-1]:
output.append('')
return '\n'.join(output)
Parameters
----------
_sw_value : SWIG CASValue object
Object to convert to Python
soptions : string
soptions of connection object
Returns
-------
any
Python representation of CASValue
'''
return _sw_value.toPython(_sw_value, soptions,
a2n(get_option('encoding_errors'), 'utf-8'),
connection, ctb2tabular,
base64.b64decode, casdt.cas2python_datetime,
casdt.cas2python_date, casdt.cas2python_time)
# return CAS2PY[errorcheck(_sw_value.getType(),
except ImportError:
warnings.warn('The PIL or Pillow package is required '
'to convert bytes to Image objects',
RuntimeWarning)
if Image is None:
continue
cdf[key] = cdf[key].map(lambda x: Image.open(BytesIO(x)))
# Apply date / datetime transformations
for item in dates:
cdf[item] = cdf[item].apply(casdt.sas2python_date)
for item in datetimes:
cdf[item] = cdf[item].apply(casdt.sas2python_datetime)
# Check for By group information
optbycol = get_option('cas.dataset.bygroup_columns')
optbyidx = get_option('cas.dataset.bygroup_as_index')
optbysfx = get_option('cas.dataset.bygroup_formatted_suffix')
optbycolsfx = get_option('cas.dataset.bygroup_collision_suffix')
cdf = cdf.reshape_bygroups(bygroup_columns=optbycol,
bygroup_as_index=optbyidx,
bygroup_formatted_suffix=optbysfx,
bygroup_collision_suffix=optbycolsfx)
# Add an index as needed
index = get_option('cas.dataset.index_name')
if index:
if not isinstance(index, (list, tuple, set)):
index = [index]
for idx in index:
if idx in cdf.columns:
if cdf.attrs.get('ByVar1'):
def close(self):
''' Close the connection '''
if self._session and self._req_sess is not None:
self._req_sess.headers.update({
'Content-Type': 'application/json',
'Content-Length': '0',
})
url = urllib.parse.urljoin(self._current_baseurl,
'cas/sessions/%s' % self._session)
if get_option('cas.debug.requests'):
_print_request('DELETE', url, self._req_sess.headers)
res = self._req_sess.delete(url, data=b'')
self._session = None
return res.status_code