Examplesο
Spin up a local kafka broker using docker-compose.yml to follow along:
docker compose up broker -d
Canβt find what you seek? Create a new issue.
Topicο
Topic can be used to send and receive kafka messages.
Data is sent to kafka when
topic
is called as a function
from snapstream import Topic
topic = Topic('emoji', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
}, offset=-2)
topic('π')
topic(key='key', val='π')
for msg in topic:
print(msg.key(), msg.value().decode())
Messages are consumed when
topic
is iterated over
None π
b'key' π
Topic uses confluent-kafka.
Cacheο
Cache can be used to persist data.
Data is cached when
cache
is called as a function
from snapstream import Cache
cache = Cache('db')
cache('prize', 'π')
Data is stored in sst files in the provided folder:
db
cache['phone'] = 'π'
for x, y in cache.items():
print(x, y)
Cache is also subscriptable
phone π
prize π
Cache is a basic wrapper around rocksdict.
Confο
Conf can be used to set default kafka configurations.
Conf is a Singleton class, there can only be a single instance
from snapstream import Conf, Topic
Conf({
'bootstrap.servers': 'localhost:29091',
'group.id': 'default-demo',
})
topic1 = Topic('emoji', {'bootstrap.servers': 'localhost:29092'})
Conf({
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'myuser',
'sasl.password': 'mypass',
})
topic2 = Topic('tweets', {'group.id': 'demo'})
print(topic1.conf)
print(topic2.conf)
Default configurations can be overridden per topic
{'bootstrap.servers': 'localhost:29092', 'group.id': 'default-demo'}
{'bootstrap.servers': 'localhost:29091', 'group.id': 'demo', 'security.protocol': 'SASL_SSL', 'sasl.mechanism': 'PLAIN', 'sasl.username': 'myuser', 'sasl.password': 'mypass'}
Yieldο
When your handler function returns zero or more values, use yield
instead of return
.
from snapstream import snap, stream
@snap(range(5), sink=[print])
def handler(n):
if n % 2 == 0:
yield f'equal: {n}'
if n == 0:
yield f'zero: {n}'
stream()
equal: 0
zero: 0
equal: 2
equal: 4
Timerο
If thereβs no incoming data, generators can be used to trigger handler functions.
from time import localtime, sleep, strftime
from snapstream import snap, stream
def timer(interval=1.0):
while True:
yield
sleep(interval)
@snap(timer())
def handler():
print(strftime('%H:%M:%S', localtime()))
stream()
The
timer()
function returns a generator that yieldsNone
every 1.0 seconds
23:25:10
23:25:11
23:25:12
...
Codecο
Codecs are used for serializing and deserializing data.
Data thatβs passed to
topic
is automatically json serialized
from snapstream import Topic
from snapstream.codecs import JsonCodec, ICodec
topic = Topic('codec-demo', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
}, offset=-2, codec=JsonCodec())
topic({'msg': 'π'})
for msg in topic:
print(msg.value())
Data thatβs read from
topic
is automatically deserialized
{'msg': 'π'}
Itβs possible to create custom codecs by extending
ICodec
class AvroCodec(ICodec):
"""Serializes/deserializes avro messages."""
def __init__(self, path: str):
"""Load avro schema."""
with open(path) as a:
self.schema = parse(a.read())
def encode(self, obj: Any) -> bytes:
"""Serialize message."""
val = serialize_avro(self.schema, obj)
return cast(bytes, val)
def decode(self, s: bytes) -> object:
"""Deserialize message."""
val = deserialize_avro(self.schema, s)
return cast(object, val)