Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def bytes_with_schema_to_avro(avro_read_schema, binary):
with BytesIO(binary) as bytes_io:
reader = fastavro.reader(bytes_io, avro_read_schema)
return next(reader)
}
new_schema = {
"type": "record",
"name": "test_schema_migration_writer_union_new",
"fields": [{
"name": "test",
"type": "int"
}]
}
new_file = MemoryIO()
records = [{"test": 1}]
fastavro.writer(new_file, schema, records)
new_file.seek(0)
new_reader = fastavro.reader(new_file, new_schema)
new_records = list(new_reader)
assert new_records == records
def test_reader_schema_attributes_throws_deprecation():
"""https://github.com/fastavro/fastavro/issues/246"""
schema = {
"type": "record",
"name": "test_reader_schema_attributes_throws_deprecation",
"fields": []
}
stream = MemoryIO()
fastavro.writer(stream, schema, [{}])
stream.seek(0)
reader = fastavro.reader(stream)
with pytest.warns(DeprecationWarning):
reader.schema
"type": "record",
"name": "test_schema_migration_maps_with_union_promotion_new",
"fields": [{
"name": "test",
"type": {
"type": "map",
"values": ["string", "long"]
},
}]
}
new_file = MemoryIO()
records = [{"test": {"foo": 1}}]
fastavro.writer(new_file, schema, records)
new_file.seek(0)
new_reader = fastavro.reader(new_file, new_schema)
new_records = list(new_reader)
assert new_records == records
def roundtrip(record, writer_schema, reader_schema):
new_file = MemoryIO()
fastavro.writer(new_file, writer_schema, [record])
new_file.seek(0)
new_records = list(fastavro.reader(new_file, reader_schema))
return new_records[0]
def test_check_concatenate(source_codec, output_codec):
blocks1, records1 = make_blocks(codec=source_codec)
blocks2, records2 = make_blocks(codec=source_codec)
new_file = MemoryIO()
w = fastavro.write.Writer(new_file, schema, codec=output_codec)
for block in blocks1:
w.write_block(block)
for block in blocks2:
w.write_block(block)
# Read the file back to make sure we get back the same stuff
new_file.seek(0)
new_records = list(fastavro.reader(new_file, schema))
assert new_records == records1 + records2
schema_path = join(data_dir, 'Parent.avsc')
schema = fastavro.schema.load_schema(schema_path)
records = [{'child': {}, 'child1': {}}]
new_file = MemoryIO()
fastavro.writer(new_file, schema, records)
new_file.seek(0)
# Clean the Child and Parent entries so we are forced to get them from the
# schema
del SCHEMA_DEFS['Child']
del SCHEMA_DEFS['Child1']
del SCHEMA_DEFS['Parent']
reader = fastavro.reader(new_file)
new_records = list(reader)
assert new_records == records
keys = ['first', 'second', 'third', 'fourth']
testdata = [{key: gen_id() for key in keys} for _ in range(50)]
schema = {
"fields": [{'name': key, 'type': 'string'} for key in keys],
"namespace": "namespace",
"name": "zerobyte",
"type": "record"
}
buf = BytesIO()
fastavro.writer(buf, schema, testdata)
buf.seek(0, SEEK_SET)
for i, rec in enumerate(fastavro.reader(buf), 1):
pass
size = len(testdata)
assert i == size, 'bad number of records'
assert rec == testdata[-1], 'bad last record'
def __file_to_dataframe(f, schema, **kwargs):
reader = fastavro.reader(f, reader_schema=schema)
return pd.DataFrame.from_records(list(reader), **kwargs)
def _open_avro(self):
with open(self.fname, 'rb') as f:
freader = fastavro.reader(f)
# in principle there can be multiple packets per file
for packet in freader:
yield packet