Examples

Spin up a local kafka broker using docker-compose.yml to follow along:

docker compose up broker -d

Can’t find what you seek? Create a new issue.

Topic

Topic can be used to send and receive kafka messages.

  • Data is sent to kafka when topic is called as a function

from snapstream import Topic

topic = Topic('emoji', {
    'bootstrap.servers': 'localhost:29091',
    'auto.offset.reset': 'earliest',
    'group.instance.id': 'demo',
    'group.id': 'demo',
}, offset=-2)

topic('🐟')
topic(key='key', val='🐟')

for msg in topic:
    print(msg.key(), msg.value().decode())
  • Messages are consumed when topic is iterated over

None 🐟
b'key' 🐟

Topic uses confluent-kafka.

Cache

Cache can be used to persist data.

  • Data is cached when cache is called as a function

from snapstream import Cache

cache = Cache('db')

cache('prize', 'πŸ†')
  • Data is stored in sst files in the provided folder: db

cache['phone'] = 'πŸ“ž'

for x, y in cache.items():
    print(x, y)
  • Cache is also subscriptable

phone πŸ“ž
prize πŸ†

Cache is a basic wrapper around rocksdict.

Conf

Conf can be used to set default kafka configurations.

  • Conf is a Singleton class, there can only be a single instance

from snapstream import Conf, Topic

Conf({
    'bootstrap.servers': 'localhost:29091',
    'group.id': 'default-demo',
})

topic1 = Topic('emoji', {'bootstrap.servers': 'localhost:29092'})

Conf({
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'myuser',
    'sasl.password': 'mypass',
})

topic2 = Topic('tweets', {'group.id': 'demo'})

print(topic1.conf)
print(topic2.conf)
  • Default configurations can be overridden per topic

{'bootstrap.servers': 'localhost:29092', 'group.id': 'default-demo'}
{'bootstrap.servers': 'localhost:29091', 'group.id': 'demo', 'security.protocol': 'SASL_SSL', 'sasl.mechanism': 'PLAIN', 'sasl.username': 'myuser', 'sasl.password': 'mypass'}

Yield

When your handler function returns zero or more values, use yield instead of return.

from snapstream import snap, stream

@snap(range(5), sink=[print])
def handler(n):
    if n % 2 == 0:
        yield f'equal: {n}'
    if n == 0:
        yield f'zero: {n}'

stream()
equal: 0
zero: 0
equal: 2
equal: 4

Timer

If there’s no incoming data, generators can be used to trigger handler functions.

from time import localtime, sleep, strftime

from snapstream import snap, stream

def timer(interval=1.0):
    while True:
        yield
        sleep(interval)

@snap(timer())
def handler():
    print(strftime('%H:%M:%S', localtime()))

stream()
  • The timer() function returns a generator that yields None every 1.0 seconds

23:25:10
23:25:11
23:25:12
...

Codec

Codecs are used for serializing and deserializing data.

  • Data that’s passed to topic is automatically json serialized

from snapstream import Topic
from snapstream.codecs import JsonCodec, ICodec

topic = Topic('codec-demo', {
    'bootstrap.servers': 'localhost:29091',
    'auto.offset.reset': 'earliest',
    'group.instance.id': 'demo',
    'group.id': 'demo',
}, offset=-2, codec=JsonCodec())

topic({'msg': '🐟'})

for msg in topic:
    print(msg.value())
  • Data that’s read from topic is automatically deserialized

{'msg': '🐟'}
  • It’s possible to create custom codecs by extending ICodec

class AvroCodec(ICodec):
  """Serializes/deserializes avro messages."""

  def __init__(self, path: str):
      """Load avro schema."""
      with open(path) as a:
          self.schema = parse(a.read())

  def encode(self, obj: Any) -> bytes:
      """Serialize message."""
      val = serialize_avro(self.schema, obj)
      return cast(bytes, val)

  def decode(self, s: bytes) -> object:
      """Deserialize message."""
      val = deserialize_avro(self.schema, s)
      return cast(object, val)