Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
return Sequence(repeat, MapStart())
elif record_type == "array":
repeat = Repeater(
ArrayEnd(),
ItemEnd(),
self._parse(schema["items"]),
)
return Sequence(repeat, ArrayStart())
elif record_type == "enum":
return Sequence(EnumLabels(schema["symbols"]), Enum())
elif record_type == "null":
return Null()
elif record_type == "boolean":
return Boolean()
elif record_type == "string":
return String()
elif record_type == "bytes":
return Bytes()
elif record_type == "int":
return Int()
elif record_type == "long":
return Long()
elif record_type == "float":
return Float()
elif record_type == "double":
return Double()
elif record_type == "fixed":
return Fixed()
def read_null(self):
self._parser.advance(Null())
return self.read_value()
def write_null(self):
self._parser.advance(Null())
self.write_value(None)
def write_index(self, index, schema):
self._parser.advance(Union())
alternative_symbol = self._parser.pop_symbol()
symbol = alternative_symbol.get_symbol(index)
if symbol != Null():
self.write_object_start()
self.write_object_key(alternative_symbol.get_label(index))
# TODO: Do we need this symbol?
self._parser.push_symbol(UnionEnd())
self._parser.push_symbol(symbol)