🐛 在结束进程时无法杀死进程的问题

This commit is contained in:
远野千束(神羽) 2024-08-15 16:40:29 +08:00
parent 0d5f9fee52
commit a61357f4e2
14 changed files with 108 additions and 86 deletions

View File

@ -4,10 +4,7 @@ import platform
import sys
import threading
import time
from typing import Any, Iterable, Optional
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from typing import Any, Optional
from liteyuki.bot.lifespan import (LIFESPAN_FUNC, Lifespan)
from liteyuki.comm import get_channel
@ -38,6 +35,7 @@ class LiteyukiBot:
self.lifespan = Lifespan()
self.process_manager: ProcessManager = ProcessManager(bot=self)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop_thread = threading.Thread(target=self.loop.run_forever, daemon=True)
@ -50,15 +48,9 @@ class LiteyukiBot:
"""
启动逻辑
"""
self.loop_thread.start() # 启动事件循环
asyncio.run(self.lifespan.before_start()) # 启动前钩子
asyncio.run(self.lifespan.after_start()) # 启动后钩子
self.start_watcher() # 启动文件监视器,后续准备插件化
self.keep_running()
def start_watcher(self):
pass
self.lifespan.before_start() # 启动前钩子
self.process_manager.start_all()
self.lifespan.after_start() # 启动后钩子
def restart(self, delay: int = 0):
"""
@ -120,6 +112,15 @@ class LiteyukiBot:
def init_config(self):
pass
def stop(self):
"""
停止轻雪
Returns:
"""
self.stop_event.set()
self.loop.stop()
def on_before_start(self, func: LIFESPAN_FUNC):
"""
注册启动前的函数
@ -198,9 +199,6 @@ class LiteyukiBot:
"""
return self.lifespan.on_after_nonebot_init(func)
def keep_running(self):
self.stop_event.wait()
_BOT_INSTANCE: Optional[LiteyukiBot] = None

View File

@ -8,23 +8,27 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@File : lifespan.py
@Software: PyCharm
"""
import asyncio
from typing import Any, Awaitable, Callable, TypeAlias
from liteyuki.log import logger
from liteyuki.utils import is_coroutine_callable
from liteyuki.utils import is_coroutine_callable, async_wrapper
SYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Any]
ASYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Awaitable[Any]]
LIFESPAN_FUNC: TypeAlias = SYNC_LIFESPAN_FUNC | ASYNC_LIFESPAN_FUNC
SYNC_PROCESS_LIFESPAN_FUNC: TypeAlias = Callable[[str], Any]
ASYNC_PROCESS_LIFESPAN_FUNC: TypeAlias = Callable[[str], Awaitable[Any]]
PROCESS_LIFESPAN_FUNC: TypeAlias = SYNC_PROCESS_LIFESPAN_FUNC | ASYNC_PROCESS_LIFESPAN_FUNC
class Lifespan:
def __init__(self) -> None:
"""
轻雪生命周期管理启动停止重启
"""
self.life_flag: int = 0 # 0: 启动前1: 启动后2: 停止前3: 停止后
self.life_flag: int = 0
self._before_start_funcs: list[LIFESPAN_FUNC] = []
self._after_start_funcs: list[LIFESPAN_FUNC] = []
@ -38,18 +42,26 @@ class Lifespan:
self._after_nonebot_init_funcs: list[LIFESPAN_FUNC] = []
@staticmethod
async def _run_funcs(funcs: list[LIFESPAN_FUNC]) -> None:
def _run_funcs(funcs: list[LIFESPAN_FUNC | PROCESS_LIFESPAN_FUNC], *args, **kwargs) -> None:
"""
运行函数
Args:
funcs:
Returns:
"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = []
for func in funcs:
if is_coroutine_callable(func):
await func()
tasks.append(func(*args, **kwargs))
else:
func()
tasks.append(async_wrapper(func)(*args, **kwargs))
loop.run_until_complete(asyncio.gather(*tasks))
def on_before_start(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
"""
@ -131,59 +143,51 @@ class Lifespan:
self._after_nonebot_init_funcs.append(func)
return func
async def before_start(self) -> None:
def before_start(self) -> None:
"""
启动前
Returns:
"""
logger.debug("Running before_start functions")
await self._run_funcs(self._before_start_funcs)
self._run_funcs(self._before_start_funcs)
async def after_start(self) -> None:
def after_start(self) -> None:
"""
启动后
Returns:
"""
logger.debug("Running after_start functions")
await self._run_funcs(self._after_start_funcs)
self._run_funcs(self._after_start_funcs)
async def before_process_shutdown(self) -> None:
def before_process_shutdown(self) -> None:
"""
停止前
Returns:
"""
logger.debug("Running before_shutdown functions")
await self._run_funcs(self._before_process_shutdown_funcs)
self._run_funcs(self._before_process_shutdown_funcs)
async def after_shutdown(self) -> None:
def after_shutdown(self) -> None:
"""
停止后
Returns:
"""
logger.debug("Running after_shutdown functions")
await self._run_funcs(self._after_shutdown_funcs)
self._run_funcs(self._after_shutdown_funcs)
async def before_process_restart(self) -> None:
def before_process_restart(self) -> None:
"""
重启前
Returns:
"""
logger.debug("Running before_restart functions")
await self._run_funcs(self._before_process_restart_funcs)
self._run_funcs(self._before_process_restart_funcs)
async def after_restart(self) -> None:
def after_restart(self) -> None:
"""
重启后
Returns:
"""
logger.debug("Running after_restart functions")
await self._run_funcs(self._after_restart_funcs)
async def after_nonebot_init(self) -> None:
"""
NoneBot 初始化后
Returns:
"""
logger.debug("Running after_nonebot_init functions")
await self._run_funcs(self._after_nonebot_init_funcs)
self._run_funcs(self._after_restart_funcs)

