Feature: 为事件响应器添加更多源码信息 (#2351)

This commit is contained in:
Ju4tCode 2023-09-09 13:46:09 +08:00 committed by GitHub
parent 3601a33f20
commit c4716e3e17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 199 additions and 35 deletions

View File

@ -6,6 +6,7 @@ matchers = MatcherManager()
from .matcher import Matcher as Matcher
from .matcher import current_bot as current_bot
from .matcher import MatcherSource as MatcherSource
from .matcher import current_event as current_event
from .matcher import current_handler as current_handler
from .matcher import current_matcher as current_matcher

View File

@ -1,4 +1,9 @@
import sys
import inspect
import warnings
from pathlib import Path
from types import ModuleType
from dataclasses import dataclass
from contextvars import ContextVar
from typing_extensions import Self
from datetime import datetime, timedelta
@ -20,6 +25,7 @@ from typing import (
from nonebot.log import logger
from nonebot.internal.rule import Rule
from nonebot.utils import classproperty
from nonebot.dependencies import Dependent
from nonebot.internal.permission import User, Permission
from nonebot.internal.adapter import (
@ -74,15 +80,51 @@ current_matcher: ContextVar["Matcher"] = ContextVar("current_matcher")
current_handler: ContextVar[Dependent] = ContextVar("current_handler")
@dataclass
class MatcherSource:
"""Matcher 源代码上下文信息"""
plugin_name: Optional[str] = None
"""事件响应器所在插件名称"""
module_name: Optional[str] = None
"""事件响应器所在插件模块的路径名"""
lineno: Optional[int] = None
"""事件响应器所在行号"""
@property
def plugin(self) -> Optional["Plugin"]:
"""事件响应器所在插件"""
from nonebot.plugin import get_plugin
if self.plugin_name is not None:
return get_plugin(self.plugin_name)
@property
def module(self) -> Optional[ModuleType]:
if self.module_name is not None:
return sys.modules.get(self.module_name)
@property
def file(self) -> Optional[Path]:
if self.module is not None and (file := inspect.getsourcefile(self.module)):
return Path(file).absolute()
class MatcherMeta(type):
if TYPE_CHECKING:
module_name: Optional[str]
type: str
_source: Optional[MatcherSource]
module_name: Optional[str]
def __repr__(self) -> str:
return (
f"{self.__name__}(type={self.type!r}"
+ (f", module={self.module_name}" if self.module_name else "")
+ (
f", lineno={self._source.lineno}"
if self._source and self._source.lineno is not None
else ""
)
+ ")"
)
@ -90,14 +132,7 @@ class MatcherMeta(type):
class Matcher(metaclass=MatcherMeta):
"""事件响应器类"""
plugin: ClassVar[Optional["Plugin"]] = None
"""事件响应器所在插件"""
module: ClassVar[Optional[ModuleType]] = None
"""事件响应器所在插件模块"""
plugin_name: ClassVar[Optional[str]] = None
"""事件响应器所在插件名"""
module_name: ClassVar[Optional[str]] = None
"""事件响应器所在点分割插件模块路径"""
_source: ClassVar[Optional[MatcherSource]] = None
type: ClassVar[str] = ""
"""事件响应器类型"""
@ -142,6 +177,11 @@ class Matcher(metaclass=MatcherMeta):
return (
f"{self.__class__.__name__}(type={self.type!r}"
+ (f", module={self.module_name}" if self.module_name else "")
+ (
f", lineno={self._source.lineno}"
if self._source and self._source.lineno is not None
else ""
)
+ ")"
)
@ -158,6 +198,7 @@ class Matcher(metaclass=MatcherMeta):
*,
plugin: Optional["Plugin"] = None,
module: Optional[ModuleType] = None,
source: Optional[MatcherSource] = None,
expire_time: Optional[Union[datetime, timedelta]] = None,
default_state: Optional[T_State] = None,
default_type_updater: Optional[Union[T_TypeUpdater, Dependent[str]]] = None,
@ -176,22 +217,47 @@ class Matcher(metaclass=MatcherMeta):
temp: 是否为临时事件响应器即触发一次后删除
priority: 响应优先级
block: 是否阻止事件向更低优先级的响应器传播
plugin: 事件响应器所在插件
module: 事件响应器所在模块
default_state: 默认状态 `state`
plugin: **Deprecated.** 事件响应器所在插件
module: **Deprecated.** 事件响应器所在模块
source: 事件响应器源代码上下文信息
expire_time: 事件响应器最终有效时间点过时即被删除
default_state: 默认状态 `state`
default_type_updater: 默认事件类型更新函数
default_permission_updater: 默认会话权限更新函数
返回:
Type[Matcher]: 新的事件响应器类
"""
if plugin is not None:
warnings.warn(
(
"Pass `plugin` context info to create Matcher is deprecated. "
"Use `source` instead."
),
DeprecationWarning,
)
if module is not None:
warnings.warn(
(
"Pass `module` context info to create Matcher is deprecated. "
"Use `source` instead."
),
DeprecationWarning,
)
source = source or (
MatcherSource(
plugin_name=plugin and plugin.name,
module_name=module and module.__name__,
)
if plugin is not None or module is not None
else None
)
NewMatcher = type(
cls.__name__,
(cls,),
{
"plugin": plugin,
"module": module,
"plugin_name": plugin and plugin.name,
"module_name": module and module.__name__,
"_source": source,
"type": type_,
"rule": rule or Rule(),
"permission": permission or Permission(),
@ -253,6 +319,26 @@ class Matcher(metaclass=MatcherMeta):
"""销毁当前的事件响应器"""
matchers[cls.priority].remove(cls)
@classproperty
def plugin(cls) -> Optional["Plugin"]:
"""事件响应器所在插件"""
return cls._source and cls._source.plugin
@classproperty
def module(cls) -> Optional[ModuleType]:
"""事件响应器所在插件模块"""
return cls._source and cls._source.module
@classproperty
def plugin_name(cls) -> Optional[str]:
"""事件响应器所在插件名"""
return cls._source and cls._source.plugin_name
@classproperty
def module_name(cls) -> Optional[str]:
"""事件响应器所在插件模块路径"""
return cls._source and cls._source.module_name
@classmethod
async def check_perm(
cls,
@ -773,8 +859,7 @@ class Matcher(metaclass=MatcherMeta):
temp=True,
priority=0,
block=True,
plugin=self.plugin,
module=self.module,
source=self.__class__._source,
expire_time=bot.config.session_expire_timeout,
default_state=self.state,
default_type_updater=self.__class__._default_type_updater,
@ -794,8 +879,7 @@ class Matcher(metaclass=MatcherMeta):
temp=True,
priority=0,
block=True,
plugin=self.plugin,
module=self.module,
source=self.__class__._source,
expire_time=bot.config.session_expire_timeout,
default_state=self.state,
default_type_updater=self.__class__._default_type_updater,

View File

@ -8,6 +8,7 @@ FrontMatter:
from nonebot.internal.matcher import Matcher as Matcher
from nonebot.internal.matcher import matchers as matchers
from nonebot.internal.matcher import current_bot as current_bot
from nonebot.internal.matcher import MatcherSource as MatcherSource
from nonebot.internal.matcher import current_event as current_event
from nonebot.internal.matcher import MatcherManager as MatcherManager
from nonebot.internal.matcher import MatcherProvider as MatcherProvider

View File

@ -7,14 +7,15 @@ FrontMatter:
import re
import inspect
import warnings
from types import ModuleType
from datetime import datetime, timedelta
from typing import Any, Set, Dict, List, Type, Tuple, Union, Optional
from nonebot.adapters import Event
from nonebot.matcher import Matcher
from nonebot.permission import Permission
from nonebot.dependencies import Dependent
from nonebot.matcher import Matcher, MatcherSource
from nonebot.typing import T_State, T_Handler, T_RuleChecker, T_PermissionChecker
from nonebot.rule import (
Rule,
@ -45,25 +46,39 @@ def store_matcher(matcher: Type[Matcher]) -> None:
plugin_chain[-1].matcher.add(matcher)
def get_matcher_plugin(depth: int = 1) -> Optional[Plugin]:
def get_matcher_plugin(depth: int = 1) -> Optional[Plugin]: # pragma: no cover
"""获取事件响应器定义所在插件。
**Deprecated**, 请使用 {ref}`nonebot.plugin.on.get_matcher_source` 获取信息
参数:
depth: 调用栈深度
"""
# matcher defined when plugin loading
if plugin_chain := _current_plugin_chain.get():
return plugin_chain[-1]
# matcher defined when plugin running
if module := get_matcher_module(depth + 1):
if plugin := get_plugin_by_module_name(module.__name__):
return plugin
warnings.warn(
"`get_matcher_plugin` is deprecated, please use `get_matcher_source` instead",
DeprecationWarning,
)
return (source := get_matcher_source(depth + 1)) and source.plugin
def get_matcher_module(depth: int = 1) -> Optional[ModuleType]:
def get_matcher_module(depth: int = 1) -> Optional[ModuleType]: # pragma: no cover
"""获取事件响应器定义所在模块。
**Deprecated**, 请使用 {ref}`nonebot.plugin.on.get_matcher_source` 获取信息
参数:
depth: 调用栈深度
"""
warnings.warn(
"`get_matcher_module` is deprecated, please use `get_matcher_source` instead",
DeprecationWarning,
)
return (source := get_matcher_source(depth + 1)) and source.module
def get_matcher_source(depth: int = 1) -> Optional[MatcherSource]:
"""获取事件响应器定义所在源码信息。
参数:
depth: 调用栈深度
"""
@ -71,7 +86,22 @@ def get_matcher_module(depth: int = 1) -> Optional[ModuleType]:
if current_frame is None:
return None
frame = inspect.getouterframes(current_frame)[depth + 1].frame
return inspect.getmodule(frame)
module_name = (module := inspect.getmodule(frame)) and module.__name__
plugin: Optional["Plugin"] = None
# matcher defined when plugin loading
if plugin_chain := _current_plugin_chain.get():
plugin = plugin_chain[-1]
# matcher defined when plugin running
elif module_name:
plugin = get_plugin_by_module_name(module_name)
return MatcherSource(
plugin_name=plugin and plugin.name,
module_name=module_name,
lineno=frame.f_lineno,
)
def on(
@ -109,8 +139,7 @@ def on(
priority=priority,
block=block,
handlers=handlers,
plugin=get_matcher_plugin(_depth + 1),
module=get_matcher_module(_depth + 1),
source=get_matcher_source(_depth + 1),
default_state=state,
)
store_matcher(matcher)

View File

@ -4,10 +4,10 @@ from types import ModuleType
from datetime import datetime, timedelta
from nonebot.adapters import Event
from nonebot.matcher import Matcher
from nonebot.permission import Permission
from nonebot.dependencies import Dependent
from nonebot.rule import Rule, ArgumentParser
from nonebot.matcher import Matcher, MatcherSource
from nonebot.typing import T_State, T_Handler, T_RuleChecker, T_PermissionChecker
from .plugin import Plugin
@ -15,6 +15,7 @@ from .plugin import Plugin
def store_matcher(matcher: type[Matcher]) -> None: ...
def get_matcher_plugin(depth: int = ...) -> Plugin | None: ...
def get_matcher_module(depth: int = ...) -> ModuleType | None: ...
def get_matcher_source(depth: int = ...) -> MatcherSource | None: ...
def on(
type: str = "",
rule: Rule | T_RuleChecker | None = ...,

View File

@ -21,6 +21,7 @@ from typing import (
Type,
Tuple,
Union,
Generic,
TypeVar,
Callable,
Optional,
@ -220,6 +221,16 @@ def resolve_dot_notation(
return instance
class classproperty(Generic[T]):
"""类属性装饰器"""
def __init__(self, func: Callable[[Any], T]) -> None:
self.func = func
def __get__(self, instance: Any, owner: Optional[Type[Any]] = None) -> T:
return self.func(type(instance) if owner is None else owner)
class DataclassEncoder(json.JSONEncoder):
"""可以序列化 {ref}`nonebot.adapters.Message`(List[Dataclass]) 的 `JSONEncoder`"""

View File

@ -11,6 +11,7 @@ exclude_lines =
if (typing\.)?TYPE_CHECKING( is True)?:
@(abc\.)?abstractmethod
raise NotImplementedError
warnings\.warn
\.\.\.
pass
if __name__ == .__main__.:

View File

@ -0,0 +1,3 @@
from nonebot import on
matcher = on("message", temp=False, expire_time=None, priority=1, block=True)

View File

@ -1,12 +1,45 @@
import sys
from pathlib import Path
import pytest
from nonebug import App
from nonebot import get_plugin
from nonebot.permission import User
from nonebot.matcher import Matcher, matchers
from utils import FakeMessage, make_fake_event
from nonebot.message import check_and_run_matcher
@pytest.mark.asyncio
async def test_matcher_info(app: App):
from plugins.matcher.matcher_info import matcher
assert issubclass(matcher, Matcher)
assert matcher.type == "message"
assert matcher.priority == 1
assert matcher.temp is False
assert matcher.expire_time is None
assert matcher.block is True
assert matcher._source
assert matcher._source.module_name == "plugins.matcher.matcher_info"
assert matcher.module is sys.modules["plugins.matcher.matcher_info"]
assert matcher.module_name == "plugins.matcher.matcher_info"
assert matcher._source.plugin_name == "matcher_info"
assert matcher.plugin is get_plugin("matcher_info")
assert matcher.plugin_name == "matcher_info"
assert (
matcher._source.file
== (Path(__file__).parent.parent / "plugins/matcher/matcher_info.py").absolute()
)
assert matcher._source.lineno == 3
@pytest.mark.asyncio
async def test_matcher_handle(app: App):
from plugins.matcher.matcher_process import test_handle