Testingο
The following snippet is out main.py
file. Letβs say that we want to test our produce
and consume
handler functions as shown in the Kafka example. Letβs also throw a Cache in there:
from snapstream import Cache, Topic, snap, stream
from time import sleep
messages = ('π', 'π', 'π', 'π')
c = Cache('db')
t = Topic('emoji', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
})
@snap(messages, sink=[t, c])
def produce(msg):
sleep(0.5)
print(f'producing {msg}')
yield msg, msg
@snap(t, sink=[print])
def consume(msg):
val = msg.value().decode()
return f'got: {val}'
stream()
Avoiding Side Effectsο
Itβs good practice to avoid triggering side effects when invoking a script by putting code with side effects in the following block:
c = Cache('db') # this triggers the 'db' folder to be created
if __name__ == "__main__":
# Things that should not run when importing from, or running tests in this module
pass
Mockingο
This test example β letβs call it test_main.py
β uses pytest-mock
(mocker fixture).
It uses:
FakeMessage: allows us to pass some value that our code can get using the
.value()
method, just like a Kafka messageFakeCache: allows us to replace a cache with a defaultdict that also has the
.values()
methodtest_produce: mocks the Cache and Topic, calls the function under test, then asserts that the topic was called with the correct output
from main import produce
class FakeMessage:
def __init__(self, return_value):
self.return_value = return_value
def value(self):
return self.return_value
def timestamp(self):
return (1, dt.now().timestamp() * 1000)
class FakeCache(defaultdict):
def __init__(self, contents={}):
return super().__init__(lambda: None, contents)
def __call__(self, key, val, *args) -> None:
self.__setitem__(key, val)
def values(self, *args, **kwargs):
return [_ for _ in super().values() if _]
def test_produce(mocker):
"""Should produce message to kafka topic."""
topic = mocker.stub(name='topic')
cache = FakeCache()
mocker.patch('snapstream.Topic.__call__', topic)
mocker.patch('main.c', cache)
produce(FakeMessage('π', {})) # type: ignore
# Assert that cache was updated
assert cache['π'] == 'π'
# Check if expected key and val have been produced
key, val = 'π', 'π'
topic.assert_called_once_with(key=key, val=val)