View File

@ -36,53 +36,63 @@ class ProcessManager:
self.targets: dict[str, tuple[callable, tuple, dict]] = {}
self.processes: dict[str, Process] = {}
def start(self, name: str, delay: int = 0):
def start(self, name: str):
"""
开启后自动监控进程并添加到进程字典中
Args:
name:
delay:
Returns:
"""
if name not in self.targets:
raise KeyError(f"Process {name} not found.")
def _start():
should_exit = False
while not should_exit:
chan_active = get_channel(f"{name}-active")
process = Process(target=self.targets[name][0], args=self.targets[name][1],
kwargs=self.targets[name][2])
self.processes[name] = process
process.start()
while not should_exit:
# 0退出 1重启
data = chan_active.receive()
if data == 1:
# 重启
if self.is_process_alive(name):
logger.info(f"Restarting process {name}")
asyncio.run(self.bot.lifespan.before_process_shutdown())
asyncio.run(self.bot.lifespan.before_process_restart())
self.terminate(name)
break
else:
logger.warning(f"Process {name} is not restartable, cannot restart.")
chan_active = get_channel(f"{name}-active")
elif data == 0:
def _start_process():
process = Process(target=self.targets[name][0], args=self.targets[name][1],
kwargs=self.targets[name][2])
self.processes[name] = process
process.start()
# 启动进程并监听信号
_start_process()
def _start_monitor():
while True:
try:
data = chan_active.receive()
if data == 0:
# 停止
logger.info(f"Stopping process {name}")
asyncio.run(self.bot.lifespan.before_process_shutdown())
should_exit = True
self.bot.lifespan.before_process_shutdown()
self.terminate(name)
break
elif data == 1:
# 重启
logger.info(f"Restarting process {name}")
self.bot.lifespan.before_process_shutdown()
self.bot.lifespan.before_process_restart()
self.terminate(name)
_start_process()
continue
else:
logger.warning("Unknown data received, ignored.")
except KeyboardInterrupt:
logger.info(f"Stopping process {name}")
self.bot.lifespan.before_process_shutdown()
self.terminate_all()
break
if delay:
threading.Timer(delay, _start).start()
else:
threading.Thread(target=_start).start()
threading.Thread(target=_start_monitor).start()
def start_all(self):
"""
启动所有进程
"""
for name in self.targets:
self.start(name)
def add_target(self, name: str, target, args: tuple = (), kwargs=None):
"""
@ -100,7 +110,6 @@ class ProcessManager:
kwargs["chan_active"] = chan_active
kwargs["chan_passive"] = chan_passive
self.targets[name] = (target, args, kwargs)
set_channels(
{
f"{name}-active" : chan_active,
@ -128,6 +137,7 @@ class ProcessManager:
process.join(TIMEOUT)
if process.is_alive():
process.kill()
logger.success(f"Process {name} terminated.")
def terminate_all(self):
for name in self.targets:
@ -144,5 +154,4 @@ class ProcessManager:
"""
if name not in self.targets:
raise logger.warning(f"Process {name} not found.")
process = self.processes[name]
return process.is_alive()
return self.processes[name].is_alive()

View File

@ -72,3 +72,18 @@ def path_to_module_name(path: Path) -> str:
return ".".join(rel_path.parts[:-1])
else:
return ".".join(rel_path.parts[:-1] + (rel_path.stem,))
def async_wrapper(func: Callable[..., Any]) -> Callable[..., Coroutine]:
"""
异步包装器
Args:
func: Sync Callable
Returns:
Coroutine: Asynchronous Callable
"""
async def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper.__signature__ = inspect.signature(func)
return wrapper

View File

@ -21,9 +21,6 @@ __plugin_meta__ = PluginMetadata(
bot = get_bot()
# nbp_chan = get_channel("nonebot-passive")
# mbp_chan = get_channel("melobot-passive")
@bot.on_before_start
def _():
logger.info("生命周期监控器:准备启动")
@ -40,5 +37,6 @@ def _():
@bot.on_after_start
def _():
async def _():
await asyncio.sleep(6)
logger.info("生命周期监控器:启动完成")

View File

@ -11,11 +11,9 @@ Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
import nonebot
from liteyuki.plugin import PluginMetadata
from liteyuki import get_bot
from liteyuki.comm import Channel, set_channel
from liteyuki.core import IS_MAIN_PROCESS
from liteyuki.plugin import PluginMetadata
from .nb_utils import adapter_manager, driver_manager
__plugin_meta__ = PluginMetadata(
@ -54,7 +52,6 @@ if IS_MAIN_PROCESS:
liteyuki = get_bot()
@liteyuki.on_after_start
def start_run_nonebot():
@liteyuki.on_before_start
async def start_run_nonebot():
liteyuki.process_manager.add_target(name="nonebot", target=nb_run, args=(), kwargs=liteyuki.config)
liteyuki.process_manager.start("nonebot")

View File

@ -6,6 +6,7 @@ from .rt_guide import *
from .crt_matchers import *
__plugin_meta__ = PluginMetadata(
name="CRT生成工具",
description="一些CRT牌子生成器",
usage="我觉得你应该会用",