snapstream package

Snapstream public objects.

class snapstream.Cache(path: str, options: Options | None = None, column_families: ~typing.Dict[str, Options] | None = None, access_type=<builtins.AccessType object>, target_table_size=26214400, number_of_locks=16)[source]

Bases: object

Create a RocksDB database in the specified folder.

>>> cache = Cache('db/mycache')            

The cache instance acts as a callable to store data:

>>> cache('key', {'msg': 'Hello World!'})  
>>> cache['key']                           
{'msg': 'Hello World!'}
close() None[source]

Flush memory to disk, and drop the current column family.

compact_range(begin: str | int | float | bytes | bool | None, end: str | int | float | bytes | bool | None, compact_opt: CompactOptions = <builtins.CompactOptions object>) None[source]

Run manual compaction on range for the current column family.

create_column_family(name: str, options: Options = <builtins.Options object>) Rdict[source]

Craete column family.

delete(key: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]

Delete item from database.

delete_range(begin: str | int | float | bytes | bool, end: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]

Delete database items, excluding end.

destroy(options: Options = <builtins.Options object>) None[source]

Delete the database.

drop_column_family(name: str) None[source]

Drop column family by name.

flush(wait: bool = True) None[source]

Manually flush the current column family.

flush_wal(sync: bool = True) None[source]

Manually flush the WAL buffer.

get(key: str | int | float | bytes | bool | List[str | int | float | bytes | bool], default: Any | None = None, read_opt: ReadOptions | None = None) Any | None[source]

Get item from database by key.

get_column_family(name: str) Rdict[source]

Get column family by name.

get_column_family_handle(name: str) ColumnFamily[source]

Get column family handle by name.

ingest_external_file(paths: ~typing.List[str], opts: IngestExternalFileOptions = <builtins.IngestExternalFileOptions object>) None[source]

Load list of SST files into current column family.

