Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@command((Key(),))
def type(self, key):
if key.value is None:
return SimpleString(b'none')
elif isinstance(key.value, bytes):
return SimpleString(b'string')
elif isinstance(key.value, list):
return SimpleString(b'list')
elif isinstance(key.value, set):
return SimpleString(b'set')
elif isinstance(key.value, ZSet):
return SimpleString(b'zset')
elif isinstance(key.value, dict):
return SimpleString(b'hash')
else:
assert False # pragma: nocover
@command((Key(set, 0), Key(set), bytes))
def smove(self, src, dst, member):
try:
src.value.remove(member)
src.updated()
except KeyError:
return 0
else:
dst.value.add(member)
dst.updated() # TODO: is it updated if member was already present?
return 1
@command((Key(ZSet), bytes))
def zrank(self, key, member):
try:
return key.value.rank(member)
except KeyError:
return None
@command((Key(list, None), Key(list)))
def rpoplpush(self, src, dst):
el = self.rpop(src)
self.lpush(dst, el)
return el
@command((Key(bytes),))
def get(self, key):
return key.get(None)
"""
self.check_arity(args)
if self.repeat:
delta = len(args) - len(self.fixed)
if delta % len(self.repeat) != 0:
raise redis.ResponseError(WRONG_ARGS_MSG.format(self.name))
types = list(self.fixed)
for i in range(len(args) - len(types)):
types.append(self.repeat[i % len(self.repeat)])
args = list(args)
# First pass: convert/validate non-keys, and short-circuit on missing keys
for i, (arg, type_) in enumerate(zip(args, types)):
if isinstance(type_, Key):
if type_.missing_return is not Key.UNSPECIFIED and arg not in db:
return (type_.missing_return,)
elif type_ != bytes:
args[i] = type_.decode(args[i])
# Second pass: read keys and check their types
command_items = []
for i, (arg, type_) in enumerate(zip(args, types)):
if isinstance(type_, Key):
item = db.get(arg)
default = None
if type_.type_ is not None:
if item is not None and type(item.value) != type_.type_:
raise redis.ResponseError(WRONGTYPE_MSG)
if item is None:
if type_.type_ is not bytes:
default = type_.type_()
@command((Key(Hash), bytes, bytes), (bytes, bytes))
def hset(self, key, *args):
h = key.value
created = 0
for i in range(0, len(args), 2):
if args[i] not in h:
created += 1
h[args[i]] = args[i + 1]
key.updated()
return created
@command((Key(bytes, 0),), (bytes,))
def bitcount(self, key, *args):
# Redis checks the argument count before decoding integers. That's why
# we can't declare them as Int.
if args:
if len(args) != 2:
raise redis.ResponseError(SYNTAX_ERROR_MSG)
start = Int.decode(args[0])
end = Int.decode(args[1])
start, end = self._fix_range_string(start, end, len(key.value))
value = key.value[start:end]
else:
value = key.value
return sum([bin(byte_to_int(l)).count('1') for l in value])
- a single containing a short-circuit return value
"""
self.check_arity(args)
if self.repeat:
delta = len(args) - len(self.fixed)
if delta % len(self.repeat) != 0:
raise redis.ResponseError(WRONG_ARGS_MSG.format(self.name))
types = list(self.fixed)
for i in range(len(args) - len(types)):
types.append(self.repeat[i % len(self.repeat)])
args = list(args)
# First pass: convert/validate non-keys, and short-circuit on missing keys
for i, (arg, type_) in enumerate(zip(args, types)):
if isinstance(type_, Key):
if type_.missing_return is not Key.UNSPECIFIED and arg not in db:
return (type_.missing_return,)
elif type_ != bytes:
args[i] = type_.decode(args[i])
# Second pass: read keys and check their types
command_items = []
for i, (arg, type_) in enumerate(zip(args, types)):
if isinstance(type_, Key):
item = db.get(arg)
default = None
if type_.type_ is not None:
if item is not None and type(item.value) != type_.type_:
raise redis.ResponseError(WRONGTYPE_MSG)
if item is None:
if type_.type_ is not bytes:
@command((Key(set, missing_return=[]),), (Key(),))
def sinter(self, *keys):
return self._setop(lambda a, b: a & b, True, None, *keys)