mirror of
https://github.com/LiteyukiStudio/LiteyukiBot.git
synced 2024-11-11 05:17:24 +08:00
160 lines
5.7 KiB
Python
160 lines
5.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Copyright (C) 2020-2024 LiteyukiStudio. All Rights Reserved
|
|
|
|
@Time : 2024/7/23 下午8:24
|
|
@Author : snowykami
|
|
@Email : snowykami@outlook.com
|
|
@File : lifespan.py
|
|
@Software: PyCharm
|
|
"""
|
|
import asyncio
|
|
from typing import Any, Awaitable, Callable, TypeAlias, Sequence
|
|
|
|
from liteyuki.log import logger
|
|
from liteyuki.utils import is_coroutine_callable, async_wrapper
|
|
|
|
SYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Any] # 同步生命周期函数
|
|
ASYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Awaitable[Any]] # 异步生命周期函数
|
|
LIFESPAN_FUNC: TypeAlias = SYNC_LIFESPAN_FUNC | ASYNC_LIFESPAN_FUNC # 生命周期函数
|
|
|
|
SYNC_PROCESS_LIFESPAN_FUNC: TypeAlias = Callable[[str], Any] # 同步进程生命周期函数
|
|
ASYNC_PROCESS_LIFESPAN_FUNC: TypeAlias = Callable[[str], Awaitable[Any]] # 异步进程生命周期函数
|
|
PROCESS_LIFESPAN_FUNC: TypeAlias = SYNC_PROCESS_LIFESPAN_FUNC | ASYNC_PROCESS_LIFESPAN_FUNC # 进程函数
|
|
|
|
|
|
class Lifespan:
|
|
def __init__(self) -> None:
|
|
"""
|
|
轻雪生命周期管理,启动、停止、重启
|
|
"""
|
|
self.life_flag: int = 0
|
|
|
|
self._before_start_funcs: list[LIFESPAN_FUNC] = []
|
|
self._after_start_funcs: list[LIFESPAN_FUNC] = []
|
|
|
|
self._before_process_shutdown_funcs: list[PROCESS_LIFESPAN_FUNC] = []
|
|
self._after_shutdown_funcs: list[LIFESPAN_FUNC] = []
|
|
|
|
self._before_process_restart_funcs: list[PROCESS_LIFESPAN_FUNC] = []
|
|
self._after_restart_funcs: list[LIFESPAN_FUNC] = []
|
|
|
|
@staticmethod
|
|
async def run_funcs(funcs: Sequence[LIFESPAN_FUNC | PROCESS_LIFESPAN_FUNC], *args, **kwargs) -> None:
|
|
"""
|
|
并发运行异步函数
|
|
Args:
|
|
funcs ([`Sequence`](https%3A//docs.python.org/3/library/typing.html#typing.Sequence)[[`ASYNC_LIFESPAN_FUNC`](#var-lifespan-func) | [`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func)]): 函数列表
|
|
Returns:
|
|
"""
|
|
tasks = [func(*args, **kwargs) if is_coroutine_callable(func) else async_wrapper(func)(*args, **kwargs) for func in funcs]
|
|
await asyncio.gather(*tasks)
|
|
|
|
def on_before_start(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
|
|
"""
|
|
注册启动时的函数
|
|
Args:
|
|
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
|
|
Returns:
|
|
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
|
|
"""
|
|
self._before_start_funcs.append(func)
|
|
return func
|
|
|
|
def on_after_start(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
|
|
"""
|
|
注册启动时的函数
|
|
Args:
|
|
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
|
|
Returns:
|
|
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
|
|
"""
|
|
self._after_start_funcs.append(func)
|
|
return func
|
|
|
|
def on_before_process_shutdown(self, func: PROCESS_LIFESPAN_FUNC) -> PROCESS_LIFESPAN_FUNC:
|
|
"""
|
|
注册进程停止前的函数
|
|
Args:
|
|
func ([`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func)): 进程生命周期函数
|
|
Returns:
|
|
[`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func): 进程生命周期函数
|
|
"""
|
|
self._before_process_shutdown_funcs.append(func)
|
|
return func
|
|
|
|
def on_after_shutdown(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
|
|
"""
|
|
注册停止后的函数
|
|
Args:
|
|
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
|
|
Returns:
|
|
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
|
|
"""
|
|
self._after_shutdown_funcs.append(func)
|
|
return func
|
|
|
|
def on_before_process_restart(self, func: PROCESS_LIFESPAN_FUNC) -> PROCESS_LIFESPAN_FUNC:
|
|
"""
|
|
注册进程重启前的函数
|
|
Args:
|
|
func ([`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func)): 进程生命周期函数
|
|
Returns:
|
|
[`PROCESS_LIFESPAN_FUNC`](#var-process-lifespan-func): 进程生命周期函数
|
|
"""
|
|
self._before_process_restart_funcs.append(func)
|
|
return func
|
|
|
|
def on_after_restart(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
|
|
"""
|
|
注册重启后的函数
|
|
Args:
|
|
func ([`LIFESPAN_FUNC`](#var-lifespan-func)): 生命周期函数
|
|
Returns:
|
|
[`LIFESPAN_FUNC`](#var-lifespan-func): 生命周期函数
|
|
"""
|
|
self._after_restart_funcs.append(func)
|
|
return func
|
|
|
|
async def before_start(self) -> None:
|
|
"""
|
|
启动前钩子
|
|
"""
|
|
logger.debug("Running before_start functions")
|
|
await self.run_funcs(self._before_start_funcs)
|
|
|
|
async def after_start(self) -> None:
|
|
"""
|
|
启动后钩子
|
|
"""
|
|
logger.debug("Running after_start functions")
|
|
await self.run_funcs(self._after_start_funcs)
|
|
|
|
async def before_process_shutdown(self, *args, **kwargs) -> None:
|
|
"""
|
|
停止前钩子
|
|
"""
|
|
logger.debug("Running before_shutdown functions")
|
|
await self.run_funcs(self._before_process_shutdown_funcs, *args, **kwargs)
|
|
|
|
async def after_shutdown(self) -> None:
|
|
"""
|
|
停止后钩子 未实现
|
|
"""
|
|
logger.debug("Running after_shutdown functions")
|
|
await self.run_funcs(self._after_shutdown_funcs)
|
|
|
|
async def before_process_restart(self, *args, **kwargs) -> None:
|
|
"""
|
|
重启前钩子
|
|
"""
|
|
logger.debug("Running before_restart functions")
|
|
await self.run_funcs(self._before_process_restart_funcs, *args, **kwargs)
|
|
|
|
async def after_restart(self) -> None:
|
|
"""
|
|
重启后钩子 未实现
|
|
"""
|
|
logger.debug("Running after_restart functions")
|
|
await self.run_funcs(self._after_restart_funcs)
|