"""Snapstream caching."""
import os
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from rocksdict import (AccessType, ColumnFamily, CompactOptions,
DBCompactionStyle, DBCompressionType,
FifoCompactOptions, IngestExternalFileOptions, Options,
Rdict, RdictIter, ReadOptions, Snapshot, WriteOptions)
from rocksdict.rocksdict import RdictItems, RdictKeys, RdictValues
MB, MINUTES = 1024 * 1024, 60
[docs]class Cache:
"""Create a RocksDB database in the specified folder.
>>> cache = Cache('db/mycache') # doctest: +SKIP
The cache instance acts as a callable to store data:
>>> cache('key', {'msg': 'Hello World!'}) # doctest: +SKIP
>>> cache['key'] # doctest: +SKIP
{'msg': 'Hello World!'}
"""
def __init__(
self,
path: str,
options: Union[Options, None] = None,
column_families: Union[Dict[str, Options], None] = None,
access_type=AccessType.read_write(),
target_table_size=25 * MB
) -> None:
"""Create instance that holds rocksdb reference.
This configuration setup optimizes for low disk usage (25mb per table/cf).
The oldest records may be removed during compaction.
https://congyuwang.github.io/RocksDict/rocksdict.html
"""
self.name = path
options = options or self._default_options(target_table_size)
column_families = column_families or {
key: options
for key in Rdict.list_cf(path, options)
} if os.path.exists(path + '/CURRENT') else {}
self.db = Rdict(path, options, column_families, access_type)
@staticmethod
def _default_options(target_table_size: int):
options = Options()
compaction_options = FifoCompactOptions()
compaction_options.max_table_files_size = target_table_size
options.create_if_missing(True)
options.set_max_background_jobs(os.cpu_count() or 2)
options.increase_parallelism(os.cpu_count() or 2)
options.set_log_file_time_to_roll(30 * MINUTES)
options.set_keep_log_file_num(1)
options.set_max_log_file_size(int(0.1 * MB))
options.set_max_manifest_file_size(MB)
options.set_fifo_compaction_options(compaction_options)
options.set_compaction_style(DBCompactionStyle.fifo())
options.set_level_zero_file_num_compaction_trigger(4)
options.set_level_zero_slowdown_writes_trigger(6)
options.set_level_zero_stop_writes_trigger(8)
options.set_max_write_buffer_number(2)
options.set_write_buffer_size(1 * MB)
options.set_target_file_size_base(256 * MB)
options.set_max_bytes_for_level_base(1024 * MB)
options.set_max_bytes_for_level_multiplier(4.0)
options.set_compression_type(DBCompressionType.lz4())
options.set_delete_obsolete_files_period_micros(10 * 1000)
return options
def __call__(self, key, val, *args) -> None:
"""Call cache to set item."""
self.__setitem__(key, val)
def __contains__(self, key) -> bool:
"""Key exists in db."""
return key in self.db
def __delitem__(self, key) -> None:
"""Delete item from db."""
del self.db[key]
def __getitem__(self, key) -> Any:
"""Get item from db or None."""
try:
return self.db[key]
except KeyError:
pass
def __setitem__(self, key, val) -> None:
"""Set item in db."""
self.db[key] = val
def __enter__(self) -> 'Cache':
"""Contextmanager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit contextmanager."""
self.close()
[docs] def set_dumps(self, dumps: Callable[[Any], bytes]) -> None:
"""Set custom dumps function."""
return self.db.set_dumps(dumps)
[docs] def set_loads(self, dumps: Callable[[bytes], Any]) -> None:
"""Set custom loads function."""
return self.db.set_loads(dumps)
[docs] def set_read_options(self, read_opt: ReadOptions) -> None:
"""Set custom read options."""
return self.db.set_read_options(read_opt)
[docs] def set_write_options(self, write_opt: WriteOptions) -> None:
"""Set custom write options."""
return self.db.set_write_options(write_opt)
[docs] def get(
self,
key: Union[str, int, float, bytes, bool, List[str], List[int], List[float], List[bytes], List[bool]],
default: Any = None,
read_opt: Union[ReadOptions, None] = None
) -> Optional[Any]:
"""Get item from database by key."""
return self.db.get(key, default, read_opt)
[docs] def put(
self,
key: Union[str, int, float, bytes, bool],
value: Any,
write_opt: Union[WriteOptions, None] = None
) -> None:
"""Put item in database using key."""
return self.db.put(key, value, write_opt)
[docs] def delete(
self,
key: Union[str, int, float, bytes, bool],
write_opt: Union[WriteOptions, None] = None
) -> None:
"""Delete item from database."""
return self.db.delete(key, write_opt)
[docs] def key_may_exist(
self,
key: Union[str, int, float, bytes, bool],
fetch: bool = False,
read_opt=None
) -> Union[bool, Tuple[bool, Any]]:
"""Check if a key exist without performing IO operations."""
return self.db.key_may_exist(key, fetch, read_opt)
[docs] def iter(self, read_opt: Union[ReadOptions, None] = None) -> RdictIter:
"""Get iterable."""
return self.db.iter(read_opt)
[docs] def items(
self,
backwards: bool = False,
from_key: Union[str, int, float, bytes, bool, None] = None,
read_opt: Union[ReadOptions, None] = None
) -> RdictItems:
"""Get tuples of key-value pairs."""
return self.db.items(backwards, from_key, read_opt)
[docs] def keys(
self,
backwards: bool = False,
from_key: Union[str, int, float, bytes, bool, None] = None,
read_opt: Union[ReadOptions, None] = None
) -> RdictKeys:
"""Get keys."""
return self.db.keys(backwards, from_key, read_opt)
[docs] def values(
self,
backwards: bool = False,
from_key: Union[str, int, float, bytes, bool, None] = None,
read_opt: Union[ReadOptions, None] = None
) -> RdictValues:
"""Get values."""
return self.db.values(backwards, from_key, read_opt)
[docs] def ingest_external_file(
self,
paths: List[str],
opts: IngestExternalFileOptions = IngestExternalFileOptions()
) -> None:
"""Load list of SST files into current column family."""
return self.db.ingest_external_file(paths, opts)
[docs] def get_column_family(self, name: str) -> Rdict:
"""Get column family by name."""
return self.db.get_column_family(name)
[docs] def get_column_family_handle(self, name: str) -> ColumnFamily:
"""Get column family handle by name."""
return self.db.get_column_family_handle(name)
[docs] def drop_column_family(self, name: str) -> None:
"""Drop column family by name."""
return self.db.drop_column_family(name)
[docs] def create_column_family(self, name: str, options: Options = Options()) -> Rdict:
"""Craete column family."""
return self.db.create_column_family(name, options)
[docs] def delete_range(
self,
begin: Union[str, int, float, bytes, bool],
end: Union[str, int, float, bytes, bool],
write_opt: Union[WriteOptions, None] = None
) -> None:
"""Delete database items, excluding end."""
return self.db.delete_range(begin, end, write_opt)
[docs] def snapshot(self) -> Snapshot:
"""Create snapshot of current column family."""
return self.db.snapshot()
[docs] def path(self) -> str:
"""Get current database path."""
return self.db.path()
[docs] def set_options(self, options: Dict[str, str]) -> None:
"""Set options for current column family."""
return self.db.set_options(options)
[docs] def property_value(self, name: str) -> Union[str, None]:
"""Get property by name from current column family."""
return self.db.property_value(name)
[docs] def property_int_value(self, name: str) -> Union[int, None]:
"""Get property as int by name from current column family."""
return self.db.property_int_value(name)
[docs] def latest_sequence_number(self) -> int:
"""Get sequence number of the most recent transaction."""
return self.db.latest_sequence_number()
[docs] def live_files(self) -> List[Dict[str, Any]]:
"""Get list of all table files with their level, start- and end key."""
return self.db.live_files()
[docs] def compact_range(
self, begin: Union[str, int, float, bytes, bool, None],
end: Union[str, int, float, bytes, bool, None],
compact_opt: CompactOptions = CompactOptions()
) -> None:
"""Run manual compaction on range for the current column family."""
return self.db.compact_range(begin, end, compact_opt)
[docs] def close(self) -> None:
"""Flush memory to disk, and drop the current column family."""
return self.db.close()
[docs] def flush(self, wait: bool = True) -> None:
"""Manually flush the current column family."""
return self.db.flush(wait)
[docs] def flush_wal(self, sync: bool = True) -> None:
"""Manually flush the WAL buffer."""
return self.db.flush_wal(sync)
[docs] def destroy(self, options: Options = Options()) -> None:
"""Delete the database."""
return Rdict.destroy(self.name, options)