跳至主要內容

liteyuki.comm.storage

远野千束大约 1 分钟API

def run_subscriber_receive_funcs(channel_: str, data: Any) -> None

运行订阅者接收函数

Args:

channel_: 频道

data: 数据

def on_get(data: tuple[str, dict[str, Any]]) -> None

def on_set(data: tuple[str, dict[str, Any]]) -> None

def on_delete(data: tuple[str, dict[str, Any]]) -> None

def on_get_all(data: tuple[str, dict[str, Any]]) -> None

def on_publish(data: tuple[str, Any]) -> None

def decorator(func: ON_RECEIVE_FUNC) -> ON_RECEIVE_FUNC

async def wrapper(data: Any) -> None

class Subscriber

def __init__(self) -> None

def receive(self) -> Any

def unsubscribe(self) -> None

class KeyValueStore

def __init__(self) -> None

def set(self, key: str, value: Any) -> None

 设置键值对

Args:

key: 键

value: 值

def get(self, key: str, default: Optional[Any]) -> Optional[Any]

 获取键值对

Args:

key: 键

default: 默认值

Returns:

Any: 值

def delete(self, key: str, ignore_key_error: bool) -> None

 删除键值对

Args:

key: 键

ignore_key_error: 是否忽略键不存在的错误

Returns:

def get_all(self) -> dict[str, Any]

 获取所有键值对

Returns:

dict[str, Any]: 键值对

def publish(self, channel_: str, data: Any) -> None

 发布消息

Args:

channel_: 频道

data: 数据

Returns:

def on_subscriber_receive(self, channel_: str) -> Callable[[ON_RECEIVE_FUNC], ON_RECEIVE_FUNC]

 订阅者接收消息时的回调

Args:

channel_: 频道

Returns:

装饰器

@staticmethod

def run_subscriber_receive_funcs(channel_: str, data: Any) -> None

 运行订阅者接收函数

Args:

channel_: 频道

data: 数据

class GlobalKeyValueStore

@classmethod

def get_instance(cls: Any) -> None

attr _instance: None

attr _lock: threading.Lock()

var key = data[1]['key']

var default = data[1]['default']

var recv_chan = data[1]['recv_chan']

var key = data[1]['key']

var value = data[1]['value']

var key = data[1]['key']

var recv_chan = data[1]['recv_chan']

var lock = _get_lock(key)

var lock = _get_lock(key)

var recv_chan = Channel[Optional[Any]]('recv_chan')

var lock = _get_lock(key)

var recv_chan = Channel[dict[str, Any]]('recv_chan')

var data = self.active_chan.receive()

var data = self.publish_channel.receive()