Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"type": "record",
"fields": [
{"name": "station", "type": "string"},
{"name": "time", "type": "long"},
{"name": "temp", "type": "int"},
],
}
records = [
{"station": "011990-99999", "temp": 0, "time": 1433269388},
{"station": "011990-99999", "temp": 22, "time": 1433270389},
{"station": "011990-99999", "temp": -11, "time": 1433273379},
{"station": "012650-99999", "temp": 111, "time": 1433275478},
]
file = MemoryIO()
with pytest.raises(ValueError, match="unrecognized codec"):
fastavro.writer(file, schema, records, codec="unsupported")
file = MemoryIO()
fastavro.writer(file, schema, records, codec="deflate")
# Change the avro binary to act as if it were written with a codec called
# `unsupported`
modified_avro = file.getvalue().replace(b"\x0edeflate", b"\x16unsupported")
modified_file = MemoryIO(modified_avro)
with pytest.raises(ValueError, match="Unrecognized codec"):
list(fastavro.reader(modified_file))
def test_eof_error():
schema = {
"type": "record",
"name": "test_eof_error",
"fields": [{
"name": "test",
"type": "float",
}]
}
new_file = MemoryIO()
record = {"test": 1.234}
fastavro.schemaless_writer(new_file, schema, record)
# Back up one byte and truncate
new_file.seek(-1, 1)
new_file.truncate()
new_file.seek(0)
with pytest.raises(EOFError):
fastavro.schemaless_reader(new_file, schema)
"name": "name",
"type": "string"
}, {
"name": "age",
"type": "long"
}]
}]
}]
}
records = [
{'item': {'description': 'test', 'size': 1}},
{'item': {'id': {'id': '#1'}, 'name': 'foobar', 'age': 12}}
]
file = MemoryIO()
fastavro.writer(file, fastavro.parse_schema(schema), records)
file.seek(0)
# Clean the schema entries to simulate reading from a fresh process (no
# cached schemas)
del SCHEMA_DEFS['Outer']
del SCHEMA_DEFS['Inner1']
del SCHEMA_DEFS['Inner2']
del SCHEMA_DEFS['UUID']
# This should not raise a KeyError
fastavro.reader(file)
def roundtrip(schema, records, new_schema):
new_file = MemoryIO()
fastavro.writer(new_file, schema, records)
new_file.seek(0)
reader = fastavro.reader(new_file, new_schema)
new_records = list(reader)
return new_records
"type": "array",
"items": "string"
},
"string"
],
}],
"name": "description",
"doc": "A description of the thing."
}
other_type_schema = CustomDict(schema)
record = {
'description': 'value',
}
new_file = MemoryIO()
fastavro.schemaless_writer(new_file, schema, record)
new_file.seek(0)
new_record = fastavro.schemaless_reader(new_file, other_type_schema)
assert record == new_record
schema = {
"type": "record",
"name": "test_schema_migration_remove_field",
"fields": [{
"name": "test",
"type": "string",
}]
}
new_schema = {
"type": "record",
"name": "test_schema_migration_remove_field_new",
"fields": []
}
new_file = MemoryIO()
records = [{'test': 'test'}]
fastavro.writer(new_file, schema, records)
new_file.seek(0)
new_reader = fastavro.reader(new_file, new_schema)
new_records = list(new_reader)
assert new_records == [{}]
def test_metadata():
schema = {
"type": "record",
"name": "test_metadata",
"fields": []
}
new_file = MemoryIO()
records = [{}]
metadata = {'key': 'value'}
fastavro.writer(new_file, schema, records, metadata=metadata)
new_file.seek(0)
new_reader = fastavro.reader(new_file)
assert new_reader.metadata['key'] == metadata['key']
"fields": [{
"name": "test",
"type": ["string", "int"]
}]
}
new_schema = {
"type": "record",
"name": "test_schema_migration_writer_union_new",
"fields": [{
"name": "test",
"type": "int"
}]
}
new_file = MemoryIO()
records = [{"test": 1}]
fastavro.writer(new_file, schema, records)
new_file.seek(0)
new_reader = fastavro.reader(new_file, new_schema)
new_records = list(new_reader)
assert new_records == records
def lz4_read_block(decoder):
length = read_long(decoder)
data = decoder.read_fixed(length)
return MemoryIO(lz4.block.decompress(data))
def zstandard_read_block(decoder):
length = read_long(decoder)
data = decoder.read_fixed(length)
return MemoryIO(zstd.ZstdDecompressor().decompress(data))