Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _zunioninter(self, func, dest, numkeys, *args):
if numkeys < 1:
raise redis.ResponseError(ZUNIONSTORE_KEYS_MSG)
if numkeys > len(args):
raise redis.ResponseError(SYNTAX_ERROR_MSG)
aggregate = b'sum'
sets = []
for i in range(numkeys):
item = CommandItem(args[i], self._db, item=self._db.get(args[i]), default=ZSet())
sets.append(self._get_zset(item.value))
weights = [1.0] * numkeys
i = numkeys
while i < len(args):
arg = args[i]
if casematch(arg, b'weights') and i + numkeys < len(args):
weights = [Float.decode(x) for x in args[i + 1:i + numkeys + 1]]
i += numkeys + 1
elif casematch(arg, b'aggregate') and i + 1 < len(args):
aggregate = casenorm(args[i + 1])
if aggregate not in (b'sum', b'min', b'max'):
raise redis.ResponseError(SYNTAX_ERROR_MSG)
i += 2
else:
raise redis.ResponseError(SYNTAX_ERROR_MSG)
def _brpoplpush_pass(self, source, destination, first_pass):
src = CommandItem(source, self._db, item=self._db.get(source), default=[])
if not isinstance(src.value, list):
if first_pass:
raise redis.ResponseError(WRONGTYPE_MSG)
else:
return None
if not src.value:
return None # Empty list
dst = CommandItem(destination, self._db, item=self._db.get(destination), default=[])
if not isinstance(dst.value, list):
raise redis.ResponseError(WRONGTYPE_MSG)
el = src.value.pop()
dst.value.insert(0, el)
src.updated()
src.writeback()
if destination != source:
# Ensure writeback only happens once
def _brpoplpush_pass(self, source, destination, first_pass):
src = CommandItem(source, self._db, item=self._db.get(source), default=[])
if not isinstance(src.value, list):
if first_pass:
raise redis.ResponseError(WRONGTYPE_MSG)
else:
return None
if not src.value:
return None # Empty list
dst = CommandItem(destination, self._db, item=self._db.get(destination), default=[])
if not isinstance(dst.value, list):
raise redis.ResponseError(WRONGTYPE_MSG)
el = src.value.pop()
dst.value.insert(0, el)
src.updated()
src.writeback()
if destination != source:
# Ensure writeback only happens once
dst.updated()
dst.writeback()
return el
def _bpop_pass(self, keys, op, first_pass):
for key in keys:
item = CommandItem(key, self._db, item=self._db.get(key), default=[])
if not isinstance(item.value, list):
if first_pass:
raise redis.ResponseError(WRONGTYPE_MSG)
else:
continue
if item.value:
ret = op(item.value)
item.updated()
item.writeback()
return [key, ret]
return None
score = SortFloat.decode(byval) if byval is not None else 0.0
return (score, v)
items.sort(key=sort_key, reverse=desc)
elif isinstance(key.value, (list, ZSet)):
items.reverse()
out = []
for row in items[start:end]:
for g in get:
v = self._lookup_key(row, g)
if store is not None and v is None:
v = b''
out.append(v)
if store is not None:
item = CommandItem(store, self._db, item=self._db.get(store))
item.value = out
item.writeback()
return len(out)
else:
return out
"""Python implementation of lookupKeyByPattern from redis"""
if pattern == b'#':
return key
p = pattern.find(b'*')
if p == -1:
return None
prefix = pattern[:p]
suffix = pattern[p+1:]
arrow = suffix.find(b'->', 0, -1)
if arrow != -1:
field = suffix[arrow+2:]
suffix = suffix[:arrow]
else:
field = None
new_key = prefix + key + suffix
item = CommandItem(new_key, self._db, item=self._db.get(new_key))
if item.value is None:
return None
if field is not None:
if not isinstance(item.value, dict):
return None
return item.value.get(field)
else:
if not isinstance(item.value, bytes):
return None
return item.value