Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def receive(self):
sniff = await self.transport.read(SNIFF_LEN)
if not sniff:
raise WatchmanError("empty watchman response")
_1, _2, elen = bser.pdu_info(sniff)
rlen = len(sniff)
buf = bytearray(elen)
buf[:rlen] = sniff
while elen > rlen:
b = await self.transport.read(elen - rlen)
buf[rlen : rlen + len(b)] = b
rlen += len(b)
response = bytes(buf)
try:
res = self._loads(response)
return res
except ValueError as e:
raise WatchmanError("watchman response decode error: %s" % e)