🚧 refactor plugin loading

This commit is contained in:
yanyongyu 2021-11-08 01:02:35 +08:00
parent d10f557212
commit 72f5aeea54
9 changed files with 1195 additions and 1270 deletions

View File

@ -10,8 +10,3 @@ NoneBot.plugin 模块
:members:
:show-inheritance:
:special-members: __init__
.. automodule:: nonebot.plugin.export
:members:
:show-inheritance:
:special-members: __init__

View File

@ -24,6 +24,7 @@ from nonebot.typing import (T_State, T_Handler, T_ArgsParser, T_TypeUpdater,
T_StateFactory, T_PermissionUpdater)
if TYPE_CHECKING:
from nonebot.plugin import Plugin
from nonebot.adapters import Bot, Event, Message, MessageSegment
matchers: Dict[int, List[Type["Matcher"]]] = defaultdict(list)
@ -62,28 +63,25 @@ class MatcherMeta(type):
class Matcher(metaclass=MatcherMeta):
"""事件响应器类"""
plugin: Optional["Plugin"] = None
"""
:类型: ``Optional[Plugin]``
:说明: 事件响应器所在插件
"""
module: Optional[ModuleType] = None
"""
:类型: ``Optional[ModuleType]``
:说明: 事件响应器所在模块
:说明: 事件响应器所在插件模块
"""
plugin_name: Optional[str] = module and getattr(module, "__plugin_name__",
None)
plugin_name: Optional[str] = None
"""
:类型: ``Optional[str]``
:说明: 事件响应器所在插件名
"""
module_name: Optional[str] = module and getattr(module, "__module_name__",
None)
module_name: Optional[str] = None
"""
:类型: ``Optional[str]``
:说明: 事件响应器所在模块名
"""
module_prefix: Optional[str] = module and getattr(module,
"__module_prefix__", None)
"""
:类型: ``Optional[str]``
:说明: 事件响应器所在模块前缀
:说明: 事件响应器所在点分割插件模块路径
"""
type: str = ""
@ -179,7 +177,7 @@ class Matcher(metaclass=MatcherMeta):
priority: int = 1,
block: bool = False,
*,
module: Optional[ModuleType] = None,
plugin: Optional["Plugin"] = None,
expire_time: Optional[datetime] = None,
default_state: Optional[T_State] = None,
default_state_factory: Optional[T_StateFactory] = None,
@ -201,7 +199,7 @@ class Matcher(metaclass=MatcherMeta):
* ``temp: bool``: 是否为临时事件响应器即触发一次后删除
* ``priority: int``: 响应优先级
* ``block: bool``: 是否阻止事件向更低优先级的响应器传播
* ``module: Optional[str]``: 事件响应器所在模块名称
* ``plugin: Optional[Plugin]``: 事件响应器所在插件
* ``default_state: Optional[T_State]``: 默认状态 ``state``
* ``default_state_factory: Optional[T_StateFactory]``: 默认状态 ``state`` 的工厂函数
* ``expire_time: Optional[datetime]``: 事件响应器最终有效时间点过时即被删除
@ -212,15 +210,18 @@ class Matcher(metaclass=MatcherMeta):
"""
NewMatcher = type(
"Matcher", (Matcher,), {
"Matcher",
(Matcher,),
{
"plugin":
plugin,
"module":
module,
plugin and plugin.
module, # FIXME: matcher module may different from plugin module
"plugin_name":
module and getattr(module, "__plugin_name__", None),
plugin and plugin.name,
"module_name":
module and getattr(module, "__module_name__", None),
"module_prefix":
module and getattr(module, "__module_prefix__", None),
plugin and plugin.module_name,
"type":
type_,
"rule":
@ -626,7 +627,7 @@ class Matcher(metaclass=MatcherMeta):
temp=True,
priority=0,
block=True,
module=self.module,
plugin=self.plugin,
expire_time=datetime.now() + bot.config.session_expire_timeout,
default_state=self.state,
default_parser=self.__class__._default_parser,
@ -662,7 +663,7 @@ class Matcher(metaclass=MatcherMeta):
temp=True,
priority=0,
block=True,
module=self.module,
plugin=self.plugin,
expire_time=datetime.now() + bot.config.session_expire_timeout,
default_state=self.state,
default_parser=self.__class__._default_parser,

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,4 @@
from contextvars import ContextVar
_export: ContextVar["Export"] = ContextVar("_export")
from . import _current_plugin
class Export(dict):
@ -57,4 +55,7 @@ def export() -> Export:
- ``Export``
"""
return _export.get()
plugin = _current_plugin.get()
if not plugin:
raise RuntimeError("Export outside of the plugin!")
return plugin.export

