Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _zrangebylex(self, key, min, max, reverse, *args):
if args:
if len(args) != 3 or not casematch(args[0], b'limit'):
raise redis.ResponseError(SYNTAX_ERROR_MSG)
offset = Int.decode(args[1])
count = Int.decode(args[2])
else:
offset = 0
count = -1
zset = key.value
items = zset.irange_lex(min.value, max.value,
inclusive=(not min.exclusive, not max.exclusive),
reverse=reverse)
items = self._limit_items(items, offset, count)
return items
out = int(value)
if not cls.valid(out) or six.ensure_binary(str(out)) != value:
raise ValueError
except ValueError:
raise redis.ResponseError(cls.DECODE_ERROR)
return out
@classmethod
def encode(cls, value):
if cls.valid(value):
return six.ensure_binary(str(value))
else:
raise redis.ResponseError(cls.ENCODE_ERROR)
class BitOffset(Int):
"""Argument converter for unsigned bit positions"""
DECODE_ERROR = INVALID_BIT_OFFSET_MSG
MIN_VALUE = 0
MAX_VALUE = 8 * MAX_STRING_SIZE - 1 # Redis imposes 512MB limit on keys
class BitValue(Int):
DECODE_ERROR = INVALID_BIT_VALUE_MSG
MIN_VALUE = 0
MAX_VALUE = 1
class DbIndex(Int):
"""Argument converter for database indices"""
class BitValue(Int):
DECODE_ERROR = INVALID_BIT_VALUE_MSG
MIN_VALUE = 0
MAX_VALUE = 1
class DbIndex(Int):
"""Argument converter for database indices"""
DECODE_ERROR = INVALID_DB_MSG
MIN_VALUE = 0
MAX_VALUE = 15
class Timeout(Int):
"""Argument converter for timeouts"""
DECODE_ERROR = TIMEOUT_NEGATIVE_MSG
MIN_VALUE = 0
class Float(object):
"""Argument converter for floating-point values.
Redis uses long double for some cases (INCRBYFLOAT, HINCRBYFLOAT)
and double for others (zset scores), but Python doesn't support
long double.
"""
DECODE_ERROR = INVALID_FLOAT_MSG
class BitOffset(Int):
"""Argument converter for unsigned bit positions"""
DECODE_ERROR = INVALID_BIT_OFFSET_MSG
MIN_VALUE = 0
MAX_VALUE = 8 * MAX_STRING_SIZE - 1 # Redis imposes 512MB limit on keys
class BitValue(Int):
DECODE_ERROR = INVALID_BIT_VALUE_MSG
MIN_VALUE = 0
MAX_VALUE = 1
class DbIndex(Int):
"""Argument converter for database indices"""
DECODE_ERROR = INVALID_DB_MSG
MIN_VALUE = 0
MAX_VALUE = 15
class Timeout(Int):
"""Argument converter for timeouts"""
DECODE_ERROR = TIMEOUT_NEGATIVE_MSG
MIN_VALUE = 0
class Float(object):
"""Argument converter for floating-point values.
def encode(cls, value):
if cls.valid(value):
return six.ensure_binary(str(value))
else:
raise redis.ResponseError(cls.ENCODE_ERROR)
class BitOffset(Int):
"""Argument converter for unsigned bit positions"""
DECODE_ERROR = INVALID_BIT_OFFSET_MSG
MIN_VALUE = 0
MAX_VALUE = 8 * MAX_STRING_SIZE - 1 # Redis imposes 512MB limit on keys
class BitValue(Int):
DECODE_ERROR = INVALID_BIT_VALUE_MSG
MIN_VALUE = 0
MAX_VALUE = 1
class DbIndex(Int):
"""Argument converter for database indices"""
DECODE_ERROR = INVALID_DB_MSG
MIN_VALUE = 0
MAX_VALUE = 15
class Timeout(Int):
"""Argument converter for timeouts"""
@command((Key(), Int, bytes), (bytes,))
def zinterstore(self, dest, numkeys, *args):
return self._zunioninter('ZINTERSTORE', dest, numkeys, *args)