Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
}],
}
with pytest.raises(SchemaParseException):
fastavro.parse_schema(record_schema)
error_schema = {
"type": "error",
"fields": [{
"name": "field",
"type": "string",
}],
}
with pytest.raises(SchemaParseException):
fastavro.parse_schema(error_schema)
fixed_schema = {
"type": "fixed",
"size": 1,
}
with pytest.raises(SchemaParseException):
fastavro.parse_schema(fixed_schema)
enum_schema = {
"type": "enum",
"symbols": ["FOO"],
}
with pytest.raises(SchemaParseException):
fastavro.parse_schema(enum_schema)
def test_named_types_have_names():
record_schema = {
"type": "record",
"fields": [{
"name": "field",
"type": "string",
}],
}
with pytest.raises(SchemaParseException):
fastavro.parse_schema(record_schema)
error_schema = {
"type": "error",
"fields": [{
"name": "field",
"type": "string",
}],
}
with pytest.raises(SchemaParseException):
fastavro.parse_schema(error_schema)
fixed_schema = {
"type": "fixed",
"size": 1,
}
def test_parse_schema_rejects_unordered_references():
try:
fastavro.parse_schema({
"type": "record",
"name": "test_parse_schema_rejects_unordered_references",
"fields": [{
"name": "left",
"type": "Thinger"
}, {
"name": "right",
"type": {
"type": "record",
"name": "Thinger",
"fields": [{
"name": "the_thing",
"type": "string"
}]
}
}]
named_schema = {
"name": "named_schema_with_logical_type",
"namespace": "com.example",
"type": "record",
"fields": [
{
"name": "item",
"type": {
"type": "int",
"logicalType": "date"
}
}
]
}
fastavro.parse_schema(named_schema)
schema = {
"type": "record",
"name": "test_named_schema_with_logical_type",
"fields": [{
"name": "item",
"type": [
"null",
"com.example.named_schema_with_logical_type"
]
}]
}
records = [
{"item": None},
{"item": {"item": "2019-05-06"}}
def write(schema, records, runs=1):
times = []
schema = parse_schema(schema)
for _ in range(runs):
iostream = BytesIO()
start = time.time()
writer(iostream, schema, records)
end = time.time()
times.append(end - start)
print('... {0} runs averaged {1} seconds'.format(runs, (sum(times) / runs)))
return iostream
def __init__(self, file_handle, schema):
"""Initialize an AvroRowWriter.
Args:
file_handle (io.IOBase): Output stream to write Avro records to.
schema (Dict[Text, Any]): BigQuery table schema.
"""
if not file_handle.writable():
raise ValueError("Output stream must be writable")
self._file_handle = file_handle
avro_schema = fastavro.parse_schema(
get_avro_schema_from_table_schema(schema))
self._avro_writer = fastavro.write.Writer(self._file_handle, avro_schema)
def validater(schema, records, runs=1):
times = []
valid = []
schema = parse_schema(schema)
for _ in range(runs):
start = time.time()
valid = validate_many(records, schema)
end = time.time()
times.append(end - start)
print('... {0} runs averaged {1} seconds'.format(runs, (sum(times) / runs)))
return valid
def read_schemaless(iostream, schema, num_records, runs=1):
times = []
schema = parse_schema(schema)
for _ in range(runs):
for _ in range(num_records):
iostream.seek(0)
start = time.time()
record = schemaless_reader(iostream, schema)
end = time.time()
times.append(end - start)
print('... {0} runs averaged {1} seconds'.format(runs, (sum(times) / runs)))
return records
def __init__(self, schema):
self.parsed_schema = parse_schema(json.loads(schema))