Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def from_stream(self, r):
size = r.get_byte()
if size not in (4, 8):
raise InterfaceError('Invalid SYBDATETIMN size', size)
return DateTimeN(size)
def get_type_info(self, curcol):
""" Reads TYPE_INFO structure (http://msdn.microsoft.com/en-us/library/dd358284.aspx)
:param curcol: An instance of :class:`Column` that will receive read information
"""
r = self._reader
# User defined data type of the column
curcol.column_usertype = r.get_uint() if IS_TDS72_PLUS(self) else r.get_usmallint()
curcol.flags = r.get_usmallint() # Flags
curcol.column_nullable = curcol.flags & Column.fNullable
curcol.column_writeable = (curcol.flags & Column.fReadWrite) > 0
curcol.column_identity = (curcol.flags & Column.fIdentity) > 0
type_id = r.get_byte()
type_class = self._tds._type_map.get(type_id)
if not type_class:
raise InterfaceError('Invalid type id', type_id)
curcol.type = type_class.from_stream(r)
def read(self, r):
size = r.get_byte()
if not size:
return None
else:
if size == 8:
return r.unpack(_flt8_struct)[0]
elif size == 4:
return r.unpack(_flt4_struct)[0]
else:
raise InterfaceError('Invalid SYBFLTN size', size)
def read(self, r):
size = r.get_byte()
if size == 0:
return None
if size != 16:
raise InterfaceError('Invalid size of UNIQUEIDENTIFIER field')
return self.read_fixed(r, size)
MsUnique.instance = MsUnique()
def bad_stream(self, msg):
""" Called when input stream contains unexpected data.
Will close stream and raise :class:`InterfaceError`
:param msg: Message for InterfaceError exception.
:return: Never returns, always raises exception.
"""
self.close()
raise InterfaceError(msg)
def from_stream(cls, r):
size = r.get_byte()
if size not in (4, 8):
raise InterfaceError('Invalid SYBFLTN size', size)
return cls(size)
def from_stream(cls, r):
size = r.get_byte()
if size not in cls._valid_sizes:
raise InterfaceError('Invalid size of INTN field', size)
return cls(size)
self.state = state
prior_state = self.state
return self.state
if state == prior_state:
return state
if state == TDS_PENDING:
if prior_state in (TDS_READING, TDS_QUERYING):
self.state = TDS_PENDING
else:
raise InterfaceError('logic error: cannot chage query state from {0} to {1}'.
format(state_names[prior_state], state_names[state]))
elif state == TDS_READING:
# transition to READING are valid only from PENDING
if self.state != TDS_PENDING:
raise InterfaceError('logic error: cannot change query state from {0} to {1}'.
format(state_names[prior_state], state_names[state]))
else:
self.state = state
elif state == TDS_IDLE:
if prior_state == TDS_DEAD:
raise InterfaceError('logic error: cannot change query state from {0} to {1}'.
format(state_names[prior_state], state_names[state]))
self.state = state
elif state == TDS_DEAD:
self.state = state
elif state == TDS_QUERYING:
if self.state == TDS_DEAD:
raise InterfaceError('logic error: cannot change query state from {0} to {1}'.
format(state_names[prior_state], state_names[state]))
elif self.state != TDS_IDLE:
raise InterfaceError('logic error: cannot change query state from {0} to {1}'.
# transition to READING are valid only from PENDING
if self.state != TDS_PENDING:
raise InterfaceError('logic error: cannot change query state from {0} to {1}'.
format(state_names[prior_state], state_names[state]))
else:
self.state = state
elif state == TDS_IDLE:
if prior_state == TDS_DEAD:
raise InterfaceError('logic error: cannot change query state from {0} to {1}'.
format(state_names[prior_state], state_names[state]))
self.state = state
elif state == TDS_DEAD:
self.state = state
elif state == TDS_QUERYING:
if self.state == TDS_DEAD:
raise InterfaceError('logic error: cannot change query state from {0} to {1}'.
format(state_names[prior_state], state_names[state]))
elif self.state != TDS_IDLE:
raise InterfaceError('logic error: cannot change query state from {0} to {1}'.
format(state_names[prior_state], state_names[state]))
else:
self.rows_affected = TDS_NO_COUNT
self.internal_sp_called = 0
self.state = state
else:
assert False
return self.state