Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import re
from .._compat import integer_types, long
from .base import SQLAdapter
from . import adapters
@adapters.register_for('sapdb')
class SAPDB(SQLAdapter):
dbengine = 'sapdb'
drivers = ('sapdb',)
REGEX_URI = re.compile(
'^(?P[^:@]+)(\:(?P[^@]*))?@(?P\[[^/]+\]|' +
'[^\:@]+)(\:(?P[0-9]+))?/(?P[^\?]+)' +
'(\?sslmode=(?P.+))?$')
def _initialize_(self, do_connect):
super(SAPDB, self)._initialize_(do_connect)
ruri = self.uri.split('://', 1)[1]
m = self.REGEX_URI.match(ruri)
if not m:
raise SyntaxError("Invalid URI string in DAL")
user = self.credential_decoder(m.group('user'))
from .base import NoSQLAdapter
from .mysql import MySQL
from .postgres import PostgrePsyco
from . import adapters, with_connection_or_raise
from .._gae import gae
if gae:
from .._gae import ndb, rdbms, namespace_manager, classobj, NDBPolyModel
from ..helpers.gae import NDBDecimalProperty
class GoogleMigratorMixin(object):
migrator_cls = InDBMigrator
@adapters.register_for("google:sql")
class GoogleSQL(GoogleMigratorMixin, MySQL):
uploads_in_blob = True
REGEX_URI = "^(?P.*)/(?P.+)$"
def _find_work_folder(self):
super(GoogleSQL, self)._find_work_folder()
if os.path.isabs(self.folder) and self.folder.startswith(os.getcwd()):
self.folder = os.path.relpath(self.folder, os.getcwd())
def _initialize_(self):
super(GoogleSQL, self)._initialize_()
self.folder = self.folder or pjoin(
"$HOME",
THREAD_LOCAL._pydal_folder_.split(os.sep + "applications" + os.sep, 1)[1],
)
ruri = self.uri.split("://", 1)[1]
FakeCursor, Reference, SQLALL, ConnectionConfigurationMixin)
from ..helpers.methods import use_common_filters, xorify
from ..objects import Field, Row, Query, Expression
from .base import NoSQLAdapter
from . import adapters
try:
from bson import Binary
from bson.binary import USER_DEFINED_SUBTYPE
except:
class Binary(object):
pass
USER_DEFINED_SUBTYPE = 0
@adapters.register_for('mongodb')
class Mongo(ConnectionConfigurationMixin, NoSQLAdapter):
dbengine = 'mongodb'
drivers = ('pymongo',)
def find_driver(self):
super(Mongo, self).find_driver()
#: ensure pymongo version >= 3.0
if 'fake_version' in self.driver_args:
version = self.driver_args['fake_version']
else:
from pymongo import version
if int(version.split('.')[0]) < 3:
raise RuntimeError(
"pydal requires pymongo version >= 3.0, found '%s'" % version)
def _initialize_(self, do_connect):
from ..helpers.classes import ConnectionConfigurationMixin
from .base import SQLAdapter
from . import adapters, with_connection_or_raise
@adapters.register_for('informix')
class Informix(ConnectionConfigurationMixin, SQLAdapter):
dbengine = 'informix'
drivers = ('informixdb',)
def _initialize_(self, do_connect):
super(Informix, self)._initialize_(do_connect)
ruri = self.uri.split('://', 1)[1]
m = self.REGEX_URI.match(ruri)
if not m:
raise SyntaxError("Invalid URI string in DAL")
user = self.credential_decoder(m.group('user'))
if not user:
raise SyntaxError('User required')
password = self.credential_decoder(m.group('password'))
if not password:
password = ''
args[0] = to_unicode(args[0])
return super(PostgrePG8000, self).execute(*args, **kwargs)
@adapters.register_for('postgres2')
class PostgreNew(Postgre):
def _get_json_dialect(self):
from ..dialects.postgre import PostgreDialectArraysJSON
return PostgreDialectArraysJSON
def _get_json_parser(self):
from ..parsers.postgre import PostgreNewAutoJSONParser
return PostgreNewAutoJSONParser
@adapters.register_for('postgres2:psycopg2')
class PostgrePsycoNew(PostgrePsyco, PostgreNew):
pass
@adapters.register_for('postgres2:pg8000')
class PostgrePG8000New(PostgrePG8000, PostgreNew):
pass
@adapters.register_for('postgres3')
class PostgreBoolean(PostgreNew):
def _get_json_dialect(self):
from ..dialects.postgre import PostgreDialectBooleanJSON
return PostgreDialectBooleanJSON
def _get_json_parser(self):
# Older Ingres releases could use rule/trigger like Oracle above.
if hasattr(table, '_primarykey'):
modify_tbl_sql = 'modify %s to btree unique on %s' % \
(table._rname,
', '.join(["'%s'" % x for x in table.primarykey]))
self.execute(modify_tbl_sql)
else:
tmp_seqname = '%s_iisq' % table._raw_rname
query = query.replace(self.dialect.INGRES_SEQNAME, tmp_seqname)
self.execute('create sequence %s' % tmp_seqname)
self.execute(query)
self.execute(
'modify %s to btree unique on %s' % (table._rname, 'id'))
@adapters.register_for('ingresu')
class IngresUnicode(Ingres):
pass
'mssqln': MSSQL4N,
'mssqln2': MSSQL1N,
'mssqln3': MSSQL3N,
'postgres': PostgreBoolean,
'postgres:psycopg2': PostgrePsycoBoolean,
'postgres:pg8000': PostgrePG8000Boolean,
'postgres2': PostgreNew,
'postgres2:psycopg2': PostgrePsycoNew,
'postgres2:pg8000': PostgrePG8000New,
'postgres3': Postgre,
'postgres3:psycopg2': PostgrePsyco,
'postgres3:pg8000': PostgrePG8000
})
@adapters.register_for('sqlite', 'sqlite:memory')
class SQLite(_SQLite):
def _initialize_(self, do_connect):
super(SQLite, self)._initialize_(do_connect)
self.driver_args['isolation_level'] = None
def begin(self, lock_type=None):
statement = 'BEGIN %s;' % lock_type if lock_type else 'BEGIN;'
self.execute(statement)
def _wrap_on_obj(f, adapter):
@wraps(f)
def wrapped(*args, **kwargs):
return f(adapter, *args, **kwargs)
return wrapped
class MSSQLN(MSSQL):
def represent(self, obj, field_type):
rv = super(MSSQLN, self).represent(obj, field_type)
if field_type in ("string", "text", "json") and rv.startswith("'"):
rv = "N" + rv
return rv
@with_connection_or_raise
def execute(self, *args, **kwargs):
if PY2:
args = list(args)
args[0] = to_unicode(args[0])
return super(MSSQLN, self).execute(*args, **kwargs)
@adapters.register_for("mssqln", "mssql2")
class MSSQL1N(MSSQLN, Slicer):
pass
@adapters.register_for("mssql3n")
class MSSQL3N(MSSQLN):
pass
@adapters.register_for("mssql4n")
class MSSQL4N(MSSQLN):
pass
@adapters.register_for("pytds")
class PyTDS(MSSQL):
handlers = self._build_handlers_for_execution()
for handler in handlers:
handler.before_execute(command)
rv = self.cursor.execute(command, *args[1:], **kwargs)
for handler in handlers:
handler.after_execute(command)
return rv
def test_connection(self):
self.execute('SELECT COUNT(*) FROM systables;')
def lastrowid(self, table):
return self.cursor.sqlerrd[1]
@adapters.register_for('informix-se')
class InformixSE(Informix):
def rowslice(self, rows, minimum=0, maximum=None):
if maximum is None:
return rows[minimum:]
return rows[minimum:maximum]
if self.driver.__version__ >= '2.5.0':
self.parser = self._get_json_parser()(self)
def adapt(self, obj):
adapted = psycopg2_adapt(obj)
# deal with new relic Connection Wrapper (newrelic>=2.10.0.8)
cxn = getattr(self.connection, '__wrapped__', self.connection)
adapted.prepare(cxn)
rv = adapted.getquoted()
if not PY2:
if isinstance(rv, bytes):
return rv.decode('utf-8')
return rv
@adapters.register_for('postgres:pg8000')
class PostgrePG8000(Postgre):
drivers = ('pg8000',)
def _config_json(self):
if self.connection._server_version >= "9.2.0":
self.dialect = self._get_json_dialect()(self)
if self.driver.__version__ >= '1.10.2':
self.parser = self._get_json_parser()(self)
def adapt(self, obj):
return "'%s'" % obj.replace("%", "%%").replace("'", "''")
@with_connection_or_raise
def execute(self, *args, **kwargs):
if PY2:
args = list(args)