Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_0030_test_render_iteration(self):
"""
Test render iteration
"""
Iteration = POOL.get('project.iteration')
with Transaction().start(DB_NAME, USER, context=CONTEXT):
self.create_defaults()
app = self.get_app(DEBUG=True)
with Transaction().set_context(company=self.company.id):
work1, = self.Work.create([{
'name': 'ABC Project',
'company': self.company.id,
}])
project, = self.Project.create([{
'work': work1.id,
'type': 'project',
'state': 'opened',
'sequence': 1,
}])
self.assert_(project)
work2, = self.Work.create([{
'name': 'User Story',
'company': self.company.id,
}])
def mkcol(self, uri):
dbname, dburi = self._get_dburi(uri)
if not dbname or not dburi:
raise DAV_Forbidden
pool = Pool(Transaction().cursor.database_name)
Collection = pool.get('webdav.collection')
try:
res = Collection.mkcol(dburi, cache=CACHE)
Transaction().cursor.commit()
except (DAV_Error, DAV_NotFound, DAV_Secret, DAV_Forbidden), exception:
self._log_exception(exception)
Transaction().cursor.rollback()
raise
except Exception, exception:
self._log_exception(exception)
Transaction().cursor.rollback()
raise DAV_Error(500)
return res
def add_fk(self, column_name, reference, on_delete=None):
if on_delete is not None:
on_delete = on_delete.upper()
else:
on_delete = 'SET NULL'
cursor = Transaction().connection.cursor()
name = self.convert_name(self.table_name + '_' + column_name + '_fkey')
if name in self._constraints:
if self._fk_deltypes.get(column_name) != on_delete:
self.drop_fk(column_name)
add = True
else:
add = False
else:
add = True
if add:
cursor.execute('ALTER TABLE "' + self.table_name + '" '
'ADD CONSTRAINT "' + name + '" '
'FOREIGN KEY ("' + column_name + '") '
'REFERENCES "' + reference + '" '
'ON DELETE ' + on_delete)
self._update_definitions(constraints=True)
def _get_cache(self):
transaction = Transaction()
dbname = transaction.database.name
lower = self._transaction_lower.get(dbname, self._default_lower)
if (transaction in self._reset
or transaction.started_at < lower):
try:
return self._transaction_cache[transaction]
except KeyError:
cache = self._database_cache.default_factory()
self._transaction_cache[transaction] = cache
return cache
else:
return self._database_cache[dbname]
def get_action_id(cls, action_id):
pool = Pool()
with Transaction().set_context(active_test=False):
if cls.search([
('id', '=', action_id),
]):
return action_id
for action_type in (
'ir.action.report',
'ir.action.act_window',
'ir.action.wizard',
'ir.action.url',
):
Action = pool.get(action_type)
actions = Action.search([
('id', '=', action_id),
])
if actions:
action, = actions
def _domain_column(self, operator, column, key=None):
database = Transaction().database
column = database.json_get(
super()._domain_column(operator, column), key)
if operator.endswith('like'):
column = Cast(column, database.sql_type('VARCHAR').base)
if self.search_unaccented and operator.endswith('ilike'):
column = database.unaccent(column)
return column
field = cls._fields[fname]
if not hasattr(field, 'set'):
if (not getattr(field, 'translate', False)
or store_translation):
columns.append(Column(table, fname))
update_values.append(field.sql_format(value))
for sub_ids in grouped_slice(ids):
red_sql = reduce_ids(table.id, sub_ids)
try:
cursor.execute(*table.update(columns, update_values,
where=red_sql))
except backend.DatabaseIntegrityError as exception:
transaction = Transaction()
with Transaction().new_transaction(), \
Transaction().set_context(_check_access=False):
cls.__raise_integrity_error(
exception, values, list(values.keys()),
transaction=transaction)
raise
for fname, value in values.items():
field = cls._fields[fname]
if (getattr(field, 'translate', False)
and not hasattr(field, 'set')):
Translation.set_ids(
'%s,%s' % (cls.__name__, fname), 'model',
transaction.language, ids, [value] * len(ids))
if hasattr(field, 'set'):
fields_to_set.setdefault(fname, []).extend((ids, value))
field_names = list(values.keys())
def write(cls, reports, values, *args):
context = Transaction().context
if 'module' in context:
actions = iter((reports, values) + args)
args = []
for reports, values in zip(actions, actions):
values = values.copy()
values['module'] = context['module']
args.extend((reports, values))
reports, values = args[:2]
args = args[2:]
super(ActionReport, cls).write(reports, values, *args)
def _update_definitions(self, columns=None, indexes=None):
if columns is None and indexes is None:
columns = indexes = True
cursor = Transaction().connection.cursor()
# Fetch columns definitions from the table
if columns:
cursor.execute('PRAGMA table_info("' + self.table_name + '")')
self._columns = {}
for _, column, type_, notnull, hasdef, _ in cursor.fetchall():
column = re.sub(r'^\"|\"$', '', column)
match = re.match(r'(\w+)(\((.*?)\))?', type_)
if match:
typname = match.group(1).upper()
size = match.group(3) and int(match.group(3)) or 0
else:
typname = type_.upper()
size = -1
self._columns[column] = {
'notnull': notnull,
'hasdef': hasdef,
def not_null_action(self, column_name, action='add'):
if not self.column_exist(column_name):
return
with Transaction().connection.cursor() as cursor:
if action == 'add':
if self._columns[column_name]['notnull']:
return
cursor.execute('SELECT id FROM "%s" '
'WHERE "%s" IS NULL'
% (self.table_name, column_name))
if not cursor.rowcount:
cursor.execute('ALTER TABLE "' + self.table_name + '" '
'ALTER COLUMN "' + column_name + '" SET NOT NULL')
self._update_definitions(columns=True)
else:
logger.warning(
'Unable to set column %s '
'of table %s not null !\n'
'Try to re-run: '
'trytond.py --update=module\n'