Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def defaultFromSchema(self, colClass, defaultstr):
"""
If the default can be converted to a python constant, convert it.
Otherwise return is as a sqlbuilder constant.
"""
if colClass == col.BoolCol:
if defaultstr == 'false':
return False
elif defaultstr == 'true':
return True
return getattr(sqlbuilder.const, defaultstr)
def accumulate(self, *expressions):
""" Use accumulate expression(s) to select result
using another SQL select through current
connection.
Return the accumulate result
"""
conn = self._getConnection()
exprs = []
for expr in expressions:
if not isinstance(expr, sqlbuilder.SQLExpression):
expr = sqlbuilder.SQLConstant(expr)
exprs.append(expr)
return conn.accumulateSelect(self, *exprs)
class JoinToTable(sqlbuilder.SQLExpression):
def __init__(self, table, idName, interTable, joinColumn):
self.table = table
self.idName = idName
self.interTable = interTable
self.joinColumn = joinColumn
def tablesUsedImmediate(self):
return [self.table, self.interTable]
def __sqlrepr__(self, db):
return '%s.%s = %s.%s' % (self.interTable, self.joinColumn, self.table,
self.idName)
class TableToId(sqlbuilder.SQLExpression):
def __init__(self, table, idName, idValue):
self.table = table
self.idName = idName
self.idValue = idValue
def tablesUsedImmediate(self):
return [self.table]
def __sqlrepr__(self, db):
return '%s.%s = %s' % (self.table, self.idName, self.idValue)
class SOSQLRelatedJoin(SORelatedJoin):
def performJoin(self, inst):
if inst.sqlmeta._perConnection:
conn = inst._connection
def generateQuery(self, data):
def ensureList(v):
if not isinstance(v,types.ListType):
v = [v]
return v
queries = []
i = sqlbuilder.table.Image
if data.has_key("ParameterFile"):
pids = ensureList(data.pop("ParameterFile"))
queries.append(sqlbuilder.IN(i.parameterfile_ID,
map(int, pids)))
if data.has_key("parameterfileID"):
pids = ensureList(data.pop("parameterfileID"))
queries.append(sqlbuilder.IN(i.parameterfile_ID,
map(int, pids)))
if data.has_key("enzorunID"):
pids = ensureList(data.pop("enzorunID"))
queries.append(sqlbuilder.IN(i.enzorun_ID,
map(int, pids)))
if data.has_key("Width"):
widths = ensureList(data.pop("Width"))
queries.append(sqlbuilder.IN(func.ROUND(i.Width),
map(int, map(float, widths))))
def generateQuery(self, data):
def ensureList(v):
if not isinstance(v,types.ListType):
v = [v]
return v
queries = []
i = sqlbuilder.table.Image
if data.has_key("ParameterFile"):
pids = ensureList(data.pop("ParameterFile"))
queries.append(sqlbuilder.IN(i.parameterfile_ID,
map(int, pids)))
if data.has_key("parameterfileID"):
pids = ensureList(data.pop("parameterfileID"))
queries.append(sqlbuilder.IN(i.parameterfile_ID,
map(int, pids)))
if data.has_key("enzorunID"):
pids = ensureList(data.pop("enzorunID"))
queries.append(sqlbuilder.IN(i.enzorun_ID,
map(int, pids)))
if data.has_key("Width"):
widths = ensureList(data.pop("Width"))
queries.append(sqlbuilder.IN(func.ROUND(i.Width),
map(int, map(float, widths))))
for k, v in data.items():
v = ensureList(v)
queries.append(sqlbuilder.IN(i.__getattr__(k),map(str,v)))
k = sqlbuilder.AND(*queries)
return k
def __setstate__(self, d):
self.__init__(_SO_fetch_no_create=1)
self._SO_validatorState = sqlbuilder.SQLObjectState(self)
self._SO_writeLock = threading.Lock()
self._SO_createValues = {}
self.__dict__.update(d)
cls = self.__class__
cache = self._connection.cache
if cache.tryGet(self.id, cls) is not None:
raise ValueError(
"Cannot unpickle %s row with id=%s - "
"a different instance with the id already exists "
"in the cache" % (cls.__name__, self.id))
cache.created(self.id, cls, self)
def _SO_columnClause(self, soClass, kw):
from . import main
data = []
if 'id' in kw:
data.append((soClass.sqlmeta.idName, kw.pop('id')))
for soColumn in soClass.sqlmeta.columnList:
key = soColumn.name
if key in kw:
val = kw.pop(key)
if soColumn.from_python:
val = soColumn.from_python(
val,
sqlbuilder.SQLObjectState(soClass, connection=self))
data.append((soColumn.dbName, val))
elif soColumn.foreignName in kw:
obj = kw.pop(soColumn.foreignName)
if isinstance(obj, main.SQLObject):
data.append((soColumn.dbName, obj.id))
else:
data.append((soColumn.dbName, obj))
if kw:
# pick the first key from kw to use to raise the error,
raise TypeError("got an unexpected keyword argument(s): "
"%r" % kw.keys())
if not data:
return None
return ' AND '.join(
['%s %s %s' %
# select is run by application. If this class is inheritance
# child, delegate query to the parent class to utilize
# InheritableIteration optimizations. Selected records
# are restricted to this (child) class by adding childName
# filter to the where clause.
# False:
# select is delegated from inheritance child which is parent
# of another class. Delegate the query to parent if possible,
# but don't add childName restriction: selected records
# will be filtered by join to the table filtered by childName.
if (not childUpdate) and parentClass:
if childUpdate is None:
# this is the first parent in deep hierarchy
addClause = parentClass.q.childName == cls.sqlmeta.childName
# if the clause was one of TRUE varians, replace it
if (clause is None) or (clause is sqlbuilder.SQLTrueClause) \
or (
isinstance(clause, string_type)
and (clause == 'all')):
clause = addClause
else:
# patch WHERE condition:
# change ID field of this class to ID of parent class
# XXX the clause is patched in place; it would be better
# to build a new one if we have to replace field
clsID = cls.q.id
parentID = parentClass.q.id
def _get_patched(clause):
if isinstance(clause, sqlbuilder.SQLOp):
_patch_id_clause(clause)
return None
def filter(self, filter_clause):
if filter_clause is None:
# None doesn't filter anything, it's just a no-op:
return self
clause = self.clause
if isinstance(clause, string_type):
clause = sqlbuilder.SQLConstant('(%s)' % clause)
return self.newClause(sqlbuilder.AND(clause, filter_clause))
orderBy = orderBy[1:]
desc = True
else:
desc = False
if isinstance(orderBy, string_type):
if orderBy in self.sourceClass.sqlmeta.columns:
val = getattr(self.sourceClass.q,
self.sourceClass.sqlmeta.columns[orderBy].name)
if desc:
return sqlbuilder.DESC(val)
else:
return val
else:
orderBy = sqlbuilder.SQLConstant(orderBy)
if desc:
return sqlbuilder.DESC(orderBy)
else:
return orderBy
else:
return orderBy