Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def getType(self):
''' Get the value type '''
_value = self._value
if isinstance(_value, float64_types):
return 'double'
if isinstance(_value, text_types):
return 'string'
if isinstance(_value, int32_types):
return 'int32'
if isinstance(_value, int64_types):
return 'int64'
if isinstance(_value, binary_types):
return 'string'
if isinstance(_value, items_types):
return 'list'
if isinstance(_value, dict):
if _value.get('_ctb'):
return 'table'
return 'list'
if _value is None:
return 'nil'
raise TypeError('%s: %s' % (self._key, _value))
def __doc__(self):
''' Documentation string '''
separator = ' '
if isinstance(self._value, (text_types, binary_types)) and len(self._value) > 40:
separator = '\n '
return '''%s : %s\n %s\n [default: %s]%s[currently: %s]\n''' % \
(self._name, self._typedesc, self._doc.rstrip().replace('\n', '\n '),
self._default, separator, self._value)
def __iter__(self):
''' Iterate over all results in the response '''
_sw_result = errorcheck(self._sw_response.getNextResult(), self._sw_response)
while _sw_result:
key = errorcheck(_sw_result.getKey(), _sw_result)
if key is None:
key = 0
elif isinstance(key, binary_types):
key = a2u(key, 'utf-8')
yield key, cas2py(_sw_result, self.soptions, connection=self._connection)
_sw_result = errorcheck(self._sw_response.getNextResult(), self._sw_response)
def dtype_from_var(value):
''' Guess the CAS data type from the value '''
if isinstance(value, int64_types):
return 'int64'
if isinstance(value, int32_types):
return 'int32'
if isinstance(value, float64_types):
return 'double'
if isinstance(value, text_types):
return 'varchar'
if isinstance(value, binary_types):
return 'varbinary'
if isinstance(value, datetime.datetime):
return 'datetime'
if isinstance(value, datetime.date):
return 'date'
if isinstance(value, datetime.time):
return 'time'
raise TypeError('Unrecognized type for value: %s' % value)
'''
for name, value in six.iteritems(kwargs):
name = str(name)
typ = errorcheck(self._sw_connection.getOptionType(name),
self._sw_connection)
try:
if typ == 'boolean':
if value in [True, False, 1, 0]:
errorcheck(self._sw_connection.setBooleanOption(name,
value and 1 or 0),
self._sw_connection)
else:
raise SWATError('%s is not a valid boolean value' % value)
elif typ == 'string':
if isinstance(value, (binary_types, text_types)):
errorcheck(self._sw_connection.setStringOption(name, a2n(value)),
self._sw_connection)
else:
errorcheck(self._sw_connection.setStringOption(name, value),
self._sw_connection)
elif typ == 'int32':
errorcheck(self._sw_connection.setInt32Option(name, int32(value)),
self._sw_connection)
elif typ == 'int64':
errorcheck(self._sw_connection.setInt64Option(name, int64(value)),
self._sw_connection)
elif typ == 'double':
errorcheck(self._sw_connection.setDoubleOption(name, float64(value)),
self._sw_connection)
except TypeError:
raise SWATError('%s is not the correct type' % value)
_sw_values : SWIG CASValueList object
List to set the value in
i : int or long
Index in the list to set
key : string
Key for the list item (None of no key is desired)
item : any
Value to set at the list index
Returns
-------
int
The next position in the list to set
'''
if isinstance(key, (binary_types, text_types)):
key = keywordify(a2n(key, 'utf-8'))
if item is True or item is False:
errorcheck(_sw_values.setBoolean(i, key, item and 1 or 0),
_sw_values)
i = i + 1
elif isinstance(item, blob):
errorcheck(_sw_values.setBlob(i, key, item), _sw_values)
i = i + 1
elif isinstance(item, text_types):
errorcheck(_sw_values.setString(i, key, a2n(item, 'utf-8')),
_sw_values)
i = i + 1
elif isinstance(item, binary_types):
errorcheck(_sw_values.setString(i, key, a2n(item, 'utf-8')),
_sw_values)
out = self._format_numeric(int32(value), sasfmt, width=width)
elif sasfmt and re.match(r'^(nl)(comma|mny|dollar|euro)\d*\.\d*$', sasfmt, flags=re.I):
out = self._format_numeric(value, sasfmt, width=width, commas=True)
else:
out = a2u(str(int32(value)))
except OverflowError:
if sasfmt and re.match(r'^[df]?\d*\.\d*', sasfmt, flags=re.I):
out = self._format_numeric(int64(value), sasfmt, width=width)
elif sasfmt and re.match(r'^(nl)?(comma|mny|dollar|euro)\d*\.\d*$', sasfmt, flags=re.I):
out = self._format_numeric(value, sasfmt, width=width, commas=True)
else:
out = a2u(str(int64(value)))
elif isinstance(value, text_types):
out = a2u(value)
# TODO: Should binary types ever get here?
elif isinstance(value, binary_types):
out = a2u(value)
elif isinstance(value, bool_types):
out = a2u(str(value))
elif isinstance(value, (datetime.date, datetime.time, datetime.datetime,
Timestamp)):
out = a2u(str(value))
elif value is None:
out = a2u('')
# For CASTable columns in dataframes
elif isinstance(value, CASTable):
return a2u(str(value))
if out is None:
raise TypeError(type(value))
'''
if isinstance(key, (binary_types, text_types)):
key = keywordify(a2n(key, 'utf-8'))
if item is True or item is False:
errorcheck(_sw_values.setBoolean(i, key, item and 1 or 0),
_sw_values)
i = i + 1
elif isinstance(item, blob):
errorcheck(_sw_values.setBlob(i, key, item), _sw_values)
i = i + 1
elif isinstance(item, text_types):
errorcheck(_sw_values.setString(i, key, a2n(item, 'utf-8')),
_sw_values)
i = i + 1
elif isinstance(item, binary_types):
errorcheck(_sw_values.setString(i, key, a2n(item, 'utf-8')),
_sw_values)
i = i + 1
elif isinstance(item, int64_types):
errorcheck(_sw_values.setInt64(i, key, int64(item)), _sw_values)
i = i + 1
elif isinstance(item, int32_types):
if item > MAX_INT32 or item < MIN_INT32:
errorcheck(_sw_values.setInt64(i, key, int64(item)), _sw_values)
else:
errorcheck(_sw_values.setInt32(i, key, int32(item)), _sw_values)
i = i + 1
elif isinstance(item, float64_types):
errorcheck(_sw_values.setDouble(i, key, float64(item)), _sw_values)
i = i + 1
elif item is nil:
if isinstance(kwargs['table'], dict):
if caslib and 'caslib' not in kwargs and kwargs['table'].get('caslib'):
kwargs['caslib'] = kwargs['table']['caslib']
kwargs['table'] = kwargs['table']['name']
# Add current value fields in the signature
for param in parmlist:
if param['name'] in kwargs:
if 'parmList' in param:
self._merge_param_args(param['parmList'], kwargs[param['name']],
action=action)
else:
if isinstance(kwargs[param['name']], text_types):
param['value'] = kwargs[param['name']].replace('"', '\\u0022')
# TODO: This should only happen for binary inputs (i.e., never)
elif isinstance(kwargs[param['name']], binary_types):
# param['value'] = kwargs[param['name']].replace('"', '\\u0022')
pass
else:
param['value'] = kwargs[param['name']]
def __init__(self, module, cursor, nrecs=1000, transformers=None):
self.cursor = cursor
self.cursor.arraysize = nrecs
# array of functions to transform data types that don't match SAS types
if transformers is None:
transformers = {}
DATETIME = getattr(module, 'DATETIME', datetime.datetime)
STRING = getattr(module, 'STRING', DBAPITypeObject(*text_types))
BINARY = getattr(module, 'BINARY', DBAPITypeObject(*binary_types))
def typemap(typ):
'''
Map database type to CAS type
Parameters
----------
typ : DBAPI data type
Returns
-------
list
[ column name, SAS data type string, CAS data type string, width ]
'''
output = [typ[0], 'NUMERIC', 'SAS', 8]