Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def kv(self, *columns):
return KeyValue.select(*columns).where(KeyValue.queue == self.name)
def put_data(self, key, value):
KeyValue.replace(queue=self.name, key=key, value=value).execute()
def pop_data(self, key):
try:
kv = self.kv().where(KeyValue.key == key).get()
except KeyValue.DoesNotExist:
return EmptyData
else:
dq = KeyValue.delete().where(
(KeyValue.queue == self.name) &
(KeyValue.key == key))
return kv.value if dq.execute() == 1 else EmptyData
def pop_data(self, key):
try:
kv = self.kv().where(KeyValue.key == key).get()
except KeyValue.DoesNotExist:
return EmptyData
else:
dq = KeyValue.delete().where(
(KeyValue.queue == self.name) &
(KeyValue.key == key))
return kv.value if dq.execute() == 1 else EmptyData
def peek_data(self, key):
try:
kv = self.kv(KeyValue.value).where(KeyValue.key == key).get()
except KeyValue.DoesNotExist:
return EmptyData
else:
return kv.value
def put_if_empty(self, key, value):
try:
(KeyValue
.insert(queue=self.name, key=key, value=value)
.on_conflict('abort')
.execute())
except IntegrityError:
return False
else:
return True
def pop_data(self, key):
try:
kv = self.kv().where(KeyValue.key == key).get()
except KeyValue.DoesNotExist:
return EmptyData
else:
dq = KeyValue.delete().where(
(KeyValue.queue == self.name) &
(KeyValue.key == key))
return kv.value if dq.execute() == 1 else EmptyData