Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_check_concatenate(source_codec, output_codec):
blocks1, records1 = make_blocks(codec=source_codec)
blocks2, records2 = make_blocks(codec=source_codec)
new_file = MemoryIO()
w = fastavro.write.Writer(new_file, schema, codec=output_codec)
for block in blocks1:
w.write_block(block)
for block in blocks2:
w.write_block(block)
# Read the file back to make sure we get back the same stuff
new_file.seek(0)
new_records = list(fastavro.reader(new_file, schema))
assert new_records == records1 + records2
"type": {"type": "string"}
},
{
"name": "field2",
"type": {"type": "int"}
}
]
}
records = [
{"field1": "test1", "field2": -1},
{"field1": "test2", "field2": 5}
]
temp_path = tmpdir.join('test_writer_class.avro')
with temp_path.open('wb') as fo:
w = Writer(fo, schema, codec='deflate')
# Creating the Writer adds the Avro file header. Get file size with
# header only.
size_with_header_only = fo.tell()
for i, record in enumerate(records):
assert w.block_count == i
w.write(record)
# Verify records are being stored *in memory*:
# 1. Block count increases
# 2. File size does not increase
assert w.block_count == i + 1
assert fo.tell() == size_with_header_only
# Flushing the file writes the data. File size should increase now.
w.flush()
"type": {"type": "string"}
},
{
"name": "field2",
"type": {"type": "int"}
}
]
}
records = [
{"field1": "test1", "field2": -1},
{"field1": "test2", "field2": 5}
]
temp_path = tmpdir.join('test_writer_class.avro')
with temp_path.open('wb') as fo:
w = Writer(fo, schema, codec='deflate', sync_interval=0)
# Creating the Writer adds the Avro file header. Get file size with
# header only.
file_size_history = [fo.tell()]
for i, record in enumerate(records):
assert w.block_count == 0
w.write(record)
# Verify records are being stored *in memory*:
# 1. Block count increases
# 2. File size does not increase
assert w.block_count == 0
file_size_history.append(fo.tell())
assert file_size_history[-1] > file_size_history[-2]
# Flushing the file writes the data. File size should increase now.
# First file: Write records until block_count goes back to 0 for the second
# time.
with temp_paths[0].open('wb') as fo:
w = Writer(fo, schema, codec='deflate')
_append_record(w)
while w.block_count > 0:
_append_record(w)
_append_record(w)
while w.block_count > 0:
_append_record(w)
w.flush()
interim_record_counts.append(len(records))
# Second file: 100 records
with temp_paths[1].open('wb') as fo:
w = Writer(fo, schema, codec='deflate')
for i in range(100):
_append_record(w)
w.flush()
interim_record_counts.append(len(records))
assert interim_record_counts[1] == interim_record_counts[0] + 100
# Read the records to verify they were written correctly.
new_records = []
new_interim_record_counts = []
for temp_path in temp_paths:
new_reader = fastavro.reader(temp_path.open('rb'))
new_records += list(new_reader)
new_interim_record_counts.append(len(new_records))
assert new_records == records
assert interim_record_counts == new_interim_record_counts
records = []
def _append_record(writer_):
record = {"field": "test{}".format(len(records))}
records.append(record)
writer_.write(record)
temp_paths = [
tmpdir.join('test_writer_class1.avro'),
tmpdir.join('test_writer_class2.avro')]
interim_record_counts = []
# First file: Write records until block_count goes back to 0 for the second
# time.
with temp_paths[0].open('wb') as fo:
w = Writer(fo, schema, codec='deflate')
_append_record(w)
while w.block_count > 0:
_append_record(w)
_append_record(w)
while w.block_count > 0:
_append_record(w)
w.flush()
interim_record_counts.append(len(records))
# Second file: 100 records
with temp_paths[1].open('wb') as fo:
w = Writer(fo, schema, codec='deflate')
for i in range(100):
_append_record(w)
w.flush()
interim_record_counts.append(len(records))
def open(self, temp_path):
file_handle = super(_FastAvroSink, self).open(temp_path)
return Writer(file_handle, self._schema, self._codec)
def __init__(self, file_handle, schema):
"""Initialize an AvroRowWriter.
Args:
file_handle (io.IOBase): Output stream to write Avro records to.
schema (Dict[Text, Any]): BigQuery table schema.
"""
if not file_handle.writable():
raise ValueError("Output stream must be writable")
self._file_handle = file_handle
avro_schema = fastavro.parse_schema(
get_avro_schema_from_table_schema(schema))
self._avro_writer = fastavro.write.Writer(self._file_handle, avro_schema)