Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def enqueued_items(self, limit=None):
query = self.tasks(Task.data).tuples()
if limit is not None:
query = query.limit(limit)
return [item[0] for item in query]
def dequeue(self):
try:
task = (self
.tasks()
.order_by(Task.id)
.limit(1)
.get())
except Task.DoesNotExist:
return
res = self.delete().where(Task.id == task.id).execute()
if res == 1:
return task.data
def dequeue(self):
try:
task = (self
.tasks()
.order_by(Task.id)
.limit(1)
.get())
except Task.DoesNotExist:
return
res = self.delete().where(Task.id == task.id).execute()
if res == 1:
return task.data
def initialize_task_table(self):
self.database.bind([Task, Schedule, KeyValue])
with self.database:
self.database.create_tables([Task, Schedule, KeyValue])
def tasks(self, *columns):
return Task.select(*columns).where(Task.queue == self.name)
def delete(self):
return Task.delete().where(Task.queue == self.name)
def enqueue(self, data):
Task.create(queue=self.name, data=data)