Source code for snapstream.caching

"""Snapstream caching."""

import os
from contextlib import contextmanager
from threading import RLock
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from rocksdict import (
    AccessType,
    ColumnFamily,
    CompactOptions,
    DBCompactionStyle,
    DBCompressionType,
    FifoCompactOptions,
    IngestExternalFileOptions,
    Options,
    Rdict,
    RdictIter,
    ReadOptions,
    Snapshot,
    WriteOptions,
)
from rocksdict.rocksdict import RdictItems, RdictKeys, RdictValues

MB, MINUTES = 1024 * 1024, 60


[docs] class Cache: """Create a RocksDB database in the specified folder. >>> cache = Cache('db/mycache') # doctest: +SKIP The cache instance acts as a callable to store data: >>> cache('key', {'msg': 'Hello World!'}) # doctest: +SKIP >>> cache['key'] # doctest: +SKIP {'msg': 'Hello World!'} """ def __init__( self, path: str, options: Union[Options, None] = None, column_families: Union[Dict[str, Options], None] = None, access_type=AccessType.read_write(), target_table_size=25 * MB, number_of_locks=16 ) -> None: """Create instance that holds rocksdb reference. This configuration setup optimizes for low disk usage (25mb per table/cf). The oldest records may be removed during compaction. https://congyuwang.github.io/RocksDict/rocksdict.html """ self.name = path self._number_of_locks = number_of_locks self._locks = [RLock() for _ in range(self._number_of_locks)] options = options or self._default_options(target_table_size) column_families = column_families or { key: options for key in Rdict.list_cf(path, options) } if os.path.exists(path + '/CURRENT') else {} self.db = Rdict(path, options, column_families, access_type) @staticmethod def _default_options(target_table_size: int): options = Options() compaction_options = FifoCompactOptions() compaction_options.max_table_files_size = target_table_size options.create_if_missing(True) options.set_max_background_jobs(os.cpu_count() or 2) options.increase_parallelism(os.cpu_count() or 2) options.set_log_file_time_to_roll(30 * MINUTES) options.set_keep_log_file_num(1) options.set_max_log_file_size(int(0.1 * MB)) options.set_max_manifest_file_size(MB) options.set_fifo_compaction_options(compaction_options) options.set_compaction_style(DBCompactionStyle.fifo()) options.set_level_zero_file_num_compaction_trigger(4) options.set_level_zero_slowdown_writes_trigger(6) options.set_level_zero_stop_writes_trigger(8) options.set_max_write_buffer_number(2) options.set_write_buffer_size(1 * MB) options.set_target_file_size_base(256 * MB) options.set_max_bytes_for_level_base(1024 * MB) options.set_max_bytes_for_level_multiplier(4.0) options.set_compression_type(DBCompressionType.lz4()) options.set_delete_obsolete_files_period_micros(10 * 1000) return options @contextmanager def _get_lock(self, key): """Get lock from a pool of locks based on key.""" index = hash(key) % self._number_of_locks with (lock := self._locks[index]): yield lock def __call__(self, key, val, *args) -> None: """Call cache to set item.""" self.__setitem__(key, val) def __contains__(self, key) -> bool: """Key exists in db.""" return key in self.db def __delitem__(self, key) -> None: """Delete item from db.""" del self.db[key] def __getitem__(self, key) -> Any: """Get item from db or None.""" try: return self.db[key] except KeyError: pass def __setitem__(self, key, val) -> None: """Set item in db.""" with self._get_lock(key): self.db[key] = val def __enter__(self) -> 'Cache': """Contextmanager.""" return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Exit contextmanager.""" self.close()
[docs] def set_dumps(self, dumps: Callable[[Any], bytes]) -> None: """Set custom dumps function.""" return self.db.set_dumps(dumps)
[docs] def set_loads(self, dumps: Callable[[bytes], Any]) -> None: """Set custom loads function.""" return self.db.set_loads(dumps)
[docs] def set_read_options(self, read_opt: ReadOptions) -> None: """Set custom read options.""" return self.db.set_read_options(read_opt)
[docs] def set_write_options(self, write_opt: WriteOptions) -> None: """Set custom write options.""" return self.db.set_write_options(write_opt)
[docs] @contextmanager def transaction(self, key) -> Any: """Lock the db entry while using the context manager.""" with self._get_lock(key): yield self
[docs] def get( self, key: Union[str, int, float, bytes, bool, List[str], List[int], List[float], List[bytes], List[bool]], default: Any = None, read_opt: Union[ReadOptions, None] = None ) -> Optional[Any]: """Get item from database by key.""" return self.db.get(key, default, read_opt)
[docs] def put( self, key: Union[str, int, float, bytes, bool], value: Any, write_opt: Union[WriteOptions, None] = None ) -> None: """Put item in database using key.""" with self._get_lock(key): return self.db.put(key, value, write_opt)
[docs] def delete( self, key: Union[str, int, float, bytes, bool], write_opt: Union[WriteOptions, None] = None ) -> None: """Delete item from database.""" return self.db.delete(key, write_opt)
[docs] def key_may_exist( self, key: Union[str, int, float, bytes, bool], fetch: bool = False, read_opt=None ) -> Union[bool, Tuple[bool, Any]]: """Check if a key exist without performing IO operations.""" return self.db.key_may_exist(key, fetch, read_opt)
[docs] def iter(self, read_opt: Union[ReadOptions, None] = None) -> RdictIter: """Get iterable.""" return self.db.iter(read_opt)
[docs] def items( self, backwards: bool = False, from_key: Union[str, int, float, bytes, bool, None] = None, read_opt: Union[ReadOptions, None] = None ) -> RdictItems: """Get tuples of key-value pairs.""" return self.db.items(backwards, from_key, read_opt)
[docs] def keys( self, backwards: bool = False, from_key: Union[str, int, float, bytes, bool, None] = None, read_opt: Union[ReadOptions, None] = None ) -> RdictKeys: """Get keys.""" return self.db.keys(backwards, from_key, read_opt)
[docs] def values( self, backwards: bool = False, from_key: Union[str, int, float, bytes, bool, None] = None, read_opt: Union[ReadOptions, None] = None ) -> RdictValues: """Get values.""" return self.db.values(backwards, from_key, read_opt)
[docs] def ingest_external_file( self, paths: List[str], opts: IngestExternalFileOptions = IngestExternalFileOptions() ) -> None: """Load list of SST files into current column family.""" return self.db.ingest_external_file(paths, opts)
[docs] def get_column_family(self, name: str) -> Rdict: """Get column family by name.""" return self.db.get_column_family(name)
[docs] def get_column_family_handle(self, name: str) -> ColumnFamily: """Get column family handle by name.""" return self.db.get_column_family_handle(name)
[docs] def drop_column_family(self, name: str) -> None: """Drop column family by name.""" return self.db.drop_column_family(name)
[docs] def create_column_family(self, name: str, options: Options = Options()) -> Rdict: """Craete column family.""" return self.db.create_column_family(name, options)
[docs] def delete_range( self, begin: Union[str, int, float, bytes, bool], end: Union[str, int, float, bytes, bool], write_opt: Union[WriteOptions, None] = None ) -> None: """Delete database items, excluding end.""" return self.db.delete_range(begin, end, write_opt)
[docs] def snapshot(self) -> Snapshot: """Create snapshot of current column family.""" return self.db.snapshot()
[docs] def path(self) -> str: """Get current database path.""" return self.db.path()
[docs] def set_options(self, options: Dict[str, str]) -> None: """Set options for current column family.""" return self.db.set_options(options)
[docs] def property_value(self, name: str) -> Union[str, None]: """Get property by name from current column family.""" return self.db.property_value(name)
[docs] def property_int_value(self, name: str) -> Union[int, None]: """Get property as int by name from current column family.""" return self.db.property_int_value(name)
[docs] def latest_sequence_number(self) -> int: """Get sequence number of the most recent transaction.""" return self.db.latest_sequence_number()
[docs] def live_files(self) -> List[Dict[str, Any]]: """Get list of all table files with their level, start- and end key.""" return self.db.live_files()
[docs] def compact_range( self, begin: Union[str, int, float, bytes, bool, None], end: Union[str, int, float, bytes, bool, None], compact_opt: CompactOptions = CompactOptions() ) -> None: """Run manual compaction on range for the current column family.""" return self.db.compact_range(begin, end, compact_opt)
[docs] def close(self) -> None: """Flush memory to disk, and drop the current column family.""" return self.db.close()
[docs] def flush(self, wait: bool = True) -> None: """Manually flush the current column family.""" return self.db.flush(wait)
[docs] def flush_wal(self, sync: bool = True) -> None: """Manually flush the WAL buffer.""" return self.db.flush_wal(sync)
[docs] def destroy(self, options: Options = Options()) -> None: """Delete the database.""" return Rdict.destroy(self.name, options)