Examplesο
Each example is self-contained. Canβt find what you seek? Create a new issue.
Topicο
Topic can be used to send and receive kafka messages.
Data is sent to kafka when
topicis called as a functionMessages can be consumed by iterating over the
topicobject
from snapstream import Topic
topic = Topic('emoji', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
}, offset=-2)
topic('π')
topic(key='key', val='π')
for msg in topic:
print(msg.key(), msg.value().decode())
None π
b'key' π
Topic uses confluent-kafka.
Cacheο
Cache can be used to persist data.
Data is cached when
cacheis called as a functionData is stored in sst files in the specified folder:
db
from snapstream import Cache
cache = Cache('db')
cache('prize', 'π')
cache['phone'] = 'π'
for x, y in cache.items():
print(x, y)
phone π
prize π
To avoid race conditions, lock database keys using the transaction context manager:
with cache.transaction('fish'):
cache['fish'] = 'π'
Cache is a basic wrapper around rocksdict.
Confο
Conf can be used to set default kafka configurations.
Conf is a Singleton class, only one instance exists
Configurations can be overridden per topic
from snapstream import Conf, Topic
Conf({
'bootstrap.servers': 'localhost:29091',
'group.id': 'default-demo',
})
topic1 = Topic('emoji', {'bootstrap.servers': 'localhost:29092'})
Conf({
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'myuser',
'sasl.password': 'mypass',
})
topic2 = Topic('conf', {'group.id': 'demo'})
print(topic1.conf)
print(topic2.conf)
{'bootstrap.servers': 'localhost:29092', 'group.id': 'default-demo'}
{'bootstrap.servers': 'localhost:29091', 'group.id': 'demo', 'security.protocol': 'SASL_SSL', 'sasl.mechanism': 'PLAIN', 'sasl.username': 'myuser', 'sasl.password': 'mypass'}
Yieldο
When your handler function returns zero or more values, use yield instead of return.
from snapstream import snap, stream
@snap(range(5), sink=[print])
def handler(n):
if n % 2 == 0:
yield f'even: {n}'
if n == 0:
yield f'zero: {n}'
stream()
even: 0
zero: 0
even: 2
even: 4
Timerο
If thereβs no incoming data, generators can be used to trigger handler functions.
The
timer()function returns a generator that yieldsNoneevery 1.0 seconds
from time import localtime, sleep, strftime
from snapstream import snap, stream
def timer(interval=1.0):
while True:
yield
sleep(interval)
@snap(timer())
def handler():
print(strftime('%H:%M:%S', localtime()))
stream()
23:25:10
23:25:11
23:25:12
...
Codecο
Codecs are used for serializing and deserializing data.
Using
JsonCodecvalues are automatically converted to and from json
from snapstream import Topic
from snapstream.codecs import JsonCodec, ICodec
topic = Topic('codecs', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
}, offset=-2, codec=JsonCodec())
topic({'msg': 'π'})
for msg in topic:
print(msg.value())
{'msg': 'π'}
Itβs possible to create custom codecs by extending
ICodec
class AvroCodec(ICodec):
"""Serializes/deserializes avro messages."""
def __init__(self, path: str):
"""Load avro schema."""
with open(path) as a:
self.schema = parse(a.read())
def encode(self, obj: Any) -> bytes:
"""Serialize message."""
val = serialize_avro(self.schema, obj)
return cast(bytes, val)
def decode(self, s: bytes) -> object:
"""Deserialize message."""
val = deserialize_avro(self.schema, s)
return cast(object, val)
Slicingο
To read a specific range or single offset from kafka, use the slice notation:
from snapstream import Topic
topic = Topic('slicing', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
})
for x in 'π', 'π', 'π', 'π':
topic(x)
for x in topic[3]:
print(x.value().decode(), x.offset())
Itβs recommended to never use list or any active operation that collects all data, as the stream may be unbounded.
Hereβs the message at offset 3:
π 3
If we want to get offsets 0 up until 3, we can slice the topic:
for x in topic[0:3]:
print(x.value().decode(), x.offset())
π 0
π 1
π 2
If the retention period could have surpassed that of the first message, itβs worth using -2 as the first offset.
In the following snippet, we also pass a step of 2, taking every second message:
for x in topic[-2::2]:
print(x.value().decode(), x.offset())
π 0
π 2
...
Youβll also notice that the program keeps waiting until the stop condition has been met.