Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def bytes_with_schema_to_avro(avro_read_schema, binary):
with BytesIO(binary) as bytes_io:
reader = fastavro.reader(bytes_io, avro_read_schema)
return next(reader)
}
new_schema = {
"type": "record",
"name": "test_schema_migration_writer_union_new",
"fields": [{
"name": "test",
"type": "int"
}]
}
new_file = MemoryIO()
records = [{"test": 1}]
fastavro.writer(new_file, schema, records)
new_file.seek(0)
new_reader = fastavro.reader(new_file, new_schema)
new_records = list(new_reader)
assert new_records == records
def test_reader_schema_attributes_throws_deprecation():
"""https://github.com/fastavro/fastavro/issues/246"""
schema = {
"type": "record",
"name": "test_reader_schema_attributes_throws_deprecation",
"fields": []
}
stream = MemoryIO()
fastavro.writer(stream, schema, [{}])
stream.seek(0)
reader = fastavro.reader(stream)
with pytest.warns(DeprecationWarning):
reader.schema
"type": "record",
"name": "test_schema_migration_maps_with_union_promotion_new",
"fields": [{
"name": "test",
"type": {
"type": "map",
"values": ["string", "long"]
},
}]
}
new_file = MemoryIO()
records = [{"test": {"foo": 1}}]
fastavro.writer(new_file, schema, records)
new_file.seek(0)
new_reader = fastavro.reader(new_file, new_schema)
new_records = list(new_reader)
assert new_records == records
def roundtrip(record, writer_schema, reader_schema):
new_file = MemoryIO()
fastavro.writer(new_file, writer_schema, [record])
new_file.seek(0)
new_records = list(fastavro.reader(new_file, reader_schema))
return new_records[0]
def test_check_concatenate(source_codec, output_codec):
blocks1, records1 = make_blocks(codec=source_codec)
blocks2, records2 = make_blocks(codec=source_codec)
new_file = MemoryIO()
w = fastavro.write.Writer(new_file, schema, codec=output_codec)
for block in blocks1:
w.write_block(block)
for block in blocks2:
w.write_block(block)
# Read the file back to make sure we get back the same stuff
new_file.seek(0)
new_records = list(fastavro.reader(new_file, schema))
assert new_records == records1 + records2
schema_path = join(data_dir, 'Parent.avsc')
schema = fastavro.schema.load_schema(schema_path)
records = [{'child': {}, 'child1': {}}]
new_file = MemoryIO()
fastavro.writer(new_file, schema, records)
new_file.seek(0)
# Clean the Child and Parent entries so we are forced to get them from the
# schema
del SCHEMA_DEFS['Child']
del SCHEMA_DEFS['Child1']
del SCHEMA_DEFS['Parent']
reader = fastavro.reader(new_file)
new_records = list(reader)
assert new_records == records
keys = ['first', 'second', 'third', 'fourth']
testdata = [{key: gen_id() for key in keys} for _ in range(50)]
schema = {
"fields": [{'name': key, 'type': 'string'} for key in keys],
"namespace": "namespace",
"name": "zerobyte",
"type": "record"
}
buf = BytesIO()
fastavro.writer(buf, schema, testdata)
buf.seek(0, SEEK_SET)
for i, rec in enumerate(fastavro.reader(buf), 1):
pass
size = len(testdata)
assert i == size, 'bad number of records'
assert rec == testdata[-1], 'bad last record'
def test_appending_records_different_schema_fails(tmpdir):
"""https://github.com/fastavro/fastavro/issues/276"""
schema = {
"type": "record",
"name": "test_appending_records_different_schema_fails",
"fields": [{
"name": "field",
"type": "string",
}]
}
test_file = str(tmpdir.join("test.avro"))
with open(test_file, "wb") as new_file:
fastavro.writer(new_file, schema, [{"field": "foo"}])
different_schema = {
"type": "record",
"name": "test_appending_records",
"fields": [{
"name": "field",
"type": "int",
}]
}
with open(test_file, "a+b") as new_file:
with pytest.raises(
ValueError, match="does not match file writer_schema"
):
fastavro.writer(new_file, different_schema, [{"field": 1}])
fastavro.writer(new_file, schema, [{"field": "foo"}])
different_schema = {
"type": "record",
"name": "test_appending_records",
"fields": [{
"name": "field",
"type": "int",
}]
}
with open(test_file, "a+b") as new_file:
with pytest.raises(
ValueError, match="does not match file writer_schema"
):
fastavro.writer(new_file, different_schema, [{"field": 1}])