Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if last_char == b'\r' and buf[0:1] == b'\n':
# Strip the last character from the last chunk.
chunks[-1] = chunks[-1][:-1]
return buf[1:], b''.join(chunks)
elif buf.find(b'\r\n') != -1:
before, sep, after = buf.partition(b"\r\n")
chunks.append(before)
return after, b''.join(chunks)
if buf:
chunks.append(buf)
last_char = buf[-1:]
buf = _recv(sock, RECV_SIZE)
if not buf:
raise MemcacheUnexpectedCloseError()
Returns:
A tuple of (buf, value) where value is the bytes read from the
socket (there will be exactly size bytes) and buf is trailing
characters read after the "\r\n" following the bytes (but not
including the \r\n).
"""
chunks = []
rlen = size + 2
while rlen - len(buf) > 0:
if buf:
rlen -= len(buf)
chunks.append(buf)
buf = _recv(sock, RECV_SIZE)
if not buf:
raise MemcacheUnexpectedCloseError()
# Now we need to remove the \r\n from the end. There are two cases we care
# about: the \r\n is all in the last buffer, or only the \n is in the last
# buffer, and we need to remove the \r from the penultimate buffer.
if rlen == 1:
# replace the last chunk with the same string minus the last character,
# which is always '\r' in this case.
chunks[-1] = chunks[-1][:-1]
else:
# Just remove the "\r\n" from the latest chunk
chunks.append(buf[:rlen - 2])
return buf[rlen:], b''.join(chunks)