Source code for snapstream.codecs

"""Snapstream codecs."""

import logging
from abc import ABCMeta, abstractmethod
from io import BytesIO
from json import dumps, loads
from typing import Any, Union, cast

from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter
from avro.schema import Schema, parse
from toolz import curry

from snapstream.utils import with_type_hint

logger = logging.getLogger(__name__)


[docs] def deserialize_json(msg: bytes) -> dict: """Deserialize json message.""" return loads(msg.decode())
[docs] def serialize_json(msg: dict) -> bytes: """Serialize json message.""" dumped = dumps(msg, default=str) return dumped.encode()
@with_type_hint @curry def deserialize_avro(schema: Schema, msg: bytes) -> object: """Deserialize avro message using provided schema.""" try: bytes_reader = BytesIO(msg) decoder = BinaryDecoder(bytes_reader) reader = DatumReader(schema) return reader.read(decoder) except Exception as e: logger.error(f'{e}\nschema:\n{schema}\nmsg:\n{str(msg)}.') raise @with_type_hint @curry def serialize_avro(schema: Schema, msg: dict) -> bytes: """Serialize avro message using provided schema.""" try: writer = DatumWriter(schema) bytes_writer = BytesIO() encoder = BinaryEncoder(bytes_writer) writer.write(msg, encoder) return bytes_writer.getvalue() except Exception as e: logger.error(f'{e}\nschema:\n{schema}\nmsg:\n{msg}.') raise
[docs] class ICodec(metaclass=ABCMeta): """Base class for codecs."""
[docs] @abstractmethod def encode(self, obj: Any) -> bytes: """Serialize object.""" raise NotImplementedError
[docs] @abstractmethod def decode(self, s: bytes) -> object: """Deserialize object.""" raise NotImplementedError
[docs] class JsonCodec(ICodec): """Serialize/deserialize json messages."""
[docs] def encode(self, obj: Any) -> bytes: """Serialize message.""" return serialize_json(obj)
[docs] def decode(self, s: bytes) -> object: """Deserialize message.""" return deserialize_json(s)
[docs] class AvroCodec(ICodec): """Serialize/deserialize avro messages.""" def __init__(self, schema: Union[str, Schema]): """Load avro schema.""" if isinstance(schema, Schema): self.schema = schema elif isinstance(schema, str): with open(schema) as a: self.schema = parse(a.read()) else: raise TypeError('Expected .avsc filepath str, or avro.schema.Schema instance.')
[docs] def encode(self, obj: Any) -> bytes: """Serialize message.""" val = serialize_avro(self.schema, obj) return cast(bytes, val)
[docs] def decode(self, s: bytes) -> object: """Deserialize message.""" val = deserialize_avro(self.schema, s) return cast(object, val)