Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_eof_error():
schema = {
"type": "record",
"name": "test_eof_error",
"fields": [{
"name": "test",
"type": "float",
}]
}
new_file = MemoryIO()
record = {"test": 1.234}
fastavro.schemaless_writer(new_file, schema, record)
# Back up one byte and truncate
new_file.seek(-1, 1)
new_file.truncate()
new_file.seek(0)
with pytest.raises(EOFError):
fastavro.schemaless_reader(new_file, schema)
def serialize(schema, data):
bytes_writer = BytesIO()
fastavro.schemaless_writer(bytes_writer, schema, data)
return bytes_writer.getvalue()
"items": "string"
},
"string"
],
}],
"name": "description",
"doc": "A description of the thing."
}
other_type_schema = CustomDict(schema)
record = {
'description': 'value',
}
new_file = MemoryIO()
fastavro.schemaless_writer(new_file, schema, record)
new_file.seek(0)
new_record = fastavro.schemaless_reader(new_file, other_type_schema)
assert record == new_record
"name": "test_boolean_roundtrip",
"fields": [{
"name": "field",
"type": "boolean"
}]
}
record = {"field": True}
new_file = MemoryIO()
fastavro.schemaless_writer(new_file, schema, record)
new_file.seek(0)
new_record = fastavro.schemaless_reader(new_file, schema)
assert record == new_record
record = {"field": False}
new_file = MemoryIO()
fastavro.schemaless_writer(new_file, schema, record)
new_file.seek(0)
new_record = fastavro.schemaless_reader(new_file, schema)
assert record == new_record
`_io.BytesIO`
Encoded data.
Examples
----------
>>> with open(ztf_alert_sample, mode='rb') as file_data:
... data = readschemadata(file_data)
... # Read the schema
... schema = data.schema
... for record in data:
... bytes = writeavrodata(record, schema)
>>> print(type(bytes))
"""
bytes_io = io.BytesIO()
fastavro.schemaless_writer(bytes_io, json_schema, json_data)
return bytes_io
def encode(self, obj):
self._validate_object_type(obj)
buffer = io.BytesIO()
fastavro.schemaless_writer(buffer, self._schema, obj.__dict__)
return buffer.getvalue()
def _dumps(self, obj):
bytes_writer = io.BytesIO()
if self.encoding_method:
datum = self.encoding_method(obj)
schemaless_writer(bytes_writer, self.schema_dict, datum)
else:
schemaless_writer(bytes_writer, self.schema_dict, datum)
return bytes_writer.getvalue()
def write_func(datum):
schemaless_writer(output, schema, datum)
return write_func
def write_schemaless(schema, records, runs=1):
times = []
schema = parse_schema(schema)
for _ in range(runs):
for record in records:
iostream = BytesIO()
start = time.time()
schemaless_writer(iostream, schema, record)
end = time.time()
times.append(end - start)
print('... {0} runs averaged {1} seconds'.format(runs, (sum(times) / runs)))
return iostream