Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def serve_file(reader, writer):
full_filename = str(file)
f = yield from threadpool.open(full_filename, mode='rb')
writer.write((yield from f.read()))
yield from f.close()
writer.close()
def serve_file(_, writer):
file = yield from aiofiles.threadpool.open(filename, mode='rb')
try:
while True:
data = yield from file.read(1)
if not data:
break
writer.write(data)
yield from writer.drain()
yield from writer.drain()
finally:
writer.close()
yield from file.close()
def test_slow_file(monkeypatch, unused_tcp_port):
"""Monkey patch open and file.read(), and assert the loop still works."""
filename = join(dirname(__file__), '..', 'resources', 'multiline_file.txt')
with open(filename, mode='rb') as f:
contents = f.read()
def new_open(*args, **kwargs):
time.sleep(1)
return open(*args, **kwargs)
monkeypatch.setattr(aiofiles.threadpool, 'sync_open', value=new_open)
@asyncio.coroutine
def serve_file(_, writer):
file = yield from aiofiles.threadpool.open(filename, mode='rb')
try:
while True:
data = yield from file.read(1)
if not data:
break
writer.write(data)
yield from writer.drain()
yield from writer.drain()
finally:
writer.close()
yield from file.close()