Examples

Each example is self-contained. Can’t find what you seek? Create a new issue.

Topic

Topic can be used to send and receive kafka messages.

  • Data is sent to kafka when topic is called as a function

  • Messages can be consumed by iterating over the topic object

from snapstream import Topic

topic = Topic('emoji', {
    'bootstrap.servers': 'localhost:29091',
    'auto.offset.reset': 'earliest',
    'group.instance.id': 'demo',
    'group.id': 'demo',
}, offset=-2)

topic('🐟')
topic(key='key', val='🐟')

for msg in topic:
    print(msg.key(), msg.value().decode())
None 🐟
b'key' 🐟

Topic uses confluent-kafka.

Cache

Cache can be used to persist data.

  • Data is cached when cache is called as a function

  • Data is stored in sst files in the specified folder: db

from snapstream import Cache

cache = Cache('db')

cache('prize', 'πŸ†')
cache['phone'] = 'πŸ“ž'

for x, y in cache.items():
    print(x, y)
phone πŸ“ž
prize πŸ†

To avoid race conditions, lock database keys using the transaction context manager:

with cache.transaction('fish'):
    cache['fish'] = '🐟'

Cache is a basic wrapper around rocksdict.

Conf

Conf can be used to set default kafka configurations.

  • Conf is a Singleton class, only one instance exists

  • Configurations can be overridden per topic

from snapstream import Conf, Topic

Conf({
    'bootstrap.servers': 'localhost:29091',
    'group.id': 'default-demo',
})

topic1 = Topic('emoji', {'bootstrap.servers': 'localhost:29092'})

Conf({
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'myuser',
    'sasl.password': 'mypass',
})

topic2 = Topic('conf', {'group.id': 'demo'})

print(topic1.conf)
print(topic2.conf)
{'bootstrap.servers': 'localhost:29092', 'group.id': 'default-demo'}
{'bootstrap.servers': 'localhost:29091', 'group.id': 'demo', 'security.protocol': 'SASL_SSL', 'sasl.mechanism': 'PLAIN', 'sasl.username': 'myuser', 'sasl.password': 'mypass'}

Yield

When your handler function returns zero or more values, use yield instead of return.

from snapstream import snap, stream

@snap(range(5), sink=[print])
def handler(n):
    if n % 2 == 0:
        yield f'even: {n}'
    if n == 0:
        yield f'zero: {n}'

stream()
even: 0
zero: 0
even: 2
even: 4

Timer

If there’s no incoming data, generators can be used to trigger handler functions.

  • The timer() function returns a generator that yields None every 1.0 seconds

from time import localtime, sleep, strftime

from snapstream import snap, stream

def timer(interval=1.0):
    while True:
        yield
        sleep(interval)

@snap(timer())
def handler():
    print(strftime('%H:%M:%S', localtime()))

stream()
23:25:10
23:25:11
23:25:12
...

Codec

Codecs are used for serializing and deserializing data.

  • Using JsonCodec values are automatically converted to and from json

from snapstream import Topic
from snapstream.codecs import JsonCodec, ICodec

topic = Topic('codecs', {
    'bootstrap.servers': 'localhost:29091',
    'auto.offset.reset': 'earliest',
    'group.instance.id': 'demo',
    'group.id': 'demo',
}, offset=-2, codec=JsonCodec())

topic({'msg': '🐟'})

for msg in topic:
    print(msg.value())
{'msg': '🐟'}
  • It’s possible to create custom codecs by extending ICodec

class AvroCodec(ICodec):
  """Serializes/deserializes avro messages."""

  def __init__(self, path: str):
      """Load avro schema."""
      with open(path) as a:
          self.schema = parse(a.read())

  def encode(self, obj: Any) -> bytes:
      """Serialize message."""
      val = serialize_avro(self.schema, obj)
      return cast(bytes, val)

  def decode(self, s: bytes) -> object:
      """Deserialize message."""
      val = deserialize_avro(self.schema, s)
      return cast(object, val)

Slicing

To read a specific range or single offset from kafka, use the slice notation:

from snapstream import Topic

topic = Topic('slicing', {
    'bootstrap.servers': 'localhost:29091',
    'auto.offset.reset': 'earliest',
    'group.instance.id': 'demo',
    'group.id': 'demo',
})

for x in 'πŸ†', 'πŸ“ž', '🐟', 'πŸ‘Œ':
    topic(x)

for x in topic[3]:
    print(x.value().decode(), x.offset())

It’s recommended to never use list or any active operation that collects all data, as the stream may be unbounded. Here’s the message at offset 3:

πŸ‘Œ 3

If we want to get offsets 0 up until 3, we can slice the topic:

for x in topic[0:3]:
    print(x.value().decode(), x.offset())
πŸ† 0
πŸ“ž 1
🐟 2

If the retention period could have surpassed that of the first message, it’s worth using -2 as the first offset. In the following snippet, we also pass a step of 2, taking every second message:

for x in topic[-2::2]:
    print(x.value().decode(), x.offset())
πŸ† 0
🐟 2
...

You’ll also notice that the program keeps waiting until the stop condition has been met.

Endpoint

To share streaming data using API endpoints, install uvicorn and fastapi, then use Thread to start the server.

In this example dictquery is used to query the cache:

from threading import Thread
from time import sleep

import dictquery as dq
from fastapi import FastAPI
from snapstream import Cache, snap, stream
from uvicorn import run

app, cache = FastAPI(), Cache('db')

@snap(range(1000), sink=[cache])
def add_unicode_to_cache(key):
    yield key, {'number': key, 'unicode': chr(key)}
    sleep(5)

@app.get('/query/{query}')
async def query_cache(query: str):
    return list(dq.filter(cache.values(), query))

if __name__ == '__main__':
    Thread(target=run, args=(app,), daemon=True).start()
    stream()

When we call the following url http://127.0.0.1:8000/query/number >= 2 it will return the entries that are available:

[
  {"number": 2, "unicode": "\u0002"},
  {"number": 3, "unicode": "\u0003"},
  ...
]