Source code for snapstream.core

"""Snapstream core objects."""

import logging
from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
from queue import Queue
from re import sub
from threading import Thread, current_thread
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    Optional,
    Set,
    Tuple,
)

from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.error import KafkaException
from pubsub import pub
from toolz import pipe

from snapstream.codecs import ICodec
from snapstream.utils import KafkaIgnoredPropertyFilter, Singleton

logger = logging.getLogger(__name__)
logger.addFilter(KafkaIgnoredPropertyFilter())

READ_FROM_START = -2
READ_FROM_END = -1


[docs] class Conf(metaclass=Singleton): """Define default kafka configuration, optionally. >>> Conf({'bootstrap.servers': 'localhost:29091'}) {'bootstrap.servers': 'localhost:29091'} """ iterables: Set[Tuple[str, Iterable]] = set()
[docs] def register_iterables(self, *it): """Add iterables to global Conf.""" self.iterables.add(*it)
[docs] @staticmethod def distribute_messages(it, queue, kwargs): """Publish messages from iterable.""" iterable_key = str(id(it)) try: for el in it: pub.sendMessage(iterable_key, msg=el, kwargs=kwargs) except BaseException as e: logger.debug(f'Exception in thread {current_thread().name}.') queue.put(e) finally: queue.put(None) logger.debug(f'Stopping thread {current_thread().name}.')
[docs] def start(self, **kwargs): """Start the streams.""" queue = Queue(maxsize=1) threads = [ Thread( target=self.distribute_messages, args=(it, queue, kwargs) ) for _, it in self.iterables ] try: for t in threads: logger.debug(f'Spawning thread {t.name}.') t.daemon = True t.start() while any(t.is_alive() for t in threads): if exception := queue.get(): raise exception except KeyboardInterrupt: exit() finally: self.iterables = set()
def __init__(self, conf: dict = {}) -> None: """Define init behavior.""" self.conf: Dict[str, Any] = {} self.__update__(conf) def __update__(self, conf: dict = {}): """Set default app configuration.""" self.conf = {**self.conf, **conf} for key, value in conf.items(): key = sub('[^0-9a-zA-Z]+', '_', key) setattr(self, key, value) def __repr__(self) -> str: """Represent config.""" return str(self.conf)
[docs] class ITopic(metaclass=ABCMeta): """Base class for topic implementations.""" @abstractmethod def __init__( self, name: str, conf: Dict[str, Any] = {}, offset: Optional[int] = None, codec: Optional[ICodec] = None, **kwargs: Dict[str, Any] ) -> None: """Initialize topic instance. - Should use Conf().conf as default kafka configuration. """ raise NotImplementedError
[docs] @abstractmethod def create_topic(self, name: str, *args: Any, **kwargs: Dict[str, Any]) -> None: """Create topic. - Should allow `*args`, `**kwargs` passthrough to kafka client. - Should log warning if topic already exists. """ raise NotImplementedError
@abstractmethod def __iter__(self) -> Iterable[Any]: """Consume from topic. - Should instantiate consumer when called (iterated over). - Should deserialize using topic codec. - Should use topic offset attribute. """ raise NotImplementedError @abstractmethod def __call__(self, val, key=None, * args, dry: bool = False, **kwargs: Dict[str, Any]) -> None: """Produce to topic. - Should instantiate producer when first called (function call). - Should set producer on topic producer attribute. - Should use topic producer attribute in susequent calls. - Should serialize using topic codec. - Should skip sending message when dry is True. - Should show a warning when skipped. """ raise NotImplementedError
def _consumer_handler(c, conf, poll_timeout, codec, raise_error): manual_commit = pipe( conf.get('enable.auto.commit'), str, str.lower ) == 'false' while True: msg = c.poll(poll_timeout) if msg is None: continue if err := msg.error(): if raise_error or err.fatal() or not err.retriable(): raise KafkaException(err) else: logger.error(msg.error()) continue if codec: decoded_val = codec.decode(msg.value()) msg.set_value(decoded_val) yield msg if manual_commit: c.commit()
[docs] @contextmanager def get_consumer( topic: str, conf: dict, offset=None, codec: Optional[ICodec] = None, poll_timeout: float = 1.0, poller=_consumer_handler, raise_error=False ) -> Iterator[Iterable[Any]]: """Yield an iterable to consume from kafka.""" c = Consumer(conf, logger=logger) def consume(): def on_assign(c, ps): for p in ps: if offset is not None: p.offset = offset c.assign(ps) logger.debug(f'Subscribing to topic: {topic}.') c.subscribe([topic], on_assign=on_assign) logger.debug(f'Consuming from topic: {topic}.') yield from poller(c, conf, poll_timeout, codec, raise_error) try: yield consume() finally: c.close()
def _producer_handler(p, topic, poll_timeout, codec, dry): def callback(err, msg): if err is not None: logger.error(f'Failed to deliver message: {err}.') # Raise exception by default raise KafkaException(err) else: logger.debug(f'Produced to topic: {msg.topic()}.') def produce(key, val, *args, **kwargs): if codec: logger.debug(f'Encoding using codec: {topic}.') val = codec.encode(val) if dry: logger.warning(f'Skipped sending message to {topic} [dry=True].') return p.produce(topic=topic, key=key, value=val, *args, **kwargs, callback=callback) p.poll(poll_timeout) return produce
[docs] @contextmanager def get_producer( topic: str, conf: dict, dry=False, codec: Optional[ICodec] = None, poll_timeout: float = 1.0, flush_timeout: float = -1.0, pusher=_producer_handler ) -> Iterator[Callable[[Any, Any], None]]: """Yield kafka produce method.""" p = Producer(conf, logger=logger) yield pusher(p, topic, poll_timeout, codec, dry) logger.debug(f'Flushing messages to kafka, flush_timeout={flush_timeout}.') p.flush(flush_timeout)
[docs] class Topic(ITopic): """Act as a consumer and producer. >>> topic = Topic('emoji', { ... 'bootstrap.servers': 'localhost:29091', ... 'auto.offset.reset': 'earliest', ... 'group.id': 'demo', ... }) Loop over topic (iterable) to consume from it: >>> for msg in topic: # doctest: +SKIP ... print(msg.value()) Call topic (callable) with data to produce to it: >>> topic({'msg': 'Hello World!'}) # doctest: +SKIP """ def __init__( self, name: str, conf: dict = {}, offset: Optional[int] = None, codec: Optional[ICodec] = None, flush_timeout: float = -1.0, poll_timeout: float = 1.0, pusher=_producer_handler, poller=_consumer_handler, dry: bool = False, raise_error: bool = False ) -> None: """Pass topic related configuration.""" c = Conf() self.name = name self.conf = {**c.conf, **conf} self.starting_offset = offset self.flush_timeout = flush_timeout self.poll_timeout = poll_timeout self.consumer = None self.producer = None self._producer_ctx = None self.pusher = pusher self.poller = poller self.codec = codec self.dry = dry self.raise_error = raise_error
[docs] def admin(self): """Get admin client.""" return AdminClient(self.conf)
[docs] def create_topic(self, *args, **kwargs) -> None: """Create topic.""" admin = AdminClient(self.conf) for t, f in admin.create_topics([NewTopic(self.name, *args, **kwargs)]).items(): try: f.result() logger.debug(f"Topic {t} created.") except KafkaException as e: if "TOPIC_ALREADY_EXISTS" in str(e): logger.warning(e) else: logger.error(e) raise
def __iter__(self) -> Iterator[Any]: """Consume from topic.""" c = get_consumer(self.name, self.conf, self.starting_offset, self.codec, self.poll_timeout, self.poller, self.raise_error) with c as consumer: for msg in consumer: yield msg def __next__(self) -> Any: """Consume next message from topic.""" self.consumer = self.consumer or self.__iter__() return next(self.consumer) def __getitem__(self, i) -> Any: """Consume specific range of messages from topic.""" if not isinstance(i, (slice, int)): raise TypeError('Expected slice or int.') start, step, stop = ( i, None, i + 1 if i >= 0 else None ) if isinstance(i, int) else ( i.start, i.step, i.stop ) c = get_consumer(self.name, self.conf, start, self.codec, self.poll_timeout, self.poller, self.raise_error) with c as consumer: for msg in consumer: if stop and msg.offset() >= stop: return if step and (msg.offset() - max(0, start)) % step != 0: continue yield msg def __call__(self, val, key=None, *args, **kwargs) -> None: """Produce to topic.""" self._producer_ctx = ( self._producer_ctx or get_producer(self.name, self.conf, self.dry, self.codec, self.poll_timeout, self.flush_timeout, self.pusher) ) self.producer = ( self.producer or self._producer_ctx.__enter__() ) self.producer(key, val, *args, **kwargs) def __del__(self): """Exit potential producer instance.""" if self._producer_ctx: self._producer_ctx.__exit__(None, None, None)