Source code for snapstream

"""Snapstream public objects."""

import logging
from inspect import signature
from typing import Any, Callable, Generator, Iterable

from pubsub import pub

from snapstream.caching import Cache
from snapstream.core import READ_FROM_END, READ_FROM_START, Conf, Topic

__all__ = [
    'snap',
    'stream',
    'Conf',
    'Topic',
    'Cache',
    'READ_FROM_START',
    'READ_FROM_END',
]

logging.basicConfig()


def _sink_output(s: Callable[..., None], output: Any) -> None:
    if not isinstance(output, tuple) and isinstance(s, (Cache)):
        raise ValueError('Cache sink expects: Tuple[key, val].')
    elif isinstance(output, tuple) and isinstance(s, (Cache, Topic)):
        key, val = output
        s(key=key, val=val)
    else:
        s(output)


def _handle_generator_or_function(
    sink: Iterable[Callable[..., None]],
    output: Any
) -> None:
    if isinstance(output, Generator):
        for val in output:
            for s in sink:
                _sink_output(s, val)
    else:
        for s in sink:
            _sink_output(s, output)


[docs] def snap( *iterable: Iterable, sink: Iterable[Callable[..., None]] = [] ): """Snaps function to stream. Ex: >>> topic = Topic('demo') # doctest: +SKIP >>> cache = Cache('state/demo') # doctest: +SKIP >>> @snap(topic, sink=[print, cache]) # doctest: +SKIP ... def handler(msg, **kwargs): ... return msg.key(), msg.value() """ c = Conf() def _deco(f): def _handler(msg, kwargs={}): parameters = signature(f).parameters.values() if any(p.kind == p.VAR_KEYWORD for p in parameters): output = f(msg, **kwargs) elif parameters: output = f(msg) else: output = f() _handle_generator_or_function(sink, output) for it in iterable: iterable_key = str(id(it)) c.register_iterables((iterable_key, it)) pub.subscribe(_handler, iterable_key) return _handler return _deco
[docs] def stream(**kwargs): """Start the streams. Ex: >>> args = { ... 'env': 'DEV', ... } >>> stream(**args) """ Conf().start(**kwargs)