LiteyukiBot-TriM/liteyuki/bot/lifespan.py

160 lines
5.7 KiB
Python

# -*- coding: utf-8 -*-
"""
Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
@Time : 2024/7/23 下午8:24
@Author : snowykami
@Email : snowykami@outlook.com
@File : lifespan.py
@Software: PyCharm
"""
import asyncio
from typing import Any, Awaitable, Callable, TypeAlias, Sequence
from liteyuki.log import logger
from liteyuki.utils import is_coroutine_callable, async_wrapper
SYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Any] # 同步生命周期函数
ASYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Awaitable[Any]] # 异步生命周期函数
LIFESPAN_FUNC: TypeAlias = SYNC_LIFESPAN_FUNC | ASYNC_LIFESPAN_FUNC # 生命周期函数
SYNC_PROCESS_LIFESPAN_FUNC: TypeAlias = Callable[[str], Any] # 同步进程生命周期函数
ASYNC_PROCESS_LIFESPAN_FUNC: TypeAlias = Callable[[str], Awaitable[Any]] # 异步进程生命周期函数
PROCESS_LIFESPAN_FUNC: TypeAlias = SYNC_PROCESS_LIFESPAN_FUNC | ASYNC_PROCESS_LIFESPAN_FUNC # 进程函数
class Lifespan:
def __init__(self) -> None:
"""
轻雪生命周期管理,启动、停止、重启
"""
self.life_flag: int = 0
self._before_start_funcs: list[LIFESPAN_FUNC] = []
self._after_start_funcs: list[LIFESPAN_FUNC] = []
self._before_process_shutdown_funcs: list[PROCESS_LIFESPAN_FUNC] = []
self._after_shutdown_funcs: list[LIFESPAN_FUNC] = []
self._before_process_restart_funcs: list[PROCESS_LIFESPAN_FUNC] = []
self._after_restart_funcs: list[LIFESPAN_FUNC] = []
@staticmethod
async def run_funcs(funcs: Sequence[LIFESPAN_FUNC | PROCESS_LIFESPAN_FUNC], *args, **kwargs) -> None:
"""
并发运行异步函数
Args:
funcs ([`Sequence`](https%3A//docs.python.org/3/library/typing.html#typing.Sequence)[[`ASYNC_LIFESPAN_FUNC`](#var-lifespan-func) | [`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func)]): 函数列表
Returns:
"""
tasks = [func(*args, **kwargs) if is_coroutine_callable(func) else async_wrapper(func)(*args, **kwargs) for func in funcs]
await asyncio.gather(*tasks)
def on_before_start(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
"""
注册启动时的函数
Args:
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
Returns:
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
"""
self._before_start_funcs.append(func)
return func
def on_after_start(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
"""
注册启动时的函数
Args:
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
Returns:
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
"""
self._after_start_funcs.append(func)
return func
def on_before_process_shutdown(self, func: PROCESS_LIFESPAN_FUNC) -> PROCESS_LIFESPAN_FUNC:
"""
注册进程停止前的函数
Args:
func ([`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func)): 进程生命周期函数
Returns:
[`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func): 进程生命周期函数
"""
self._before_process_shutdown_funcs.append(func)
return func
def on_after_shutdown(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
"""
注册停止后的函数
Args:
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
Returns:
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
"""
self._after_shutdown_funcs.append(func)
return func
def on_before_process_restart(self, func: PROCESS_LIFESPAN_FUNC) -> PROCESS_LIFESPAN_FUNC:
"""
注册进程重启前的函数
Args:
func ([`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func)): 进程生命周期函数
Returns:
[`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func): 进程生命周期函数
"""
self._before_process_restart_funcs.append(func)
return func
def on_after_restart(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
"""
注册重启后的函数
Args:
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
Returns:
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
"""
self._after_restart_funcs.append(func)
return func
async def before_start(self) -> None:
"""
启动前钩子
"""
logger.debug("运行 before_start 函数")
await self.run_funcs(self._before_start_funcs)
async def after_start(self) -> None:
"""
启动后钩子
"""
logger.debug("运行 after_start 函数")
await self.run_funcs(self._after_start_funcs)
async def before_process_shutdown(self, *args, **kwargs) -> None:
"""
停止前钩子
"""
logger.debug("运行 before_shutdown 函数")
await self.run_funcs(self._before_process_shutdown_funcs, *args, **kwargs)
async def after_shutdown(self) -> None:
"""
停止后钩子 未实现
"""
logger.debug("运行 after_shutdown 函数")
await self.run_funcs(self._after_shutdown_funcs)
async def before_process_restart(self, *args, **kwargs) -> None:
"""
重启前钩子
"""
logger.debug("运行 before_restart 函数")
await self.run_funcs(self._before_process_restart_funcs, *args, **kwargs)
async def after_restart(self) -> None:
"""
重启后钩子 未实现
"""
logger.debug("运行 after_restart 函数")
await self.run_funcs(self._after_restart_funcs)