items(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictItems[source]

Get tuples of key-value pairs.

iter(read_opt: ReadOptions | None = None) RdictIter[source]

Get iterable.

key_may_exist(key: str | int | float | bytes | bool, fetch: bool = False, read_opt=None) bool | Tuple[bool, Any][source]

Check if a key exist without performing IO operations.

keys(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictKeys[source]

Get keys.

latest_sequence_number() int[source]

Get sequence number of the most recent transaction.

live_files() List[Dict[str, Any]][source]

Get list of all table files with their level, start- and end key.

path() str[source]

Get current database path.

property_int_value(name: str) int | None[source]

Get property as int by name from current column family.

property_value(name: str) str | None[source]

Get property by name from current column family.

put(key: str | int | float | bytes | bool, value: Any, write_opt: WriteOptions | None = None) None[source]

Put item in database using key.

set_dumps(dumps: Callable[[Any], bytes]) None[source]

Set custom dumps function.

set_loads(dumps: Callable[[bytes], Any]) None[source]

Set custom loads function.

set_options(options: Dict[str, str]) None[source]

Set options for current column family.

set_read_options(read_opt: ReadOptions) None[source]

Set custom read options.

set_write_options(write_opt: WriteOptions) None[source]

Set custom write options.

snapshot() Snapshot[source]

Create snapshot of current column family.

transaction(key) Any[source]

Lock the db entry while using the context manager.

values(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictValues[source]

Get values.

class snapstream.Conf(*args, **kwargs)[source]

Bases: object

Define default kafka configuration, optionally.

>>> Conf({'bootstrap.servers': 'localhost:29091'})
{'bootstrap.servers': 'localhost:29091'}
static distribute_messages(it, queue, kwargs)[source]

Publish messages from iterable.

iterables: Set[Tuple[str, Iterable]] = {}
register_iterables(*it)[source]

Add iterables to global Conf.

start(**kwargs)[source]

Start the streams.

class snapstream.Topic(name: str, conf: dict = {}, offset: int | None = None, codec: ~snapstream.codecs.ICodec | None = None, flush_timeout: float = -1.0, poll_timeout: float = 1.0, pusher=<function _producer_handler>, poller=<function _consumer_handler>, dry: bool = False, raise_error: bool = False, commit_each_message: bool = False)[source]

Bases: ITopic

Act as a consumer and producer.

>>> topic = Topic('emoji', {
...     'bootstrap.servers': 'localhost:29091',
...     'auto.offset.reset': 'earliest',
...     'group.id': 'demo',
... })

Loop over topic (iterable) to consume from it:

>>> for msg in topic:               
...     print(msg.value())

Call topic (callable) with data to produce to it:

>>> topic({'msg': 'Hello World!'})  
admin()[source]

Get admin client.

property consumer: Consumer | None

Get underlying consumer object.

create_topic(*args, **kwargs) None[source]

Create topic.

property producer: Producer

Get underlying producer object.

snapstream.snap(*iterable: Iterable, sink: Iterable[Callable[[...], None]] = [])[source]

Snaps function to stream.

Ex:
>>> topic = Topic('demo')               
>>> cache = Cache('state/demo')         
>>> @snap(topic, sink=[print, cache])   
... def handler(msg, **kwargs):
...     return msg.key(), msg.value()
snapstream.stream(**kwargs)[source]

Start the streams.

Ex:
>>> args = {
...     'env': 'DEV',
... }
>>> stream(**args)

Submodules

snapstream.caching module

Snapstream caching.

class snapstream.caching.Cache(path: str, options: Options | None = None, column_families: ~typing.Dict[str, Options] | None = None, access_type=<builtins.AccessType object>, target_table_size=26214400, number_of_locks=16)[source]

Bases: object

Create a RocksDB database in the specified folder.

>>> cache = Cache('db/mycache')            

The cache instance acts as a callable to store data:

>>> cache('key', {'msg': 'Hello World!'})  
>>> cache['key']                           
{'msg': 'Hello World!'}
close() None[source]

Flush memory to disk, and drop the current column family.

compact_range(begin: str | int | float | bytes | bool | None, end: str | int | float | bytes | bool | None, compact_opt: CompactOptions = <builtins.CompactOptions object>) None[source]

Run manual compaction on range for the current column family.

create_column_family(name: str, options: Options = <builtins.Options object>) Rdict[source]

Craete column family.

delete(key: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]

Delete item from database.

delete_range(begin: str | int | float | bytes | bool, end: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]

Delete database items, excluding end.

destroy(options: Options = <builtins.Options object>) None[source]

Delete the database.

drop_column_family(name: str) None[source]

Drop column family by name.

flush(wait: bool = True) None[source]

Manually flush the current column family.

flush_wal(sync: bool = True) None[source]

Manually flush the WAL buffer.

get(key: str | int | float | bytes | bool | List[str | int | float | bytes | bool], default: Any | None = None, read_opt: ReadOptions | None = None) Any | None[source]

Get item from database by key.

get_column_family(name: str) Rdict[source]

Get column family by name.

get_column_family_handle(name: str) ColumnFamily[source]

Get column family handle by name.

ingest_external_file(paths: ~typing.List[str], opts: IngestExternalFileOptions = <builtins.IngestExternalFileOptions object>) None[source]

Load list of SST files into current column family.

items(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictItems[source]

Get tuples of key-value pairs.

iter(read_opt: ReadOptions | None = None) RdictIter[source]

Get iterable.

key_may_exist(key: str | int | float | bytes | bool, fetch: bool = False, read_opt=None) bool | Tuple[bool, Any][source]

Check if a key exist without performing IO operations.

keys(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictKeys[source]

Get keys.

latest_sequence_number() int[source]

Get sequence number of the most recent transaction.

live_files() List[Dict[str, Any]][source]

Get list of all table files with their level, start- and end key.

path() str[source]

Get current database path.

property_int_value(name: str) int | None[source]

Get property as int by name from current column family.

property_value(name: str) str | None[source]

Get property by name from current column family.

put(key: str | int | float | bytes | bool, value: Any, write_opt: WriteOptions | None = None) None[source]

Put item in database using key.

set_dumps(dumps: Callable[[Any], bytes]) None[source]

Set custom dumps function.

set_loads(dumps: Callable[[bytes], Any]) None[source]

Set custom loads function.

set_options(options: Dict[str, str]) None[source]

Set options for current column family.

set_read_options(read_opt: ReadOptions) None[source]

Set custom read options.

set_write_options(write_opt: WriteOptions) None[source]

Set custom write options.

snapshot() Snapshot[source]

Create snapshot of current column family.

transaction(key) Any[source]

Lock the db entry while using the context manager.

values(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictValues[source]

Get values.

snapstream.codecs module

Snapstream codecs.

class snapstream.codecs.AvroCodec(schema: str | Schema)[source]

Bases: ICodec

Serialize/deserialize avro messages.

decode(s: bytes) object[source]

Deserialize message.

encode(obj: Any) bytes[source]

Serialize message.

class snapstream.codecs.ICodec[source]

Bases: object

Base class for codecs.

abstract decode(s: bytes) object[source]

Deserialize object.

abstract encode(obj: Any) bytes[source]

Serialize object.

class snapstream.codecs.JsonCodec[source]

Bases: ICodec

Serialize/deserialize json messages.

decode(s: bytes) object[source]

Deserialize message.

encode(obj: Any) bytes[source]

Serialize message.

snapstream.codecs.deserialize_json(msg: bytes) dict[source]

Deserialize json message.

snapstream.codecs.serialize_json(msg: dict) bytes[source]

Serialize json message.

snapstream.core module

Snapstream core objects.

class snapstream.core.Conf(*args, **kwargs)[source]

Bases: object

Define default kafka configuration, optionally.

>>> Conf({'bootstrap.servers': 'localhost:29091'})
{'bootstrap.servers': 'localhost:29091'}
conf: Dict[str, Any]
static distribute_messages(it, queue, kwargs)[source]

Publish messages from iterable.

iterables: Set[Tuple[str, Iterable]] = {}
register_iterables(*it)[source]

Add iterables to global Conf.

start(**kwargs)[source]

Start the streams.

class snapstream.core.ITopic(name: str, conf: Dict[str, Any] = {}, offset: int | None = None, codec: ICodec | None = None, **kwargs: Dict[str, Any])[source]

Bases: object

Base class for topic implementations.

abstract create_topic(name: str, *args: Any, **kwargs: Dict[str, Any]) None[source]

Create topic.

  • Should allow *args, **kwargs passthrough to kafka client.

  • Should log warning if topic already exists.

class snapstream.core.Topic(name: str, conf: dict = {}, offset: int | None = None, codec: ~snapstream.codecs.ICodec | None = None, flush_timeout: float = -1.0, poll_timeout: float = 1.0, pusher=<function _producer_handler>, poller=<function _consumer_handler>, dry: bool = False, raise_error: bool = False, commit_each_message: bool = False)[source]

Bases: ITopic

Act as a consumer and producer.

>>> topic = Topic('emoji', {
...     'bootstrap.servers': 'localhost:29091',
...     'auto.offset.reset': 'earliest',
...     'group.id': 'demo',
... })

Loop over topic (iterable) to consume from it:

>>> for msg in topic:               
...     print(msg.value())

Call topic (callable) with data to produce to it:

>>> topic({'msg': 'Hello World!'})  
admin()[source]

Get admin client.

property consumer: Consumer | None

Get underlying consumer object.

create_topic(*args, **kwargs) None[source]

Create topic.

property producer: Producer

Get underlying producer object.

snapstream.utils module

Snapstream utilities.

class snapstream.utils.KafkaIgnoredPropertyFilter(name='')[source]

Bases: Filter

Filter out specific kafka logging.

filter(record)[source]

Suppress CONFWARN messages with specific config keys.

class snapstream.utils.Singleton(name, bases, dct)[source]

Bases: type

Maintain a single instance of a class.

snapstream.utils.folder_size(folder: str, unit='mb')[source]

Get size of folder.

snapstream.utils.get_prefixed_variables(prefix: str, secrets_base_path='', key_sep='.') dict[source]

Get environment or file variables having prefix.

>>> environ['DEFAULT_EXAMPLE'] = 'test'
>>> get_prefixed_variables(prefix='DEFAULT_')
{'example': 'test'}
>>> del environ['DEFAULT_EXAMPLE']
snapstream.utils.get_variable(secret: str, secrets_base_path='', required=False) str | None[source]

Get environment or file variable.

snapstream.utils.with_type_hint(func: Callable[[...], Any]) Callable[[...], Any][source]

Retain function types after applying decorators.