How to use the faust.serializers.codecs.Codec function in faust

To help you get started, we’ve selected a few faust examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github marcosschroh / cookiecutter-faust / {{cookiecutter.project_slug}} / {{cookiecutter.project_slug}} / codecs / codec.py View on Github external
import typing

import msgpack

from faust.serializers import codecs

# Codec example from https://faust.readthedocs.io/en/latest/userguide/models.html#codecs


class raw_msgpack(codecs.Codec):
    def _dumps(self, obj: typing.Any) -> bytes:
        return msgpack.dumps(obj)

    def _loads(self, s: bytes) -> typing.Any:
        return msgpack.loads(s)


def msgpack_codec() -> codecs.Codec:
    return raw_msgpack() | codecs.binary()
github marcosschroh / python-schema-registry-client / schema_registry / serializer / faust_avro_serializer.py View on Github external
from schema_registry.serializer.message_serializer import MessageSerializer

from faust.serializers.codecs import Codec

from typing import Dict


class AvroSerializer(MessageSerializer, Codec):
    """
    Subclass of faust.serializers.codecs.Codec and
    datamountaineer.schema_registry.serializers.MessageSerializer

    schama_registry_client: SchemaRegistryClient
        Client used to call the schema-registry service
    destination_topic: str
        Topic used to send the encoded message
    schema: str
        Parsed avro schema. Must be a parsed schema from the python avro library
    is_key: bool
    """

    def __init__(self, schema_registry_client, destination_topic, schema, is_key=False):
        self.schema_registry_client = schema_registry_client
        self.destination_topic = destination_topic