186
nonebot/plugin/load.py Normal file
View File

@ -0,0 +1,186 @@
from typing import Set, Optional
from .export import Export
from .manager import PluginManager
from .plugin import Plugin, get_plugin
# TODO
def _load_plugin(manager: PluginManager, plugin_name: str) -> Optional[Plugin]:
if plugin_name.startswith("_"):
return None
if plugin_name in plugins:
return None
try:
module = manager.load_plugin(plugin_name)
plugin = Plugin(plugin_name, module)
plugins[plugin_name] = plugin
logger.opt(colors=True).success(
f'Succeeded to import "<y>{escape_tag(plugin_name)}</y>"')
return plugin
except Exception as e:
logger.opt(colors=True, exception=e).error(
f'<r><bg #f8bbd0>Failed to import "{escape_tag(plugin_name)}"</bg #f8bbd0></r>'
)
return None
def load_plugin(module_path: str) -> Optional[Plugin]:
"""
:说明:
使用 ``PluginManager`` 加载单个插件可以是本地插件或是通过 ``pip`` 安装的插件
:参数:
* ``module_path: str``: 插件名称 ``path.to.your.plugin``
:返回:
- ``Optional[Plugin]``
"""
context: Context = copy_context()
manager = PluginManager(PLUGIN_NAMESPACE, plugins=[module_path])
return context.run(_load_plugin, manager, module_path)
def load_plugins(*plugin_dir: str) -> Set[Plugin]:
"""
:说明:
导入目录下多个插件 ``_`` 开头的插件不会被导入
:参数:
- ``*plugin_dir: str``: 插件路径
:返回:
- ``Set[Plugin]``
"""
loaded_plugins = set()
manager = PluginManager(PLUGIN_NAMESPACE, search_path=plugin_dir)
for plugin_name in manager.list_plugins():
context: Context = copy_context()
result = context.run(_load_plugin, manager, plugin_name)
if result:
loaded_plugins.add(result)
return loaded_plugins
def load_all_plugins(module_path: Set[str],
plugin_dir: Set[str]) -> Set[Plugin]:
"""
:说明:
导入指定列表中的插件以及指定目录下多个插件 ``_`` 开头的插件不会被导入
:参数:
- ``module_path: Set[str]``: 指定插件集合
- ``plugin_dir: Set[str]``: 指定插件路径集合
:返回:
- ``Set[Plugin]``
"""
loaded_plugins = set()
manager = PluginManager(PLUGIN_NAMESPACE, module_path, plugin_dir)
for plugin_name in manager.list_plugins():
context: Context = copy_context()
result = context.run(_load_plugin, manager, plugin_name)
if result:
loaded_plugins.add(result)
return loaded_plugins
def load_from_json(file_path: str, encoding: str = "utf-8") -> Set[Plugin]:
"""
:说明:
导入指定 json 文件中的 ``plugins`` 以及 ``plugin_dirs`` 下多个插件 ``_`` 开头的插件不会被导入
:参数:
- ``file_path: str``: 指定 json 文件路径
- ``encoding: str``: 指定 json 文件编码
:返回:
- ``Set[Plugin]``
"""
with open(file_path, "r", encoding=encoding) as f:
data = json.load(f)
plugins = data.get("plugins")
plugin_dirs = data.get("plugin_dirs")
assert isinstance(plugins, list), "plugins must be a list of plugin name"
assert isinstance(plugin_dirs,
list), "plugin_dirs must be a list of directories"
return load_all_plugins(set(plugins), set(plugin_dirs))
def load_from_toml(file_path: str, encoding: str = "utf-8") -> Set[Plugin]:
"""
:说明:
导入指定 toml 文件 ``[nonebot.plugins]`` 中的 ``plugins`` 以及 ``plugin_dirs`` 下多个插件
``_`` 开头的插件不会被导入
:参数:
- ``file_path: str``: 指定 toml 文件路径
- ``encoding: str``: 指定 toml 文件编码
:返回:
- ``Set[Plugin]``
"""
with open(file_path, "r", encoding=encoding) as f:
data = tomlkit.parse(f.read()) # type: ignore
nonebot_data = data.get("nonebot", {}).get("plugins")
if not nonebot_data:
raise ValueError("Cannot find '[nonebot.plugins]' in given toml file!")
plugins = nonebot_data.get("plugins", [])
plugin_dirs = nonebot_data.get("plugin_dirs", [])
assert isinstance(plugins, list), "plugins must be a list of plugin name"
assert isinstance(plugin_dirs,
list), "plugin_dirs must be a list of directories"
return load_all_plugins(set(plugins), set(plugin_dirs))
def load_builtin_plugins(name: str = "echo") -> Optional[Plugin]:
"""
:说明:
导入 NoneBot 内置插件
:返回:
- ``Plugin``
"""
return load_plugin(f"nonebot.plugins.{name}")
def require(name: str) -> Optional[Export]:
"""
:说明:
获取一个插件的导出内容
:参数:
* ``name: str``: 插件名 ``load_plugin`` 参数一致如果为 ``load_plugins`` 导入的插件则为文件()
:返回:
- ``Optional[Export]``
"""
plugin = get_plugin(name) or load_plugin(name)
if not plugin:
raise RuntimeError(f"Cannot load plugin \"{name}\"!")
return plugin.export

