Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def add(ids, target_ids):
target_ids = list(map(int, target_ids))
if not target_ids:
return
existing_ids = set()
for sub_ids in grouped_slice(target_ids):
relations = Relation.search([
search_clause(ids),
(self.target, 'in', list(sub_ids)),
])
for relation in relations:
existing_ids.add((
getattr(relation, self.origin).id,
getattr(relation, self.target).id))
for new_id in target_ids:
for record_id in ids:
if (record_id, new_id) in existing_ids:
continue
relation_to_create.append({
self.origin: field_value(record_id),
self.target: new_id,
})
to_delete.append(id_)
else:
to_update.append(id_)
values = list(values)
cursor.execute(*table.update(columns, values,
where=table.id == id_))
rowcount = cursor.rowcount
if rowcount == -1 or rowcount is None:
cursor.execute(*table.select(table.id,
where=table.id == id_))
rowcount = len(cursor.fetchall())
if rowcount < 1:
cursor.execute(*table.insert(columns, [values]))
if to_delete:
for sub_ids in grouped_slice(to_delete):
where = reduce_ids(table.id, sub_ids)
cursor.execute(*table.delete(where=where))
cls._insert_history(to_delete, True)
if to_update:
cls._insert_history(to_update)
def get_related_records(Model, field_name, sub_ids):
if issubclass(Model, ModelSQL):
foreign_table = Model.__table__()
foreign_red_sql = reduce_ids(
Column(foreign_table, field_name), sub_ids)
cursor.execute(*foreign_table.select(foreign_table.id,
where=foreign_red_sql))
records = Model.browse([x[0] for x in cursor.fetchall()])
else:
with transaction.set_context(active_test=False):
records = Model.search([(field_name, 'in', sub_ids)])
return records
for sub_ids, sub_records in zip(
grouped_slice(ids), grouped_slice(records)):
sub_ids = list(sub_ids)
red_sql = reduce_ids(table.id, sub_ids)
transaction.delete_records.setdefault(cls.__name__,
set()).update(sub_ids)
for Model, field_name in foreign_keys_toupdate:
if (not hasattr(Model, 'search')
or not hasattr(Model, 'write')):
continue
records = get_related_records(Model, field_name, sub_ids)
if records:
Model.write(records, {
field_name: None,
})
def get_unread(cls, ids, name):
pool = Pool()
Read = pool.get('ir.note.read')
cursor = Transaction().connection.cursor()
user_id = Transaction().user
table = cls.__table__()
read = Read.__table__()
unread = {}
for sub_ids in grouped_slice(ids):
where = reduce_ids(table.id, sub_ids)
query = table.join(read, 'LEFT',
condition=(table.id == read.note)
& (read.user == user_id)
).select(table.id,
Case((read.user != Null, False), else_=True),
where=where)
cursor.execute(*query)
unread.update(cursor.fetchall())
return unread
def history_revisions(cls, ids):
pool = Pool()
ModelAccess = pool.get('ir.model.access')
User = pool.get('res.user')
cursor = Transaction().connection.cursor()
ModelAccess.check(cls.__name__, 'read')
table = cls.__table_history__()
user = User.__table__()
revisions = []
for sub_ids in grouped_slice(ids):
where = reduce_ids(table.id, sub_ids)
cursor.execute(*table.join(user, 'LEFT',
Coalesce(table.write_uid, table.create_uid) == user.id)
.select(
Coalesce(table.write_date, table.create_date),
table.id,
user.name,
where=where))
revisions.append(cursor.fetchall())
revisions = list(chain(*revisions))
revisions.sort(reverse=True)
# SQLite uses char for COALESCE
if revisions and isinstance(revisions[0][0], str):
strptime = datetime.datetime.strptime
format_ = '%Y-%m-%d %H:%M:%S.%f'
revisions = [(strptime(timestamp, format_), id_, name)
def remove(ids, target_ids):
target_ids = list(map(int, target_ids))
if not target_ids:
return
for sub_ids in grouped_slice(target_ids):
targets = Target.search([
search_clause(ids),
('id', 'in', list(sub_ids)),
])
to_write.extend((targets, {
self.field: None,
}))
transaction = Transaction()
Model = Pool().get(self.data['model'])
with transaction.set_user(self.data['user']), \
transaction.set_context(self.data['context']):
instances = self.data['instances']
# Ensure record ids still exist
if isinstance(instances, int):
with transaction.set_context(active_test=False):
if Model.search([('id', '=', instances)]):
instances = Model(instances)
else:
instances = None
else:
ids = set()
with transaction.set_context(active_test=False):
for sub_ids in grouped_slice(instances):
records = Model.search([('id', 'in', list(sub_ids))])
ids.update(map(int, records))
if ids:
instances = Model.browse(
[i for i in instances if i in ids])
else:
instances = None
if instances is not None:
getattr(Model, self.data['method'])(
instances, *self.data['args'], **self.data['kwargs'])
if not self.dequeued_at:
self.dequeued_at = datetime.datetime.now()
self.finished_at = datetime.datetime.now()
self.save()
def reset(cls, model, names, records):
assert all(r.__class__.__name__ == model for r in records)
clicks = []
for records in grouped_slice(records):
clicks.extend(cls.search([
('button.model.model', '=', model),
('button.name', 'in', names),
('record_id', 'in', [r.id for r in records]),
]))
cls.write(clicks, {
'active': False,
})
to_fetch.append(obj_id)
else:
to_fetch = ids
if to_fetch:
# Get parent translations
parent_lang = get_parent(lang)
if parent_lang:
translations.update(
cls.get_ids(name, ttype, parent_lang, to_fetch))
if fuzzy_translation:
fuzzy_clause = []
else:
fuzzy_clause = [('fuzzy', '=', False)]
for sub_to_fetch in grouped_slice(to_fetch):
for translation in cls.search([
('lang', '=', lang),
('type', '=', ttype),
('name', '=', name),
('value', '!=', ''),
('value', '!=', None),
('res_id', 'in', list(sub_to_fetch)),
] + fuzzy_clause):
translations[translation.res_id] = translation.value
# Don't store fuzzy translation in cache
if not fuzzy_translation:
for res_id in to_fetch:
value = translations.setdefault(res_id)
cls._translation_cache.set(
(name, ttype, lang, res_id), value)
return translations