Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def lexer(socket, buf_size=ijson.backend.BUFSIZE):
data = socket.recv() if IS_WEBSOCKET else socket.recv(buf_size)
if not data:
return
buf = data[1:-1].encode('utf-8') if IS_WEBSOCKET else data
pos = 0
discarded = 0
while True:
match = LEXEME_RE.search(buf, pos)
if pos < len(buf) and match:
lexeme = match.group()
if lexeme == b'"':
pos = match.start()
start = pos + 1
while True:
try:
def basic_parse(response, buf_size=ijson.backend.BUFSIZE):
'''
Iterator yielding unprefixed events.
Parameters:
- response: a stream response from requests
'''
lexer = iter(IncrementalJsonParser.lexer(response, buf_size))
for value in ijson.backend.parse_value(lexer):
yield value
try:
next(lexer)
except StopIteration:
pass
else:
raise ijson.common.JSONError('Additional data')
def lexer(response, buf_size=ijson.backend.BUFSIZE):
it = response.iter_content(buf_size)
data = next(it, None)
if data is None:
yield None
buf = data.decode('utf-8')
pos = 0
discarded = 0
while True:
match = LEXEME_RE.search(buf, pos)
if pos < len(buf) and match:
lexeme = match.group()
if lexeme == '"':
pos = match.start()
start = pos + 1
while True:
try: