Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"fields": [{
"name": "field",
"type": {
"logicalType": "decimal",
"precision": "5",
"scale": 2,
"type": "bytes",
},
}],
}
with pytest.raises(
SchemaParseException,
match="decimal precision must be a postive integer",
):
parse_schema(schema)
def test_parse_schema():
schema = {
"type": "record",
"name": "test_parse_schema",
"fields": [{
"name": "field",
"type": "string",
}],
}
parsed_schema = parse_schema(schema)
assert "__fastavro_parsed" in parsed_schema
parsed_schema_again = parse_schema(parsed_schema)
assert parsed_schema_again == parsed_schema
def test_aliases_are_preserved():
schema = {
"type": "record",
"name": "test_parse_schema",
"fields": [{
"name": "field",
"type": "string",
"aliases": ["test"],
}],
}
parsed_schema = parse_schema(schema)
assert "aliases" in parsed_schema["fields"][0]
def test_unknown_type():
schema = {
"type": "unknown",
}
with pytest.raises(UnknownType):
parse_schema(schema)
def __init__(self,
schema,
metadata=None,
validator=None):
self.schema = parse_schema(schema)
self.validate_fn = validate if validator is True else validator
self.metadata = metadata or {}
if isinstance(schema, dict):
schema = {
key: value
for key, value in iteritems(schema)
if key != "__fastavro_parsed"
}
self.metadata['avro.schema'] = json.dumps(schema)
----------
fo: file-like
Input stream
writer_schema: dict
Schema used to write the json file
Example::
from fastavro.json_reader import reader
with open('some-file.json', 'r') as fo:
for record in reader(fo, schema):
print(record)
"""
parsed_schema = parse_schema(writer_schema, _write_hint=False)
for line in fo:
json_loaded = json.loads(line.strip())
yield _read_json(json_loaded, parsed_schema)
{'name': 'time', 'type': 'long'},
{'name': 'temp', 'type': 'int'},
],
}
records = [
{u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
{u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
{u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
{u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
]
with open('weather.json', 'w') as out:
writer(out, schema, records)
"""
parsed_schema = parse_schema(schema, _write_hint=False)
for record in records:
json.dump(_write_json(record, parsed_schema), fo)
fo.write('\n')
Output file
schema: dict
Schema
record: dict
Record to write
Example::
parsed_schema = fastavro.parse_schema(schema)
with open('file.avro', 'rb') as fp:
fastavro.schemaless_writer(fp, parsed_schema, record)
Note: The ``schemaless_writer`` can only write a single record.
"""
schema = parse_schema(schema)
encoder = BinaryEncoder(fo)
write_data(encoder, record, schema)
encoder.flush()
if isinstance(fo, BinaryEncoder):
self.encoder = fo
else:
self.encoder = BinaryEncoder(fo)
self.io = BinaryEncoder(MemoryIO())
self.block_count = 0
self.sync_interval = sync_interval
self.compression_level = compression_level
if appendable(self.encoder._fo):
# Seed to the beginning to read the header
self.encoder._fo.seek(0)
avro_reader = reader(self.encoder._fo)
header = avro_reader._header
file_writer_schema = parse_schema(avro_reader.writer_schema)
if self.schema != file_writer_schema:
msg = "Provided schema {} does not match file writer_schema {}"
raise ValueError(msg.format(self.schema, file_writer_schema))
codec = avro_reader.metadata.get("avro.codec", "null")
self.sync_marker = header["sync"]
# Seek to the end of the file
self.encoder._fo.seek(0, 2)
self.block_writer = BLOCK_WRITERS[codec]
else:
self.sync_marker = sync_marker or urandom(SYNC_SIZE)
try:
self._header = read_data(self.decoder, HEADER_SCHEMA, None,
self.return_record_name)
except StopIteration:
raise ValueError('cannot read header - is it an avro file?')
# `meta` values are bytes. So, the actual decoding has to be external.
self.metadata = {
k: btou(v) for k, v in iteritems(self._header['meta'])
}
self._schema = json.loads(self.metadata['avro.schema'])
self.codec = self.metadata.get('avro.codec', 'null')
# Always parse the writer schema since it might have named types that
# need to be stored in SCHEMA_DEFS
self.writer_schema = parse_schema(
self._schema, _write_hint=False, _force=True
)