Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from peewee import *
from playhouse.db_url import connect as db_url_connect
from huey.api import Huey
from huey.constants import EmptyData
from huey.exceptions import ConfigurationError
from huey.storage import BaseStorage
class BytesBlobField(BlobField):
def python_value(self, value):
return value if isinstance(value, bytes) else bytes(value)
class SqlStorage(BaseStorage):
def __init__(self, name='huey', database=None, **kwargs):
super(SqlStorage, self).__init__(name)
if database is None:
raise ConfigurationError('Use of SqlStorage requires a '
'database= argument, which should be a '
'peewee database or a connection string.')
if isinstance(database, Database):
self.database = database
else:
# Treat database argument as a URL connection string.
self.database = db_url_connect(database)
self.KV, self.Schedule, self.Task = self.create_models()
self.create_tables()
class Meta:
indexes = (
(('queue', 'timestamp'), False),
)
class KeyValue(BaseModel):
queue = CharField()
key = CharField()
value = _BytesField()
class Meta:
primary_key = CompositeKey('queue', 'key')
class SqliteStorage(BaseStorage):
def __init__(self, name='huey', filename='huey.db', **storage_kwargs):
self.filename = filename
self.database = SqliteDatabase(filename, **storage_kwargs)
super(SqliteStorage, self).__init__(name)
self.initialize_task_table()
def initialize_task_table(self):
self.database.bind([Task, Schedule, KeyValue])
with self.database:
self.database.create_tables([Task, Schedule, KeyValue])
def tasks(self, *columns):
return Task.select(*columns).where(Task.queue == self.name)
def delete(self):
super(_ConnectionState, self).__init__(**kwargs)
self.reset()
def reset(self):
self.conn = None
self.closed = True
def set_connection(self, conn):
self.conn = conn
self.closed = False
class _ConnectionLocal(_ConnectionState, threading.local): pass
# Python 2.x may return object for BLOB columns.
to_bytes = lambda b: bytes(b) if not isinstance(b, bytes) else b
to_blob = lambda b: sqlite3.Binary(b)
class BaseSqlStorage(BaseStorage):
begin_sql = 'begin'
ddl = []
def __init__(self, *args, **kwargs):
super(BaseSqlStorage, self).__init__(*args, **kwargs)
self._state = _ConnectionLocal()
self.initialize_schema()
def close(self):
if self._state.closed: return False
self._state.conn.close()
self._state.reset()
return True
@property
def conn(self):
from huey.api import Huey
from huey.constants import EmptyData
from huey.storage import BaseStorage
from simpledb import Client
class SimpleStorage(BaseStorage):
def __init__(self, name='huey', host='127.0.0.1', port=31337,
**storage_kwargs):
super(SimpleStorage, self).__init__(name=name, **storage_kwargs)
self.client = Client(host=host, port=port)
def enqueue(self, data):
self.client.lpush(self.name, data)
def dequeue(self):
return self.client.rpop(self.name)
def unqueue(self, data):
return self.client.lrem(self.name)
def queue_size(self):
return self.client.llen(self.name)
def flush_queue(self): pass
def add_to_schedule(self, data, ts, utc): pass
def read_schedule(self, ts): return []
def schedule_size(self): return 0
def scheduled_items(self, limit=None): return []
def flush_schedule(self): pass
def put_data(self, key, value, is_result=False): pass
def peek_data(self, key): return EmptyData
def pop_data(self, key): return EmptyData
def has_data_for_key(self, key): return False
def result_store_size(self): return 0
def result_items(self): return {}
def flush_results(self): pass
class MemoryStorage(BaseStorage):
def __init__(self, *args, **kwargs):
super(MemoryStorage, self).__init__(*args, **kwargs)
self._c = 0 # Counter to ensure FIFO behavior for queue.
self._queue = []
self._results = {}
self._schedule = []
self._lock = threading.RLock()
def enqueue(self, data, priority=None):
with self._lock:
self._c += 1
priority = 0 if priority is None else -priority
heapq.heappush(self._queue, (priority, self._c, data))
def dequeue(self):
try:
from functools import partial
import time
from ukt import KT_NONE
from ukt import KyotoTycoon
from huey.api import Huey
from huey.constants import EmptyData
from huey.storage import BaseStorage
from huey.utils import decode
class KyotoTycoonStorage(BaseStorage):
priority = True
def __init__(self, name='huey', host='127.0.0.1', port=1978, db=None,
timeout=None, max_age=3600, queue_db=None, client=None,
blocking=False, result_expire_time=None):
super(KyotoTycoonStorage, self).__init__(name)
if client is None:
client = KyotoTycoon(host, port, timeout, db, serializer=KT_NONE,
max_age=max_age)
self.blocking = blocking
self.expire_time = result_expire_time
self.kt = client
self._db = db
self._queue_db = queue_db if queue_db is not None else db
:return: No return value.
"""
raise NotImplementedError
def flush_all(self):
"""
Remove all persistent or semi-persistent data.
:return: No return value.
"""
self.flush_queue()
self.flush_schedule()
self.flush_results()
class BlackHoleStorage(BaseStorage):
def enqueue(self, data, priority=None): pass
def dequeue(self): pass
def queue_size(self): return 0
def enqueued_items(self, limit=None): return []
def flush_queue(self): pass
def add_to_schedule(self, data, ts, utc): pass
def read_schedule(self, ts): return []
def schedule_size(self): return 0
def scheduled_items(self, limit=None): return []
def flush_schedule(self): pass
def put_data(self, key, value, is_result=False): pass
def peek_data(self, key): return EmptyData
def pop_data(self, key): return EmptyData
def has_data_for_key(self, key): return False
def result_store_size(self): return 0
def result_items(self): return {}
self._results = {}
# A custom lua script to pass to redis that will read tasks from the schedule
# and atomically pop them from the sorted set and return them. It won't return
# anything if it isn't able to remove the items it reads.
SCHEDULE_POP_LUA = """\
local key = KEYS[1]
local unix_ts = ARGV[1]
local res = redis.call('zrangebyscore', key, '-inf', unix_ts)
if #res and redis.call('zremrangebyscore', key, '-inf', unix_ts) == #res then
return res
end"""
class RedisStorage(BaseStorage):
priority = False # Use PriorityRedisStorage instead. Requires Redis>=5.0.
redis_client = Redis
def __init__(self, name='huey', blocking=True, read_timeout=1,
connection_pool=None, url=None, client_name=None,
**connection_params):
if Redis is None:
raise ConfigurationError('"redis" python module not found, cannot '
'use Redis storage backend. Run "pip '
'install redis" to install.')
# Drop common empty values from the connection_params.
for p in ('host', 'port', 'db'):
if p in connection_params and connection_params[p] is None:
del connection_params[p]