Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_unknown_error(self):
client = self.make_client([b'foobarbaz\r\n'])
def _get():
client.get(b'key')
pytest.raises(MemcacheUnknownError, _get)
self.check_spans(1, ['get'], ['get key'])
if line == b'END' or line == b'OK':
return result
elif line.startswith(b'VALUE'):
key, value, buf = self._extract_value(expect_cas, line, buf,
remapped_keys,
prefixed_keys)
result[key] = value
elif name == b'stats' and line.startswith(b'STAT'):
key_value = line.split()
result[key_value[1]] = key_value[2]
elif name == b'stats' and line.startswith(b'ITEM'):
# For 'stats cachedump' commands
key_value = line.split()
result[key_value[1]] = b' '.join(key_value[2:])
else:
raise MemcacheUnknownError(line[:32])
except Exception:
self.close()
if self.ignore_exc:
return {}
raise
buf = b''
for key in keys:
buf, line = _readline(self.sock, buf)
self._raise_errors(line, name)
if line in VALID_STORE_RESULTS[name]:
if line == b'STORED':
results[key] = True
if line == b'NOT_STORED':
results[key] = False
if line == b'NOT_FOUND':
results[key] = None
if line == b'EXISTS':
results[key] = False
else:
raise MemcacheUnknownError(line[:32])
return results
except Exception:
self.close()
raise
def version(self):
"""
The memcached "version" command.
Returns:
A string of the memcached version.
"""
cmd = b"version\r\n"
results = self._misc_cmd([cmd], b'version', False)
before, _, after = results[0].partition(b' ')
if before != b'VERSION':
raise MemcacheUnknownError(
"Received unexpected response: %s" % results[0])
return after