Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_bs_pair():
x, y = socket.socketpair()
bx, by = BufferedSocket(x), BufferedSocket(y)
# sanity check
by.sendall(b'123')
bx.recv_size(3) == b'123'
return bx, by
def test_short_lines():
for ms in (2, 4, 6, 1024, None):
x, y = socket.socketpair()
bs = BufferedSocket(x)
y.sendall(b'1\n2\n3\n')
assert bs.recv_until(b'\n', maxsize=ms) == b'1'
assert bs.recv_until(b'\n', maxsize=ms) == b'2'
y.close()
assert bs.recv_close(maxsize=ms) == b'3\n'
try:
bs.recv_size(1)
except ConnectionClosed:
pass
else:
assert False, 'expected ConnectionClosed'
bs.close()
return
for with_delimiter.
"""
delim = b'\r\n'
for with_delim in (True, False):
if with_delim:
cond_delim = b'\r\n'
else:
cond_delim = b''
empty = b''
small_one = b'1'
big_two = b'2' * 2048
for ms in (3, 5, 1024, None):
x, y = socket.socketpair()
bs = BufferedSocket(x)
y.sendall(empty + delim)
y.sendall(small_one + delim)
y.sendall(big_two + delim)
kwargs = {'maxsize': ms, 'with_delimiter': with_delim}
assert bs.recv_until(delim, **kwargs) == empty + cond_delim
assert bs.recv_until(delim, **kwargs) == small_one + cond_delim
try:
assert bs.recv_until(delim, **kwargs) == big_two + cond_delim
except MessageTooLong:
if ms is None:
assert False, 'unexpected MessageTooLong'
else:
if ms is not None:
assert False, 'expected MessageTooLong'
def test_timeout_setters_getters():
x, y = socket.socketpair()
bs = BufferedSocket(x)
assert bs.settimeout(1.0) is None
assert bs.gettimeout() == 1.0
assert bs.setblocking(False) is None
assert bs.gettimeout() == 0.0
assert bs.setblocking(True) is None
assert bs.gettimeout() is None
def test_props():
x, y = socket.socketpair()
bs = BufferedSocket(x)
assert bs.type == x.type
assert bs.proto == x.proto
assert bs.family == x.family
return
def test_split_delim():
delim = b'\r\n'
first = b'1234\r'
second = b'\n5'
x, y = socket.socketpair()
bs = BufferedSocket(x)
y.sendall(first)
try:
bs.recv_until(delim, timeout=0.0001)
except Timeout:
pass
y.sendall(second)
assert bs.recv_until(delim, with_delimiter=True) == b'1234\r\n'
assert bs.recv_size(1) == b'5'
return
def call(self, *commands):
'''
Helper function that implements a (subset of) the RESP
protocol used by Redis >= 1.2
'''
cm = context.get_context().connection_mgr
sock = BufferedSocket(cm.get_connection(self.address))
# ARRAY: first byte *, decimal length, \r\n, contents
out = ['*' + str(len(commands))] + \
["${0}\r\n{1}".format(len(e), e) for e in commands]
out = "\r\n".join(out) + "\r\n"
sock.send(out)
fbyte = sock.peek(1)
if fbyte == "-": # error string
raise RedisError(sock.recv_until('\r\n'))
elif fbyte == '+': # simple string
resp = sock.recv_until('\r\n')[1:]
elif fbyte == '$': # bulk string
length = int(sock.recv_until('\r\n')[1:])
if length == -1:
resp = None
else:
resp = sock.recv_size(length)
def __init__(self, sock, timeout=DEFAULT_TIMEOUT, maxsize=DEFAULT_MAXSIZE):
self.bsock = BufferedSocket(sock)
self.timeout = timeout
self.maxsize = maxsize
self._msgsize_maxsize = len(str(maxsize)) + 1 # len(str()) == log10