snapstream package
Snapstream public objects.
- class snapstream.Cache(path: str, options: Options | None = None, column_families: ~typing.Dict[str, Options] | None = None, access_type=<builtins.AccessType object>, target_table_size=26214400, number_of_locks=16)[source]
Bases:
objectCreate a RocksDB database in the specified folder.
>>> cache = Cache('db/mycache')
The cache instance acts as a callable to store data:
>>> cache('key', {'msg': 'Hello World!'}) >>> cache['key'] {'msg': 'Hello World!'}
- compact_range(begin: str | int | float | bytes | bool | None, end: str | int | float | bytes | bool | None, compact_opt: CompactOptions = <builtins.CompactOptions object>) None[source]
Run manual compaction on range for the current column family.
- create_column_family(name: str, options: Options = <builtins.Options object>) Rdict[source]
Craete column family.
- delete(key: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]
Delete item from database.
- delete_range(begin: str | int | float | bytes | bool, end: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]
Delete database items, excluding end.
- get(key: str | int | float | bytes | bool | List[str] | List[int] | List[float] | List[bytes] | List[bool], default: Any | None = None, read_opt: ReadOptions | None = None) Any | None[source]
Get item from database by key.
- ingest_external_file(paths: ~typing.List[str], opts: IngestExternalFileOptions = <builtins.IngestExternalFileOptions object>) None[source]
Load list of SST files into current column family.
- items(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictItems[source]
Get tuples of key-value pairs.
- key_may_exist(key: str | int | float | bytes | bool, fetch: bool = False, read_opt=None) bool | Tuple[bool, Any][source]
Check if a key exist without performing IO operations.
- keys(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictKeys[source]
Get keys.
- live_files() List[Dict[str, Any]][source]
Get list of all table files with their level, start- and end key.
- property_int_value(name: str) int | None[source]
Get property as int by name from current column family.
- class snapstream.Conf(*args, **kwargs)[source]
Bases:
objectDefine default kafka configuration, optionally.
>>> Conf({'bootstrap.servers': 'localhost:29091'}) {'bootstrap.servers': 'localhost:29091'}
- iterables: Set[Tuple[str, Iterable]] = {}
- class snapstream.Topic(name: str, conf: dict = {}, offset: int | None = None, codec: ~snapstream.codecs.ICodec | None = None, flush_timeout: float = -1.0, poll_timeout: float = 1.0, pusher=<function _producer_handler>, poller=<function _consumer_handler>, dry: bool = False, raise_error: bool = False)[source]
Bases:
ITopicAct as a consumer and producer.
>>> topic = Topic('emoji', { ... 'bootstrap.servers': 'localhost:29091', ... 'auto.offset.reset': 'earliest', ... 'group.id': 'demo', ... })
Loop over topic (iterable) to consume from it:
>>> for msg in topic: ... print(msg.value())
Call topic (callable) with data to produce to it:
>>> topic({'msg': 'Hello World!'})
- snapstream.snap(*iterable: Iterable, sink: Iterable[Callable[[...], None]] = [])[source]
Snaps function to stream.
- Ex:
>>> topic = Topic('demo') >>> cache = Cache('state/demo')
>>> @snap(topic, sink=[print, cache]) ... def handler(msg, **kwargs): ... return msg.key(), msg.value()
- snapstream.stream(**kwargs)[source]
Start the streams.
- Ex:
>>> args = { ... 'env': 'DEV', ... } >>> stream(**args)
Submodules
snapstream.caching module
Snapstream caching.
- class snapstream.caching.Cache(path: str, options: Options | None = None, column_families: ~typing.Dict[str, Options] | None = None, access_type=<builtins.AccessType object>, target_table_size=26214400, number_of_locks=16)[source]
Bases:
objectCreate a RocksDB database in the specified folder.
>>> cache = Cache('db/mycache')
The cache instance acts as a callable to store data:
>>> cache('key', {'msg': 'Hello World!'}) >>> cache['key'] {'msg': 'Hello World!'}
- compact_range(begin: str | int | float | bytes | bool | None, end: str | int | float | bytes | bool | None, compact_opt: CompactOptions = <builtins.CompactOptions object>) None[source]
Run manual compaction on range for the current column family.
- create_column_family(name: str, options: Options = <builtins.Options object>) Rdict[source]
Craete column family.
- delete(key: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]
Delete item from database.
- delete_range(begin: str | int | float | bytes | bool, end: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]
Delete database items, excluding end.
- get(key: str | int | float | bytes | bool | List[str] | List[int] | List[float] | List[bytes] | List[bool], default: Any | None = None, read_opt: ReadOptions | None = None) Any | None[source]
Get item from database by key.
- ingest_external_file(paths: ~typing.List[str], opts: IngestExternalFileOptions = <builtins.IngestExternalFileOptions object>) None[source]
Load list of SST files into current column family.
- items(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictItems[source]
Get tuples of key-value pairs.
- key_may_exist(key: str | int | float | bytes | bool, fetch: bool = False, read_opt=None) bool | Tuple[bool, Any][source]
Check if a key exist without performing IO operations.
- keys(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictKeys[source]
Get keys.
- live_files() List[Dict[str, Any]][source]
Get list of all table files with their level, start- and end key.
- property_int_value(name: str) int | None[source]
Get property as int by name from current column family.
snapstream.codecs module
Snapstream codecs.
- class snapstream.codecs.AvroCodec(schema: str | Schema)[source]
Bases:
ICodecSerialize/deserialize avro messages.
snapstream.core module
Snapstream core objects.
- class snapstream.core.Conf(*args, **kwargs)[source]
Bases:
objectDefine default kafka configuration, optionally.
>>> Conf({'bootstrap.servers': 'localhost:29091'}) {'bootstrap.servers': 'localhost:29091'}
- conf: Dict[str, Any]
- iterables: Set[Tuple[str, Iterable]] = {}
- class snapstream.core.ITopic(name: str, conf: Dict[str, Any] = {}, offset: int | None = None, codec: ICodec | None = None, **kwargs: Dict[str, Any])[source]
Bases:
objectBase class for topic implementations.
- class snapstream.core.Topic(name: str, conf: dict = {}, offset: int | None = None, codec: ~snapstream.codecs.ICodec | None = None, flush_timeout: float = -1.0, poll_timeout: float = 1.0, pusher=<function _producer_handler>, poller=<function _consumer_handler>, dry: bool = False, raise_error: bool = False)[source]
Bases:
ITopicAct as a consumer and producer.
>>> topic = Topic('emoji', { ... 'bootstrap.servers': 'localhost:29091', ... 'auto.offset.reset': 'earliest', ... 'group.id': 'demo', ... })
Loop over topic (iterable) to consume from it:
>>> for msg in topic: ... print(msg.value())
Call topic (callable) with data to produce to it:
>>> topic({'msg': 'Hello World!'})
snapstream.utils module
Snapstream utilities.
- class snapstream.utils.KafkaIgnoredPropertyFilter(name='')[source]
Bases:
FilterFilter out specific kafka logging.
- class snapstream.utils.Singleton(name, bases, dct)[source]
Bases:
typeMaintain a single instance of a class.
- snapstream.utils.get_prefixed_variables(prefix: str, secrets_base_path='', key_sep='.') dict[source]
Get environment or file variables having prefix.
>>> environ['DEFAULT_EXAMPLE'] = 'test' >>> get_prefixed_variables(prefix='DEFAULT_') {'example': 'test'} >>> del environ['DEFAULT_EXAMPLE']