Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
2,u2,17,f
3,u3,16,m
4,u4,15,f
"""
with StringIO() as f:
f.write(head+content)
await aioload_csv(User_csv_f, f,pk_in_csv=True)
all_cols = SQL('*')
query0 = User_csv_f.select(all_cols).order_by(User_csv_f.id)
user0 = await alist(query0)
s = (os.linesep).join([str(i) for i in user0]).strip()
r = content.strip()
assert s == r
await User_csv_f.drop_table()
async with aiofiles.open(str(filepath)) as ff:
er = await aioload_csv(User_csv_f, ff,pk_in_csv=False)
all_cols = SQL('*')
query0 = User_csv_f.select(all_cols).order_by(User_csv_f.id)
user0 = await alist(query0)
s = (os.linesep).join([str(i) for i in user0]).strip()
r = content.strip()
assert s == r
await User_csv_f.drop_table()
await aioload_csv(User_csv_f, '0_user_out.csv',pk_in_csv=True)
all_cols = SQL('*')
query0 = User_csv_f.select(all_cols).order_by(User_csv_f.id)
user0 = await alist(query0)
s = (os.linesep).join([str(i) for i in user0]).strip()
r = content.strip()
async def test_chunked_upload_async_input(dummy_peony_client, medias):
async with aiofiles.open(str(medias['bloom'].cache), 'rb') as aiofile:
await chunked_upload(dummy_peony_client, medias['bloom'], aiofile)
async def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
stats = await async_os.stat(file_path)
headers = dict()
headers["Accept-Ranges"] = "bytes"
headers["Content-Length"] = str(stats.st_size)
if request.method == "HEAD":
return HTTPResponse(
headers=headers,
content_type=guess_type(file_path)[0] or "text/plain",
)
else:
return file(
file_path,
headers=headers,
mime_type=guess_type(file_path)[0] or "text/plain",
)
contents = f.read()
input_file = yield from aiofiles.open(filename)
output_file = yield from aiofiles.open(str(tmp_filename), mode='w+')
size = (yield from aiofiles.os.stat(filename)).st_size
input_fd = input_file.fileno()
output_fd = output_file.fileno()
yield from aiofiles.os.sendfile(output_fd, input_fd, 0, size)
yield from output_file.seek(0)
actual_contents = yield from output_file.read()
actual_size = (yield from aiofiles.os.stat(str(tmp_filename))).st_size
assert contents == actual_contents
assert size == actual_size
async def file_route(request, filename):
file_path = os.path.join(static_file_directory, filename)
file_path = os.path.abspath(unquote(file_path))
headers = dict()
headers["Accept-Ranges"] = "bytes"
if request.method == "HEAD":
# Return a normal HTTPResponse, not a
# StreamingHTTPResponse for a HEAD request
stats = await async_os.stat(file_path)
headers["Content-Length"] = str(stats.st_size)
return HTTPResponse(
headers=headers,
content_type=guess_type(file_path)[0] or "text/plain",
)
else:
return file_stream(
file_path,
chunk_size=32,
headers=headers,
mime_type=guess_type(file_path)[0] or "text/plain",
)
async def test_simple_peek(mode, tmpdir):
"""Test flushing to a file."""
filename = 'file.bin'
full_file = tmpdir.join(filename)
full_file.write_binary(b'0123456789')
async with aioopen(str(full_file), mode=mode) as file:
if 'a' in mode:
await file.seek(0) # Rewind for append modes.
peeked = await file.peek(1)
# Technically it's OK for the peek to return less bytes than requested.
if peeked:
assert peeked.startswith(b'0')
read = await file.read(1)
assert peeked.startswith(read)
def serve_file(reader, writer):
full_filename = str(file)
f = yield from threadpool.open(full_filename, mode='rb')
writer.write((yield from f.read()))
yield from f.close()
writer.close()
async def test_simple_iteration(mode):
"""Test iterating over lines from a file."""
filename = join(dirname(__file__), '..', 'resources', 'multiline_file.txt')
async with aioopen(filename, mode=mode) as file:
# Append mode needs us to seek.
await file.seek(0)
counter = 1
# The old iteration pattern:
while True:
line = await file.readline()
if not line:
break
assert line.strip() == 'line ' + str(counter)
counter += 1
await file.seek(0)
counter = 1
async def test_simple_close_ctx_mgr(mode, buffering, tmpdir):
"""Open a file, read a byte, and close it."""
filename = 'bigfile.bin'
content = b'0' * 4 * io.DEFAULT_BUFFER_SIZE
full_file = tmpdir.join(filename)
full_file.write_binary(content)
async with aioopen(str(full_file), mode=mode, buffering=buffering) as file:
assert not file.closed
assert not file._file.closed
assert file.closed
assert file._file.closed
def serve_file(_, writer):
file = yield from aiofiles.threadpool.open(filename, mode='rb')
try:
while True:
data = yield from file.read(1)
if not data:
break
writer.write(data)
yield from writer.drain()
yield from writer.drain()
finally:
writer.close()
yield from file.close()