Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _zrangebyscore(self, key, min, max, reverse, *args):
withscores = False
offset = 0
count = -1
i = 0
while i < len(args):
if casematch(args[i], b'withscores'):
withscores = True
i += 1
elif casematch(args[i], b'limit') and i + 2 < len(args):
offset = Int.decode(args[i + 1])
count = Int.decode(args[i + 2])
i += 3
else:
raise redis.ResponseError(SYNTAX_ERROR_MSG)
zset = key.value
items = list(zset.irange_score(min.lower_bound, max.upper_bound, reverse=reverse))
items = self._limit_items(items, offset, count)
items = self._apply_withscores(items, withscores)
return items
xx = False
nx = False
while i < len(args):
if casematch(args[i], b'nx'):
nx = True
i += 1
elif casematch(args[i], b'xx'):
xx = True
i += 1
elif casematch(args[i], b'ex') and i + 1 < len(args):
ex = Int.decode(args[i + 1])
if ex <= 0:
raise redis.ResponseError(INVALID_EXPIRE_MSG.format('set'))
i += 2
elif casematch(args[i], b'px') and i + 1 < len(args):
px = Int.decode(args[i + 1])
if px <= 0:
raise redis.ResponseError(INVALID_EXPIRE_MSG.format('set'))
i += 2
else:
raise redis.ResponseError(SYNTAX_ERROR_MSG)
if (xx and nx) or (px is not None and ex is not None):
raise redis.ResponseError(SYNTAX_ERROR_MSG)
if nx and key:
return None
if xx and not key:
return None
key.value = value
if ex is not None:
key.expireat = self._db.time + ex
if px is not None:
def hincrby(self, key, field, amount):
c = Int.decode(key.value.get(field, b'0')) + amount
key.value[field] = Int.encode(c)
key.updated()
return c
def _zrangebylex(self, key, min, max, reverse, *args):
if args:
if len(args) != 3 or not casematch(args[0], b'limit'):
raise redis.ResponseError(SYNTAX_ERROR_MSG)
offset = Int.decode(args[1])
count = Int.decode(args[2])
else:
offset = 0
count = -1
zset = key.value
items = zset.irange_lex(min.value, max.value,
inclusive=(not min.exclusive, not max.exclusive),
reverse=reverse)
items = self._limit_items(items, offset, count)
return items
grabbing the full set of keys over which we are investigating subsets.
It also doesn't adhere to the guarantee that every key will be iterated
at least once even if the database is modified during the scan.
However, provided the database is not modified, every key will be
returned exactly once.
"""
pattern = None
count = 10
if len(args) % 2 != 0:
raise redis.ResponseError(SYNTAX_ERROR_MSG)
for i in range(0, len(args), 2):
if casematch(args[i], b'match'):
pattern = args[i + 1]
elif casematch(args[i], b'count'):
count = Int.decode(args[i + 1])
if count <= 0:
raise redis.ResponseError(SYNTAX_ERROR_MSG)
else:
raise redis.ResponseError(SYNTAX_ERROR_MSG)
if cursor >= len(keys):
return [0, []]
data = sorted(keys)
result_cursor = cursor + count
result_data = []
if pattern is not None:
regex = compile_pattern(pattern)
for val in itertools.islice(data, cursor, result_cursor):
compare_val = val[0] if isinstance(val, tuple) else val
if regex.match(compare_val):
def set_(self, key, value, *args):
i = 0
ex = None
px = None
xx = False
nx = False
while i < len(args):
if casematch(args[i], b'nx'):
nx = True
i += 1
elif casematch(args[i], b'xx'):
xx = True
i += 1
elif casematch(args[i], b'ex') and i + 1 < len(args):
ex = Int.decode(args[i + 1])
if ex <= 0:
raise redis.ResponseError(INVALID_EXPIRE_MSG.format('set'))
i += 2
elif casematch(args[i], b'px') and i + 1 < len(args):
px = Int.decode(args[i + 1])
if px <= 0:
raise redis.ResponseError(INVALID_EXPIRE_MSG.format('set'))
i += 2
else:
raise redis.ResponseError(SYNTAX_ERROR_MSG)
if (xx and nx) or (px is not None and ex is not None):
raise redis.ResponseError(SYNTAX_ERROR_MSG)
if nx and key:
return None
if xx and not key: