Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import typing
import msgpack
from faust.serializers import codecs
# Codec example from https://faust.readthedocs.io/en/latest/userguide/models.html#codecs
class raw_msgpack(codecs.Codec):
def _dumps(self, obj: typing.Any) -> bytes:
return msgpack.dumps(obj)
def _loads(self, s: bytes) -> typing.Any:
return msgpack.loads(s)
def msgpack_codec() -> codecs.Codec:
return raw_msgpack() | codecs.binary()
from schema_registry.serializer.message_serializer import MessageSerializer
from faust.serializers.codecs import Codec
from typing import Dict
class AvroSerializer(MessageSerializer, Codec):
"""
Subclass of faust.serializers.codecs.Codec and
datamountaineer.schema_registry.serializers.MessageSerializer
schama_registry_client: SchemaRegistryClient
Client used to call the schema-registry service
destination_topic: str
Topic used to send the encoded message
schema: str
Parsed avro schema. Must be a parsed schema from the python avro library
is_key: bool
"""
def __init__(self, schema_registry_client, destination_topic, schema, is_key=False):
self.schema_registry_client = schema_registry_client
self.destination_topic = destination_topic