From 222250bc41140574675f0f52731e82c79ad98011 Mon Sep 17 00:00:00 2001 From: snowy Date: Fri, 16 Aug 2024 21:38:22 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E6=96=B0=E5=A2=9E=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E5=AE=89=E5=85=A8=E5=85=B1=E4=BA=AB=E5=86=85=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=AD=98=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- liteyuki/__init__.py | 19 ++- liteyuki/bot/__init__.py | 9 +- liteyuki/comm/__init__.py | 19 ++- liteyuki/comm/channel.py | 34 ++++-- liteyuki/comm/shared_memory.py | 14 --- liteyuki/comm/storage.py | 114 ++++++++++++++++++ liteyuki/config.py | 2 +- liteyuki/core/manager.py | 40 ++++-- liteyuki/plugins/plugins_loader/__init__.py | 20 ++- liteyuki/utils.py | 2 + src/liteyuki_main/loader.py | 2 +- .../nonebot_launcher/__init__.py | 10 +- .../divided_by_lifespan/__init__.py | 3 +- .../divided_by_lifespan/after_start.py | 4 +- src/nonebot_plugins/ts_ly_comm.py | 20 +++ 15 files changed, 246 insertions(+), 66 deletions(-) delete mode 100644 liteyuki/comm/shared_memory.py create mode 100644 liteyuki/comm/storage.py create mode 100644 src/nonebot_plugins/ts_ly_comm.py diff --git a/liteyuki/__init__.py b/liteyuki/__init__.py index ea4561fa..2ff0eee6 100644 --- a/liteyuki/__init__.py +++ b/liteyuki/__init__.py @@ -7,7 +7,6 @@ from liteyuki.bot import ( from liteyuki.comm import ( Channel, - chan, Event ) @@ -17,7 +16,19 @@ from liteyuki.plugin import ( ) from liteyuki.log import ( - logger, - init_log - + init_log, + logger ) + +__all__ = [ + "LiteyukiBot", + "get_bot", + "get_config", + "get_config_with_compat", + "Channel", + "Event", + "load_plugin", + "load_plugins", + "init_log", + "logger" +] \ No newline at end of file diff --git a/liteyuki/bot/__init__.py b/liteyuki/bot/__init__.py index c0684f0e..ba26122e 100644 --- a/liteyuki/bot/__init__.py +++ b/liteyuki/bot/__init__.py @@ -47,9 +47,9 @@ class LiteyukiBot: """ 启动逻辑 """ - self.lifespan.before_start() # 启动前钩子 + self.lifespan.before_start() # 启动前钩子 self.process_manager.start_all() - self.lifespan.after_start() # 启动后钩子 + self.lifespan.after_start() # 启动后钩子 self.keep_alive() def keep_alive(self): @@ -98,10 +98,9 @@ class LiteyukiBot: Args: name: 进程名称, 默认为None, 所有进程 Returns: - """ - self.loop.create_task(self.lifespan.before_process_shutdown()) # 重启前钩子 - self.loop.create_task(self.lifespan.before_process_shutdown()) # 停止前钩子 + self.lifespan.before_process_shutdown() # 重启前钩子 + self.lifespan.before_process_shutdown() # 停止前钩子 if name is not None: chan_active = get_channel(f"{name}-active") diff --git a/liteyuki/comm/__init__.py b/liteyuki/comm/__init__.py index ac4c6a78..c6efd16b 100644 --- a/liteyuki/comm/__init__.py +++ b/liteyuki/comm/__init__.py @@ -11,20 +11,31 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved """ from liteyuki.comm.channel import ( Channel, - chan, get_channel, set_channel, set_channels, - get_channels + get_channels, + active_channel, + passive_channel ) from liteyuki.comm.event import Event __all__ = [ "Channel", - "chan", "Event", "get_channel", "set_channel", "set_channels", - "get_channels" + "get_channels", + "active_channel", + "passive_channel" ] + +from liteyuki.utils import IS_MAIN_PROCESS + +# 第一次引用必定为赋值 +_ref_count = 0 +if not IS_MAIN_PROCESS: + if (active_channel is None or passive_channel is None) and _ref_count > 0: + raise RuntimeError("Error: Channel not initialized in sub process") + _ref_count += 1 diff --git a/liteyuki/comm/channel.py b/liteyuki/comm/channel.py index 3af074f2..b7ae95d2 100644 --- a/liteyuki/comm/channel.py +++ b/liteyuki/comm/channel.py @@ -17,7 +17,7 @@ from multiprocessing import Pipe from typing import Any, Optional, Callable, Awaitable, List, TypeAlias from uuid import uuid4 -from liteyuki.utils import is_coroutine_callable, run_coroutine +from liteyuki.utils import IS_MAIN_PROCESS, is_coroutine_callable, run_coroutine SYNC_ON_RECEIVE_FUNC: TypeAlias = Callable[[Any], Any] ASYNC_ON_RECEIVE_FUNC: TypeAlias = Callable[[Any], Awaitable[Any]] @@ -27,11 +27,13 @@ SYNC_FILTER_FUNC: TypeAlias = Callable[[Any], bool] ASYNC_FILTER_FUNC: TypeAlias = Callable[[Any], Awaitable[bool]] FILTER_FUNC: TypeAlias = SYNC_FILTER_FUNC | ASYNC_FILTER_FUNC - - _channel: dict[str, "Channel"] = {} _callback_funcs: dict[str, ON_RECEIVE_FUNC] = {} +"""子进程可用的主动和被动通道""" +active_channel: Optional["Channel"] = None +passive_channel: Optional["Channel"] = None + class Channel: """ @@ -40,8 +42,6 @@ class Channel: """ def __init__(self, _id: str): - # self.main_send_conn, self.sub_receive_conn = Pipe() - # self.sub_send_conn, self.main_receive_conn = Pipe() self.conn_send, self.conn_recv = Pipe() self._closed = False self._on_main_receive_funcs: list[str] = [] @@ -102,12 +102,16 @@ class Channel: async def wrapper(data: Any) -> Any: if filter_func is not None: if is_coroutine_callable(filter_func): - if not await filter_func(data): + if not (await filter_func(data)): return else: if not filter_func(data): return - return await func(data) + + if is_coroutine_callable(func): + return await func(data) + else: + return func(data) function_id = str(uuid4()) _callback_funcs[function_id] = wrapper @@ -164,10 +168,6 @@ class Channel: return self.receive() -"""默认通道实例,可直接从模块导入使用""" -chan = Channel("default") - - def set_channel(name: str, channel: Channel): """ 设置通道实例 @@ -175,6 +175,9 @@ def set_channel(name: str, channel: Channel): name: 通道名称 channel: 通道实例 """ + if not IS_MAIN_PROCESS: + raise RuntimeError(f"Function {__name__} should only be called in the main process.") + if not isinstance(channel, Channel): raise TypeError(f"channel must be an instance of Channel, {type(channel)} found") _channel[name] = channel @@ -186,6 +189,9 @@ def set_channels(channels: dict[str, Channel]): Args: channels: 通道名称 """ + if not IS_MAIN_PROCESS: + raise RuntimeError(f"Function {__name__} should only be called in the main process.") + for name, channel in channels.items(): set_channel(name, channel) @@ -197,6 +203,9 @@ def get_channel(name: str) -> Optional[Channel]: name: 通道名称 Returns: """ + if not IS_MAIN_PROCESS: + raise RuntimeError(f"Function {__name__} should only be called in the main process.") + return _channel.get(name, None) @@ -205,4 +214,7 @@ def get_channels() -> dict[str, Channel]: 获取通道实例 Returns: """ + if not IS_MAIN_PROCESS: + raise RuntimeError(f"Function {__name__} should only be called in the main process.") + return _channel diff --git a/liteyuki/comm/shared_memory.py b/liteyuki/comm/shared_memory.py deleted file mode 100644 index 1cfc6f67..00000000 --- a/liteyuki/comm/shared_memory.py +++ /dev/null @@ -1,14 +0,0 @@ -# -*- coding: utf-8 -*- -""" -共享内存模块。类似于redis,但是更加轻量级。 -""" - -memory_database = {} - - -def set_memory(key: str, value: any) -> None: - pass - - -def get_mem_data(key: str) -> any: - pass diff --git a/liteyuki/comm/storage.py b/liteyuki/comm/storage.py new file mode 100644 index 00000000..6c815036 --- /dev/null +++ b/liteyuki/comm/storage.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +""" +共享内存模块。类似于redis,但是更加轻量级并且线程安全 +""" + +import threading +from typing import Any, Optional + +from liteyuki.utils import IS_MAIN_PROCESS +from liteyuki.comm.channel import Channel + +if IS_MAIN_PROCESS: + _locks = {} + + +def _get_lock(key): + if IS_MAIN_PROCESS: + if key not in _locks: + _locks[key] = threading.Lock() + return _locks[key] + else: + raise RuntimeError("Cannot get lock in sub process.") + + +class KeyValueStore: + def __init__(self): + self._store = {} + + self.active_chan = Channel(_id="shared_memory-active") + self.passive_chan = Channel(_id="shared_memory-passive") + + def set(self, key: str, value: any) -> None: + if IS_MAIN_PROCESS: + lock = _get_lock(key) + with lock: + self._store[key] = value + else: + # 向主进程发送请求拿取 + self.passive_chan.send(("set", key, value)) + + def get(self, key: str, default: Optional[any] = None) -> any: + if IS_MAIN_PROCESS: + lock = _get_lock(key) + with lock: + return self._store.get(key, default) + else: + self.passive_chan.send(("get", key, default)) + return self.active_chan.receive() + + def delete(self, key: str) -> None: + if IS_MAIN_PROCESS: + lock = _get_lock(key) + with lock: + if key in self._store: + del self._store[key] + del _locks[key] + else: + # 向主进程发送请求删除 + self.passive_chan.send(("delete", key)) + + def get_all(self) -> dict[str, any]: + if IS_MAIN_PROCESS: + return self._store + else: + self.passive_chan.send(("get_all",)) + return self.active_chan.receive() + + +class GlobalKeyValueStore: + _instance = None + _lock = threading.Lock() + + @classmethod + def get_instance(cls): + if IS_MAIN_PROCESS: + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = KeyValueStore() + return cls._instance + else: + raise RuntimeError("Cannot get instance in sub process.") + + +shared_memory: Optional[KeyValueStore] = None + +# 全局单例访问点 +if IS_MAIN_PROCESS: + shared_memory = GlobalKeyValueStore.get_instance() + + + @shared_memory.passive_chan.on_receive(lambda d: d[0] == "get") + def on_get(d): + print(shared_memory.get_all()) + shared_memory.active_chan.send(shared_memory.get(d[1], d[2])) + print("发送数据:", shared_memory.get(d[1], d[2])) + + + @shared_memory.passive_chan.on_receive(lambda d: d[0] == "set") + def on_set(d): + shared_memory.set(d[1], d[2]) + + + @shared_memory.passive_chan.on_receive(lambda d: d[0] == "delete") + def on_delete(d): + shared_memory.delete(d[1]) +else: + shared_memory = None + +_ref_count = 0 # 引用计数 +if not IS_MAIN_PROCESS: + if (shared_memory is None) and _ref_count > 1: + raise RuntimeError("Shared memory not initialized.") + _ref_count += 1 diff --git a/liteyuki/config.py b/liteyuki/config.py index b50adc5e..e020bac7 100644 --- a/liteyuki/config.py +++ b/liteyuki/config.py @@ -16,7 +16,7 @@ import toml import yaml from pydantic import BaseModel -from liteyuki import logger +from liteyuki.log import logger _SUPPORTED_CONFIG_FORMATS = (".yaml", ".yml", ".json", ".toml") diff --git a/liteyuki/core/manager.py b/liteyuki/core/manager.py index f2e3cbca..c28346a5 100644 --- a/liteyuki/core/manager.py +++ b/liteyuki/core/manager.py @@ -10,18 +10,21 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved """ import atexit -import threading import signal +import threading from multiprocessing import Process -from typing import Any, Callable, Optional, Protocol, TYPE_CHECKING, TypeAlias +from typing import Any, Callable, TYPE_CHECKING, TypeAlias from liteyuki.comm import Channel, get_channel, set_channels +from liteyuki.comm.storage import shared_memory from liteyuki.log import logger +from liteyuki.utils import IS_MAIN_PROCESS -TARGET_FUNC: TypeAlias = Callable[[Channel, Channel, ...], Any] +TARGET_FUNC: TypeAlias = Callable[..., Any] if TYPE_CHECKING: from liteyuki.bot import LiteyukiBot + from liteyuki.comm.storage import KeyValueStore TIMEOUT = 10 @@ -30,9 +33,29 @@ __all__ = [ ] +# Update the delivery_channel_wrapper function to return the top-level wrapper +def _delivery_channel_wrapper(func: TARGET_FUNC, chan_active: Channel, chan_passive: Channel, sm: "KeyValueStore", *args, **kwargs): + """ + 子进程入口函数 + """ + # 给子进程设置通道 + if IS_MAIN_PROCESS: + raise RuntimeError("Function should only be called in a sub process.") + + from liteyuki.comm import channel + channel.active_channel = chan_active + channel.passive_channel = chan_passive + + # 给子进程创建共享内存实例 + from liteyuki.comm import storage + storage.shared_memory = sm + + func(*args, **kwargs) + + class ProcessManager: """ - 在主进程中被调用 + 进程管理器 """ def __init__(self, bot: "LiteyukiBot"): @@ -61,7 +84,6 @@ class ProcessManager: process = Process(target=self.targets[name][0], args=self.targets[name][1], kwargs=self.targets[name][2]) self.processes[name] = process - process.start() # 启动进程并监听信号 @@ -114,9 +136,9 @@ class ProcessManager: kwargs = {} chan_active = Channel(_id=f"{name}-active") chan_passive = Channel(_id=f"{name}-passive") - kwargs["chan_active"] = chan_active - kwargs["chan_passive"] = chan_passive - self.targets[name] = (target, args, kwargs) + + self.targets[name] = (_delivery_channel_wrapper, (target, chan_active, chan_passive, shared_memory, *args), kwargs) + # 主进程通道 set_channels( { f"{name}-active" : chan_active, @@ -124,7 +146,7 @@ class ProcessManager: } ) - def join(self): + def join_all(self): for name, process in self.targets: process.join() diff --git a/liteyuki/plugins/plugins_loader/__init__.py b/liteyuki/plugins/plugins_loader/__init__.py index 8b90c530..facefd15 100644 --- a/liteyuki/plugins/plugins_loader/__init__.py +++ b/liteyuki/plugins/plugins_loader/__init__.py @@ -8,7 +8,7 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved @File : __init__.py.py @Software: PyCharm """ -from liteyuki import get_config_with_compat, load_plugin +from liteyuki import get_config, load_plugin, get_bot from liteyuki.plugin import PluginMetadata, load_plugins __plugin_meta__ = PluginMetadata( @@ -18,9 +18,17 @@ __plugin_meta__ = PluginMetadata( description="插件加载器,用于加载轻雪原生插件" ) -load_plugins("src/liteyuki_plugins") -for plugin in get_config_with_compat("liteyuki.plugins", ("plugins", ), []): - load_plugin(plugin) -for plugin_dir in get_config_with_compat("liteyuki.plugin_dirs", ("plugins_dirs", ), []): - load_plugins(plugin_dir) \ No newline at end of file +def default_plugins_loader(): + """ + 默认插件加载器,应在初始化时调用 + """ + load_plugins("src/liteyuki_plugins") + for plugin in get_config("liteyuki.plugins", []): + load_plugin(plugin) + + for plugin_dir in get_config("liteyuki.plugin_dirs", []): + load_plugins(plugin_dir) + + +default_plugins_loader() diff --git a/liteyuki/utils.py b/liteyuki/utils.py index 785dfd62..90a2048f 100644 --- a/liteyuki/utils.py +++ b/liteyuki/utils.py @@ -12,6 +12,7 @@ from liteyuki.log import logger IS_MAIN_PROCESS = multiprocessing.current_process().name == "MainProcess" + def is_coroutine_callable(call: Callable[..., Any]) -> bool: """ 判断是否为协程可调用对象 @@ -87,5 +88,6 @@ def async_wrapper(func: Callable[..., Any]) -> Callable[..., Coroutine]: async def wrapper(*args, **kwargs): return func(*args, **kwargs) + wrapper.__signature__ = inspect.signature(func) return wrapper diff --git a/src/liteyuki_main/loader.py b/src/liteyuki_main/loader.py index 2a533856..e86ce88b 100644 --- a/src/liteyuki_main/loader.py +++ b/src/liteyuki_main/loader.py @@ -8,7 +8,7 @@ from src.utils.base.data_manager import InstalledPlugin, plugin_db from src.utils.base.resource import load_resources from src.utils.message.tools import check_for_package -from liteyuki import get_bot, chan +from liteyuki import get_bot from nonebot_plugin_apscheduler import scheduler diff --git a/src/liteyuki_plugins/nonebot_launcher/__init__.py b/src/liteyuki_plugins/nonebot_launcher/__init__.py index 62e146a0..73fa373c 100644 --- a/src/liteyuki_plugins/nonebot_launcher/__init__.py +++ b/src/liteyuki_plugins/nonebot_launcher/__init__.py @@ -8,12 +8,9 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved @File : __init__.py.py @Software: PyCharm """ -import asyncio -import time import nonebot -from liteyuki.comm import Channel, set_channel from liteyuki.core import IS_MAIN_PROCESS from liteyuki.plugin import PluginMetadata from .nb_utils import adapter_manager, driver_manager @@ -23,20 +20,15 @@ __plugin_meta__ = PluginMetadata( ) -def nb_run(chan_active: "Channel", chan_passive: "Channel", *args, **kwargs): +def nb_run(*args, **kwargs): """ 初始化NoneBot并运行在子进程 Args: - - chan_active: - chan_passive: **kwargs: Returns: """ # 给子进程传递通道对象 - set_channel("nonebot-active", chan_active) - set_channel("nonebot-passive", chan_passive) kwargs.update(kwargs.get("nonebot", {})) # nonebot配置优先 nonebot.init(**kwargs) diff --git a/src/liteyuki_plugins/scheduled_tasks/divided_by_lifespan/__init__.py b/src/liteyuki_plugins/scheduled_tasks/divided_by_lifespan/__init__.py index 8f9b0e0a..a74b17b2 100644 --- a/src/liteyuki_plugins/scheduled_tasks/divided_by_lifespan/__init__.py +++ b/src/liteyuki_plugins/scheduled_tasks/divided_by_lifespan/__init__.py @@ -7,4 +7,5 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved @Email : snowykami@outlook.com @File : __init__.py @Software: PyCharm -""" \ No newline at end of file +""" +from .after_start import * \ No newline at end of file diff --git a/src/liteyuki_plugins/scheduled_tasks/divided_by_lifespan/after_start.py b/src/liteyuki_plugins/scheduled_tasks/divided_by_lifespan/after_start.py index de8e256b..bd3cc2a1 100644 --- a/src/liteyuki_plugins/scheduled_tasks/divided_by_lifespan/after_start.py +++ b/src/liteyuki_plugins/scheduled_tasks/divided_by_lifespan/after_start.py @@ -11,13 +11,15 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved import time from liteyuki import get_bot +from liteyuki.comm.storage import shared_memory liteyuki = get_bot() -@liteyuki.on_after_start +@liteyuki.on_before_start def save_startup_timestamp(): """ 储存启动的时间戳 """ startup_timestamp = time.time() + shared_memory.set("startup_timestamp", startup_timestamp) diff --git a/src/nonebot_plugins/ts_ly_comm.py b/src/nonebot_plugins/ts_ly_comm.py new file mode 100644 index 00000000..e01da08f --- /dev/null +++ b/src/nonebot_plugins/ts_ly_comm.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +""" +Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved + +@Time : 2024/8/16 下午8:30 +@Author : snowykami +@Email : snowykami@outlook.com +@File : ts_ly_comm.py +@Software: PyCharm +""" +from nonebot.plugin import PluginMetadata +from liteyuki.comm.storage import shared_memory + +__plugin_meta__ = PluginMetadata( + name="轻雪通信测试", + description="用于测试轻雪插件通信", + usage="不面向用户", +) + +print("共享内存数据:", shared_memory.get("startup_timestamp", default=None)) \ No newline at end of file