liteyuki.comm.storage
大约 1 分钟API
def run_subscriber_receive_funcs(channel_: str, data: Any) -> None
运行订阅者接收函数
Args:
channel_: 频道
data: 数据
def on_get(data: tuple[str, dict[str, Any]]) -> None
def on_set(data: tuple[str, dict[str, Any]]) -> None
def on_delete(data: tuple[str, dict[str, Any]]) -> None
def on_get_all(data: tuple[str, dict[str, Any]]) -> None
def on_publish(data: tuple[str, Any]) -> None
def decorator(func: ON_RECEIVE_FUNC) -> ON_RECEIVE_FUNC
async def wrapper(data: Any) -> None
class Subscriber
def __init__(self) -> None
def receive(self) -> Any
def unsubscribe(self) -> None
class KeyValueStore
def __init__(self) -> None
def set(self, key: str, value: Any) -> None
设置键值对
Args:
key: 键
value: 值
def get(self, key: str, default: Optional[Any]) -> Optional[Any]
获取键值对
Args:
key: 键
default: 默认值
Returns:
Any: 值
def delete(self, key: str, ignore_key_error: bool) -> None
删除键值对
Args:
key: 键
ignore_key_error: 是否忽略键不存在的错误
Returns:
def get_all(self) -> dict[str, Any]
获取所有键值对
Returns:
dict[str, Any]: 键值对
def publish(self, channel_: str, data: Any) -> None
发布消息
Args:
channel_: 频道
data: 数据
Returns:
def on_subscriber_receive(self, channel_: str) -> Callable[[ON_RECEIVE_FUNC], ON_RECEIVE_FUNC]
订阅者接收消息时的回调
Args:
channel_: 频道
Returns:
装饰器
@staticmethod
def run_subscriber_receive_funcs(channel_: str, data: Any) -> None
运行订阅者接收函数
Args:
channel_: 频道
data: 数据
class GlobalKeyValueStore
@classmethod
def get_instance(cls: Any) -> None