Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def hincrbyfloat(self, key, field, amount):
c = Float.decode(key.value.get(field, b'0')) + Float.decode(amount)
if not isfinite(c):
raise redis.ResponseError(NONFINITE_MSG)
encoded = Float.encode(c, True)
key.value[field] = encoded
key.updated()
return encoded
def incrbyfloat(self, key, amount):
# TODO: introduce convert_order so that we can specify amount is Float
c = Float.decode(key.get(b'0')) + Float.decode(amount)
if not isfinite(c):
raise redis.ResponseError(NONFINITE_MSG)
encoded = Float.encode(c, True)
key.update(encoded)
return encoded
def zscore(self, key, member):
try:
return Float.encode(key.value[member], False)
except KeyError:
return None
def zincrby(self, key, increment, member):
# Can't just default the old score to 0.0, because in IEEE754, adding
# 0.0 to something isn't a nop (e.g. 0.0 + -0.0 == 0.0).
try:
score = key.value.get(member, None) + increment
except TypeError:
score = increment
if math.isnan(score):
raise redis.ResponseError(SCORE_NAN_MSG)
key.value[member] = score
key.updated()
return Float.encode(score, False)