Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, name='huey', database=None, **kwargs):
super(SqlStorage, self).__init__(name)
if database is None:
raise ConfigurationError('Use of SqlStorage requires a '
'database= argument, which should be a '
'peewee database or a connection string.')
if isinstance(database, Database):
self.database = database
else:
# Treat database argument as a URL connection string.
self.database = db_url_connect(database)
self.KV, self.Schedule, self.Task = self.create_models()
self.create_tables()
def start(self):
"""
Start all consumer processes and register signal handlers.
"""
if self.huey.immediate:
raise ConfigurationError(
'Consumer cannot be run with Huey instances where immediate '
'is enabled. Please check your configuration and ensure that '
'"huey.immediate = False".')
# Log startup message.
self._logger.info('Huey consumer started with %s %s, PID %s at %s',
self.workers, self.worker_type, os.getpid(),
self.huey._get_timestamp())
self._logger.info('Scheduler runs every %s second(s).',
self.scheduler_interval)
self._logger.info('Periodic tasks are %s.',
'enabled' if self.periodic else 'disabled')
msg = ['The following commands are available:']
for command in self.huey._registry._registry:
msg.append('+ %s' % command)
def __init__(self, name='huey', blocking=True, read_timeout=1,
connection_pool=None, url=None, client_name=None,
**connection_params):
if Redis is None:
raise ConfigurationError('"redis" python module not found, cannot '
'use Redis storage backend. Run "pip '
'install redis" to install.')
# Drop common empty values from the connection_params.
for p in ('host', 'port', 'db'):
if p in connection_params and connection_params[p] is None:
del connection_params[p]
if sum(1 for p in (url, connection_pool, connection_params) if p) > 1:
raise ConfigurationError(
'The connection configuration is over-determined. '
'Please specify only one of the the following: '
'"url", "connection_pool", or "connection_params"')
if url:
connection_pool = ConnectionPool.from_url(
url, decode_components=True)
elif connection_pool is None:
connection_pool = ConnectionPool(**connection_params)
self.pool = connection_pool
self.conn = self.redis_client(connection_pool=connection_pool)
self.connection_params = connection_params
self._pop = self.conn.register_script(SCHEDULE_POP_LUA)
self.name = self.clean_name(name)
def __init__(self, compression=False, compression_level=6, use_zlib=False):
self.comp = compression
self.comp_level = compression_level
self.use_zlib = use_zlib
if self.comp:
if self.use_zlib and zlib is None:
raise ConfigurationError('use_zlib specified, but zlib module '
'not found.')
elif gzip is None:
raise ConfigurationError('gzip module required to enable '
'compression.')
def __init__(self, secret=None, salt='huey', **kwargs):
super(SignedSerializer, self).__init__(**kwargs)
if not secret or not salt:
raise ConfigurationError('The secret and salt parameters are '
'required by %r' % type(self))
self.secret = encode(secret)
self.salt = encode(salt)
self.separator = b':'
self._key = hashlib.sha1(self.salt + self.secret).digest()
def __init__(self, name='huey', blocking=True, read_timeout=1,
connection_pool=None, url=None, client_name=None,
**connection_params):
if Redis is None:
raise ConfigurationError('"redis" python module not found, cannot '
'use Redis storage backend. Run "pip '
'install redis" to install.')
# Drop common empty values from the connection_params.
for p in ('host', 'port', 'db'):
if p in connection_params and connection_params[p] is None:
del connection_params[p]
if sum(1 for p in (url, connection_pool, connection_params) if p) > 1:
raise ConfigurationError(
'The connection configuration is over-determined. '
'Please specify only one of the the following: '
'"url", "connection_pool", or "connection_params"')
if url:
connection_pool = ConnectionPool.from_url(
def start(self):
if self.huey_settings is None:
self._no_config()
elif isinstance(self.huey_settings, RedisHuey):
self._huey_instance_config()
elif isinstance(self.huey_settings, dict):
if SingleConfReader.is_legacy(self.huey_settings):
self._single_config()
elif MultiConfReader.is_multi_config(self.huey_settings):
self._multi_config()
else:
raise ConfigurationError('HUEY settings dictionary invalid.')
else:
raise ConfigurationError('Configuration doesnt match guidelines.')
def start(self):
if self.huey_settings is None:
self._no_config()
elif isinstance(self.huey_settings, RedisHuey):
self._huey_instance_config()
elif isinstance(self.huey_settings, dict):
if SingleConfReader.is_legacy(self.huey_settings):
self._single_config()
elif MultiConfReader.is_multi_config(self.huey_settings):
self._multi_config()
else:
raise ConfigurationError('HUEY settings dictionary invalid.')
else:
raise ConfigurationError('Configuration doesnt match guidelines.')
if huey.immediate:
self._logger.warning('Consumer initialized with Huey instance '
'that has "immediate" mode enabled. This '
'must be disabled before the consumer can '
'be run.')
self.huey = huey
self.workers = workers # Number of workers.
self.periodic = periodic # Enable periodic task scheduler?
self.default_delay = initial_delay # Default queue polling interval.
self.backoff = backoff # Exponential backoff factor when queue empty.
self.max_delay = max_delay # Maximum interval between polling events.
# Ensure that the scheduler runs at an interval between 1 and 60s.
self.scheduler_interval = max(min(scheduler_interval, 60), 1)
if 60 % self.scheduler_interval != 0:
raise ConfigurationError('Scheduler interval must be a factor '
'of 60, e.g. 1, 2, 3, 4, 5, 6, 10, 12...')
if worker_type == 'gevent': worker_type = WORKER_GREENLET
self.worker_type = worker_type # What process model are we using?
# Configure health-check and consumer main-loop attributes.
self._stop_flag_timeout = 0.1
self._health_check = check_worker_health
self._health_check_interval = float(health_check_interval)
# Create the execution environment helper.
self.environment = self.get_environment(self.worker_type)
# Create the event used to signal the process should terminate. We'll
# also store a boolean flag to indicate whether we should restart after
# the processes are cleaned up.