Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
rows = sql(select_filename, (10,)).fetchall()
if not rows:
break
count += len(rows)
delete = (
'DELETE FROM Cache WHERE rowid IN (%s)'
% select_policy.format(fields='rowid', now=now)
)
sql(delete, (10,))
for filename, in rows:
cleanup(filename)
except Timeout:
raise Timeout(count)
return count
while True:
with self._transact(retry) as (sql, cleanup):
rows = sql(select, args).fetchall()
if not rows:
break
count += len(rows)
sql(delete % ','.join(str(row[0]) for row in rows))
for row in rows:
args[arg_index] = row[row_index]
cleanup(row[-1])
except Timeout:
raise Timeout(count)
return count
def _remove(self, name, args=(), retry=False):
total = 0
for shard in self._shards:
method = getattr(shard, name)
while True:
try:
count = method(*args, retry=retry)
total += count
except Timeout as timeout:
total += timeout.args[0]
else:
break
return total
with self._transact(retry) as (sql, cleanup):
rows = sql(select_filename, (10,)).fetchall()
if not rows:
break
count += len(rows)
delete = (
'DELETE FROM Cache WHERE rowid IN (%s)'
% select_policy.format(fields='rowid', now=now)
)
sql(delete, (10,))
for filename, in rows:
cleanup(filename)
except Timeout:
raise Timeout(count)
return count