View File

@ -6,51 +6,16 @@ from hashlib import md5
from pathlib import Path
from types import ModuleType
from collections import Counter
from contextvars import ContextVar
from importlib.abc import MetaPathFinder
from importlib.machinery import PathFinder, SourceFileLoader
from typing import Set, List, Union, Iterable, Optional, Sequence
from .export import Export, _export
_current_plugin: ContextVar[Optional[ModuleType]] = ContextVar(
"_current_plugin", default=None)
_internal_space = ModuleType(__name__ + "._internal")
_internal_space.__path__ = [] # type: ignore
sys.modules[_internal_space.__name__] = _internal_space
_manager_stack: List["PluginManager"] = []
class _NamespaceModule(ModuleType):
"""Simple namespace module to store plugins."""
@property
def __path__(self):
return []
def __getattr__(self, name: str):
try:
return super().__getattr__(name) # type: ignore
except AttributeError:
if name.startswith("__"):
raise
raise RuntimeError("Plugin manager not activated!")
class _InternalModule(ModuleType):
"""Internal module for each plugin manager."""
def __init__(self, prefix: str, plugin_manager: "PluginManager"):
super().__init__(f"{prefix}.{plugin_manager.internal_id}")
self.__plugin_manager__ = plugin_manager
@property
def __path__(self) -> List[str]:
return list(self.__plugin_manager__.search_path)
from .export import Export
from . import _current_plugin
from .plugin import Plugin, _new_plugin
# TODO
class PluginManager:
def __init__(self,

883
nonebot/plugin/on.py Normal file
View File

@ -0,0 +1,883 @@
import re
from typing import (TYPE_CHECKING, Any, Set, Dict, List, Type, Tuple, Union,
Optional)
from nonebot.handler import Handler
from nonebot.matcher import Matcher
from nonebot.permission import Permission
from nonebot.typing import T_State, T_Handler, T_RuleChecker, T_StateFactory
from nonebot.rule import (Rule, ArgumentParser, regex, command, keyword,
endswith, startswith, shell_command)
from .manager import _current_plugin
if TYPE_CHECKING:
from nonebot.adapters import Bot, Event
def _store_matcher(matcher: Type[Matcher]) -> None:
plugin = _current_plugin.get()
# only store the matcher defined in the plugin
if plugin:
plugin.matcher.add(matcher)
def on(type: str = "",
rule: Optional[Union[Rule, T_RuleChecker]] = None,
permission: Optional[Permission] = None,
*,
handlers: Optional[List[Union[T_Handler, Handler]]] = None,
temp: bool = False,
priority: int = 1,
block: bool = False,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = None) -> Type[Matcher]:
"""
:说明:
注册一个基础事件响应器可自定义类型
:参数:
* ``type: str``: 事件响应器类型
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
matcher = Matcher.new(type,
Rule() & rule,
permission or Permission(),
temp=temp,
priority=priority,
block=block,
handlers=handlers,
plugin=_current_plugin.get(),
default_state=state,
default_state_factory=state_factory)
_store_matcher(matcher)
return matcher
def on_metaevent(
rule: Optional[Union[Rule, T_RuleChecker]] = None,
*,
handlers: Optional[List[Union[T_Handler, Handler]]] = None,
temp: bool = False,
priority: int = 1,
block: bool = False,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = None) -> Type[Matcher]:
"""
:说明:
注册一个元事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
matcher = Matcher.new("meta_event",
Rule() & rule,
Permission(),
temp=temp,
priority=priority,
block=block,
handlers=handlers,
plugin=_current_plugin.get(),
default_state=state,
default_state_factory=state_factory)
_store_matcher(matcher)
return matcher
def on_message(rule: Optional[Union[Rule, T_RuleChecker]] = None,
permission: Optional[Permission] = None,
*,
handlers: Optional[List[Union[T_Handler, Handler]]] = None,
temp: bool = False,
priority: int = 1,
block: bool = True,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = None) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
matcher = Matcher.new("message",
Rule() & rule,
permission or Permission(),
temp=temp,
priority=priority,
block=block,
handlers=handlers,
plugin=_current_plugin.get(),
default_state=state,
default_state_factory=state_factory)
_store_matcher(matcher)
return matcher
def on_notice(rule: Optional[Union[Rule, T_RuleChecker]] = None,
*,
handlers: Optional[List[Union[T_Handler, Handler]]] = None,
temp: bool = False,
priority: int = 1,
block: bool = False,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = None) -> Type[Matcher]:
"""
:说明:
注册一个通知事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
matcher = Matcher.new("notice",
Rule() & rule,
Permission(),
temp=temp,
priority=priority,
block=block,
handlers=handlers,
plugin=_current_plugin.get(),
default_state=state,
default_state_factory=state_factory)
_store_matcher(matcher)
return matcher
def on_request(rule: Optional[Union[Rule, T_RuleChecker]] = None,
*,
handlers: Optional[List[Union[T_Handler, Handler]]] = None,
temp: bool = False,
priority: int = 1,
block: bool = False,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = None) -> Type[Matcher]:
"""
:说明:
注册一个请求事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
matcher = Matcher.new("request",
Rule() & rule,
Permission(),
temp=temp,
priority=priority,
block=block,
handlers=handlers,
plugin=_current_plugin.get(),
default_state=state,
default_state_factory=state_factory)
_store_matcher(matcher)
return matcher
def on_startswith(msg: Union[str, Tuple[str, ...]],
rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = None,
ignorecase: bool = False,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息的**文本部分**以指定内容开头时响应
:参数:
* ``msg: Union[str, Tuple[str, ...]]``: 指定消息开头内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``ignorecase: bool``: 是否忽略大小写
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
return on_message(startswith(msg, ignorecase) & rule, **kwargs)
def on_endswith(msg: Union[str, Tuple[str, ...]],
rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = None,
ignorecase: bool = False,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息的**文本部分**以指定内容结尾时响应
:参数:
* ``msg: Union[str, Tuple[str, ...]]``: 指定消息结尾内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``ignorecase: bool``: 是否忽略大小写
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
return on_message(endswith(msg, ignorecase) & rule, **kwargs)
def on_keyword(keywords: Set[str],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息纯文本部分包含关键词时响应
:参数:
* ``keywords: Set[str]``: 关键词列表
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
return on_message(keyword(*keywords) & rule, **kwargs)
def on_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息以指定命令开头时响应
命令匹配规则参考: `命令形式匹配 <rule.html#command-command>`_
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
if len(message) < 1:
return
segment = message.pop(0)
segment_text = str(segment).lstrip()
if not segment_text.startswith(state["_prefix"]["raw_command"]):
return
new_message = message.__class__(
segment_text[len(state["_prefix"]["raw_command"]):].lstrip())
for new_segment in reversed(new_message):
message.insert(0, new_segment)
handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
return on_message(command(*commands) & rule, handlers=handlers, **kwargs)
def on_shell_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
parser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器
与普通的 ``on_command`` 不同的是在添加 ``parser`` 参数时, 响应器会自动处理消息
并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]``
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
segment = message.pop(0)
new_message = message.__class__(
str(segment)
[len(state["_prefix"]["raw_command"]):].strip()) # type: ignore
for new_segment in reversed(new_message):
message.insert(0, new_segment)
handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
return on_message(shell_command(*commands, parser=parser) & rule,
handlers=handlers,
**kwargs)
def on_regex(pattern: str,
flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息匹配正则表达式时响应
命令匹配规则参考: `正则匹配 <rule.html#regex-regex-flags-0>`_
:参数:
* ``pattern: str``: 正则表达式
* ``flags: Union[int, re.RegexFlag]``: 正则匹配标志
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
return on_message(regex(pattern, flags) & rule, **kwargs)
class CommandGroup:
"""命令组,用于声明一组有相同名称前缀的命令。"""
def __init__(self, cmd: Union[str, Tuple[str, ...]], **kwargs):
"""
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀
* ``**kwargs``: 其他传递给 ``on_command`` 的参数默认值参考 `on_command <#on-command-cmd-rule-none-aliases-none-kwargs>`_
"""
self.basecmd: Tuple[str, ...] = (cmd,) if isinstance(cmd, str) else cmd
"""
- **类型**: ``Tuple[str, ...]``
- **说明**: 命令前缀
"""
if "aliases" in kwargs:
del kwargs["aliases"]
self.base_kwargs: Dict[str, Any] = kwargs
"""
- **类型**: ``Dict[str, Any]``
- **说明**: 其他传递给 ``on_command`` 的参数默认值
"""
def command(self, cmd: Union[str, Tuple[str, ...]],
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个新的命令
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀
* ``**kwargs``: 其他传递给 ``on_command`` 的参数将会覆盖命令组默认值
:返回:
- ``Type[Matcher]``
"""
sub_cmd = (cmd,) if isinstance(cmd, str) else cmd
cmd = self.basecmd + sub_cmd
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
return on_command(cmd, **final_kwargs)
def shell_command(self, cmd: Union[str, Tuple[str, ...]],
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个新的命令
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀
* ``**kwargs``: 其他传递给 ``on_shell_command`` 的参数将会覆盖命令组默认值
:返回:
- ``Type[Matcher]``
"""
sub_cmd = (cmd,) if isinstance(cmd, str) else cmd
cmd = self.basecmd + sub_cmd
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
return on_shell_command(cmd, **final_kwargs)
class MatcherGroup:
"""事件响应器组合,统一管理。为 ``Matcher`` 创建提供默认属性。"""
def __init__(self, **kwargs):
"""
:说明:
创建一个事件响应器组合参数为默认值 ``on`` 一致
"""
self.matchers: List[Type[Matcher]] = []
"""
:类型: ``List[Type[Matcher]]``
:说明: 组内事件响应器列表
"""
self.base_kwargs: Dict[str, Any] = kwargs
"""
- **类型**: ``Dict[str, Any]``
- **说明**: 其他传递给 ``on`` 的参数默认值
"""
def on(self, **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个基础事件响应器可自定义类型
:参数:
* ``type: str``: 事件响应器类型
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
matcher = on(**final_kwargs)
self.matchers.append(matcher)
return matcher
def on_metaevent(self, **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个元事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
final_kwargs.pop("permission", None)
matcher = on_metaevent(**final_kwargs)
self.matchers.append(matcher)
return matcher
def on_message(self, **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_message(**final_kwargs)
self.matchers.append(matcher)
return matcher
def on_notice(self, **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个通知事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_notice(**final_kwargs)
self.matchers.append(matcher)
return matcher
def on_request(self, **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个请求事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_request(**final_kwargs)
self.matchers.append(matcher)
return matcher
def on_startswith(self, msg: Union[str, Tuple[str, ...]],
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息的**文本部分**以指定内容开头时响应
:参数:
* ``msg: Union[str, Tuple[str, ...]]``: 指定消息开头内容
* ``ignorecase: bool``: 是否忽略大小写
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_startswith(msg, **final_kwargs)
self.matchers.append(matcher)
return matcher
def on_endswith(self, msg: Union[str, Tuple[str, ...]],
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息的**文本部分**以指定内容结尾时响应
:参数:
* ``msg: Union[str, Tuple[str, ...]]``: 指定消息结尾内容
* ``ignorecase: bool``: 是否忽略大小写
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_endswith(msg, **final_kwargs)
self.matchers.append(matcher)
return matcher
def on_keyword(self, keywords: Set[str], **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息纯文本部分包含关键词时响应
:参数:
* ``keywords: Set[str]``: 关键词列表
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_keyword(keywords, **final_kwargs)
self.matchers.append(matcher)
return matcher
def on_command(self,
cmd: Union[str, Tuple[str, ...]],
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息以指定命令开头时响应
命令匹配规则参考: `命令形式匹配 <rule.html#command-command>`_
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_command(cmd, aliases=aliases, **final_kwargs)
self.matchers.append(matcher)
return matcher
def on_shell_command(self,
cmd: Union[str, Tuple[str, ...]],
aliases: Optional[Set[Union[str, Tuple[str,
...]]]] = None,
parser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器
与普通的 ``on_command`` 不同的是在添加 ``parser`` 参数时, 响应器会自动处理消息
并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]``
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_shell_command(cmd,
aliases=aliases,
parser=parser,
**final_kwargs)
self.matchers.append(matcher)
return matcher
def on_regex(self,
pattern: str,
flags: Union[int, re.RegexFlag] = 0,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息匹配正则表达式时响应
命令匹配规则参考: `正则匹配 <rule.html#regex-regex-flags-0>`_
:参数:
* ``pattern: str``: 正则表达式
* ``flags: Union[int, re.RegexFlag]``: 正则匹配标志
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_regex(pattern, flags=flags, **final_kwargs)
self.matchers.append(matcher)
return matcher

View File

@ -1,7 +1,5 @@
import re
from types import ModuleType
from dataclasses import dataclass
from typing import Set, Dict, List, Type, Tuple, Union, Optional
from typing import Set, List, Type, Tuple, Union, Optional
from nonebot.handler import Handler
from nonebot.matcher import Matcher
@ -9,26 +7,6 @@ from nonebot.permission import Permission
from nonebot.rule import Rule, ArgumentParser
from nonebot.typing import T_State, T_Handler, T_RuleChecker, T_StateFactory
from .export import Export
from .export import export as export
plugins: Dict[str, "Plugin"] = ...
PLUGIN_NAMESPACE: str = ...
@dataclass(eq=False)
class Plugin(object):
name: str
module: ModuleType
@property
def export(self) -> Export:
...
@property
def matcher(self) -> Set[Type[Matcher]]:
...
def on(type: str = "",
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
@ -387,40 +365,3 @@ class MatcherGroup:
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def load_plugin(module_path: str) -> Optional[Plugin]:
...
def load_plugins(*plugin_dir: str) -> Set[Plugin]:
...
def load_all_plugins(module_path: Set[str],
plugin_dir: Set[str]) -> Set[Plugin]:
...
def load_from_json(file_path: str, encoding: str = ...) -> Set[Plugin]:
...
def load_from_toml(file_path: str, encoding: str = ...) -> Set[Plugin]:
...
def load_builtin_plugins(name: str = ...) -> Optional[Plugin]:
...
def get_plugin(name: str) -> Optional[Plugin]:
...
def get_loaded_plugins() -> Set[Plugin]:
...
def require(name: str) -> Optional[Export]:
...

85
nonebot/plugin/plugin.py Normal file
View File

@ -0,0 +1,85 @@
from types import ModuleType
from dataclasses import field, dataclass
from typing import Set, Dict, Type, Optional
from nonebot.matcher import Matcher
from .export import Export
plugins: Dict[str, "Plugin"] = {}
"""
:类型: ``Dict[str, Plugin]``
:说明: 已加载的插件
"""
@dataclass(eq=False)
class Plugin(object):
"""存储插件信息"""
name: str
"""
- **类型**: ``str``
- **说明**: 插件名称使用 文件/文件夹 名称作为插件名
"""
module: ModuleType
"""
- **类型**: ``ModuleType``
- **说明**: 插件模块对象
"""
module_name: str
"""
- **类型**: ``str``
- **说明**: 点分割模块路径
"""
export: Export = field(default_factory=Export)
"""
- **类型**: ``Export``
- **说明**: 插件内定义的导出内容
"""
matcher: Set[Type[Matcher]] = field(default_factory=set)
"""
- **类型**: ``Set[Type[Matcher]]``
- **说明**: 插件内定义的 ``Matcher``
"""
# TODO
parent_plugin: Optional["Plugin"] = None
sub_plugins: Set["Plugin"] = field(default_factory=set)
def get_plugin(name: str) -> Optional[Plugin]:
"""
:说明:
获取当前导入的某个插件
:参数:
* ``name: str``: 插件名 ``load_plugin`` 参数一致如果为 ``load_plugins`` 导入的插件则为文件()
:返回:
- ``Optional[Plugin]``
"""
return plugins.get(name)
def get_loaded_plugins() -> Set[Plugin]:
"""
:说明:
获取当前已导入的所有插件
:返回:
- ``Set[Plugin]``
"""
return set(plugins.values())
def _new_plugin(fullname: str, module: ModuleType) -> Plugin:
_, name = fullname.rsplit(".", 1)
if name in plugins:
raise RuntimeError("Plugin already exists! Check your plugin name.")
plugin = Plugin(name, module, fullname)
plugins[name] = plugin
return plugin