Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
.setBinaryFromBase64(row, offset,
a2n(base64.b64encode(
a2b(transformer(value))))),
self._sw_databuffer)
else:
errorcheck(self._sw_databuffer.setBinaryFromBase64(row,
offset,
a2n('')),
self._sw_databuffer)
else:
if isinstance(value, (text_types, binary_types)):
errorcheck(self._sw_databuffer.setString(row, offset,
a2n(transformer(value))),
self._sw_databuffer)
else:
errorcheck(self._sw_databuffer.setString(row, offset, a2n('')),
self._sw_databuffer)
elif vrtype == 'NUMERIC' and vtype in ['INT32', 'DATE']:
if pd.isnull(value):
value = get_option('cas.missing.%s' % vtype.lower())
warnings.warn(('Missing value found in 32-bit '
+ 'integer-based column \'%s\'.\n' % v['name'])
+ ('Substituting cas.missing.%s option value (%s).' %
(vtype.lower(), value)),
RuntimeWarning)
if length > 4:
for i in range(int64(length / 4)):
errorcheck(self._sw_databuffer.setInt32(row, offset + (i * 4),
int32(transformer(get(value, i, 0)))),
self._sw_databuffer)
else:
errorcheck(self._sw_databuffer.setInt32(row, offset,
out = errorcheck(a2u(self._sw_formatter.formatInt64(
int64(value), a2n('best12.'), int32(width)),
a2n('utf-8')),
self._sw_formatter)
elif isinstance(value, text_types):
try:
out = errorcheck(a2u(self._sw_formatter.formatString(
a2n(value), a2n(sasfmt),
int32(width)), a2n('utf-8')),
self._sw_formatter)
except SWATError:
out = value
# TODO: Should binary types ever get here?
elif isinstance(value, binary_types):
out = errorcheck(a2u(self._sw_formatter.formatString(
a2n(value), a2n(sasfmt), int32(width)), a2n('utf-8')),
self._sw_formatter)
elif isinstance(value, bool_types):
out = errorcheck(a2u(self._sw_formatter.formatInt32(
int32(value), a2n(sasfmt), int32(width)), a2n('utf-8')),
self._sw_formatter)
elif isinstance(value, (datetime.datetime, Timestamp)):
out = errorcheck(a2u(self._sw_formatter.formatDouble(
utils.datetime.python2sas_datetime(value),
a2n(sasfmt), int32(width)),
a2n('utf-8')),
self._sw_formatter)
elif isinstance(value, datetime.date):
out = errorcheck(a2u(self._sw_formatter.formatDouble(
utils.datetime.python2sas_date(value),
a2n(sasfmt), int32(width)),
a2n('utf-8')),
if param['parmType'] == 'boolean':
output.append('%sDefault: %s' % (nextindent,
param['default'] and True or False))
else:
output.append('%sDefault: %s' % (nextindent, param['default']))
if selector_values:
output.append('')
output.append('%s%s.%s : %s' % (nextindent, path,
param['selector'].lower(), selector_type))
output.append('%sDefault: %s' % (nextnextindent, selector_values[0]))
value = a2n('Values: %s' %
(', '.join([('%s' % x) for x in selector_values])), 'utf-8')
if wraptext:
output.append(''.join(wraptext(value, width,
a2n(nextnextindent, 'utf-8'),
a2n(nextnextindent + ' ' * 8, 'utf-8'))))
else:
output.append(textwrap.fill('Values: %s' %
(', '.join([('%s' % x)
for x in selector_values])),
width, initial_indent=nextnextindent,
subsequent_indent=nextnextindent + ' ' * 8))
elif 'allowedValues' in param:
value = a2n('Values: %s' % (', '.join([('%s' % x)
for x in param['allowedValues']])), 'utf-8')
if wraptext:
output.append(''.join(wraptext(value, width,
a2n(nextindent, 'utf-8'),
a2n(nextindent + ' ' * 8, 'utf-8'))))
else:
output.append(textwrap.fill('Values: %s' %
elif typ == 'int64-array':
nitems = errorcheck(_sw_table.getColumnAttributeNItems(), _sw_table)
attrs[key] = []
for i in range(nitems):
attrs[key].append(errorcheck(
_sw_table.getColumnInt64ArrayAttributeItem(col,
a2n(key, 'utf-8'),
i),
_sw_table))
elif typ == 'double-array':
nitems = errorcheck(_sw_table.getColumnAttributeNItems(), _sw_table)
attrs[key] = []
for i in range(nitems):
attrs[key].append(errorcheck(
_sw_table.getColumnDoubleArrayAttributeItem(col,
a2n(key, 'utf-8'),
i),
_sw_table))
return cls(name=name, label=label, dtype=dtype, width=width, format=format,
size=size, attrs=attrs)
intmiss[col.name] = {-2147483648: np.nan}
elif dtype == 'int64-array':
for elem in range(col.size[1]):
col = SASColumnSpec.fromtable(_sw_table, i, elem=elem)
dtypes.append((col.name, 'i8'))
colinfo[col.name] = col
intmiss[col.name] = {-9223372036854775808: np.nan}
elif dtype == 'double-array':
for elem in range(col.size[1]):
col = SASColumnSpec.fromtable(_sw_table, i, elem=elem)
dtypes.append((col.name, 'f8'))
colinfo[col.name] = col
kwargs['colinfo'] = colinfo
# Numpy doesn't like unicode column names in Python 2, so map them to utf-8
dtypes = [(a2n(x[0], 'utf-8'), x[1]) for x in dtypes]
# Create a np.array and fill it
kwargs['data'] = np.array(_sw_table.toTuples(a2n(
get_option('encoding_errors'), 'utf-8'),
casdt.cas2python_datetime, casdt.cas2python_date,
casdt.cas2python_time),
dtype=dtypes)
# Short circuit for numpy arrays
# if tformat == 'numpy_array':
# return kwargs['data']
cdf = SASDataFrame(**kwargs)
# Map column names back to unicode in pandas
cdf.columns = [a2u(x[0], 'utf-8') for x in dtypes]
datetime.date)):
value = python2cas_date(value)
elif vtype == 'TIME' and isinstance(value, (datetime.datetime,
datetime.time)):
value = python2cas_time(value)
elif vtype == 'DATETIME' and isinstance(value, (datetime.date,
datetime.time,
datetime.datetime)):
value = python2cas_datetime(value)
if vrtype == 'CHAR' or vtype in ['VARCHAR', 'CHAR', 'BINARY', 'VARBINARY']:
if vtype in ['BINARY', 'VARBINARY'] \
and hasattr(self._sw_databuffer, 'setBinaryFromBase64'):
if isinstance(value, (binary_types, text_types)):
errorcheck(self._sw_databuffer
.setBinaryFromBase64(row, offset,
a2n(base64.b64encode(
a2b(transformer(value))))),
self._sw_databuffer)
else:
errorcheck(self._sw_databuffer.setBinaryFromBase64(row,
offset,
a2n('')),
self._sw_databuffer)
else:
if isinstance(value, (text_types, binary_types)):
errorcheck(self._sw_databuffer.setString(row, offset,
a2n(transformer(value))),
self._sw_databuffer)
else:
errorcheck(self._sw_databuffer.setString(row, offset, a2n('')),
self._sw_databuffer)
elif vrtype == 'NUMERIC' and vtype in ['INT32', 'DATE']:
elif typ == 'int32-array':
nitems = errorcheck(_sw_table.getColumnAttributeNItems(), _sw_table)
attrs[key] = []
for i in range(nitems):
attrs[key].append(errorcheck(
_sw_table.getColumnInt32ArrayAttributeItem(col,
a2n(key, 'utf-8'),
i),
_sw_table))
elif typ == 'int64-array':
nitems = errorcheck(_sw_table.getColumnAttributeNItems(), _sw_table)
attrs[key] = []
for i in range(nitems):
attrs[key].append(errorcheck(
_sw_table.getColumnInt64ArrayAttributeItem(col,
a2n(key, 'utf-8'),
i),
_sw_table))
elif typ == 'double-array':
nitems = errorcheck(_sw_table.getColumnAttributeNItems(), _sw_table)
attrs[key] = []
for i in range(nitems):
attrs[key].append(errorcheck(
_sw_table.getColumnDoubleArrayAttributeItem(col,
a2n(key, 'utf-8'),
i),
_sw_table))
return cls(name=name, label=label, dtype=dtype, width=width, format=format,
size=size, attrs=attrs)
self._sw_formatter)
elif isinstance(value, datetime.date):
out = errorcheck(a2u(self._sw_formatter.formatDouble(
utils.datetime.python2sas_date(value),
a2n(sasfmt), int32(width)),
a2n('utf-8')),
self._sw_formatter)
elif isinstance(value, datetime.time):
out = errorcheck(a2u(self._sw_formatter.formatDouble(
utils.datetime.python2sas_time(value),
a2n(sasfmt), int32(width)),
a2n('utf-8')),
self._sw_formatter)
elif value is None:
out = errorcheck(a2u(self._sw_formatter.formatString(
a2n(''), a2n(sasfmt), int32(width)), a2n('utf-8')),
self._sw_formatter)
# For CASTable columns in dataframes
elif isinstance(value, CASTable):
return a2u(str(value))
if out is None:
raise TypeError(type(value))
return out
# Compute reclen
if reclen is None:
reclen = sum([v['length'] for v in self.vars])
self.reclen = reclen
# Compute offsets
next_offset = 0
for v in self.vars:
if v.get('offset', None) is None:
v['offset'] = next_offset
next_offset = v['offset'] + (v['length'] * v.get('nvalues', 1))
_sw_error = clib.SW_CASError(a2n(soptions))
self._sw_databuffer = errorcheck(clib.SW_CASDataBuffer(int64(self.reclen),
int64(self.nrecs),
a2n(soptions), _sw_error),
_sw_error)
and hasattr(self._sw_databuffer, 'setBinaryFromBase64'):
if isinstance(value, (binary_types, text_types)):
errorcheck(self._sw_databuffer
.setBinaryFromBase64(row, offset,
a2n(base64.b64encode(
a2b(transformer(value))))),
self._sw_databuffer)
else:
errorcheck(self._sw_databuffer.setBinaryFromBase64(row,
offset,
a2n('')),
self._sw_databuffer)
else:
if isinstance(value, (text_types, binary_types)):
errorcheck(self._sw_databuffer.setString(row, offset,
a2n(transformer(value))),
self._sw_databuffer)
else:
errorcheck(self._sw_databuffer.setString(row, offset, a2n('')),
self._sw_databuffer)
elif vrtype == 'NUMERIC' and vtype in ['INT32', 'DATE']:
if pd.isnull(value):
value = get_option('cas.missing.%s' % vtype.lower())
warnings.warn(('Missing value found in 32-bit '
+ 'integer-based column \'%s\'.\n' % v['name'])
+ ('Substituting cas.missing.%s option value (%s).' %
(vtype.lower(), value)),
RuntimeWarning)
if length > 4:
for i in range(int64(length / 4)):
errorcheck(self._sw_databuffer.setInt32(row, offset + (i * 4),
int32(transformer(get(value, i, 0)))),