Examplesο
Each example is self-contained. Canβt find what you seek? Create a new issue.
Topicο
Topic can be used to send and receive kafka messages.
Data is sent to kafka when
topic
is called as a functionMessages can be consumed by iterating over the
topic
object
from snapstream import Topic
topic = Topic('emoji', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
}, offset=-2)
topic('π')
topic(key='key', val='π')
for msg in topic:
print(msg.key(), msg.value().decode())
None π
b'key' π
Topic uses confluent-kafka.
Cacheο
Cache can be used to persist data.
Data is cached when
cache
is called as a functionData is stored in sst files in the specified folder:
db
from snapstream import Cache
cache = Cache('db')
cache('prize', 'π')
cache['phone'] = 'π'
for x, y in cache.items():
print(x, y)
phone π
prize π
To avoid race conditions, lock database keys using the transaction
context manager:
with cache.transaction('fish'):
cache['fish'] = 'π'
Cache is a basic wrapper around rocksdict.
Confο
Conf can be used to set default kafka configurations.
Conf is a Singleton class, only one instance exists
Configurations can be overridden per topic
from snapstream import Conf, Topic
Conf({
'bootstrap.servers': 'localhost:29091',
'group.id': 'default-demo',
})
topic1 = Topic('emoji', {'bootstrap.servers': 'localhost:29092'})
Conf({
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'myuser',
'sasl.password': 'mypass',
})
topic2 = Topic('conf', {'group.id': 'demo'})
print(topic1.conf)
print(topic2.conf)
{'bootstrap.servers': 'localhost:29092', 'group.id': 'default-demo'}
{'bootstrap.servers': 'localhost:29091', 'group.id': 'demo', 'security.protocol': 'SASL_SSL', 'sasl.mechanism': 'PLAIN', 'sasl.username': 'myuser', 'sasl.password': 'mypass'}
Yieldο
When your handler function returns zero or more values, use yield
instead of return
.
from snapstream import snap, stream
@snap(range(5), sink=[print])
def handler(n):
if n % 2 == 0:
yield f'even: {n}'
if n == 0:
yield f'zero: {n}'
stream()
even: 0
zero: 0
even: 2
even: 4
Timerο
If thereβs no incoming data, generators can be used to trigger handler functions.
The
timer()
function returns a generator that yieldsNone
every 1.0 seconds
from time import localtime, sleep, strftime
from snapstream import snap, stream
def timer(interval=1.0):
while True:
yield
sleep(interval)
@snap(timer())
def handler():
print(strftime('%H:%M:%S', localtime()))
stream()
23:25:10
23:25:11
23:25:12
...
Codecο
Codecs are used for serializing and deserializing data.
Using
JsonCodec
values are automatically converted to and from json
from snapstream import Topic
from snapstream.codecs import JsonCodec, ICodec
topic = Topic('codecs', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
}, offset=-2, codec=JsonCodec())
topic({'msg': 'π'})
for msg in topic:
print(msg.value())
{'msg': 'π'}
Itβs possible to create custom codecs by extending
ICodec
class AvroCodec(ICodec):
"""Serializes/deserializes avro messages."""
def __init__(self, path: str):
"""Load avro schema."""
with open(path) as a:
self.schema = parse(a.read())
def encode(self, obj: Any) -> bytes:
"""Serialize message."""
val = serialize_avro(self.schema, obj)
return cast(bytes, val)
def decode(self, s: bytes) -> object:
"""Deserialize message."""
val = deserialize_avro(self.schema, s)
return cast(object, val)
Slicingο
To read a specific range or single offset from kafka, use the slice notation:
from snapstream import Topic
topic = Topic('slicing', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
})
for x in 'π', 'π', 'π', 'π':
topic(x)
for x in topic[3]:
print(x.value().decode(), x.offset())
Itβs recommended to never use list
or any active operation that collects all data, as the stream may be unbounded.
Hereβs the message at offset 3:
π 3
If we want to get offsets 0 up until 3, we can slice the topic:
for x in topic[0:3]:
print(x.value().decode(), x.offset())
π 0
π 1
π 2
If the retention period could have surpassed that of the first message, itβs worth using -2
as the first offset.
In the following snippet, we also pass a step of 2
, taking every second message:
for x in topic[-2::2]:
print(x.value().decode(), x.offset())
π 0
π 2
...
Youβll also notice that the program keeps waiting until the stop condition has been met.
Endpointο
To share streaming data using API endpoints, install uvicorn
and fastapi
, then use Thread
to start the server.
In this example dictquery
is used to query the cache:
from threading import Thread
from time import sleep
import dictquery as dq
from fastapi import FastAPI
from snapstream import Cache, snap, stream
from uvicorn import run
app, cache = FastAPI(), Cache('db')
@snap(range(1000), sink=[cache])
def add_unicode_to_cache(key):
yield key, {'number': key, 'unicode': chr(key)}
sleep(5)
@app.get('/query/{query}')
async def query_cache(query: str):
return list(dq.filter(cache.values(), query))
if __name__ == '__main__':
Thread(target=run, args=(app,), daemon=True).start()
stream()
When we call the following url http://127.0.0.1:8000/query/number >= 2
it will return the entries that are available:
[
{"number": 2, "unicode": "\u0002"},
{"number": 3, "unicode": "\u0003"},
...
]