Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return 'BEGIN;'
@staticmethod
def column_info(row):
return row['name'], row['dflt_value'], row['type']
@staticmethod
def extrapolator(field):
return ':%s' % field
@staticmethod
def row_ids(db, primary_key):
return [db.cursor().lastrowid]
class DatabaseConnection(BaseConnection):
DRIVER = sqlite3
LEXICON = Lexicon
@memoized_property
def connect_args(self):
return [self.db_config.database]
@memoized_property
def connect_kwargs(self):
return {'isolation_level': 'DEFERRED'}
@staticmethod
def format_args(args):
return args.values()
@staticmethod
def standardize_interpolators(sql, params):
sql, params = super(Lexicon, Lexicon).standardize_interpolators(sql, params)
param_names = re.findall(r'\%\((\w+)\)s', sql)
if len(param_names):
sql = re.sub(r'\%\(\w+\)s', '%s', sql)
if isinstance(params, dict):
params = list(map(lambda x: params[x], param_names))
return sql, params
class DatabaseConnection(BaseConnection):
DRIVER = sf
LEXICON = Lexicon
def __init__(self, db_config, name):
super(DatabaseConnection, self).__init__(db_config, name)
@memoized_property
def connect_kwargs(self):
kwargs = dict(
user=self.db_config.username,
password=self.db_config.password,
account=self.db_config.account,
database=self.db_config.database,
schema=self.db_config.schema
)
result = '(' \
+ ', '.join(fields) \
+ ') = '
result += ', '.join(
['(' + ', '.join(ext) + ')' for ext in value_extrapolators]
)
return result
@staticmethod
def row_ids(db, primary_key):
row_ids = db.cursor().fetchall()
row_ids = [r[primary_key] for r in row_ids]
return row_ids
class DatabaseConnection(BaseConnection):
DRIVER = pg
LEXICON = Lexicon
@retry(pg.OperationalError, tries=3)
def connect(self):
connection = super(DatabaseConnection, self).connect()
connection.initialize(config.logger)
return connection
@memoized_property
def connect_kwargs(self):
kwargs = super(DatabaseConnection, self).connect_kwargs
kwargs.update(connection_factory=extras.MinTimeLoggingConnection)
return kwargs
@staticmethod
def column_info(row):
return row['Field'], row['Default'], row['Type']
@staticmethod
def row_ids(db, primary_key):
db.execute('SELECT LAST_INSERT_ID();')
return [db.cursor().fetchall()[0][0]]
@staticmethod
def apply_watermark(query, watermark):
return ' '.join([watermark, query])
class DatabaseConnection(BaseConnection):
DRIVER = MySQLdb
LEXICON = Lexicon
@retry(DRIVER.OperationalError, tries=3)
def connect(self):
return super(DatabaseConnection, self).connect()
@retry(DRIVER.InterfaceError, tries=3)
def execute(self, *query):
return super(DatabaseConnection, self).execute(*query)