Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class OraIntConverter(dbapiprovider.IntConverter):
signed_types = {None: 'NUMBER(38)', 8: 'NUMBER(3)', 16: 'NUMBER(5)', 24: 'NUMBER(7)', 32: 'NUMBER(10)', 64: 'NUMBER(19)'}
unsigned_types = {None: 'NUMBER(38)', 8: 'NUMBER(3)', 16: 'NUMBER(5)', 24: 'NUMBER(8)', 32: 'NUMBER(10)', 64: 'NUMBER(20)'}
def init(self, kwargs):
dbapiprovider.IntConverter.init(self, kwargs)
sequence_name = kwargs.pop('sequence_name', None)
if sequence_name is not None and not (self.attr.auto and self.attr.is_pk):
throw(TypeError, "Parameter 'sequence_name' can be used only for PrimaryKey attributes with auto=True")
class OraRealConverter(dbapiprovider.RealConverter):
# Note that Oracle has differnet representation of float numbers
def sql_type(converter):
return 'NUMBER'
class OraDecimalConverter(dbapiprovider.DecimalConverter):
def sql_type(converter):
return 'NUMBER(%d, %d)' % (converter.precision, converter.scale)
class OraBlobConverter(dbapiprovider.BlobConverter):
def sql2py(converter, val):
return buffer(val.read())
class OraDateConverter(dbapiprovider.DateConverter):
def sql2py(converter, val):
if isinstance(val, datetime): return val.date()
if not isinstance(val, date): throw(ValueError,
'Value of unexpected type received from database: instead of date got %s', type(val))
return val
class OraTimeConverter(dbapiprovider.TimeConverter):
sql_type_name = 'INTERVAL DAY(0) TO SECOND'
return 'LONGBLOB'
class MySQLTimeConverter(dbapiprovider.TimeConverter):
def sql2py(converter, val):
if isinstance(val, timedelta): # MySQLdb returns timedeltas instead of times
total_seconds = val.days * (24 * 60 * 60) + val.seconds
if 0 <= total_seconds <= 24 * 60 * 60:
minutes, seconds = divmod(total_seconds, 60)
hours, minutes = divmod(minutes, 60)
return time(hours, minutes, seconds, val.microseconds)
elif not isinstance(val, time): throw(ValueError,
'Value of unexpected type received from database%s: instead of time or timedelta got %s'
% ('for attribute %s' % converter.attr if converter.attr else '', type(val)))
return val
class MySQLTimedeltaConverter(dbapiprovider.TimedeltaConverter):
sql_type_name = 'TIME'
class MySQLUuidConverter(dbapiprovider.UuidConverter):
def sql_type(converter):
return 'BINARY(16)'
class MySQLJsonConverter(dbapiprovider.JsonConverter):
EQ = 'JSON_EQ'
NE = 'JSON_NE'
def init(self, kwargs):
if self.provider.server_version < (5, 7, 8):
version = '.'.join(imap(str, self.provider.server_version))
raise NotImplementedError("MySQL %s has no JSON support" % version)
class MySQLProvider(DBAPIProvider):
dialect = 'MySQL'
max_time_precision = default_time_precision = 0
varchar_default_max_len = 255
uint64_support = True
dbapi_module = mysql_module
dbschema_cls = MySQLSchema
translator_cls = MySQLTranslator
sqlbuilder_cls = MySQLBuilder
fk_types = { 'SERIAL' : 'BIGINT UNSIGNED' }
converter_classes = [
(NoneType, dbapiprovider.NoneConverter),
(bool, dbapiprovider.BoolConverter),
(basestring, MySQLStrConverter),
(int_types, dbapiprovider.IntConverter),
(float, MySQLRealConverter),
(Decimal, dbapiprovider.DecimalConverter),
(datetime, dbapiprovider.DatetimeConverter),
(date, dbapiprovider.DateConverter),
(time, MySQLTimeConverter),
(timedelta, MySQLTimedeltaConverter),
(UUID, MySQLUuidConverter),
(buffer, MySQLBlobConverter),
(ormtypes.Json, MySQLJsonConverter),
]
def normalize_name(provider, name):
return name[:provider.max_name_len].lower()
@wrap_dbapi_exceptions
def inspect_connection(provider, connection):
def drop_table(provider, connection, table_name):
cursor = connection.cursor()
sql = 'DROP TABLE %s CASCADE' % provider.quote_name(table_name)
cursor.execute(sql)
converter_classes = [
(NoneType, dbapiprovider.NoneConverter),
(bool, dbapiprovider.BoolConverter),
(basestring, PGStrConverter),
(int_types, PGIntConverter),
(float, PGRealConverter),
(Decimal, dbapiprovider.DecimalConverter),
(datetime, PGDatetimeConverter),
(date, dbapiprovider.DateConverter),
(time, dbapiprovider.TimeConverter),
(timedelta, PGTimedeltaConverter),
(UUID, PGUuidConverter),
(buffer, PGBlobConverter),
(ormtypes.Json, PGJsonConverter),
]
provider_cls = PGProvider
if converter.max_len:
return 'VARCHAR(%d)' % converter.max_len
attr = converter.attr
if attr is not None and (attr.is_unique or attr.composite_keys):
return 'VARCHAR(8000)'
return 'VARCHAR(MAX)'
class MSRealConverter(dbapiprovider.RealConverter):
def sql_type(converter):
return 'FLOAT'
class MSBlobConverter(dbapiprovider.BlobConverter):
def sql_type(converter):
return 'VARBINARY(MAX)'
class MSTimeConverter(dbapiprovider.TimeConverter):
def sql2py(converter, val):
if isinstance(val, timedelta): # MySQLdb returns timedeltas instead of times
total_seconds = val.days * (24 * 60 * 60) + val.seconds
if 0 <= total_seconds <= 24 * 60 * 60:
minutes, seconds = divmod(total_seconds, 60)
hours, minutes = divmod(minutes, 60)
return time(hours, minutes, seconds, val.microseconds)
elif not isinstance(val, time): throw(ValueError,
'Value of unexpected type received from database%s: instead of time or timedelta got %s'
% ('for attribute %s' % converter.attr if converter.attr else '', type(val)))
return val
class MSTimedeltaConverter(dbapiprovider.TimedeltaConverter):
sql_type_name = 'TIME'
class MSUuidConverter(dbapiprovider.UuidConverter):
class PGBlobConverter(dbapiprovider.BlobConverter):
def sql_type(converter):
return 'BYTEA'
class PGTimedeltaConverter(dbapiprovider.TimedeltaConverter):
sql_type_name = 'INTERVAL DAY TO SECOND'
class PGDatetimeConverter(dbapiprovider.DatetimeConverter):
sql_type_name = 'TIMESTAMP'
class PGUuidConverter(dbapiprovider.UuidConverter):
def py2sql(converter, val):
return val
class PGJsonConverter(dbapiprovider.JsonConverter):
def sql_type(self):
return "JSONB"
class PGArrayConverter(dbapiprovider.ArrayConverter):
array_types = {
int: ('int', PGIntConverter),
unicode: ('text', PGStrConverter),
float: ('double precision', PGRealConverter)
}
class PGPool(Pool):
def _connect(pool):
pool.con = pool.dbapi_module.connect(*pool.args, **pool.kwargs)
if 'client_encoding' not in pool.kwargs:
pool.con.set_client_encoding('UTF8')
def release(pool, con):
dbschema_cls = MSSchema
translator_cls = MSTranslator
sqlbuilder_cls = MSBuilder
default_schema_name = 'dbo'
name_before_table = 'db_name'
converter_classes = [
(NoneType, dbapiprovider.NoneConverter),
(bool, MSBoolConverter),
(basestring, MSStrConverter),
(int_types, MSIntConverter),
(float, MSRealConverter),
(Decimal, dbapiprovider.DecimalConverter),
(datetime, dbapiprovider.DatetimeConverter),
(date, MSDateConverter),
(time, MSTimeConverter),
(timedelta, MSTimedeltaConverter),
(UUID, MSUuidConverter),
(buffer, MSBlobConverter),
]
def normalize_name(provider, name):
return name[:provider.max_name_len].lower()
@wrap_dbapi_exceptions
def inspect_connection(provider, connection):
cursor = connection.cursor()
cursor.execute('select @@version')
row = cursor.fetchone()
assert row is not None
return Value.__unicode__(self)
class PGSQLBuilder(sqlbuilding.SQLBuilder):
dialect = 'PostgreSQL'
make_value = PGValue
def INSERT(builder, table_name, columns, values, returning=None):
result = sqlbuilding.SQLBuilder.INSERT(builder, table_name, columns, values)
if returning is not None:
result.extend([' RETURNING ', builder.quote_name(returning) ])
return result
def TO_INT(builder, expr):
return '(', builder(expr), ')::int'
def DATE(builder, expr):
return '(', builder(expr), ')::date'
class PGUnicodeConverter(dbapiprovider.UnicodeConverter):
def py2sql(converter, val):
return val.encode('utf-8')
def sql2py(converter, val):
if isinstance(val, unicode): return val
return val.decode('utf-8')
class PGStrConverter(dbapiprovider.StrConverter):
def py2sql(converter, val):
return val.decode(converter.encoding).encode('utf-8')
def sql2py(converter, val):
if not isinstance(val, unicode):
if converter.utf8: return val
val = val.decode('utf-8')
return val.encode(converter.encoding, 'replace')
class PGLongConverter(dbapiprovider.IntConverter):
max_params_count = 10000
table_if_not_exists_syntax = True
index_if_not_exists_syntax = False
max_time_precision = default_time_precision = 0
varchar_default_max_len = 255
uint64_support = True
dbapi_module = mysql_module
dbschema_cls = MySQLSchema
translator_cls = MySQLTranslator
sqlbuilder_cls = MySQLBuilder
fk_types = { 'SERIAL' : 'BIGINT UNSIGNED' }
converter_classes = [
(NoneType, dbapiprovider.NoneConverter),
(bool, dbapiprovider.BoolConverter),
(basestring, MySQLStrConverter),
(int_types, dbapiprovider.IntConverter),
(float, MySQLRealConverter),
(Decimal, dbapiprovider.DecimalConverter),
(datetime, dbapiprovider.DatetimeConverter),
(date, dbapiprovider.DateConverter),
(time, MySQLTimeConverter),
(timedelta, MySQLTimedeltaConverter),
(UUID, MySQLUuidConverter),
(buffer, MySQLBlobConverter),
(ormtypes.Json, MySQLJsonConverter),
]
def normalize_name(provider, name):
return name[:provider.max_name_len].lower()
return val
def py2sql(converter, val):
return timedelta(hours=val.hour, minutes=val.minute, seconds=val.second, microseconds=val.microsecond)
class OraTimedeltaConverter(dbapiprovider.TimedeltaConverter):
sql_type_name = 'INTERVAL DAY TO SECOND'
def __init__(converter, provider, py_type, attr=None):
dbapiprovider.TimedeltaConverter.__init__(converter, provider, py_type, attr)
if attr is not None and converter.precision > 0:
# cx_Oracle 5.1.3 corrupts microseconds for values of DAY TO SECOND type
converter.precision = 0
class OraDatetimeConverter(dbapiprovider.DatetimeConverter):
sql_type_name = 'TIMESTAMP'
class OraUuidConverter(dbapiprovider.UuidConverter):
def sql_type(converter):
return 'RAW(16)'
class OraJsonConverter(dbapiprovider.JsonConverter):
json_kwargs = {'separators': (',', ':'), 'sort_keys': True, 'ensure_ascii': False}
optimistic = False # CLOBs cannot be compared with strings, and TO_CHAR(CLOB) returns first 4000 chars only
def sql2py(converter, dbval):
if hasattr(dbval, 'read'): dbval = dbval.read()
return dbapiprovider.JsonConverter.sql2py(converter, dbval)
def sql_type(converter):
return 'CLOB'
class OraProvider(DBAPIProvider):
dialect = 'Oracle'
paramstyle = 'named'
max_name_len = 30