Kafkaο
Spin up a local kafka broker using docker-compose.yml to follow along:
docker compose up broker -d
Note: check the logs before sending messages to Kafka, these messages may not be delivered until the broker is ready.
Hereβs the hello-world application using Kafka:
from time import sleep
from snapstream import Topic, snap, stream
messages = ('π', 'π', 'π', 'π')
t = Topic('emoji', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
})
@snap(messages, sink=[t])
def produce(msg):
sleep(0.5)
print(f'producing {msg}')
return msg
@snap(t, sink=[print])
def consume(msg):
val = msg.value().decode()
return f'got: {val}'
stream()
The principles remain the same:
Any iterable may act as a source of data
Any callable can be used as a sink
When we call
stream()
, each iterable is processed in a separate threadElements are published to each handler function (thatβs decorated using
snap
)
Producing π
got: π
Producing π
got: π
Producing π
got: π
Producing π
got: π
These simple concepts offer interesting ways to establish complex arbitrary stateful streams.
Note: out of the box advanced features such as synchronizing streams could be offered in the future, feel free to contribute!