Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
start = utime.ticks_ms()
while len(d):
ns = s.send(d) # OSError if client fails
d = d[ns:] # Possible partial write
await asyncio.sleep_ms(100)
if utime.ticks_diff(utime.ticks_ms(), start) > self.timeout:
raise OSError
def close(self):
print('Closing sockets.')
if isinstance(self.sock, socket.socket):
self.sock.close()
loop = asyncio.get_event_loop()
client = Client(1500, loop) # Server timeout set by server side app: 1.5s
try:
loop.run_forever()
finally:
client.close() # Close sockets in case of ctrl-C or bug