diff --git a/docs/api/matcher.md b/docs/api/matcher.md index 5ebb8d3f..5cd6b03f 100644 --- a/docs/api/matcher.md +++ b/docs/api/matcher.md @@ -32,6 +32,21 @@ sidebarDepth: 0 事件响应器类 +### `plugin` + + +* **类型** + + `Optional[Plugin]` + + + +* **说明** + + 事件响应器所在插件 + + + ### `module` @@ -43,7 +58,7 @@ sidebarDepth: 0 * **说明** - 事件响应器所在模块 + 事件响应器所在插件模块 @@ -73,22 +88,7 @@ sidebarDepth: 0 * **说明** - 事件响应器所在模块名 - - - -### `module_prefix` - - -* **类型** - - `Optional[str]` - - - -* **说明** - - 事件响应器所在模块前缀 + 事件响应器所在点分割插件模块路径 @@ -292,7 +292,7 @@ sidebarDepth: 0 -### _classmethod_ `new(type_='', rule=None, permission=None, handlers=None, temp=False, priority=1, block=False, *, module=None, expire_time=None, default_state=None, default_state_factory=None, default_parser=None, default_type_updater=None, default_permission_updater=None)` +### _classmethod_ `new(type_='', rule=None, permission=None, handlers=None, temp=False, priority=1, block=False, *, plugin=None, module=None, expire_time=None, default_state=None, default_state_factory=None, default_parser=None, default_type_updater=None, default_permission_updater=None)` * **说明** @@ -325,7 +325,10 @@ sidebarDepth: 0 * `block: bool`: 是否阻止事件向更低优先级的响应器传播 - * `module: Optional[str]`: 事件响应器所在模块名称 + * `plugin: Optional[Plugin]`: 事件响应器所在插件 + + + * `module: Optional[ModuleType]`: 事件响应器所在模块 * `default_state: Optional[T_State]`: 默认状态 `state` diff --git a/docs/api/plugin.md b/docs/api/plugin.md index c51c93b2..7575692e 100644 --- a/docs/api/plugin.md +++ b/docs/api/plugin.md @@ -50,7 +50,16 @@ sidebarDepth: 0 * **说明**: 插件模块对象 -### _property_ `export` +### `module_name` + + +* **类型**: `str` + + +* **说明**: 点分割模块路径 + + +### `export` * **类型**: `Export` @@ -59,7 +68,7 @@ sidebarDepth: 0 * **说明**: 插件内定义的导出内容 -### _property_ `matcher` +### `matcher` * **类型**: `Set[Type[Matcher]]` @@ -68,7 +77,64 @@ sidebarDepth: 0 * **说明**: 插件内定义的 `Matcher` -## `on(type='', rule=None, permission=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None)` +### `parent_plugin` + + +* **类型**: `Optional[Plugin]` + + +* **说明**: 父插件 + + +### `sub_plugins` + + +* **类型**: `Set[Plugin]` + + +* **说明**: 子插件集合 + + +## `get_plugin(name)` + + +* **说明** + + 获取当前导入的某个插件。 + + + +* **参数** + + + * `name: str`: 插件名,与 `load_plugin` 参数一致。如果为 `load_plugins` 导入的插件,则为文件(夹)名。 + + + +* **返回** + + + * `Optional[Plugin]` + + + +## `get_loaded_plugins()` + + +* **说明** + + 获取当前已导入的所有插件。 + + + +* **返回** + + + * `Set[Plugin]` + + + +## `on(type='', rule=None, permission=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None, _depth=0)` * **说明** @@ -115,7 +181,7 @@ sidebarDepth: 0 -## `on_metaevent(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None)` +## `on_metaevent(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None, _depth=0)` * **说明** @@ -156,7 +222,7 @@ sidebarDepth: 0 -## `on_message(rule=None, permission=None, *, handlers=None, temp=False, priority=1, block=True, state=None, state_factory=None)` +## `on_message(rule=None, permission=None, *, handlers=None, temp=False, priority=1, block=True, state=None, state_factory=None, _depth=0)` * **说明** @@ -200,7 +266,7 @@ sidebarDepth: 0 -## `on_notice(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None)` +## `on_notice(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None, _depth=0)` * **说明** @@ -241,7 +307,7 @@ sidebarDepth: 0 -## `on_request(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None)` +## `on_request(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None, _depth=0)` * **说明** @@ -282,7 +348,7 @@ sidebarDepth: 0 -## `on_startswith(msg, rule=None, ignorecase=False, **kwargs)` +## `on_startswith(msg, rule=None, ignorecase=False, _depth=0, **kwargs)` * **说明** @@ -332,7 +398,7 @@ sidebarDepth: 0 -## `on_endswith(msg, rule=None, ignorecase=False, **kwargs)` +## `on_endswith(msg, rule=None, ignorecase=False, _depth=0, **kwargs)` * **说明** @@ -382,7 +448,7 @@ sidebarDepth: 0 -## `on_keyword(keywords, rule=None, **kwargs)` +## `on_keyword(keywords, rule=None, _depth=0, **kwargs)` * **说明** @@ -429,7 +495,7 @@ sidebarDepth: 0 -## `on_command(cmd, rule=None, aliases=None, **kwargs)` +## `on_command(cmd, rule=None, aliases=None, _depth=0, **kwargs)` * **说明** @@ -481,7 +547,7 @@ sidebarDepth: 0 -## `on_shell_command(cmd, rule=None, aliases=None, parser=None, **kwargs)` +## `on_shell_command(cmd, rule=None, aliases=None, parser=None, _depth=0, **kwargs)` * **说明** @@ -538,7 +604,7 @@ sidebarDepth: 0 -## `on_regex(pattern, flags=0, rule=None, **kwargs)` +## `on_regex(pattern, flags=0, rule=None, _depth=0, **kwargs)` * **说明** @@ -1300,10 +1366,10 @@ sidebarDepth: 0 * **参数** - * `module_path: Set[str]`: 指定插件集合 + * `module_path: Iterable[str]`: 指定插件集合 - * `plugin_dir: Set[str]`: 指定插件路径集合 + * `plugin_dir: Iterable[str]`: 指定插件路径集合 @@ -1345,7 +1411,7 @@ sidebarDepth: 0 * **说明** - 导入指定 toml 文件 `[nonebot.plugins]` 中的 `plugins` 以及 `plugin_dirs` 下多个插件, + 导入指定 toml 文件 `[tool.nonebot]` 中的 `plugins` 以及 `plugin_dirs` 下多个插件, 以 `_` 开头的插件不会被导入! @@ -1372,7 +1438,7 @@ sidebarDepth: 0 * **说明** - 导入 NoneBot 内置插件 + 导入 NoneBot 内置插件, 默认导入 `echo` 插件 @@ -1383,45 +1449,6 @@ sidebarDepth: 0 -## `get_plugin(name)` - - -* **说明** - - 获取当前导入的某个插件。 - - - -* **参数** - - - * `name: str`: 插件名,与 `load_plugin` 参数一致。如果为 `load_plugins` 导入的插件,则为文件(夹)名。 - - - -* **返回** - - - * `Optional[Plugin]` - - - -## `get_loaded_plugins()` - - -* **说明** - - 获取当前已导入的所有插件。 - - - -* **返回** - - - * `Set[Plugin]` - - - ## `require(name)` @@ -1441,7 +1468,14 @@ sidebarDepth: 0 * **返回** - * `Optional[Export]` + * `Export` + + + +* **异常** + + + * `RuntimeError`: 插件无法加载 diff --git a/docs_build/plugin.rst b/docs_build/plugin.rst index 09d2a6d1..1456f15f 100644 --- a/docs_build/plugin.rst +++ b/docs_build/plugin.rst @@ -11,6 +11,21 @@ NoneBot.plugin 模块 :show-inheritance: :special-members: __init__ +.. automodule:: nonebot.plugin.plugin + :members: + :show-inheritance: + :special-members: __init__ + +.. automodule:: nonebot.plugin.on + :members: + :show-inheritance: + :special-members: __init__ + +.. automodule:: nonebot.plugin.load + :members: + :show-inheritance: + :special-members: __init__ + .. automodule:: nonebot.plugin.export :members: :show-inheritance: diff --git a/nonebot/__init__.py b/nonebot/__init__.py index e5af0caf..a9c6d350 100644 --- a/nonebot/__init__.py +++ b/nonebot/__init__.py @@ -37,7 +37,7 @@ from nonebot.adapters import Bot from nonebot.utils import escape_tag from nonebot.config import Env, Config from nonebot.log import logger, default_filter -from nonebot.drivers import Driver, ForwardDriver, ReverseDriver +from nonebot.drivers import Driver, ReverseDriver try: _dist: pkg_resources.Distribution = pkg_resources.get_distribution( diff --git a/nonebot/matcher.py b/nonebot/matcher.py index 291d56cf..b8a874f2 100644 --- a/nonebot/matcher.py +++ b/nonebot/matcher.py @@ -24,6 +24,7 @@ from nonebot.typing import (T_State, T_Handler, T_ArgsParser, T_TypeUpdater, T_StateFactory, T_PermissionUpdater) if TYPE_CHECKING: + from nonebot.plugin import Plugin from nonebot.adapters import Bot, Event, Message, MessageSegment matchers: Dict[int, List[Type["Matcher"]]] = defaultdict(list) @@ -62,28 +63,25 @@ class MatcherMeta(type): class Matcher(metaclass=MatcherMeta): """事件响应器类""" + plugin: Optional["Plugin"] = None + """ + :类型: ``Optional[Plugin]`` + :说明: 事件响应器所在插件 + """ module: Optional[ModuleType] = None """ :类型: ``Optional[ModuleType]`` - :说明: 事件响应器所在模块 + :说明: 事件响应器所在插件模块 """ - plugin_name: Optional[str] = module and getattr(module, "__plugin_name__", - None) + plugin_name: Optional[str] = None """ :类型: ``Optional[str]`` :说明: 事件响应器所在插件名 """ - module_name: Optional[str] = module and getattr(module, "__module_name__", - None) + module_name: Optional[str] = None """ :类型: ``Optional[str]`` - :说明: 事件响应器所在模块名 - """ - module_prefix: Optional[str] = module and getattr(module, - "__module_prefix__", None) - """ - :类型: ``Optional[str]`` - :说明: 事件响应器所在模块前缀 + :说明: 事件响应器所在点分割插件模块路径 """ type: str = "" @@ -179,6 +177,7 @@ class Matcher(metaclass=MatcherMeta): priority: int = 1, block: bool = False, *, + plugin: Optional["Plugin"] = None, module: Optional[ModuleType] = None, expire_time: Optional[datetime] = None, default_state: Optional[T_State] = None, @@ -201,7 +200,8 @@ class Matcher(metaclass=MatcherMeta): * ``temp: bool``: 是否为临时事件响应器,即触发一次后删除 * ``priority: int``: 响应优先级 * ``block: bool``: 是否阻止事件向更低优先级的响应器传播 - * ``module: Optional[str]``: 事件响应器所在模块名称 + * ``plugin: Optional[Plugin]``: 事件响应器所在插件 + * ``module: Optional[ModuleType]``: 事件响应器所在模块 * ``default_state: Optional[T_State]``: 默认状态 ``state`` * ``default_state_factory: Optional[T_StateFactory]``: 默认状态 ``state`` 的工厂函数 * ``expire_time: Optional[datetime]``: 事件响应器最终有效时间点,过时即被删除 @@ -213,14 +213,14 @@ class Matcher(metaclass=MatcherMeta): NewMatcher = type( "Matcher", (Matcher,), { + "plugin": + plugin, "module": module, "plugin_name": - module and getattr(module, "__plugin_name__", None), + plugin and plugin.name, "module_name": - module and getattr(module, "__module_name__", None), - "module_prefix": - module and getattr(module, "__module_prefix__", None), + module and module.__name__, "type": type_, "rule": @@ -626,6 +626,7 @@ class Matcher(metaclass=MatcherMeta): temp=True, priority=0, block=True, + plugin=self.plugin, module=self.module, expire_time=datetime.now() + bot.config.session_expire_timeout, default_state=self.state, @@ -662,6 +663,7 @@ class Matcher(metaclass=MatcherMeta): temp=True, priority=0, block=True, + plugin=self.plugin, module=self.module, expire_time=datetime.now() + bot.config.session_expire_timeout, default_state=self.state, diff --git a/nonebot/plugin/__init__.py b/nonebot/plugin/__init__.py index 719fa246..69a34e63 100644 --- a/nonebot/plugin/__init__.py +++ b/nonebot/plugin/__init__.py @@ -4,1147 +4,37 @@ 为 NoneBot 插件开发提供便携的定义函数。 """ -import re -import json -from types import ModuleType -from dataclasses import dataclass -from collections import defaultdict -from contextvars import Context, copy_context -from typing import (TYPE_CHECKING, Any, Set, Dict, List, Type, Tuple, Union, - Optional) -import tomlkit +from typing import List, Optional +from contextvars import ContextVar -from nonebot.log import logger -from nonebot.handler import Handler -from nonebot.matcher import Matcher -from nonebot.utils import escape_tag -from nonebot.permission import Permission -from nonebot.typing import T_State, T_Handler, T_RuleChecker, T_StateFactory -from nonebot.rule import (Rule, ArgumentParser, regex, command, keyword, - endswith, startswith, shell_command) +_managers: List["PluginManager"] = [] +_current_plugin: ContextVar[Optional["Plugin"]] = ContextVar("_current_plugin", + default=None) -from .export import Export +from .on import on as on +from .manager import PluginManager +from .export import Export as Export from .export import export as export -from .manager import PluginManager, _current_plugin - -if TYPE_CHECKING: - from nonebot.adapters import Bot, Event - -plugins: Dict[str, "Plugin"] = {} -""" -:类型: ``Dict[str, Plugin]`` -:说明: 已加载的插件 -""" -PLUGIN_NAMESPACE = "nonebot.loaded_plugins" - -_plugin_matchers: Dict[str, Set[Type[Matcher]]] = defaultdict(set) - - -@dataclass(eq=False) -class Plugin(object): - """存储插件信息""" - name: str - """ - - **类型**: ``str`` - - **说明**: 插件名称,使用 文件/文件夹 名称作为插件名 - """ - module: ModuleType - """ - - **类型**: ``ModuleType`` - - **说明**: 插件模块对象 - """ - - @property - def export(self) -> Export: - """ - - **类型**: ``Export`` - - **说明**: 插件内定义的导出内容 - """ - return getattr(self.module, "__export__", Export()) - - @property - def matcher(self) -> Set[Type[Matcher]]: - """ - - **类型**: ``Set[Type[Matcher]]`` - - **说明**: 插件内定义的 ``Matcher`` - """ - # return reduce( - # lambda x, y: x | _plugin_matchers[y], - # filter(lambda x: x.startswith(self.name), _plugin_matchers.keys()), - # set()) - return _plugin_matchers.get(self.name, set()) - - -def _store_matcher(matcher: Type[Matcher]): - if matcher.plugin_name: - _plugin_matchers[matcher.plugin_name].add(matcher) - - -def on(type: str = "", - rule: Optional[Union[Rule, T_RuleChecker]] = None, - permission: Optional[Permission] = None, - *, - handlers: Optional[List[Union[T_Handler, Handler]]] = None, - temp: bool = False, - priority: int = 1, - block: bool = False, - state: Optional[T_State] = None, - state_factory: Optional[T_StateFactory] = None) -> Type[Matcher]: - """ - :说明: - - 注册一个基础事件响应器,可自定义类型。 - - :参数: - - * ``type: str``: 事件响应器类型 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - matcher = Matcher.new(type, - Rule() & rule, - permission or Permission(), - temp=temp, - priority=priority, - block=block, - handlers=handlers, - module=_current_plugin.get(), - default_state=state, - default_state_factory=state_factory) - _store_matcher(matcher) - return matcher - - -def on_metaevent( - rule: Optional[Union[Rule, T_RuleChecker]] = None, - *, - handlers: Optional[List[Union[T_Handler, Handler]]] = None, - temp: bool = False, - priority: int = 1, - block: bool = False, - state: Optional[T_State] = None, - state_factory: Optional[T_StateFactory] = None) -> Type[Matcher]: - """ - :说明: - - 注册一个元事件响应器。 - - :参数: - - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - matcher = Matcher.new("meta_event", - Rule() & rule, - Permission(), - temp=temp, - priority=priority, - block=block, - handlers=handlers, - module=_current_plugin.get(), - default_state=state, - default_state_factory=state_factory) - _store_matcher(matcher) - return matcher - - -def on_message(rule: Optional[Union[Rule, T_RuleChecker]] = None, - permission: Optional[Permission] = None, - *, - handlers: Optional[List[Union[T_Handler, Handler]]] = None, - temp: bool = False, - priority: int = 1, - block: bool = True, - state: Optional[T_State] = None, - state_factory: Optional[T_StateFactory] = None) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器。 - - :参数: - - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - matcher = Matcher.new("message", - Rule() & rule, - permission or Permission(), - temp=temp, - priority=priority, - block=block, - handlers=handlers, - module=_current_plugin.get(), - default_state=state, - default_state_factory=state_factory) - _store_matcher(matcher) - return matcher - - -def on_notice(rule: Optional[Union[Rule, T_RuleChecker]] = None, - *, - handlers: Optional[List[Union[T_Handler, Handler]]] = None, - temp: bool = False, - priority: int = 1, - block: bool = False, - state: Optional[T_State] = None, - state_factory: Optional[T_StateFactory] = None) -> Type[Matcher]: - """ - :说明: - - 注册一个通知事件响应器。 - - :参数: - - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - matcher = Matcher.new("notice", - Rule() & rule, - Permission(), - temp=temp, - priority=priority, - block=block, - handlers=handlers, - module=_current_plugin.get(), - default_state=state, - default_state_factory=state_factory) - _store_matcher(matcher) - return matcher - - -def on_request(rule: Optional[Union[Rule, T_RuleChecker]] = None, - *, - handlers: Optional[List[Union[T_Handler, Handler]]] = None, - temp: bool = False, - priority: int = 1, - block: bool = False, - state: Optional[T_State] = None, - state_factory: Optional[T_StateFactory] = None) -> Type[Matcher]: - """ - :说明: - - 注册一个请求事件响应器。 - - :参数: - - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - matcher = Matcher.new("request", - Rule() & rule, - Permission(), - temp=temp, - priority=priority, - block=block, - handlers=handlers, - module=_current_plugin.get(), - default_state=state, - default_state_factory=state_factory) - _store_matcher(matcher) - return matcher - - -def on_startswith(msg: Union[str, Tuple[str, ...]], - rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = None, - ignorecase: bool = False, - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器,并且当消息的**文本部分**以指定内容开头时响应。 - - :参数: - - * ``msg: Union[str, Tuple[str, ...]]``: 指定消息开头内容 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``ignorecase: bool``: 是否忽略大小写 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - return on_message(startswith(msg, ignorecase) & rule, **kwargs) - - -def on_endswith(msg: Union[str, Tuple[str, ...]], - rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = None, - ignorecase: bool = False, - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器,并且当消息的**文本部分**以指定内容结尾时响应。 - - :参数: - - * ``msg: Union[str, Tuple[str, ...]]``: 指定消息结尾内容 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``ignorecase: bool``: 是否忽略大小写 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - return on_message(endswith(msg, ignorecase) & rule, **kwargs) - - -def on_keyword(keywords: Set[str], - rule: Optional[Union[Rule, T_RuleChecker]] = None, - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器,并且当消息纯文本部分包含关键词时响应。 - - :参数: - - * ``keywords: Set[str]``: 关键词列表 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - return on_message(keyword(*keywords) & rule, **kwargs) - - -def on_command(cmd: Union[str, Tuple[str, ...]], - rule: Optional[Union[Rule, T_RuleChecker]] = None, - aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器,并且当消息以指定命令开头时响应。 - - 命令匹配规则参考: `命令形式匹配 `_ - - :参数: - - * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - - async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): - message = event.get_message() - if len(message) < 1: - return - segment = message.pop(0) - segment_text = str(segment).lstrip() - if not segment_text.startswith(state["_prefix"]["raw_command"]): - return - new_message = message.__class__( - segment_text[len(state["_prefix"]["raw_command"]):].lstrip()) - for new_segment in reversed(new_message): - message.insert(0, new_segment) - - handlers = kwargs.pop("handlers", []) - handlers.insert(0, _strip_cmd) - - commands = set([cmd]) | (aliases or set()) - return on_message(command(*commands) & rule, handlers=handlers, **kwargs) - - -def on_shell_command(cmd: Union[str, Tuple[str, ...]], - rule: Optional[Union[Rule, T_RuleChecker]] = None, - aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, - parser: Optional[ArgumentParser] = None, - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器。 - - 与普通的 ``on_command`` 不同的是,在添加 ``parser`` 参数时, 响应器会自动处理消息。 - - 并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]`` 中 - - :参数: - - * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名 - * ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - - async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): - message = event.get_message() - segment = message.pop(0) - new_message = message.__class__( - str(segment) - [len(state["_prefix"]["raw_command"]):].strip()) # type: ignore - for new_segment in reversed(new_message): - message.insert(0, new_segment) - - handlers = kwargs.pop("handlers", []) - handlers.insert(0, _strip_cmd) - - commands = set([cmd]) | (aliases or set()) - return on_message(shell_command(*commands, parser=parser) & rule, - handlers=handlers, - **kwargs) - - -def on_regex(pattern: str, - flags: Union[int, re.RegexFlag] = 0, - rule: Optional[Union[Rule, T_RuleChecker]] = None, - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器,并且当消息匹配正则表达式时响应。 - - 命令匹配规则参考: `正则匹配 `_ - - :参数: - - * ``pattern: str``: 正则表达式 - * ``flags: Union[int, re.RegexFlag]``: 正则匹配标志 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - return on_message(regex(pattern, flags) & rule, **kwargs) - - -class CommandGroup: - """命令组,用于声明一组有相同名称前缀的命令。""" - - def __init__(self, cmd: Union[str, Tuple[str, ...]], **kwargs): - """ - :参数: - - * ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀 - * ``**kwargs``: 其他传递给 ``on_command`` 的参数默认值,参考 `on_command <#on-command-cmd-rule-none-aliases-none-kwargs>`_ - """ - self.basecmd: Tuple[str, ...] = (cmd,) if isinstance(cmd, str) else cmd - """ - - **类型**: ``Tuple[str, ...]`` - - **说明**: 命令前缀 - """ - if "aliases" in kwargs: - del kwargs["aliases"] - self.base_kwargs: Dict[str, Any] = kwargs - """ - - **类型**: ``Dict[str, Any]`` - - **说明**: 其他传递给 ``on_command`` 的参数默认值 - """ - - def command(self, cmd: Union[str, Tuple[str, ...]], - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个新的命令。 - - :参数: - - * ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀 - * ``**kwargs``: 其他传递给 ``on_command`` 的参数,将会覆盖命令组默认值 - - :返回: - - - ``Type[Matcher]`` - """ - sub_cmd = (cmd,) if isinstance(cmd, str) else cmd - cmd = self.basecmd + sub_cmd - - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - return on_command(cmd, **final_kwargs) - - def shell_command(self, cmd: Union[str, Tuple[str, ...]], - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个新的命令。 - - :参数: - - * ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀 - * ``**kwargs``: 其他传递给 ``on_shell_command`` 的参数,将会覆盖命令组默认值 - - :返回: - - - ``Type[Matcher]`` - """ - sub_cmd = (cmd,) if isinstance(cmd, str) else cmd - cmd = self.basecmd + sub_cmd - - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - return on_shell_command(cmd, **final_kwargs) - - -class MatcherGroup: - """事件响应器组合,统一管理。为 ``Matcher`` 创建提供默认属性。""" - - def __init__(self, **kwargs): - """ - :说明: - - 创建一个事件响应器组合,参数为默认值,与 ``on`` 一致 - """ - self.matchers: List[Type[Matcher]] = [] - """ - :类型: ``List[Type[Matcher]]`` - :说明: 组内事件响应器列表 - """ - self.base_kwargs: Dict[str, Any] = kwargs - """ - - **类型**: ``Dict[str, Any]`` - - **说明**: 其他传递给 ``on`` 的参数默认值 - """ - - def on(self, **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个基础事件响应器,可自定义类型。 - - :参数: - - * ``type: str``: 事件响应器类型 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - matcher = on(**final_kwargs) - self.matchers.append(matcher) - return matcher - - def on_metaevent(self, **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个元事件响应器。 - - :参数: - - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - final_kwargs.pop("type", None) - final_kwargs.pop("permission", None) - matcher = on_metaevent(**final_kwargs) - self.matchers.append(matcher) - return matcher - - def on_message(self, **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器。 - - :参数: - - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - final_kwargs.pop("type", None) - matcher = on_message(**final_kwargs) - self.matchers.append(matcher) - return matcher - - def on_notice(self, **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个通知事件响应器。 - - :参数: - - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - final_kwargs.pop("type", None) - matcher = on_notice(**final_kwargs) - self.matchers.append(matcher) - return matcher - - def on_request(self, **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个请求事件响应器。 - - :参数: - - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - final_kwargs.pop("type", None) - matcher = on_request(**final_kwargs) - self.matchers.append(matcher) - return matcher - - def on_startswith(self, msg: Union[str, Tuple[str, ...]], - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器,并且当消息的**文本部分**以指定内容开头时响应。 - - :参数: - - * ``msg: Union[str, Tuple[str, ...]]``: 指定消息开头内容 - * ``ignorecase: bool``: 是否忽略大小写 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - final_kwargs.pop("type", None) - matcher = on_startswith(msg, **final_kwargs) - self.matchers.append(matcher) - return matcher - - def on_endswith(self, msg: Union[str, Tuple[str, ...]], - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器,并且当消息的**文本部分**以指定内容结尾时响应。 - - :参数: - - * ``msg: Union[str, Tuple[str, ...]]``: 指定消息结尾内容 - * ``ignorecase: bool``: 是否忽略大小写 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - final_kwargs.pop("type", None) - matcher = on_endswith(msg, **final_kwargs) - self.matchers.append(matcher) - return matcher - - def on_keyword(self, keywords: Set[str], **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器,并且当消息纯文本部分包含关键词时响应。 - - :参数: - - * ``keywords: Set[str]``: 关键词列表 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - final_kwargs.pop("type", None) - matcher = on_keyword(keywords, **final_kwargs) - self.matchers.append(matcher) - return matcher - - def on_command(self, - cmd: Union[str, Tuple[str, ...]], - aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器,并且当消息以指定命令开头时响应。 - - 命令匹配规则参考: `命令形式匹配 `_ - - :参数: - - * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容 - * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - final_kwargs.pop("type", None) - matcher = on_command(cmd, aliases=aliases, **final_kwargs) - self.matchers.append(matcher) - return matcher - - def on_shell_command(self, - cmd: Union[str, Tuple[str, ...]], - aliases: Optional[Set[Union[str, Tuple[str, - ...]]]] = None, - parser: Optional[ArgumentParser] = None, - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器。 - - 与普通的 ``on_command`` 不同的是,在添加 ``parser`` 参数时, 响应器会自动处理消息。 - - 并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]`` 中 - - :参数: - - * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容 - * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名 - * ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - final_kwargs.pop("type", None) - matcher = on_shell_command(cmd, - aliases=aliases, - parser=parser, - **final_kwargs) - self.matchers.append(matcher) - return matcher - - def on_regex(self, - pattern: str, - flags: Union[int, re.RegexFlag] = 0, - **kwargs) -> Type[Matcher]: - """ - :说明: - - 注册一个消息事件响应器,并且当消息匹配正则表达式时响应。 - - 命令匹配规则参考: `正则匹配 `_ - - :参数: - - * ``pattern: str``: 正则表达式 - * ``flags: Union[int, re.RegexFlag]``: 正则匹配标志 - * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 - * ``permission: Optional[Permission]``: 事件响应权限 - * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 - * ``temp: bool``: 是否为临时事件响应器(仅执行一次) - * ``priority: int``: 事件响应器优先级 - * ``block: bool``: 是否阻止事件向更低优先级传递 - * ``state: Optional[T_State]``: 默认 state - * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 - - :返回: - - - ``Type[Matcher]`` - """ - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - final_kwargs.pop("type", None) - matcher = on_regex(pattern, flags=flags, **final_kwargs) - self.matchers.append(matcher) - return matcher - - -def _load_plugin(manager: PluginManager, plugin_name: str) -> Optional[Plugin]: - if plugin_name.startswith("_"): - return None - - if plugin_name in plugins: - return None - - try: - module = manager.load_plugin(plugin_name) - - plugin = Plugin(plugin_name, module) - plugins[plugin_name] = plugin - logger.opt(colors=True).success( - f'Succeeded to import "{escape_tag(plugin_name)}"') - return plugin - except Exception as e: - logger.opt(colors=True, exception=e).error( - f'Failed to import "{escape_tag(plugin_name)}"' - ) - return None - - -def load_plugin(module_path: str) -> Optional[Plugin]: - """ - :说明: - - 使用 ``PluginManager`` 加载单个插件,可以是本地插件或是通过 ``pip`` 安装的插件。 - - :参数: - - * ``module_path: str``: 插件名称 ``path.to.your.plugin`` - - :返回: - - - ``Optional[Plugin]`` - """ - - context: Context = copy_context() - manager = PluginManager(PLUGIN_NAMESPACE, plugins=[module_path]) - return context.run(_load_plugin, manager, module_path) - - -def load_plugins(*plugin_dir: str) -> Set[Plugin]: - """ - :说明: - - 导入目录下多个插件,以 ``_`` 开头的插件不会被导入! - - :参数: - - - ``*plugin_dir: str``: 插件路径 - - :返回: - - - ``Set[Plugin]`` - """ - loaded_plugins = set() - manager = PluginManager(PLUGIN_NAMESPACE, search_path=plugin_dir) - for plugin_name in manager.list_plugins(): - context: Context = copy_context() - result = context.run(_load_plugin, manager, plugin_name) - if result: - loaded_plugins.add(result) - return loaded_plugins - - -def load_all_plugins(module_path: Set[str], - plugin_dir: Set[str]) -> Set[Plugin]: - """ - :说明: - - 导入指定列表中的插件以及指定目录下多个插件,以 ``_`` 开头的插件不会被导入! - - :参数: - - - ``module_path: Set[str]``: 指定插件集合 - - ``plugin_dir: Set[str]``: 指定插件路径集合 - - :返回: - - - ``Set[Plugin]`` - """ - loaded_plugins = set() - manager = PluginManager(PLUGIN_NAMESPACE, module_path, plugin_dir) - for plugin_name in manager.list_plugins(): - context: Context = copy_context() - result = context.run(_load_plugin, manager, plugin_name) - if result: - loaded_plugins.add(result) - return loaded_plugins - - -def load_from_json(file_path: str, encoding: str = "utf-8") -> Set[Plugin]: - """ - :说明: - - 导入指定 json 文件中的 ``plugins`` 以及 ``plugin_dirs`` 下多个插件,以 ``_`` 开头的插件不会被导入! - - :参数: - - - ``file_path: str``: 指定 json 文件路径 - - ``encoding: str``: 指定 json 文件编码 - - :返回: - - - ``Set[Plugin]`` - """ - with open(file_path, "r", encoding=encoding) as f: - data = json.load(f) - plugins = data.get("plugins") - plugin_dirs = data.get("plugin_dirs") - assert isinstance(plugins, list), "plugins must be a list of plugin name" - assert isinstance(plugin_dirs, - list), "plugin_dirs must be a list of directories" - return load_all_plugins(set(plugins), set(plugin_dirs)) - - -def load_from_toml(file_path: str, encoding: str = "utf-8") -> Set[Plugin]: - """ - :说明: - - 导入指定 toml 文件 ``[nonebot.plugins]`` 中的 ``plugins`` 以及 ``plugin_dirs`` 下多个插件, - 以 ``_`` 开头的插件不会被导入! - - :参数: - - - ``file_path: str``: 指定 toml 文件路径 - - ``encoding: str``: 指定 toml 文件编码 - - :返回: - - - ``Set[Plugin]`` - """ - with open(file_path, "r", encoding=encoding) as f: - data = tomlkit.parse(f.read()) # type: ignore - - nonebot_data = data.get("nonebot", {}).get("plugins") - if not nonebot_data: - raise ValueError("Cannot find '[nonebot.plugins]' in given toml file!") - plugins = nonebot_data.get("plugins", []) - plugin_dirs = nonebot_data.get("plugin_dirs", []) - assert isinstance(plugins, list), "plugins must be a list of plugin name" - assert isinstance(plugin_dirs, - list), "plugin_dirs must be a list of directories" - return load_all_plugins(set(plugins), set(plugin_dirs)) - - -def load_builtin_plugins(name: str = "echo") -> Optional[Plugin]: - """ - :说明: - - 导入 NoneBot 内置插件 - - :返回: - - - ``Plugin`` - """ - return load_plugin(f"nonebot.plugins.{name}") - - -def get_plugin(name: str) -> Optional[Plugin]: - """ - :说明: - - 获取当前导入的某个插件。 - - :参数: - - * ``name: str``: 插件名,与 ``load_plugin`` 参数一致。如果为 ``load_plugins`` 导入的插件,则为文件(夹)名。 - - :返回: - - - ``Optional[Plugin]`` - """ - return plugins.get(name) - - -def get_loaded_plugins() -> Set[Plugin]: - """ - :说明: - - 获取当前已导入的所有插件。 - - :返回: - - - ``Set[Plugin]`` - """ - return set(plugins.values()) - - -def require(name: str) -> Optional[Export]: - """ - :说明: - - 获取一个插件的导出内容 - - :参数: - - * ``name: str``: 插件名,与 ``load_plugin`` 参数一致。如果为 ``load_plugins`` 导入的插件,则为文件(夹)名。 - - :返回: - - - ``Optional[Export]`` - """ - plugin = get_plugin(name) or load_plugin(name) - return plugin.export if plugin else None +from .load import require as require +from .on import on_regex as on_regex +from .plugin import Plugin as Plugin +from .on import on_notice as on_notice +from .on import on_command as on_command +from .on import on_keyword as on_keyword +from .on import on_message as on_message +from .on import on_request as on_request +from .on import on_endswith as on_endswith +from .load import load_plugin as load_plugin +from .on import CommandGroup as CommandGroup +from .on import MatcherGroup as MatcherGroup +from .on import on_metaevent as on_metaevent +from .plugin import get_plugin as get_plugin +from .load import load_plugins as load_plugins +from .on import on_startswith as on_startswith +from .load import load_from_json as load_from_json +from .load import load_from_toml as load_from_toml +from .on import on_shell_command as on_shell_command +from .load import load_all_plugins as load_all_plugins +from .plugin import get_loaded_plugins as get_loaded_plugins +from .load import load_builtin_plugins as load_builtin_plugins diff --git a/nonebot/plugin/export.py b/nonebot/plugin/export.py index 20ad4d72..9c7c3f57 100644 --- a/nonebot/plugin/export.py +++ b/nonebot/plugin/export.py @@ -1,6 +1,4 @@ -from contextvars import ContextVar - -_export: ContextVar["Export"] = ContextVar("_export") +from . import _current_plugin class Export(dict): @@ -57,4 +55,7 @@ def export() -> Export: - ``Export`` """ - return _export.get() + plugin = _current_plugin.get() + if not plugin: + raise RuntimeError("Export outside of the plugin!") + return plugin.export diff --git a/nonebot/plugin/load.py b/nonebot/plugin/load.py new file mode 100644 index 00000000..5e1002dc --- /dev/null +++ b/nonebot/plugin/load.py @@ -0,0 +1,161 @@ +import json +from typing import Set, Iterable, Optional + +import tomlkit + +from . import _managers +from .export import Export +from .manager import PluginManager +from .plugin import Plugin, get_plugin + + +def load_plugin(module_path: str) -> Optional[Plugin]: + """ + :说明: + + 使用 ``PluginManager`` 加载单个插件,可以是本地插件或是通过 ``pip`` 安装的插件。 + + :参数: + + * ``module_path: str``: 插件名称 ``path.to.your.plugin`` + + :返回: + + - ``Optional[Plugin]`` + """ + + manager = PluginManager([module_path]) + _managers.append(manager) + return manager.load_plugin(module_path) + + +def load_plugins(*plugin_dir: str) -> Set[Plugin]: + """ + :说明: + + 导入目录下多个插件,以 ``_`` 开头的插件不会被导入! + + :参数: + + - ``*plugin_dir: str``: 插件路径 + + :返回: + + - ``Set[Plugin]`` + """ + manager = PluginManager(search_path=plugin_dir) + _managers.append(manager) + return manager.load_all_plugins() + + +def load_all_plugins(module_path: Iterable[str], + plugin_dir: Iterable[str]) -> Set[Plugin]: + """ + :说明: + + 导入指定列表中的插件以及指定目录下多个插件,以 ``_`` 开头的插件不会被导入! + + :参数: + + - ``module_path: Iterable[str]``: 指定插件集合 + - ``plugin_dir: Iterable[str]``: 指定插件路径集合 + + :返回: + + - ``Set[Plugin]`` + """ + manager = PluginManager(module_path, plugin_dir) + _managers.append(manager) + return manager.load_all_plugins() + + +def load_from_json(file_path: str, encoding: str = "utf-8") -> Set[Plugin]: + """ + :说明: + + 导入指定 json 文件中的 ``plugins`` 以及 ``plugin_dirs`` 下多个插件,以 ``_`` 开头的插件不会被导入! + + :参数: + + - ``file_path: str``: 指定 json 文件路径 + - ``encoding: str``: 指定 json 文件编码 + + :返回: + + - ``Set[Plugin]`` + """ + with open(file_path, "r", encoding=encoding) as f: + data = json.load(f) + plugins = data.get("plugins") + plugin_dirs = data.get("plugin_dirs") + assert isinstance(plugins, list), "plugins must be a list of plugin name" + assert isinstance(plugin_dirs, + list), "plugin_dirs must be a list of directories" + return load_all_plugins(set(plugins), set(plugin_dirs)) + + +def load_from_toml(file_path: str, encoding: str = "utf-8") -> Set[Plugin]: + """ + :说明: + + 导入指定 toml 文件 ``[tool.nonebot]`` 中的 ``plugins`` 以及 ``plugin_dirs`` 下多个插件, + 以 ``_`` 开头的插件不会被导入! + + :参数: + + - ``file_path: str``: 指定 toml 文件路径 + - ``encoding: str``: 指定 toml 文件编码 + + :返回: + + - ``Set[Plugin]`` + """ + with open(file_path, "r", encoding=encoding) as f: + data = tomlkit.parse(f.read()) # type: ignore + + nonebot_data = data.get("tool", {}).get("nonebot") or data.get( + "nonebot", {}).get("plugins") + if not nonebot_data: + raise ValueError("Cannot find '[tool.nonebot]' in given toml file!") + plugins = nonebot_data.get("plugins", []) + plugin_dirs = nonebot_data.get("plugin_dirs", []) + assert isinstance(plugins, list), "plugins must be a list of plugin name" + assert isinstance(plugin_dirs, + list), "plugin_dirs must be a list of directories" + return load_all_plugins(plugins, plugin_dirs) + + +def load_builtin_plugins(name: str = "echo") -> Optional[Plugin]: + """ + :说明: + + 导入 NoneBot 内置插件, 默认导入 ``echo`` 插件 + + :返回: + + - ``Plugin`` + """ + return load_plugin(f"nonebot.plugins.{name}") + + +def require(name: str) -> Export: + """ + :说明: + + 获取一个插件的导出内容 + + :参数: + + * ``name: str``: 插件名,与 ``load_plugin`` 参数一致。如果为 ``load_plugins`` 导入的插件,则为文件(夹)名。 + + :返回: + + - ``Export`` + + :异常: + - ``RuntimeError``: 插件无法加载 + """ + plugin = get_plugin(name) or load_plugin(name) + if not plugin: + raise RuntimeError(f"Cannot load plugin \"{name}\"!") + return plugin.export diff --git a/nonebot/plugin/manager.py b/nonebot/plugin/manager.py index d7485c72..e98495d0 100644 --- a/nonebot/plugin/manager.py +++ b/nonebot/plugin/manager.py @@ -1,201 +1,103 @@ import sys -import uuid import pkgutil import importlib -from hashlib import md5 from pathlib import Path +from itertools import chain from types import ModuleType -from collections import Counter -from contextvars import ContextVar from importlib.abc import MetaPathFinder from importlib.machinery import PathFinder, SourceFileLoader -from typing import Set, List, Union, Iterable, Optional, Sequence +from typing import Set, Dict, List, Union, Iterable, Optional, Sequence -from .export import Export, _export - -_current_plugin: ContextVar[Optional[ModuleType]] = ContextVar( - "_current_plugin", default=None) - -_internal_space = ModuleType(__name__ + "._internal") -_internal_space.__path__ = [] # type: ignore -sys.modules[_internal_space.__name__] = _internal_space - -_manager_stack: List["PluginManager"] = [] - - -class _NamespaceModule(ModuleType): - """Simple namespace module to store plugins.""" - - @property - def __path__(self): - return [] - - def __getattr__(self, name: str): - try: - return super().__getattr__(name) # type: ignore - except AttributeError: - if name.startswith("__"): - raise - raise RuntimeError("Plugin manager not activated!") - - -class _InternalModule(ModuleType): - """Internal module for each plugin manager.""" - - def __init__(self, prefix: str, plugin_manager: "PluginManager"): - super().__init__(f"{prefix}.{plugin_manager.internal_id}") - self.__plugin_manager__ = plugin_manager - - @property - def __path__(self) -> List[str]: - return list(self.__plugin_manager__.search_path) +from nonebot.log import logger +from nonebot.utils import escape_tag +from .plugin import Plugin, _new_plugin +from . import _managers, _current_plugin class PluginManager: - def __init__(self, - namespace: str, - plugins: Optional[Iterable[str]] = None, - search_path: Optional[Iterable[str]] = None, - *, - id: Optional[str] = None): - self.namespace: str = namespace - self.namespace_module: ModuleType = self._setup_namespace(namespace) - - self.id: str = id or str(uuid.uuid4()) - self.internal_id: str = md5( - ((self.namespace or "") + self.id).encode()).hexdigest() - self.internal_module = self._setup_internal_module(self.internal_id) + def __init__( + self, + plugins: Optional[Iterable[str]] = None, + search_path: Optional[Iterable[str]] = None, + ): # simple plugin not in search path self.plugins: Set[str] = set(plugins or []) self.search_path: Set[str] = set(search_path or []) - # ensure can be loaded + # cache plugins + self.searched_plugins: Dict[str, Path] = {} self.list_plugins() - def _setup_namespace(self, namespace: str) -> ModuleType: - try: - module = importlib.import_module(namespace) - except ImportError: - module = _NamespaceModule(namespace) - if "." in namespace: - parent = importlib.import_module(namespace.rsplit(".", 1)[0]) - setattr(parent, namespace.rsplit(".", 1)[1], module) + def _path_to_module_name(self, path: Path) -> str: + rel_path = path.resolve().relative_to(Path(".").resolve()) + if rel_path.stem == "__init__": + return ".".join(rel_path.parts[:-1]) + else: + return ".".join(rel_path.parts[:-1] + (rel_path.stem,)) - sys.modules[namespace] = module - return module + def _previous_plugins(self) -> List[str]: + _pre_managers: List[PluginManager] + if self in _managers: + _pre_managers = _managers[:_managers.index(self)] + else: + _pre_managers = _managers[:] - def _setup_internal_module(self, internal_id: str) -> ModuleType: - if hasattr(_internal_space, internal_id): - raise RuntimeError("Plugin manager already exists!") - - index = 2 - prefix: str = _internal_space.__name__ - while True: - try: - frame = sys._getframe(index) - except ValueError: - break - # check if is called in plugin - if "__plugin_name__" not in frame.f_globals: - index += 1 - continue - prefix = frame.f_globals.get("__name__", _internal_space.__name__) - break - - if not prefix.startswith(_internal_space.__name__): - prefix = _internal_space.__name__ - module = _InternalModule(prefix, self) - sys.modules[module.__name__] = module # type: ignore - setattr(_internal_space, internal_id, module) - return module - - def __enter__(self): - if self in _manager_stack: - raise RuntimeError("Plugin manager already activated!") - _manager_stack.append(self) - return self - - def __exit__(self, exc_type, exc_value, traceback): - try: - _manager_stack.pop() - except IndexError: - pass - - def search_plugins(self) -> List[str]: return [ - module_info.name - for module_info in pkgutil.iter_modules(self.search_path) + *chain.from_iterable( + [*manager.plugins, *manager.searched_plugins.keys()] + for manager in _pre_managers) ] def list_plugins(self) -> Set[str]: - _pre_managers: List[PluginManager] - if self in _manager_stack: - _pre_managers = _manager_stack[:_manager_stack.index(self)] - else: - _pre_managers = _manager_stack[:] + # get all previous ready to load plugins + previous_plugins = self._previous_plugins() + searched_plugins: Dict[str, Path] = {} - _search_path: Set[str] = set() - for manager in _pre_managers: - _search_path |= manager.search_path - if _search_path & self.search_path: - raise RuntimeError("Duplicate plugin search path!") - - _search_plugins = self.search_plugins() - c = Counter([*_search_plugins, *self.plugins]) - conflict = [name for name, num in c.items() if num > 1] - if conflict: - raise RuntimeError( - f"More than one plugin named {' / '.join(conflict)}!") - return set(_search_plugins) | self.plugins - - def load_plugin(self, name) -> ModuleType: - if name in self.plugins: - with self: - return importlib.import_module(name) - - if "." in name: - raise ValueError("Plugin name cannot contain '.'") - - with self: - return importlib.import_module(f"{self.namespace}.{name}") - - def load_all_plugins(self) -> List[ModuleType]: - return [self.load_plugin(name) for name in self.list_plugins()] - - def _rewrite_module_name(self, module_name: str) -> Optional[str]: - prefix = f"{self.internal_module.__name__}." - raw_name = module_name[len(self.namespace) + - 1:] if module_name.startswith(self.namespace + - ".") else None - # dir plugins - if raw_name and raw_name.split(".")[0] in self.search_plugins(): - return f"{prefix}{raw_name}" - # third party plugin or renamed dir plugins - elif module_name in self.plugins or module_name.startswith(prefix): - return module_name - # dir plugins - elif module_name in self.search_plugins(): - return f"{prefix}{module_name}" - return None - - def _check_absolute_import(self, origin_path: str) -> Optional[str]: - if not self.search_path: - return - paths = set([ - *self.search_path, - *(str(Path(path).resolve()) for path in self.search_path) - ]) - for path in paths: - try: - rel_path = Path(origin_path).relative_to(path) - if rel_path.stem == "__init__": - return f"{self.internal_module.__name__}." + ".".join( - rel_path.parts[:-1]) - return f"{self.internal_module.__name__}." + ".".join( - rel_path.parts[:-1] + (rel_path.stem,)) - except ValueError: + for module_info in pkgutil.iter_modules(self.search_path): + if module_info.name.startswith("_"): continue + if module_info.name in searched_plugins.keys( + ) or module_info.name in previous_plugins: + raise RuntimeError( + f"Plugin already exists: {module_info.name}! Check your plugin name" + ) + module_spec = module_info.module_finder.find_spec( + module_info.name, None) + if not module_spec: + continue + module_path = module_spec.origin + if not module_path: + continue + searched_plugins[module_info.name] = Path(module_path).resolve() + + self.searched_plugins = searched_plugins + + return self.plugins | set(self.searched_plugins.keys()) + + def load_plugin(self, name) -> Optional[Plugin]: + try: + if name in self.plugins: + module = importlib.import_module(name) + elif name not in self.searched_plugins: + raise RuntimeError( + f"Plugin not found: {name}! Check your plugin name") + else: + module = importlib.import_module( + self._path_to_module_name(self.searched_plugins[name])) + + logger.opt(colors=True).success( + f'Succeeded to import "{escape_tag(name)}"') + return getattr(module, "__plugin__", None) + except Exception as e: + logger.opt(colors=True, exception=e).error( + f'Failed to import "{escape_tag(name)}"' + ) + + def load_all_plugins(self) -> Set[Plugin]: + return set( + filter(None, + (self.load_plugin(name) for name in self.list_plugins()))) class PluginFinder(MetaPathFinder): @@ -204,28 +106,27 @@ class PluginFinder(MetaPathFinder): fullname: str, path: Optional[Sequence[Union[bytes, str]]], target: Optional[ModuleType] = None): - if _manager_stack: + if _managers: index = -1 - origin_spec = PathFinder.find_spec(fullname, path, target) - while -index <= len(_manager_stack): - manager = _manager_stack[index] + module_spec = PathFinder.find_spec(fullname, path, target) + if not module_spec: + return + module_origin = module_spec.origin + if not module_origin: + return + module_path = Path(module_origin).resolve() - rel_name = None - if origin_spec and origin_spec.origin: - rel_name = manager._check_absolute_import( - origin_spec.origin) + while -index <= len(_managers): + manager = _managers[index] + + if fullname in manager.plugins or module_path in manager.searched_plugins.values( + ): + module_spec.loader = PluginLoader(manager, fullname, + module_origin) + return module_spec - newname = manager._rewrite_module_name(rel_name or fullname) - if newname: - spec = PathFinder.find_spec( - newname, path or [*manager.search_path, *sys.path], - target) - if spec: - spec.loader = PluginLoader( # type: ignore - manager, newname, spec.origin) - return spec index -= 1 - return None + return class PluginLoader(SourceFileLoader): @@ -246,20 +147,15 @@ class PluginLoader(SourceFileLoader): if self.loaded: return - export = Export() - _export_token = _export.set(export) + plugin = _new_plugin(self.name, module) + parent_plugin = _current_plugin.get() + if parent_plugin: + plugin.parent_plugin = parent_plugin + parent_plugin.sub_plugins.add(plugin) - prefix = self.manager.internal_module.__name__ - is_dir_plugin = self.name.startswith(prefix + ".") - module_name = self.name[len(prefix) + - 1:] if is_dir_plugin else self.name - _plugin_token = _current_plugin.set(module) + _plugin_token = _current_plugin.set(plugin) - setattr(module, "__export__", export) - setattr(module, "__plugin_name__", - module_name.split(".")[0] if is_dir_plugin else module_name) - setattr(module, "__module_name__", module_name) - setattr(module, "__module_prefix__", prefix if is_dir_plugin else "") + setattr(module, "__plugin__", plugin) # try: # super().exec_module(module) @@ -270,7 +166,6 @@ class PluginLoader(SourceFileLoader): super().exec_module(module) _current_plugin.reset(_plugin_token) - _export.reset(_export_token) return diff --git a/nonebot/plugin/on.py b/nonebot/plugin/on.py new file mode 100644 index 00000000..e4da8345 --- /dev/null +++ b/nonebot/plugin/on.py @@ -0,0 +1,918 @@ +import re +import sys +import inspect +from types import ModuleType +from typing import (TYPE_CHECKING, Any, Set, Dict, List, Type, Tuple, Union, + Optional) + +from nonebot.handler import Handler +from nonebot.matcher import Matcher +from .manager import _current_plugin +from nonebot.permission import Permission +from nonebot.typing import T_State, T_Handler, T_RuleChecker, T_StateFactory +from nonebot.rule import (Rule, ArgumentParser, regex, command, keyword, + endswith, startswith, shell_command) + +if TYPE_CHECKING: + from nonebot.adapters import Bot, Event + + +def _store_matcher(matcher: Type[Matcher]) -> None: + plugin = _current_plugin.get() + # only store the matcher defined in the plugin + if plugin: + plugin.matcher.add(matcher) + + +def _get_matcher_module(depth: int = 1) -> Optional[ModuleType]: + current_frame = inspect.currentframe() + if current_frame is None: + return None + frame = inspect.getouterframes(current_frame)[depth + 1].frame + module_name = frame.f_globals["__name__"] + return sys.modules.get(module_name) + + +def on(type: str = "", + rule: Optional[Union[Rule, T_RuleChecker]] = None, + permission: Optional[Permission] = None, + *, + handlers: Optional[List[Union[T_Handler, Handler]]] = None, + temp: bool = False, + priority: int = 1, + block: bool = False, + state: Optional[T_State] = None, + state_factory: Optional[T_StateFactory] = None, + _depth: int = 0) -> Type[Matcher]: + """ + :说明: + + 注册一个基础事件响应器,可自定义类型。 + + :参数: + + * ``type: str``: 事件响应器类型 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + matcher = Matcher.new(type, + Rule() & rule, + permission or Permission(), + temp=temp, + priority=priority, + block=block, + handlers=handlers, + plugin=_current_plugin.get(), + module=_get_matcher_module(_depth + 1), + default_state=state, + default_state_factory=state_factory) + _store_matcher(matcher) + return matcher + + +def on_metaevent(rule: Optional[Union[Rule, T_RuleChecker]] = None, + *, + handlers: Optional[List[Union[T_Handler, Handler]]] = None, + temp: bool = False, + priority: int = 1, + block: bool = False, + state: Optional[T_State] = None, + state_factory: Optional[T_StateFactory] = None, + _depth: int = 0) -> Type[Matcher]: + """ + :说明: + + 注册一个元事件响应器。 + + :参数: + + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + matcher = Matcher.new("meta_event", + Rule() & rule, + Permission(), + temp=temp, + priority=priority, + block=block, + handlers=handlers, + plugin=_current_plugin.get(), + module=_get_matcher_module(_depth + 1), + default_state=state, + default_state_factory=state_factory) + _store_matcher(matcher) + return matcher + + +def on_message(rule: Optional[Union[Rule, T_RuleChecker]] = None, + permission: Optional[Permission] = None, + *, + handlers: Optional[List[Union[T_Handler, Handler]]] = None, + temp: bool = False, + priority: int = 1, + block: bool = True, + state: Optional[T_State] = None, + state_factory: Optional[T_StateFactory] = None, + _depth: int = 0) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器。 + + :参数: + + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + matcher = Matcher.new("message", + Rule() & rule, + permission or Permission(), + temp=temp, + priority=priority, + block=block, + handlers=handlers, + plugin=_current_plugin.get(), + module=_get_matcher_module(_depth + 1), + default_state=state, + default_state_factory=state_factory) + _store_matcher(matcher) + return matcher + + +def on_notice(rule: Optional[Union[Rule, T_RuleChecker]] = None, + *, + handlers: Optional[List[Union[T_Handler, Handler]]] = None, + temp: bool = False, + priority: int = 1, + block: bool = False, + state: Optional[T_State] = None, + state_factory: Optional[T_StateFactory] = None, + _depth: int = 0) -> Type[Matcher]: + """ + :说明: + + 注册一个通知事件响应器。 + + :参数: + + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + matcher = Matcher.new("notice", + Rule() & rule, + Permission(), + temp=temp, + priority=priority, + block=block, + handlers=handlers, + plugin=_current_plugin.get(), + module=_get_matcher_module(_depth + 1), + default_state=state, + default_state_factory=state_factory) + _store_matcher(matcher) + return matcher + + +def on_request(rule: Optional[Union[Rule, T_RuleChecker]] = None, + *, + handlers: Optional[List[Union[T_Handler, Handler]]] = None, + temp: bool = False, + priority: int = 1, + block: bool = False, + state: Optional[T_State] = None, + state_factory: Optional[T_StateFactory] = None, + _depth: int = 0) -> Type[Matcher]: + """ + :说明: + + 注册一个请求事件响应器。 + + :参数: + + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + matcher = Matcher.new("request", + Rule() & rule, + Permission(), + temp=temp, + priority=priority, + block=block, + handlers=handlers, + plugin=_current_plugin.get(), + module=_get_matcher_module(_depth + 1), + default_state=state, + default_state_factory=state_factory) + _store_matcher(matcher) + return matcher + + +def on_startswith(msg: Union[str, Tuple[str, ...]], + rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = None, + ignorecase: bool = False, + _depth: int = 0, + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器,并且当消息的**文本部分**以指定内容开头时响应。 + + :参数: + + * ``msg: Union[str, Tuple[str, ...]]``: 指定消息开头内容 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``ignorecase: bool``: 是否忽略大小写 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + return on_message(startswith(msg, ignorecase) & rule, + **kwargs, + _depth=_depth + 1) + + +def on_endswith(msg: Union[str, Tuple[str, ...]], + rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = None, + ignorecase: bool = False, + _depth: int = 0, + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器,并且当消息的**文本部分**以指定内容结尾时响应。 + + :参数: + + * ``msg: Union[str, Tuple[str, ...]]``: 指定消息结尾内容 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``ignorecase: bool``: 是否忽略大小写 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + return on_message(endswith(msg, ignorecase) & rule, + **kwargs, + _depth=_depth + 1) + + +def on_keyword(keywords: Set[str], + rule: Optional[Union[Rule, T_RuleChecker]] = None, + _depth: int = 0, + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器,并且当消息纯文本部分包含关键词时响应。 + + :参数: + + * ``keywords: Set[str]``: 关键词列表 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + return on_message(keyword(*keywords) & rule, **kwargs, _depth=_depth + 1) + + +def on_command(cmd: Union[str, Tuple[str, ...]], + rule: Optional[Union[Rule, T_RuleChecker]] = None, + aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, + _depth: int = 0, + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器,并且当消息以指定命令开头时响应。 + + 命令匹配规则参考: `命令形式匹配 `_ + + :参数: + + * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + + async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): + message = event.get_message() + if len(message) < 1: + return + segment = message.pop(0) + segment_text = str(segment).lstrip() + if not segment_text.startswith(state["_prefix"]["raw_command"]): + return + new_message = message.__class__( + segment_text[len(state["_prefix"]["raw_command"]):].lstrip()) + for new_segment in reversed(new_message): + message.insert(0, new_segment) + + handlers = kwargs.pop("handlers", []) + handlers.insert(0, _strip_cmd) + + commands = set([cmd]) | (aliases or set()) + return on_message(command(*commands) & rule, + handlers=handlers, + **kwargs, + _depth=_depth + 1) + + +def on_shell_command(cmd: Union[str, Tuple[str, ...]], + rule: Optional[Union[Rule, T_RuleChecker]] = None, + aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, + parser: Optional[ArgumentParser] = None, + _depth: int = 0, + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器。 + + 与普通的 ``on_command`` 不同的是,在添加 ``parser`` 参数时, 响应器会自动处理消息。 + + 并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]`` 中 + + :参数: + + * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名 + * ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + + async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): + message = event.get_message() + segment = message.pop(0) + new_message = message.__class__( + str(segment) + [len(state["_prefix"]["raw_command"]):].strip()) # type: ignore + for new_segment in reversed(new_message): + message.insert(0, new_segment) + + handlers = kwargs.pop("handlers", []) + handlers.insert(0, _strip_cmd) + + commands = set([cmd]) | (aliases or set()) + return on_message(shell_command(*commands, parser=parser) & rule, + handlers=handlers, + **kwargs, + _depth=_depth + 1) + + +def on_regex(pattern: str, + flags: Union[int, re.RegexFlag] = 0, + rule: Optional[Union[Rule, T_RuleChecker]] = None, + _depth: int = 0, + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器,并且当消息匹配正则表达式时响应。 + + 命令匹配规则参考: `正则匹配 `_ + + :参数: + + * ``pattern: str``: 正则表达式 + * ``flags: Union[int, re.RegexFlag]``: 正则匹配标志 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + return on_message(regex(pattern, flags) & rule, **kwargs, _depth=_depth + 1) + + +class CommandGroup: + """命令组,用于声明一组有相同名称前缀的命令。""" + + def __init__(self, cmd: Union[str, Tuple[str, ...]], **kwargs): + """ + :参数: + + * ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀 + * ``**kwargs``: 其他传递给 ``on_command`` 的参数默认值,参考 `on_command <#on-command-cmd-rule-none-aliases-none-kwargs>`_ + """ + self.basecmd: Tuple[str, ...] = (cmd,) if isinstance(cmd, str) else cmd + """ + - **类型**: ``Tuple[str, ...]`` + - **说明**: 命令前缀 + """ + if "aliases" in kwargs: + del kwargs["aliases"] + self.base_kwargs: Dict[str, Any] = kwargs + """ + - **类型**: ``Dict[str, Any]`` + - **说明**: 其他传递给 ``on_command`` 的参数默认值 + """ + + def command(self, cmd: Union[str, Tuple[str, ...]], + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个新的命令。 + + :参数: + + * ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀 + * ``**kwargs``: 其他传递给 ``on_command`` 的参数,将会覆盖命令组默认值 + + :返回: + + - ``Type[Matcher]`` + """ + sub_cmd = (cmd,) if isinstance(cmd, str) else cmd + cmd = self.basecmd + sub_cmd + + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + return on_command(cmd, **final_kwargs, _depth=1) + + def shell_command(self, cmd: Union[str, Tuple[str, ...]], + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个新的命令。 + + :参数: + + * ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀 + * ``**kwargs``: 其他传递给 ``on_shell_command`` 的参数,将会覆盖命令组默认值 + + :返回: + + - ``Type[Matcher]`` + """ + sub_cmd = (cmd,) if isinstance(cmd, str) else cmd + cmd = self.basecmd + sub_cmd + + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + return on_shell_command(cmd, **final_kwargs, _depth=1) + + +class MatcherGroup: + """事件响应器组合,统一管理。为 ``Matcher`` 创建提供默认属性。""" + + def __init__(self, **kwargs): + """ + :说明: + + 创建一个事件响应器组合,参数为默认值,与 ``on`` 一致 + """ + self.matchers: List[Type[Matcher]] = [] + """ + :类型: ``List[Type[Matcher]]`` + :说明: 组内事件响应器列表 + """ + self.base_kwargs: Dict[str, Any] = kwargs + """ + - **类型**: ``Dict[str, Any]`` + - **说明**: 其他传递给 ``on`` 的参数默认值 + """ + + def on(self, **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个基础事件响应器,可自定义类型。 + + :参数: + + * ``type: str``: 事件响应器类型 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + matcher = on(**final_kwargs, _depth=1) + self.matchers.append(matcher) + return matcher + + def on_metaevent(self, **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个元事件响应器。 + + :参数: + + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + final_kwargs.pop("type", None) + final_kwargs.pop("permission", None) + matcher = on_metaevent(**final_kwargs, _depth=1) + self.matchers.append(matcher) + return matcher + + def on_message(self, **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器。 + + :参数: + + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + final_kwargs.pop("type", None) + matcher = on_message(**final_kwargs, _depth=1) + self.matchers.append(matcher) + return matcher + + def on_notice(self, **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个通知事件响应器。 + + :参数: + + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + final_kwargs.pop("type", None) + matcher = on_notice(**final_kwargs, _depth=1) + self.matchers.append(matcher) + return matcher + + def on_request(self, **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个请求事件响应器。 + + :参数: + + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + final_kwargs.pop("type", None) + matcher = on_request(**final_kwargs, _depth=1) + self.matchers.append(matcher) + return matcher + + def on_startswith(self, msg: Union[str, Tuple[str, ...]], + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器,并且当消息的**文本部分**以指定内容开头时响应。 + + :参数: + + * ``msg: Union[str, Tuple[str, ...]]``: 指定消息开头内容 + * ``ignorecase: bool``: 是否忽略大小写 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + final_kwargs.pop("type", None) + matcher = on_startswith(msg, **final_kwargs, _depth=1) + self.matchers.append(matcher) + return matcher + + def on_endswith(self, msg: Union[str, Tuple[str, ...]], + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器,并且当消息的**文本部分**以指定内容结尾时响应。 + + :参数: + + * ``msg: Union[str, Tuple[str, ...]]``: 指定消息结尾内容 + * ``ignorecase: bool``: 是否忽略大小写 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + final_kwargs.pop("type", None) + matcher = on_endswith(msg, **final_kwargs, _depth=1) + self.matchers.append(matcher) + return matcher + + def on_keyword(self, keywords: Set[str], **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器,并且当消息纯文本部分包含关键词时响应。 + + :参数: + + * ``keywords: Set[str]``: 关键词列表 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + final_kwargs.pop("type", None) + matcher = on_keyword(keywords, **final_kwargs, _depth=1) + self.matchers.append(matcher) + return matcher + + def on_command(self, + cmd: Union[str, Tuple[str, ...]], + aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None, + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器,并且当消息以指定命令开头时响应。 + + 命令匹配规则参考: `命令形式匹配 `_ + + :参数: + + * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容 + * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + final_kwargs.pop("type", None) + matcher = on_command(cmd, aliases=aliases, **final_kwargs, _depth=1) + self.matchers.append(matcher) + return matcher + + def on_shell_command(self, + cmd: Union[str, Tuple[str, ...]], + aliases: Optional[Set[Union[str, Tuple[str, + ...]]]] = None, + parser: Optional[ArgumentParser] = None, + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器。 + + 与普通的 ``on_command`` 不同的是,在添加 ``parser`` 参数时, 响应器会自动处理消息。 + + 并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]`` 中 + + :参数: + + * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容 + * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名 + * ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + final_kwargs.pop("type", None) + matcher = on_shell_command(cmd, + aliases=aliases, + parser=parser, + **final_kwargs, + _depth=1) + self.matchers.append(matcher) + return matcher + + def on_regex(self, + pattern: str, + flags: Union[int, re.RegexFlag] = 0, + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个消息事件响应器,并且当消息匹配正则表达式时响应。 + + 命令匹配规则参考: `正则匹配 `_ + + :参数: + + * ``pattern: str``: 正则表达式 + * ``flags: Union[int, re.RegexFlag]``: 正则匹配标志 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + final_kwargs = self.base_kwargs.copy() + final_kwargs.update(kwargs) + final_kwargs.pop("type", None) + matcher = on_regex(pattern, flags=flags, **final_kwargs, _depth=1) + self.matchers.append(matcher) + return matcher diff --git a/nonebot/plugin/__init__.pyi b/nonebot/plugin/on.pyi similarity index 92% rename from nonebot/plugin/__init__.pyi rename to nonebot/plugin/on.pyi index bf8b071f..68e8ad62 100644 --- a/nonebot/plugin/__init__.pyi +++ b/nonebot/plugin/on.pyi @@ -1,7 +1,5 @@ import re -from types import ModuleType -from dataclasses import dataclass -from typing import Set, Dict, List, Type, Tuple, Union, Optional +from typing import Set, List, Type, Tuple, Union, Optional from nonebot.handler import Handler from nonebot.matcher import Matcher @@ -9,26 +7,6 @@ from nonebot.permission import Permission from nonebot.rule import Rule, ArgumentParser from nonebot.typing import T_State, T_Handler, T_RuleChecker, T_StateFactory -from .export import Export -from .export import export as export - -plugins: Dict[str, "Plugin"] = ... -PLUGIN_NAMESPACE: str = ... - - -@dataclass(eq=False) -class Plugin(object): - name: str - module: ModuleType - - @property - def export(self) -> Export: - ... - - @property - def matcher(self) -> Set[Type[Matcher]]: - ... - def on(type: str = "", rule: Optional[Union[Rule, T_RuleChecker]] = ..., @@ -387,40 +365,3 @@ class MatcherGroup: state: Optional[T_State] = ..., state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]: ... - - -def load_plugin(module_path: str) -> Optional[Plugin]: - ... - - -def load_plugins(*plugin_dir: str) -> Set[Plugin]: - ... - - -def load_all_plugins(module_path: Set[str], - plugin_dir: Set[str]) -> Set[Plugin]: - ... - - -def load_from_json(file_path: str, encoding: str = ...) -> Set[Plugin]: - ... - - -def load_from_toml(file_path: str, encoding: str = ...) -> Set[Plugin]: - ... - - -def load_builtin_plugins(name: str = ...) -> Optional[Plugin]: - ... - - -def get_plugin(name: str) -> Optional[Plugin]: - ... - - -def get_loaded_plugins() -> Set[Plugin]: - ... - - -def require(name: str) -> Optional[Export]: - ... diff --git a/nonebot/plugin/plugin.py b/nonebot/plugin/plugin.py new file mode 100644 index 00000000..ee5a7c51 --- /dev/null +++ b/nonebot/plugin/plugin.py @@ -0,0 +1,91 @@ +from types import ModuleType +from dataclasses import field, dataclass +from typing import Set, Dict, Type, Optional + +from .export import Export +from nonebot.matcher import Matcher + +plugins: Dict[str, "Plugin"] = {} +""" +:类型: ``Dict[str, Plugin]`` +:说明: 已加载的插件 +""" + + +@dataclass(eq=False) +class Plugin(object): + """存储插件信息""" + name: str + """ + - **类型**: ``str`` + - **说明**: 插件名称,使用 文件/文件夹 名称作为插件名 + """ + module: ModuleType + """ + - **类型**: ``ModuleType`` + - **说明**: 插件模块对象 + """ + module_name: str + """ + - **类型**: ``str`` + - **说明**: 点分割模块路径 + """ + export: Export = field(default_factory=Export) + """ + - **类型**: ``Export`` + - **说明**: 插件内定义的导出内容 + """ + matcher: Set[Type[Matcher]] = field(default_factory=set) + """ + - **类型**: ``Set[Type[Matcher]]`` + - **说明**: 插件内定义的 ``Matcher`` + """ + parent_plugin: Optional["Plugin"] = None + """ + - **类型**: ``Optional[Plugin]`` + - **说明**: 父插件 + """ + sub_plugins: Set["Plugin"] = field(default_factory=set) + """ + - **类型**: ``Set[Plugin]`` + - **说明**: 子插件集合 + """ + + +def get_plugin(name: str) -> Optional[Plugin]: + """ + :说明: + + 获取当前导入的某个插件。 + + :参数: + + * ``name: str``: 插件名,与 ``load_plugin`` 参数一致。如果为 ``load_plugins`` 导入的插件,则为文件(夹)名。 + + :返回: + + - ``Optional[Plugin]`` + """ + return plugins.get(name) + + +def get_loaded_plugins() -> Set[Plugin]: + """ + :说明: + + 获取当前已导入的所有插件。 + + :返回: + + - ``Set[Plugin]`` + """ + return set(plugins.values()) + + +def _new_plugin(fullname: str, module: ModuleType) -> Plugin: + name = fullname.rsplit(".", 1)[-1] if "." in fullname else fullname + if name in plugins: + raise RuntimeError("Plugin already exists! Check your plugin name.") + plugin = Plugin(name, module, fullname) + plugins[name] = plugin + return plugin diff --git a/pages/changelog.md b/pages/changelog.md index b5012389..76c9f1c5 100644 --- a/pages/changelog.md +++ b/pages/changelog.md @@ -7,6 +7,8 @@ sidebar: auto ## v2.0.0a17 - 新增 `MessageTemplate` 对于 `str` 普通模板的支持 +- 移除插件加载的 `NameSpace` 模式 +- 修改 toml 加载插件时的键名为 `tool.nonebot` 以符合规范 ## v2.0.0a16 diff --git a/pyproject.toml b/pyproject.toml index 07ed326a..6d9a8330 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,7 @@ line_length = 80 length_sort = true skip_gitignore = true force_sort_within_sections = true +known_local_folder = "nonebot" extra_standard_library = "typing_extensions" [build-system] diff --git a/tests/plugins.toml b/tests/plugins.toml index 85eefe30..a905f3dc 100644 --- a/tests/plugins.toml +++ b/tests/plugins.toml @@ -1,3 +1,3 @@ -[nonebot.plugins] +[tool.nonebot] plugins = ["nonebot_plugin_test"] plugin_dirs = ["test_plugins"]