Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def schedule(self, *columns):
return (Schedule
.select(*columns)
.where(Schedule.queue == self.name)
.order_by(Schedule.timestamp))
def schedule(self, *columns):
return (Schedule
.select(*columns)
.where(Schedule.queue == self.name)
.order_by(Schedule.timestamp))
def read_schedule(self, ts):
tasks = (self
.schedule(Schedule.id, Schedule.data)
.where(Schedule.timestamp <= ts)
.tuples())
id_list, data = [], []
for task_id, task_data in tasks:
id_list.append(task_id)
data.append(task_data)
if id_list:
(Schedule
.delete()
.where(Schedule.id << id_list)
.execute())
return data
def add_to_schedule(self, data, ts):
Schedule.create(data=data, timestamp=ts, queue=self.name)
def schedule(self, *columns):
return (Schedule
.select(*columns)
.where(Schedule.queue == self.name)
.order_by(Schedule.timestamp))
def read_schedule(self, ts):
tasks = (self
.schedule(Schedule.id, Schedule.data)
.where(Schedule.timestamp <= ts)
.tuples())
id_list, data = [], []
for task_id, task_data in tasks:
id_list.append(task_id)
data.append(task_data)
if id_list:
(Schedule
.delete()
.where(Schedule.id << id_list)
.execute())
return data