🐛 fix 通道类回调函数在进程间传递时无法序列号的问题

This commit is contained in:
远野千束 2024-08-10 22:25:41 +08:00
parent 3bd40e7271
commit 7107d03b72
66 changed files with 5112 additions and 4916 deletions

View File

@ -1,25 +1,26 @@
import asyncio
import os
import platform
import sys
import threading
import time
import asyncio
from typing import Any, Optional
from multiprocessing import freeze_support
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from liteyuki.bot.lifespan import (LIFESPAN_FUNC, Lifespan)
from liteyuki.comm.channel import Channel
from liteyuki.comm.channel import Channel, set_channel
from liteyuki.core import IS_MAIN_PROCESS
from liteyuki.core.manager import ProcessManager
from liteyuki.core.spawn_process import mb_run, nb_run
from liteyuki.log import init_log, logger
from liteyuki.plugin import load_plugins
from liteyuki.utils import run_coroutine
__all__ = [
"LiteyukiBot",
"get_bot"
]
"""是否为主进程"""
class LiteyukiBot:
def __init__(self, *args, **kwargs):
@ -29,11 +30,12 @@ class LiteyukiBot:
self.init(**self.config) # 初始化
self.lifespan: Lifespan = Lifespan()
self.chan = Channel() # 进程通信通道
self.pm: ProcessManager = ProcessManager(bot=self, chan=self.chan)
self.process_manager: ProcessManager = ProcessManager(bot=self)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop_thread = threading.Thread(target=self.loop.run_forever, daemon=True)
self.call_restart_count = 0
print("\033[34m" + r"""
__ ______ ________ ________ __ __ __ __ __ __ ______
@ -53,15 +55,83 @@ $$$$$$$$/ $$$$$$/ $$/ $$$$$$$$/ $$/ $$$$$$/ $$/ $$/ $$$$$$/
self.loop_thread.start() # 启动事件循环
asyncio.run(self.lifespan.before_start()) # 启动前钩子
self.pm.add_target("nonebot", nb_run, **self.config)
self.pm.start("nonebot")
self.process_manager.add_target("nonebot", nb_run, **self.config)
self.process_manager.start("nonebot")
self.pm.add_target("melobot", mb_run, **self.config)
self.pm.start("melobot")
self.process_manager.add_target("melobot", mb_run, **self.config)
self.process_manager.start("melobot")
asyncio.run(self.lifespan.after_start()) # 启动后钩子
def restart(self, name: Optional[str] = None):
self.start_watcher() # 启动文件监视器
def start_watcher(self):
if self.config.get("debug", False):
code_directories = {}
src_directories = (
"liteyuki",
"src/liteyuki_main",
"src/liteyuki_plugins",
"src/nonebot_plugins",
"src/utils",
)
src_excludes_extensions = (
"pyc",
)
logger.debug("Liteyuki Reload enabled, watching for file changes...")
restart = self.restart_process
class CodeModifiedHandler(FileSystemEventHandler):
"""
Handler for code file changes
"""
def on_modified(self, event):
if event.src_path.endswith(
src_excludes_extensions) or event.is_directory or "__pycache__" in event.src_path:
return
logger.info(f"{event.src_path} modified, reloading bot...")
restart()
code_modified_handler = CodeModifiedHandler()
observer = Observer()
for directory in src_directories:
observer.schedule(code_modified_handler, directory, recursive=True)
observer.start()
def restart(self, delay: int = 0):
"""
重启轻雪本体
Returns:
"""
if self.call_restart_count < 1:
executable = sys.executable
args = sys.argv
logger.info("Restarting LiteyukiBot...")
time.sleep(delay)
if platform.system() == "Windows":
cmd = "start"
elif platform.system() == "Linux":
cmd = "nohup"
elif platform.system() == "Darwin":
cmd = "open"
else:
cmd = "nohup"
self.process_manager.terminate_all()
# 等待所有进程退出
self.process_manager.chan_active.receive("main")
# 进程退出后重启
threading.Thread(target=os.system, args=(f"{cmd} {executable} {' '.join(args)}",)).start()
sys.exit(0)
self.call_restart_count += 1
def restart_process(self, name: Optional[str] = None):
"""
停止轻雪
Args:
@ -75,10 +145,10 @@ $$$$$$$$/ $$$$$$/ $$/ $$$$$$$$/ $$/ $$$$$$/ $$/ $$/ $$$$$$/
self.loop.create_task(self.lifespan.before_shutdown()) # 停止前钩子
if name:
self.chan.send(1, name)
self.chan_active.send(1, name)
else:
for name in self.pm.targets:
self.chan.send(1, name)
for name in self.process_manager.targets:
self.chan_active.send(1, name)
def init(self, *args, **kwargs):
"""

View File

@ -9,11 +9,22 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@Software: PyCharm
该模块用于轻雪主进程和Nonebot子进程之间的通信
"""
from liteyuki.comm.channel import Channel, chan
from liteyuki.comm.channel import (
Channel,
chan,
get_channel,
set_channel,
set_channels,
get_channels
)
from liteyuki.comm.event import Event
__all__ = [
"Channel",
"chan",
"Event",
"get_channel",
"set_channel",
"set_channels",
"get_channels"
]

View File

@ -10,8 +10,12 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
本模块定义了一个通用的通道类用于进程间通信
"""
import functools
import multiprocessing
import threading
from multiprocessing import Pipe
from typing import Any, Optional, Callable, Awaitable, List, TypeAlias
from uuid import uuid4
from liteyuki.utils import is_coroutine_callable, run_coroutine
@ -23,76 +27,89 @@ SYNC_FILTER_FUNC: TypeAlias = Callable[[Any], bool]
ASYNC_FILTER_FUNC: TypeAlias = Callable[[Any], Awaitable[bool]]
FILTER_FUNC: TypeAlias = SYNC_FILTER_FUNC | ASYNC_FILTER_FUNC
IS_MAIN_PROCESS = multiprocessing.current_process().name == "MainProcess"
_channel: dict[str, "Channel"] = {}
_callback_funcs: dict[str, ON_RECEIVE_FUNC] = {}
class Channel:
"""
通道类用于进程间通信
通道类用于进程间通信进程内不可用仅限主进程和子进程之间通信
有两种接收工作方式但是只能选择一种主动接收和被动接收主动接收使用 `receive` 方法被动接收使用 `on_receive` 装饰器
"""
def __init__(self):
self.receive_conn, self.send_conn = Pipe()
def __init__(self, _id: str):
self.main_send_conn, self.sub_receive_conn = Pipe()
self.sub_send_conn, self.main_receive_conn = Pipe()
self._closed = False
self._on_receive_funcs: List[ON_RECEIVE_FUNC] = []
self._on_receive_funcs_with_receiver: dict[str, List[ON_RECEIVE_FUNC]] = {}
self._on_main_receive_funcs: list[str] = []
self._on_sub_receive_funcs: list[str] = []
self.name: str = _id
def send(self, data: Any, receiver: Optional[str] = None):
self.is_main_receive_loop_running = False
self.is_sub_receive_loop_running = False
def __str__(self):
return f"Channel({self.name})"
def send(self, data: Any):
"""
发送数据
Args:
data: 数据
receiver: 接收者如果为None则广播
"""
if self._closed:
raise RuntimeError("Cannot send to a closed channel")
self.send_conn.send((data, receiver))
if IS_MAIN_PROCESS:
print("主进程发送数据:", data)
self.main_send_conn.send(data)
else:
print("子进程发送数据:", data)
self.sub_send_conn.send(data)
def receive(self, receiver: str = None) -> Any:
def receive(self) -> Any:
"""
接收数据
Args:
receiver: 接收者如果为None则接收任意数据
"""
if self._closed:
raise RuntimeError("Cannot receive from a closed channel")
while True:
# 判断receiver是否为None或者receiver是否等于接收者是则接收数据否则不动数据
data, receiver_ = self.receive_conn.recv()
if receiver is None or receiver == receiver_:
self._run_on_receive_funcs(data, receiver_)
return data
self.send_conn.send((data, receiver_))
if IS_MAIN_PROCESS:
data = self.main_receive_conn.recv()
print("主进程接收数据:", data)
else:
data = self.sub_receive_conn.recv()
print("子进程接收数据:", data)
def peek(self) -> Optional[Any]:
"""
查看管道中的数据不移除
Returns:
"""
if self._closed:
raise RuntimeError("Cannot peek from a closed channel")
if self.receive_conn.poll():
data, receiver = self.receive_conn.recv()
self.receive_conn.send((data, receiver))
return data
return None
def close(self):
"""
关闭通道
"""
self._closed = True
self.receive_conn.close()
self.send_conn.close()
self.sub_receive_conn.close()
self.main_send_conn.close()
self.sub_send_conn.close()
self.main_receive_conn.close()
def on_receive(self, filter_func: Optional[FILTER_FUNC] = None, receiver: Optional[str] = None) -> Callable[[ON_RECEIVE_FUNC], ON_RECEIVE_FUNC]:
def on_receive(self, filter_func: Optional[FILTER_FUNC] = None) -> Callable[[ON_RECEIVE_FUNC], ON_RECEIVE_FUNC]:
"""
接收数据并执行函数
Args:
filter_func: 过滤函数为None则不过滤
receiver: 接收者, 为None则接收任意数据
Returns:
装饰器装饰一个函数在接收到数据后执行
"""
if (not self.is_sub_receive_loop_running) and not IS_MAIN_PROCESS:
threading.Thread(target=self._start_sub_receive_loop).start()
if (not self.is_main_receive_loop_running) and IS_MAIN_PROCESS:
threading.Thread(target=self._start_main_receive_loop).start()
def decorator(func: ON_RECEIVE_FUNC) -> ON_RECEIVE_FUNC:
async def wrapper(data: Any) -> Any:
@ -105,28 +122,53 @@ class Channel:
return
return await func(data)
if receiver is None:
self._on_receive_funcs.append(wrapper)
function_id = str(uuid4())
_callback_funcs[function_id] = wrapper
if IS_MAIN_PROCESS:
self._on_main_receive_funcs.append(function_id)
else:
if receiver not in self._on_receive_funcs_with_receiver:
self._on_receive_funcs_with_receiver[receiver] = []
self._on_receive_funcs_with_receiver[receiver].append(wrapper)
self._on_sub_receive_funcs.append(function_id)
return func
return decorator
def _run_on_receive_funcs(self, data: Any, receiver: Optional[str] = None):
def _run_on_main_receive_funcs(self, data: Any):
"""
运行接收函数
Args:
data: 数据
"""
if receiver is None:
for func in self._on_receive_funcs:
run_coroutine(func(data))
else:
for func in self._on_receive_funcs_with_receiver.get(receiver, []):
run_coroutine(func(data))
for func_id in self._on_main_receive_funcs:
func = _callback_funcs[func_id]
run_coroutine(func(data))
def _run_on_sub_receive_funcs(self, data: Any):
"""
运行接收函数
Args:
data: 数据
"""
for func_id in self._on_sub_receive_funcs:
func = _callback_funcs[func_id]
run_coroutine(func(data))
def _start_main_receive_loop(self):
"""
开始接收数据
"""
self.is_main_receive_loop_running = True
while not self._closed:
data = self.main_receive_conn.recv()
self._run_on_main_receive_funcs(data)
def _start_sub_receive_loop(self):
"""
开始接收数据
"""
self.is_sub_receive_loop_running = True
while not self._closed:
data = self.sub_receive_conn.recv()
self._run_on_sub_receive_funcs(data)
def __iter__(self):
return self
@ -136,4 +178,42 @@ class Channel:
"""默认通道实例,可直接从模块导入使用"""
chan = Channel()
chan = Channel("default")
def set_channel(name: str, channel: Channel):
"""
设置通道实例
Args:
name: 通道名称
channel: 通道实例
"""
_channel[name] = channel
def set_channels(channels: dict[str, Channel]):
"""
设置通道实例
Args:
channels: 通道名称
"""
for name, channel in channels.items():
_channel[name] = channel
def get_channel(name: str) -> Optional[Channel]:
"""
获取通道实例
Args:
name: 通道名称
Returns:
"""
return _channel.get(name, None)
def get_channels() -> dict[str, Channel]:
"""
获取通道实例
Returns:
"""
return _channel

View File

@ -13,7 +13,7 @@ import threading
from multiprocessing import Process
from typing import TYPE_CHECKING
from liteyuki.comm import Channel
from liteyuki.comm import Channel, get_channel, set_channels
from liteyuki.log import logger
if TYPE_CHECKING:
@ -31,12 +31,18 @@ class ProcessManager:
在主进程中被调用
"""
def __init__(self, bot: "LiteyukiBot", chan: Channel):
def __init__(self, bot: "LiteyukiBot"):
self.bot = bot
self.chan = chan
self.targets: dict[str, tuple[callable, tuple, dict]] = {}
self.processes: dict[str, Process] = {}
set_channels({
"nonebot-active" : Channel(_id="nonebot-active"),
"melobot-active" : Channel(_id="melobot-active"),
"nonebot-passive": Channel(_id="nonebot-passive"),
"melobot-passive": Channel(_id="melobot-passive"),
})
def start(self, name: str, delay: int = 0):
"""
开启后自动监控进程并添加到进程字典中
@ -47,19 +53,21 @@ class ProcessManager:
Returns:
"""
if name not in self.targets:
raise KeyError(f"Process {name} not found.")
def _start():
should_exit = False
while not should_exit:
process = Process(target=self.targets[name][0], args=(self.chan, *self.targets[name][1]), kwargs=self.targets[name][2])
chan_active = get_channel(f"{name}-active")
chan_passive = get_channel(f"{name}-passive")
process = Process(target=self.targets[name][0], args=(chan_active, chan_passive, *self.targets[name][1]),
kwargs=self.targets[name][2])
self.processes[name] = process
process.start()
while not should_exit:
# 0退出 1重启
data = self.chan.receive(name)
data = chan_active.receive()
if data == 1:
logger.info(f"Restarting process {name}")
asyncio.run(self.bot.lifespan.before_shutdown())
@ -103,3 +111,7 @@ class ProcessManager:
process.join(TIMEOUT)
if process.is_alive():
process.kill()
def terminate_all(self):
for name in self.targets:
self.terminate(name)

View File

@ -3,6 +3,7 @@ from typing import Optional, TYPE_CHECKING
import nonebot
from liteyuki.core.nb import adapter_manager, driver_manager
from liteyuki.comm.channel import set_channel
if TYPE_CHECKING:
from liteyuki.comm.channel import Channel
@ -10,23 +11,23 @@ if TYPE_CHECKING:
timeout_limit: int = 20
"""导出对象用于主进程与nonebot通信"""
chan_in_spawn_nb: Optional["Channel"] = None
_channels = {}
def nb_run(chan, *args, **kwargs):
def nb_run(chan_active: "Channel", chan_passive: "Channel", *args, **kwargs):
"""
初始化NoneBot并运行在子进程
Args:
chan:
*args:
chan_active:
chan_passive:
**kwargs:
Returns:
"""
global chan_in_spawn_nb
chan_in_spawn_nb = chan
set_channel("nonebot-active", chan_active)
set_channel("nonebot-passive", chan_passive)
nonebot.init(**kwargs)
driver_manager.init(config=kwargs)
adapter_manager.init(kwargs)
@ -35,17 +36,21 @@ def nb_run(chan, *args, **kwargs):
nonebot.run()
def mb_run(chan, *args, **kwargs):
def mb_run(chan_active: "Channel", chan_passive: "Channel", *args, **kwargs):
"""
初始化MeloBot并运行在子进程
Args:
chan
chan_active
chan_passive
*args:
**kwargs:
Returns:
"""
set_channel("melobot-active", chan_active)
set_channel("melobot-passive", chan_passive)
# bot = MeloBot(__name__)
# bot.init(AbstractConnector(cd_time=0))
# bot.run()

View File

@ -7,14 +7,19 @@
# @Email : snowykami@outlook.com
# @File : asa.py
# @Software: PyCharm
import asyncio
from liteyuki.plugin import PluginMetadata
from liteyuki import get_bot, logger
from liteyuki.comm.channel import get_channel
__plugin_meta__ = PluginMetadata(
name="lifespan_monitor",
)
bot = get_bot()
nbp_chan = get_channel("nonebot-passive")
mbp_chan = get_channel("melobot-passive")
@bot.on_before_start
@ -24,6 +29,7 @@ def _():
@bot.on_before_shutdown
def _():
print(get_channel("main"))
logger.info("生命周期监控器:准备停止")
@ -35,3 +41,17 @@ def _():
@bot.on_after_start
def _():
logger.info("生命周期监控器:启动完成")
@bot.on_after_start
async def _():
logger.info("生命周期监控器:启动完成")
while True:
await asyncio.sleep(3)
nbp_chan.send("send by main")
@mbp_chan.on_receive()
@nbp_chan.on_receive()
async def _(data):
print("主进程收到数据", data)

View File

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
"""
Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@Time : 2024/8/10 下午5:18
@Author : snowykami
@Email : snowykami@outlook.com
@File : reloader_monitor.py
@Software: PyCharm
"""

Binary file not shown.

View File

@ -6,29 +6,25 @@ import nonebot
import pip
from nonebot import Bot, get_driver, require
from nonebot.adapters import onebot, satori
from nonebot.adapters.onebot.v11 import Message, escape, unescape
from nonebot.adapters.onebot.v11 import Message, unescape
from nonebot.exception import MockApiException
from nonebot.internal.matcher import Matcher
from nonebot.permission import SUPERUSER
from liteyuki import Channel
# from src.liteyuki.core import Reloader
from src.utils import event as event_utils, satori_utils
from src.utils.base.config import get_config, load_from_yaml
from src.utils.base.data_manager import StoredConfig, TempConfig, common_db
from src.utils.base.language import get_user_lang
from src.utils.base.ly_typing import T_Bot, T_MessageEvent
from src.utils.message.message import MarkdownMessage as md, broadcast_to_superusers
# from src.liteyuki.core import Reloader
from src.utils import event as event_utils, satori_utils
from liteyuki.core.spawn_process import chan_in_spawn_nb
from .api import update_liteyuki
from liteyuki.bot import get_bot
from ..utils.base import reload
from ..utils.base.ly_function import get_function
require("nonebot_plugin_alconna")
require("nonebot_plugin_apscheduler")
from nonebot_plugin_alconna import UniMessage, on_alconna, Alconna, Args, Subcommand, Arparma, MultiVar
from nonebot_plugin_alconna import on_alconna, Alconna, Args, Subcommand, Arparma, MultiVar
from nonebot_plugin_apscheduler import scheduler
driver = get_driver()
@ -81,7 +77,6 @@ async def _(bot: T_Bot, event: T_MessageEvent):
).handle()
# Satori OK
async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent):
global channel_in_spawn_process
await matcher.send("Liteyuki reloading")
temp_data = common_db.where_one(TempConfig(), default=TempConfig())
@ -91,8 +86,8 @@ async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent):
"reload_time" : time.time(),
"reload_bot_id" : bot.self_id,
"reload_session_type": event_utils.get_message_type(event),
"reload_session_id" : (event.group_id if event.message_type == "group" else event.user_id) if not isinstance(event,
satori.event.Event) else event.chan.id,
"reload_session_id" : (event.group_id if event.message_type == "group" else event.user_id)
if not isinstance(event, satori.event.Event) else event.chan_active.id,
"delta_time" : 0
}
)
@ -358,7 +353,7 @@ async def _(bot: T_Bot):
delta_time = temp_data.data.get("delta_time", 0)
common_db.save(temp_data) # 更新数据
if delta_time <= 20.0: # 启动时间太长就别发了,丢人
if delta_time <= 20.0: # 启动时间太长就别发了,丢人
if isinstance(bot, satori.Bot):
await bot.send_message(
channel_id=reload_session_id,

View File

@ -11,35 +11,12 @@ if get_config("debug", False):
liteyuki_bot = get_bot()
src_directories = (
"src/liteyuki_main",
"src/plugins",
"src/utils",
)
src_excludes_extensions = (
"pyc",
)
res_directories = (
"src/resources",
"resources",
)
nonebot.logger.info("Liteyuki Reload enabled, watching for file changes...")
class CodeModifiedHandler(FileSystemEventHandler):
"""
Handler for code file changes
"""
def on_modified(self, event):
if event.src_path.endswith(
src_excludes_extensions) or event.is_directory or "__pycache__" in event.src_path:
return
nonebot.logger.info(f"{event.src_path} modified, reloading bot...")
reload()
class ResourceModifiedHandler(FileSystemEventHandler):
"""
@ -51,12 +28,9 @@ if get_config("debug", False):
load_resources()
code_modified_handler = CodeModifiedHandler()
resource_modified_handle = ResourceModifiedHandler()
observer = Observer()
for directory in src_directories:
observer.schedule(code_modified_handler, directory, recursive=True)
for directory in res_directories:
observer.schedule(resource_modified_handle, directory, recursive=True)
observer.start()

View File

@ -21,7 +21,7 @@ liteyuki_bot = get_bot()
@driver.on_startup
async def load_plugins():
nonebot.plugin.load_plugins("src/plugins")
nonebot.plugin.load_plugins("src/nonebot_plugins")
# 从数据库读取已安装的插件
if not get_config("safe_mode", False):
# 安全模式下,不加载插件

View File

@ -0,0 +1,3 @@
# 说明
此目录为**轻雪插件**目录,非其他插件目录。

View File

@ -1,6 +1,7 @@
import multiprocessing
from nonebot.plugin import PluginMetadata
from liteyuki.comm import get_channel
from .rt_guide import *
from .crt_matchers import *
@ -16,3 +17,11 @@ __plugin_meta__ = PluginMetadata(
"default_enable": True,
}
)
chan = get_channel("nonebot-passive")
@chan.on_receive()
async def _(d):
print("CRT子进程接收到数据", d)
chan.send("CRT子进程已接收到数据")

View File

@ -6,7 +6,7 @@ from src.utils.base.ly_typing import T_MessageEvent
from src.utils import satori_utils
from nonebot.adapters import satori
from nonebot_plugin_alconna.typings import Event
from src.plugins.liteyuki_status.counter_for_satori import satori_counter
from src.nonebot_plugins.liteyuki_status.counter_for_satori import satori_counter
@event_preprocessor

View File

@ -1,7 +1,7 @@
import threading
from nonebot import logger
from liteyuki.core.spawn_process import chan_in_spawn_nb
from liteyuki.comm.channel import get_channel
def reload(delay: float = 0.0, receiver: str = "nonebot"):
@ -14,6 +14,13 @@ def reload(delay: float = 0.0, receiver: str = "nonebot"):
Returns:
"""
chan = get_channel(receiver + "-active")
if chan is None:
logger.error(f"Channel {receiver}-active not found, cannot reload.")
return
chan_in_spawn_nb.send(1, receiver)
logger.info(f"Reloading LiteyukiBot({receiver})...")
if delay > 0:
threading.Timer(delay, chan.send, args=(1,)).start()
return
else:
chan.send(1)