🔀 Merge pull request #587

Feature: remove namespace
This commit is contained in:
Ju4tCode 2021-11-11 22:52:56 +08:00 committed by GitHub
commit 512d77de0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1456 additions and 1502 deletions

View File

@ -32,6 +32,21 @@ sidebarDepth: 0
事件响应器类
### `plugin`
* **类型**
`Optional[Plugin]`
* **说明**
事件响应器所在插件
### `module`
@ -43,7 +58,7 @@ sidebarDepth: 0
* **说明**
事件响应器所在模块
事件响应器所在插件模块
@ -73,22 +88,7 @@ sidebarDepth: 0
* **说明**
事件响应器所在模块名
### `module_prefix`
* **类型**
`Optional[str]`
* **说明**
事件响应器所在模块前缀
事件响应器所在点分割插件模块路径
@ -292,7 +292,7 @@ sidebarDepth: 0
### _classmethod_ `new(type_='', rule=None, permission=None, handlers=None, temp=False, priority=1, block=False, *, module=None, expire_time=None, default_state=None, default_state_factory=None, default_parser=None, default_type_updater=None, default_permission_updater=None)`
### _classmethod_ `new(type_='', rule=None, permission=None, handlers=None, temp=False, priority=1, block=False, *, plugin=None, module=None, expire_time=None, default_state=None, default_state_factory=None, default_parser=None, default_type_updater=None, default_permission_updater=None)`
* **说明**
@ -325,7 +325,10 @@ sidebarDepth: 0
* `block: bool`: 是否阻止事件向更低优先级的响应器传播
* `module: Optional[str]`: 事件响应器所在模块名称
* `plugin: Optional[Plugin]`: 事件响应器所在插件
* `module: Optional[ModuleType]`: 事件响应器所在模块
* `default_state: Optional[T_State]`: 默认状态 `state`

View File

@ -50,7 +50,16 @@ sidebarDepth: 0
* **说明**: 插件模块对象
### _property_ `export`
### `module_name`
* **类型**: `str`
* **说明**: 点分割模块路径
### `export`
* **类型**: `Export`
@ -59,7 +68,7 @@ sidebarDepth: 0
* **说明**: 插件内定义的导出内容
### _property_ `matcher`
### `matcher`
* **类型**: `Set[Type[Matcher]]`
@ -68,7 +77,64 @@ sidebarDepth: 0
* **说明**: 插件内定义的 `Matcher`
## `on(type='', rule=None, permission=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None)`
### `parent_plugin`
* **类型**: `Optional[Plugin]`
* **说明**: 父插件
### `sub_plugins`
* **类型**: `Set[Plugin]`
* **说明**: 子插件集合
## `get_plugin(name)`
* **说明**
获取当前导入的某个插件。
* **参数**
* `name: str`: 插件名,与 `load_plugin` 参数一致。如果为 `load_plugins` 导入的插件,则为文件(夹)名。
* **返回**
* `Optional[Plugin]`
## `get_loaded_plugins()`
* **说明**
获取当前已导入的所有插件。
* **返回**
* `Set[Plugin]`
## `on(type='', rule=None, permission=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None, _depth=0)`
* **说明**
@ -115,7 +181,7 @@ sidebarDepth: 0
## `on_metaevent(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None)`
## `on_metaevent(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None, _depth=0)`
* **说明**
@ -156,7 +222,7 @@ sidebarDepth: 0
## `on_message(rule=None, permission=None, *, handlers=None, temp=False, priority=1, block=True, state=None, state_factory=None)`
## `on_message(rule=None, permission=None, *, handlers=None, temp=False, priority=1, block=True, state=None, state_factory=None, _depth=0)`
* **说明**
@ -200,7 +266,7 @@ sidebarDepth: 0
## `on_notice(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None)`
## `on_notice(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None, _depth=0)`
* **说明**
@ -241,7 +307,7 @@ sidebarDepth: 0
## `on_request(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None)`
## `on_request(rule=None, *, handlers=None, temp=False, priority=1, block=False, state=None, state_factory=None, _depth=0)`
* **说明**
@ -282,7 +348,7 @@ sidebarDepth: 0
## `on_startswith(msg, rule=None, ignorecase=False, **kwargs)`
## `on_startswith(msg, rule=None, ignorecase=False, _depth=0, **kwargs)`
* **说明**
@ -332,7 +398,7 @@ sidebarDepth: 0
## `on_endswith(msg, rule=None, ignorecase=False, **kwargs)`
## `on_endswith(msg, rule=None, ignorecase=False, _depth=0, **kwargs)`
* **说明**
@ -382,7 +448,7 @@ sidebarDepth: 0
## `on_keyword(keywords, rule=None, **kwargs)`
## `on_keyword(keywords, rule=None, _depth=0, **kwargs)`
* **说明**
@ -429,7 +495,7 @@ sidebarDepth: 0
## `on_command(cmd, rule=None, aliases=None, **kwargs)`
## `on_command(cmd, rule=None, aliases=None, _depth=0, **kwargs)`
* **说明**
@ -481,7 +547,7 @@ sidebarDepth: 0
## `on_shell_command(cmd, rule=None, aliases=None, parser=None, **kwargs)`
## `on_shell_command(cmd, rule=None, aliases=None, parser=None, _depth=0, **kwargs)`
* **说明**
@ -538,7 +604,7 @@ sidebarDepth: 0
## `on_regex(pattern, flags=0, rule=None, **kwargs)`
## `on_regex(pattern, flags=0, rule=None, _depth=0, **kwargs)`
* **说明**
@ -1300,10 +1366,10 @@ sidebarDepth: 0
* **参数**
* `module_path: Set[str]`: 指定插件集合
* `module_path: Iterable[str]`: 指定插件集合
* `plugin_dir: Set[str]`: 指定插件路径集合
* `plugin_dir: Iterable[str]`: 指定插件路径集合
@ -1345,7 +1411,7 @@ sidebarDepth: 0
* **说明**
导入指定 toml 文件 `[nonebot.plugins]` 中的 `plugins` 以及 `plugin_dirs` 下多个插件,
导入指定 toml 文件 `[tool.nonebot]` 中的 `plugins` 以及 `plugin_dirs` 下多个插件,
`_` 开头的插件不会被导入!
@ -1372,7 +1438,7 @@ sidebarDepth: 0
* **说明**
导入 NoneBot 内置插件
导入 NoneBot 内置插件, 默认导入 `echo` 插件
@ -1383,45 +1449,6 @@ sidebarDepth: 0
## `get_plugin(name)`
* **说明**
获取当前导入的某个插件。
* **参数**
* `name: str`: 插件名,与 `load_plugin` 参数一致。如果为 `load_plugins` 导入的插件,则为文件(夹)名。
* **返回**
* `Optional[Plugin]`
## `get_loaded_plugins()`
* **说明**
获取当前已导入的所有插件。
* **返回**
* `Set[Plugin]`
## `require(name)`
@ -1441,7 +1468,14 @@ sidebarDepth: 0
* **返回**
* `Optional[Export]`
* `Export`
* **异常**
* `RuntimeError`: 插件无法加载

View File

@ -11,6 +11,21 @@ NoneBot.plugin 模块
:show-inheritance:
:special-members: __init__
.. automodule:: nonebot.plugin.plugin
:members:
:show-inheritance:
:special-members: __init__
.. automodule:: nonebot.plugin.on
:members:
:show-inheritance:
:special-members: __init__
.. automodule:: nonebot.plugin.load
:members:
:show-inheritance:
:special-members: __init__
.. automodule:: nonebot.plugin.export
:members:
:show-inheritance:

View File

@ -37,7 +37,7 @@ from nonebot.adapters import Bot
from nonebot.utils import escape_tag
from nonebot.config import Env, Config
from nonebot.log import logger, default_filter
from nonebot.drivers import Driver, ForwardDriver, ReverseDriver
from nonebot.drivers import Driver, ReverseDriver
try:
_dist: pkg_resources.Distribution = pkg_resources.get_distribution(

View File

@ -24,6 +24,7 @@ from nonebot.typing import (T_State, T_Handler, T_ArgsParser, T_TypeUpdater,
T_StateFactory, T_PermissionUpdater)
if TYPE_CHECKING:
from nonebot.plugin import Plugin
from nonebot.adapters import Bot, Event, Message, MessageSegment
matchers: Dict[int, List[Type["Matcher"]]] = defaultdict(list)
@ -62,28 +63,25 @@ class MatcherMeta(type):
class Matcher(metaclass=MatcherMeta):
"""事件响应器类"""
plugin: Optional["Plugin"] = None
"""
:类型: ``Optional[Plugin]``
:说明: 事件响应器所在插件
"""
module: Optional[ModuleType] = None
"""
:类型: ``Optional[ModuleType]``
:说明: 事件响应器所在模块
:说明: 事件响应器所在插件模块
"""
plugin_name: Optional[str] = module and getattr(module, "__plugin_name__",
None)
plugin_name: Optional[str] = None
"""
:类型: ``Optional[str]``
:说明: 事件响应器所在插件名
"""
module_name: Optional[str] = module and getattr(module, "__module_name__",
None)
module_name: Optional[str] = None
"""
:类型: ``Optional[str]``
:说明: 事件响应器所在模块名
"""
module_prefix: Optional[str] = module and getattr(module,
"__module_prefix__", None)
"""
:类型: ``Optional[str]``
:说明: 事件响应器所在模块前缀
:说明: 事件响应器所在点分割插件模块路径
"""
type: str = ""
@ -179,6 +177,7 @@ class Matcher(metaclass=MatcherMeta):
priority: int = 1,
block: bool = False,
*,
plugin: Optional["Plugin"] = None,
module: Optional[ModuleType] = None,
expire_time: Optional[datetime] = None,
default_state: Optional[T_State] = None,
@ -201,7 +200,8 @@ class Matcher(metaclass=MatcherMeta):
* ``temp: bool``: 是否为临时事件响应器即触发一次后删除
* ``priority: int``: 响应优先级
* ``block: bool``: 是否阻止事件向更低优先级的响应器传播
* ``module: Optional[str]``: 事件响应器所在模块名称
* ``plugin: Optional[Plugin]``: 事件响应器所在插件
* ``module: Optional[ModuleType]``: 事件响应器所在模块
* ``default_state: Optional[T_State]``: 默认状态 ``state``
* ``default_state_factory: Optional[T_StateFactory]``: 默认状态 ``state`` 的工厂函数
* ``expire_time: Optional[datetime]``: 事件响应器最终有效时间点过时即被删除
@ -213,14 +213,14 @@ class Matcher(metaclass=MatcherMeta):
NewMatcher = type(
"Matcher", (Matcher,), {
"plugin":
plugin,
"module":
module,
"plugin_name":
module and getattr(module, "__plugin_name__", None),
plugin and plugin.name,
"module_name":
module and getattr(module, "__module_name__", None),
"module_prefix":
module and getattr(module, "__module_prefix__", None),
module and module.__name__,
"type":
type_,
"rule":
@ -626,6 +626,7 @@ class Matcher(metaclass=MatcherMeta):
temp=True,
priority=0,
block=True,
plugin=self.plugin,
module=self.module,
expire_time=datetime.now() + bot.config.session_expire_timeout,
default_state=self.state,
@ -662,6 +663,7 @@ class Matcher(metaclass=MatcherMeta):
temp=True,
priority=0,
block=True,
plugin=self.plugin,
module=self.module,
expire_time=datetime.now() + bot.config.session_expire_timeout,
default_state=self.state,

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,4 @@
from contextvars import ContextVar
_export: ContextVar["Export"] = ContextVar("_export")
from . import _current_plugin
class Export(dict):
@ -57,4 +55,7 @@ def export() -> Export:
- ``Export``
"""
return _export.get()
plugin = _current_plugin.get()
if not plugin:
raise RuntimeError("Export outside of the plugin!")
return plugin.export

161
nonebot/plugin/load.py Normal file
View File

@ -0,0 +1,161 @@
import json
from typing import Set, Iterable, Optional
import tomlkit
from . import _managers
from .export import Export
from .manager import PluginManager
from .plugin import Plugin, get_plugin
def load_plugin(module_path: str) -> Optional[Plugin]:
"""
:说明:
使用 ``PluginManager`` 加载单个插件可以是本地插件或是通过 ``pip`` 安装的插件
:参数:
* ``module_path: str``: 插件名称 ``path.to.your.plugin``
:返回:
- ``Optional[Plugin]``
"""
manager = PluginManager([module_path])
_managers.append(manager)
return manager.load_plugin(module_path)
def load_plugins(*plugin_dir: str) -> Set[Plugin]:
"""
:说明:
导入目录下多个插件 ``_`` 开头的插件不会被导入
:参数:
- ``*plugin_dir: str``: 插件路径
:返回:
- ``Set[Plugin]``
"""
manager = PluginManager(search_path=plugin_dir)
_managers.append(manager)
return manager.load_all_plugins()
def load_all_plugins(module_path: Iterable[str],
plugin_dir: Iterable[str]) -> Set[Plugin]:
"""
:说明:
导入指定列表中的插件以及指定目录下多个插件 ``_`` 开头的插件不会被导入
:参数:
- ``module_path: Iterable[str]``: 指定插件集合
- ``plugin_dir: Iterable[str]``: 指定插件路径集合
:返回:
- ``Set[Plugin]``
"""
manager = PluginManager(module_path, plugin_dir)
_managers.append(manager)
return manager.load_all_plugins()
def load_from_json(file_path: str, encoding: str = "utf-8") -> Set[Plugin]:
"""
:说明:
导入指定 json 文件中的 ``plugins`` 以及 ``plugin_dirs`` 下多个插件 ``_`` 开头的插件不会被导入
:参数:
- ``file_path: str``: 指定 json 文件路径
- ``encoding: str``: 指定 json 文件编码
:返回:
- ``Set[Plugin]``
"""
with open(file_path, "r", encoding=encoding) as f:
data = json.load(f)
plugins = data.get("plugins")
plugin_dirs = data.get("plugin_dirs")
assert isinstance(plugins, list), "plugins must be a list of plugin name"
assert isinstance(plugin_dirs,
list), "plugin_dirs must be a list of directories"
return load_all_plugins(set(plugins), set(plugin_dirs))
def load_from_toml(file_path: str, encoding: str = "utf-8") -> Set[Plugin]:
"""
:说明:
导入指定 toml 文件 ``[tool.nonebot]`` 中的 ``plugins`` 以及 ``plugin_dirs`` 下多个插件
``_`` 开头的插件不会被导入
:参数:
- ``file_path: str``: 指定 toml 文件路径
- ``encoding: str``: 指定 toml 文件编码
:返回:
- ``Set[Plugin]``
"""
with open(file_path, "r", encoding=encoding) as f:
data = tomlkit.parse(f.read()) # type: ignore
nonebot_data = data.get("tool", {}).get("nonebot") or data.get(
"nonebot", {}).get("plugins")
if not nonebot_data:
raise ValueError("Cannot find '[tool.nonebot]' in given toml file!")
plugins = nonebot_data.get("plugins", [])
plugin_dirs = nonebot_data.get("plugin_dirs", [])
assert isinstance(plugins, list), "plugins must be a list of plugin name"
assert isinstance(plugin_dirs,
list), "plugin_dirs must be a list of directories"
return load_all_plugins(plugins, plugin_dirs)
def load_builtin_plugins(name: str = "echo") -> Optional[Plugin]:
"""
:说明:
导入 NoneBot 内置插件, 默认导入 ``echo`` 插件
:返回:
- ``Plugin``
"""
return load_plugin(f"nonebot.plugins.{name}")
def require(name: str) -> Export:
"""
:说明:
获取一个插件的导出内容
:参数:
* ``name: str``: 插件名 ``load_plugin`` 参数一致如果为 ``load_plugins`` 导入的插件则为文件()
:返回:
- ``Export``
:异常:
- ``RuntimeError``: 插件无法加载
"""
plugin = get_plugin(name) or load_plugin(name)
if not plugin:
raise RuntimeError(f"Cannot load plugin \"{name}\"!")
return plugin.export

View File

@ -1,201 +1,103 @@
import sys
import uuid
import pkgutil
import importlib
from hashlib import md5
from pathlib import Path
from itertools import chain
from types import ModuleType
from collections import Counter
from contextvars import ContextVar
from importlib.abc import MetaPathFinder
from importlib.machinery import PathFinder, SourceFileLoader
from typing import Set, List, Union, Iterable, Optional, Sequence
from typing import Set, Dict, List, Union, Iterable, Optional, Sequence
from .export import Export, _export
_current_plugin: ContextVar[Optional[ModuleType]] = ContextVar(
"_current_plugin", default=None)
_internal_space = ModuleType(__name__ + "._internal")
_internal_space.__path__ = [] # type: ignore
sys.modules[_internal_space.__name__] = _internal_space
_manager_stack: List["PluginManager"] = []
class _NamespaceModule(ModuleType):
"""Simple namespace module to store plugins."""
@property
def __path__(self):
return []
def __getattr__(self, name: str):
try:
return super().__getattr__(name) # type: ignore
except AttributeError:
if name.startswith("__"):
raise
raise RuntimeError("Plugin manager not activated!")
class _InternalModule(ModuleType):
"""Internal module for each plugin manager."""
def __init__(self, prefix: str, plugin_manager: "PluginManager"):
super().__init__(f"{prefix}.{plugin_manager.internal_id}")
self.__plugin_manager__ = plugin_manager
@property
def __path__(self) -> List[str]:
return list(self.__plugin_manager__.search_path)
from nonebot.log import logger
from nonebot.utils import escape_tag
from .plugin import Plugin, _new_plugin
from . import _managers, _current_plugin
class PluginManager:
def __init__(self,
namespace: str,
plugins: Optional[Iterable[str]] = None,
search_path: Optional[Iterable[str]] = None,
*,
id: Optional[str] = None):
self.namespace: str = namespace
self.namespace_module: ModuleType = self._setup_namespace(namespace)
self.id: str = id or str(uuid.uuid4())
self.internal_id: str = md5(
((self.namespace or "") + self.id).encode()).hexdigest()
self.internal_module = self._setup_internal_module(self.internal_id)
def __init__(
self,
plugins: Optional[Iterable[str]] = None,
search_path: Optional[Iterable[str]] = None,
):
# simple plugin not in search path
self.plugins: Set[str] = set(plugins or [])
self.search_path: Set[str] = set(search_path or [])
# ensure can be loaded
# cache plugins
self.searched_plugins: Dict[str, Path] = {}
self.list_plugins()
def _setup_namespace(self, namespace: str) -> ModuleType:
try:
module = importlib.import_module(namespace)
except ImportError:
module = _NamespaceModule(namespace)
if "." in namespace:
parent = importlib.import_module(namespace.rsplit(".", 1)[0])
setattr(parent, namespace.rsplit(".", 1)[1], module)
def _path_to_module_name(self, path: Path) -> str:
rel_path = path.resolve().relative_to(Path(".").resolve())
if rel_path.stem == "__init__":
return ".".join(rel_path.parts[:-1])
else:
return ".".join(rel_path.parts[:-1] + (rel_path.stem,))
sys.modules[namespace] = module
return module
def _previous_plugins(self) -> List[str]:
_pre_managers: List[PluginManager]
if self in _managers:
_pre_managers = _managers[:_managers.index(self)]
else:
_pre_managers = _managers[:]
def _setup_internal_module(self, internal_id: str) -> ModuleType:
if hasattr(_internal_space, internal_id):
raise RuntimeError("Plugin manager already exists!")
index = 2
prefix: str = _internal_space.__name__
while True:
try:
frame = sys._getframe(index)
except ValueError:
break
# check if is called in plugin
if "__plugin_name__" not in frame.f_globals:
index += 1
continue
prefix = frame.f_globals.get("__name__", _internal_space.__name__)
break
if not prefix.startswith(_internal_space.__name__):
prefix = _internal_space.__name__
module = _InternalModule(prefix, self)
sys.modules[module.__name__] = module # type: ignore
setattr(_internal_space, internal_id, module)
return module
def __enter__(self):
if self in _manager_stack:
raise RuntimeError("Plugin manager already activated!")
_manager_stack.append(self)
return self
def __exit__(self, exc_type, exc_value, traceback):
try:
_manager_stack.pop()
except IndexError:
pass
def search_plugins(self) -> List[str]:
return [
module_info.name
for module_info in pkgutil.iter_modules(self.search_path)
*chain.from_iterable(
[*manager.plugins, *manager.searched_plugins.keys()]
for manager in _pre_managers)
]
def list_plugins(self) -> Set[str]:
_pre_managers: List[PluginManager]
if self in _manager_stack:
_pre_managers = _manager_stack[:_manager_stack.index(self)]
else:
_pre_managers = _manager_stack[:]
# get all previous ready to load plugins
previous_plugins = self._previous_plugins()
searched_plugins: Dict[str, Path] = {}
_search_path: Set[str] = set()
for manager in _pre_managers:
_search_path |= manager.search_path
if _search_path & self.search_path:
raise RuntimeError("Duplicate plugin search path!")
_search_plugins = self.search_plugins()
c = Counter([*_search_plugins, *self.plugins])
conflict = [name for name, num in c.items() if num > 1]
if conflict:
raise RuntimeError(
f"More than one plugin named {' / '.join(conflict)}!")
return set(_search_plugins) | self.plugins
def load_plugin(self, name) -> ModuleType:
if name in self.plugins:
with self:
return importlib.import_module(name)
if "." in name:
raise ValueError("Plugin name cannot contain '.'")
with self:
return importlib.import_module(f"{self.namespace}.{name}")
def load_all_plugins(self) -> List[ModuleType]:
return [self.load_plugin(name) for name in self.list_plugins()]
def _rewrite_module_name(self, module_name: str) -> Optional[str]:
prefix = f"{self.internal_module.__name__}."
raw_name = module_name[len(self.namespace) +
1:] if module_name.startswith(self.namespace +
".") else None
# dir plugins
if raw_name and raw_name.split(".")[0] in self.search_plugins():
return f"{prefix}{raw_name}"
# third party plugin or renamed dir plugins
elif module_name in self.plugins or module_name.startswith(prefix):
return module_name
# dir plugins
elif module_name in self.search_plugins():
return f"{prefix}{module_name}"
return None
def _check_absolute_import(self, origin_path: str) -> Optional[str]:
if not self.search_path:
return
paths = set([
*self.search_path,
*(str(Path(path).resolve()) for path in self.search_path)
])
for path in paths:
try:
rel_path = Path(origin_path).relative_to(path)
if rel_path.stem == "__init__":
return f"{self.internal_module.__name__}." + ".".join(
rel_path.parts[:-1])
return f"{self.internal_module.__name__}." + ".".join(
rel_path.parts[:-1] + (rel_path.stem,))
except ValueError:
for module_info in pkgutil.iter_modules(self.search_path):
if module_info.name.startswith("_"):
continue
if module_info.name in searched_plugins.keys(
) or module_info.name in previous_plugins:
raise RuntimeError(
f"Plugin already exists: {module_info.name}! Check your plugin name"
)
module_spec = module_info.module_finder.find_spec(
module_info.name, None)
if not module_spec:
continue
module_path = module_spec.origin
if not module_path:
continue
searched_plugins[module_info.name] = Path(module_path).resolve()
self.searched_plugins = searched_plugins
return self.plugins | set(self.searched_plugins.keys())
def load_plugin(self, name) -> Optional[Plugin]:
try:
if name in self.plugins:
module = importlib.import_module(name)
elif name not in self.searched_plugins:
raise RuntimeError(
f"Plugin not found: {name}! Check your plugin name")
else:
module = importlib.import_module(
self._path_to_module_name(self.searched_plugins[name]))
logger.opt(colors=True).success(
f'Succeeded to import "<y>{escape_tag(name)}</y>"')
return getattr(module, "__plugin__", None)
except Exception as e:
logger.opt(colors=True, exception=e).error(
f'<r><bg #f8bbd0>Failed to import "{escape_tag(name)}"</bg #f8bbd0></r>'
)
def load_all_plugins(self) -> Set[Plugin]:
return set(
filter(None,
(self.load_plugin(name) for name in self.list_plugins())))
class PluginFinder(MetaPathFinder):
@ -204,28 +106,27 @@ class PluginFinder(MetaPathFinder):
fullname: str,
path: Optional[Sequence[Union[bytes, str]]],
target: Optional[ModuleType] = None):
if _manager_stack:
if _managers:
index = -1
origin_spec = PathFinder.find_spec(fullname, path, target)
while -index <= len(_manager_stack):
manager = _manager_stack[index]
module_spec = PathFinder.find_spec(fullname, path, target)
if not module_spec:
return
module_origin = module_spec.origin
if not module_origin:
return
module_path = Path(module_origin).resolve()
rel_name = None
if origin_spec and origin_spec.origin:
rel_name = manager._check_absolute_import(
origin_spec.origin)
while -index <= len(_managers):
manager = _managers[index]
if fullname in manager.plugins or module_path in manager.searched_plugins.values(
):
module_spec.loader = PluginLoader(manager, fullname,
module_origin)
return module_spec
newname = manager._rewrite_module_name(rel_name or fullname)
if newname:
spec = PathFinder.find_spec(
newname, path or [*manager.search_path, *sys.path],
target)
if spec:
spec.loader = PluginLoader( # type: ignore
manager, newname, spec.origin)
return spec
index -= 1
return None
return
class PluginLoader(SourceFileLoader):
@ -246,20 +147,15 @@ class PluginLoader(SourceFileLoader):
if self.loaded:
return
export = Export()
_export_token = _export.set(export)
plugin = _new_plugin(self.name, module)
parent_plugin = _current_plugin.get()
if parent_plugin:
plugin.parent_plugin = parent_plugin
parent_plugin.sub_plugins.add(plugin)
prefix = self.manager.internal_module.__name__
is_dir_plugin = self.name.startswith(prefix + ".")
module_name = self.name[len(prefix) +
1:] if is_dir_plugin else self.name
_plugin_token = _current_plugin.set(module)
_plugin_token = _current_plugin.set(plugin)
setattr(module, "__export__", export)
setattr(module, "__plugin_name__",
module_name.split(".")[0] if is_dir_plugin else module_name)
setattr(module, "__module_name__", module_name)
setattr(module, "__module_prefix__", prefix if is_dir_plugin else "")
setattr(module, "__plugin__", plugin)
# try:
# super().exec_module(module)
@ -270,7 +166,6 @@ class PluginLoader(SourceFileLoader):
super().exec_module(module)
_current_plugin.reset(_plugin_token)
_export.reset(_export_token)
return

918
nonebot/plugin/on.py Normal file
View File

@ -0,0 +1,918 @@
import re
import sys
import inspect
from types import ModuleType
from typing import (TYPE_CHECKING, Any, Set, Dict, List, Type, Tuple, Union,
Optional)
from nonebot.handler import Handler
from nonebot.matcher import Matcher
from .manager import _current_plugin
from nonebot.permission import Permission
from nonebot.typing import T_State, T_Handler, T_RuleChecker, T_StateFactory
from nonebot.rule import (Rule, ArgumentParser, regex, command, keyword,
endswith, startswith, shell_command)
if TYPE_CHECKING:
from nonebot.adapters import Bot, Event
def _store_matcher(matcher: Type[Matcher]) -> None:
plugin = _current_plugin.get()
# only store the matcher defined in the plugin
if plugin:
plugin.matcher.add(matcher)
def _get_matcher_module(depth: int = 1) -> Optional[ModuleType]:
current_frame = inspect.currentframe()
if current_frame is None:
return None
frame = inspect.getouterframes(current_frame)[depth + 1].frame
module_name = frame.f_globals["__name__"]
return sys.modules.get(module_name)
def on(type: str = "",
rule: Optional[Union[Rule, T_RuleChecker]] = None,
permission: Optional[Permission] = None,
*,
handlers: Optional[List[Union[T_Handler, Handler]]] = None,
temp: bool = False,
priority: int = 1,
block: bool = False,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = None,
_depth: int = 0) -> Type[Matcher]:
"""
:说明:
注册一个基础事件响应器可自定义类型
:参数:
* ``type: str``: 事件响应器类型
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
matcher = Matcher.new(type,
Rule() & rule,
permission or Permission(),
temp=temp,
priority=priority,
block=block,
handlers=handlers,
plugin=_current_plugin.get(),
module=_get_matcher_module(_depth + 1),
default_state=state,
default_state_factory=state_factory)
_store_matcher(matcher)
return matcher
def on_metaevent(rule: Optional[Union[Rule, T_RuleChecker]] = None,
*,
handlers: Optional[List[Union[T_Handler, Handler]]] = None,
temp: bool = False,
priority: int = 1,
block: bool = False,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = None,
_depth: int = 0) -> Type[Matcher]:
"""
:说明:
注册一个元事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
matcher = Matcher.new("meta_event",
Rule() & rule,
Permission(),
temp=temp,
priority=priority,
block=block,
handlers=handlers,
plugin=_current_plugin.get(),
module=_get_matcher_module(_depth + 1),
default_state=state,
default_state_factory=state_factory)
_store_matcher(matcher)
return matcher
def on_message(rule: Optional[Union[Rule, T_RuleChecker]] = None,
permission: Optional[Permission] = None,
*,
handlers: Optional[List[Union[T_Handler, Handler]]] = None,
temp: bool = False,
priority: int = 1,
block: bool = True,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = None,
_depth: int = 0) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
matcher = Matcher.new("message",
Rule() & rule,
permission or Permission(),
temp=temp,
priority=priority,
block=block,
handlers=handlers,
plugin=_current_plugin.get(),
module=_get_matcher_module(_depth + 1),
default_state=state,
default_state_factory=state_factory)
_store_matcher(matcher)
return matcher
def on_notice(rule: Optional[Union[Rule, T_RuleChecker]] = None,
*,
handlers: Optional[List[Union[T_Handler, Handler]]] = None,
temp: bool = False,
priority: int = 1,
block: bool = False,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = None,
_depth: int = 0) -> Type[Matcher]:
"""
:说明:
注册一个通知事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
matcher = Matcher.new("notice",
Rule() & rule,
Permission(),
temp=temp,
priority=priority,
block=block,
handlers=handlers,
plugin=_current_plugin.get(),
module=_get_matcher_module(_depth + 1),
default_state=state,
default_state_factory=state_factory)
_store_matcher(matcher)
return matcher
def on_request(rule: Optional[Union[Rule, T_RuleChecker]] = None,
*,
handlers: Optional[List[Union[T_Handler, Handler]]] = None,
temp: bool = False,
priority: int = 1,
block: bool = False,
state: Optional[T_State] = None,
state_factory: Optional[T_StateFactory] = None,
_depth: int = 0) -> Type[Matcher]:
"""
:说明:
注册一个请求事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
matcher = Matcher.new("request",
Rule() & rule,
Permission(),
temp=temp,
priority=priority,
block=block,
handlers=handlers,
plugin=_current_plugin.get(),
module=_get_matcher_module(_depth + 1),
default_state=state,
default_state_factory=state_factory)
_store_matcher(matcher)
return matcher
def on_startswith(msg: Union[str, Tuple[str, ...]],
rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = None,
ignorecase: bool = False,
_depth: int = 0,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息的**文本部分**以指定内容开头时响应
:参数:
* ``msg: Union[str, Tuple[str, ...]]``: 指定消息开头内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``ignorecase: bool``: 是否忽略大小写
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
return on_message(startswith(msg, ignorecase) & rule,
**kwargs,
_depth=_depth + 1)
def on_endswith(msg: Union[str, Tuple[str, ...]],
rule: Optional[Optional[Union[Rule, T_RuleChecker]]] = None,
ignorecase: bool = False,
_depth: int = 0,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息的**文本部分**以指定内容结尾时响应
:参数:
* ``msg: Union[str, Tuple[str, ...]]``: 指定消息结尾内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``ignorecase: bool``: 是否忽略大小写
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
return on_message(endswith(msg, ignorecase) & rule,
**kwargs,
_depth=_depth + 1)
def on_keyword(keywords: Set[str],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
_depth: int = 0,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息纯文本部分包含关键词时响应
:参数:
* ``keywords: Set[str]``: 关键词列表
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
return on_message(keyword(*keywords) & rule, **kwargs, _depth=_depth + 1)
def on_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
_depth: int = 0,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息以指定命令开头时响应
命令匹配规则参考: `命令形式匹配 <rule.html#command-command>`_
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
if len(message) < 1:
return
segment = message.pop(0)
segment_text = str(segment).lstrip()
if not segment_text.startswith(state["_prefix"]["raw_command"]):
return
new_message = message.__class__(
segment_text[len(state["_prefix"]["raw_command"]):].lstrip())
for new_segment in reversed(new_message):
message.insert(0, new_segment)
handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
return on_message(command(*commands) & rule,
handlers=handlers,
**kwargs,
_depth=_depth + 1)
def on_shell_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
parser: Optional[ArgumentParser] = None,
_depth: int = 0,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器
与普通的 ``on_command`` 不同的是在添加 ``parser`` 参数时, 响应器会自动处理消息
并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]``
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
segment = message.pop(0)
new_message = message.__class__(
str(segment)
[len(state["_prefix"]["raw_command"]):].strip()) # type: ignore
for new_segment in reversed(new_message):
message.insert(0, new_segment)
handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
return on_message(shell_command(*commands, parser=parser) & rule,
handlers=handlers,
**kwargs,
_depth=_depth + 1)
def on_regex(pattern: str,
flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Union[Rule, T_RuleChecker]] = None,
_depth: int = 0,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息匹配正则表达式时响应
命令匹配规则参考: `正则匹配 <rule.html#regex-regex-flags-0>`_
:参数:
* ``pattern: str``: 正则表达式
* ``flags: Union[int, re.RegexFlag]``: 正则匹配标志
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
return on_message(regex(pattern, flags) & rule, **kwargs, _depth=_depth + 1)
class CommandGroup:
"""命令组,用于声明一组有相同名称前缀的命令。"""
def __init__(self, cmd: Union[str, Tuple[str, ...]], **kwargs):
"""
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀
* ``**kwargs``: 其他传递给 ``on_command`` 的参数默认值参考 `on_command <#on-command-cmd-rule-none-aliases-none-kwargs>`_
"""
self.basecmd: Tuple[str, ...] = (cmd,) if isinstance(cmd, str) else cmd
"""
- **类型**: ``Tuple[str, ...]``
- **说明**: 命令前缀
"""
if "aliases" in kwargs:
del kwargs["aliases"]
self.base_kwargs: Dict[str, Any] = kwargs
"""
- **类型**: ``Dict[str, Any]``
- **说明**: 其他传递给 ``on_command`` 的参数默认值
"""
def command(self, cmd: Union[str, Tuple[str, ...]],
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个新的命令
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀
* ``**kwargs``: 其他传递给 ``on_command`` 的参数将会覆盖命令组默认值
:返回:
- ``Type[Matcher]``
"""
sub_cmd = (cmd,) if isinstance(cmd, str) else cmd
cmd = self.basecmd + sub_cmd
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
return on_command(cmd, **final_kwargs, _depth=1)
def shell_command(self, cmd: Union[str, Tuple[str, ...]],
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个新的命令
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀
* ``**kwargs``: 其他传递给 ``on_shell_command`` 的参数将会覆盖命令组默认值
:返回:
- ``Type[Matcher]``
"""
sub_cmd = (cmd,) if isinstance(cmd, str) else cmd
cmd = self.basecmd + sub_cmd
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
return on_shell_command(cmd, **final_kwargs, _depth=1)
class MatcherGroup:
"""事件响应器组合,统一管理。为 ``Matcher`` 创建提供默认属性。"""
def __init__(self, **kwargs):
"""
:说明:
创建一个事件响应器组合参数为默认值 ``on`` 一致
"""
self.matchers: List[Type[Matcher]] = []
"""
:类型: ``List[Type[Matcher]]``
:说明: 组内事件响应器列表
"""
self.base_kwargs: Dict[str, Any] = kwargs
"""
- **类型**: ``Dict[str, Any]``
- **说明**: 其他传递给 ``on`` 的参数默认值
"""
def on(self, **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个基础事件响应器可自定义类型
:参数:
* ``type: str``: 事件响应器类型
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
matcher = on(**final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
def on_metaevent(self, **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个元事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
final_kwargs.pop("permission", None)
matcher = on_metaevent(**final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
def on_message(self, **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_message(**final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
def on_notice(self, **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个通知事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_notice(**final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
def on_request(self, **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个请求事件响应器
:参数:
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_request(**final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
def on_startswith(self, msg: Union[str, Tuple[str, ...]],
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息的**文本部分**以指定内容开头时响应
:参数:
* ``msg: Union[str, Tuple[str, ...]]``: 指定消息开头内容
* ``ignorecase: bool``: 是否忽略大小写
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_startswith(msg, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
def on_endswith(self, msg: Union[str, Tuple[str, ...]],
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息的**文本部分**以指定内容结尾时响应
:参数:
* ``msg: Union[str, Tuple[str, ...]]``: 指定消息结尾内容
* ``ignorecase: bool``: 是否忽略大小写
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_endswith(msg, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
def on_keyword(self, keywords: Set[str], **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息纯文本部分包含关键词时响应
:参数:
* ``keywords: Set[str]``: 关键词列表
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_keyword(keywords, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
def on_command(self,
cmd: Union[str, Tuple[str, ...]],
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息以指定命令开头时响应
命令匹配规则参考: `命令形式匹配 <rule.html#command-command>`_
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_command(cmd, aliases=aliases, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
def on_shell_command(self,
cmd: Union[str, Tuple[str, ...]],
aliases: Optional[Set[Union[str, Tuple[str,
...]]]] = None,
parser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器
与普通的 ``on_command`` 不同的是在添加 ``parser`` 参数时, 响应器会自动处理消息
并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]``
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_shell_command(cmd,
aliases=aliases,
parser=parser,
**final_kwargs,
_depth=1)
self.matchers.append(matcher)
return matcher
def on_regex(self,
pattern: str,
flags: Union[int, re.RegexFlag] = 0,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个消息事件响应器并且当消息匹配正则表达式时响应
命令匹配规则参考: `正则匹配 <rule.html#regex-regex-flags-0>`_
:参数:
* ``pattern: str``: 正则表达式
* ``flags: Union[int, re.RegexFlag]``: 正则匹配标志
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[Union[T_Handler, Handler]]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_regex(pattern, flags=flags, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher

View File

@ -1,7 +1,5 @@
import re
from types import ModuleType
from dataclasses import dataclass
from typing import Set, Dict, List, Type, Tuple, Union, Optional
from typing import Set, List, Type, Tuple, Union, Optional
from nonebot.handler import Handler
from nonebot.matcher import Matcher
@ -9,26 +7,6 @@ from nonebot.permission import Permission
from nonebot.rule import Rule, ArgumentParser
from nonebot.typing import T_State, T_Handler, T_RuleChecker, T_StateFactory
from .export import Export
from .export import export as export
plugins: Dict[str, "Plugin"] = ...
PLUGIN_NAMESPACE: str = ...
@dataclass(eq=False)
class Plugin(object):
name: str
module: ModuleType
@property
def export(self) -> Export:
...
@property
def matcher(self) -> Set[Type[Matcher]]:
...
def on(type: str = "",
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
@ -387,40 +365,3 @@ class MatcherGroup:
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def load_plugin(module_path: str) -> Optional[Plugin]:
...
def load_plugins(*plugin_dir: str) -> Set[Plugin]:
...
def load_all_plugins(module_path: Set[str],
plugin_dir: Set[str]) -> Set[Plugin]:
...
def load_from_json(file_path: str, encoding: str = ...) -> Set[Plugin]:
...
def load_from_toml(file_path: str, encoding: str = ...) -> Set[Plugin]:
...
def load_builtin_plugins(name: str = ...) -> Optional[Plugin]:
...
def get_plugin(name: str) -> Optional[Plugin]:
...
def get_loaded_plugins() -> Set[Plugin]:
...
def require(name: str) -> Optional[Export]:
...

91
nonebot/plugin/plugin.py Normal file
View File

@ -0,0 +1,91 @@
from types import ModuleType
from dataclasses import field, dataclass
from typing import Set, Dict, Type, Optional
from .export import Export
from nonebot.matcher import Matcher
plugins: Dict[str, "Plugin"] = {}
"""
:类型: ``Dict[str, Plugin]``
:说明: 已加载的插件
"""
@dataclass(eq=False)
class Plugin(object):
"""存储插件信息"""
name: str
"""
- **类型**: ``str``
- **说明**: 插件名称使用 文件/文件夹 名称作为插件名
"""
module: ModuleType
"""
- **类型**: ``ModuleType``
- **说明**: 插件模块对象
"""
module_name: str
"""
- **类型**: ``str``
- **说明**: 点分割模块路径
"""
export: Export = field(default_factory=Export)
"""
- **类型**: ``Export``
- **说明**: 插件内定义的导出内容
"""
matcher: Set[Type[Matcher]] = field(default_factory=set)
"""
- **类型**: ``Set[Type[Matcher]]``
- **说明**: 插件内定义的 ``Matcher``
"""
parent_plugin: Optional["Plugin"] = None
"""
- **类型**: ``Optional[Plugin]``
- **说明**: 父插件
"""
sub_plugins: Set["Plugin"] = field(default_factory=set)
"""
- **类型**: ``Set[Plugin]``
- **说明**: 子插件集合
"""
def get_plugin(name: str) -> Optional[Plugin]:
"""
:说明:
获取当前导入的某个插件
:参数:
* ``name: str``: 插件名 ``load_plugin`` 参数一致如果为 ``load_plugins`` 导入的插件则为文件()
:返回:
- ``Optional[Plugin]``
"""
return plugins.get(name)
def get_loaded_plugins() -> Set[Plugin]:
"""
:说明:
获取当前已导入的所有插件
:返回:
- ``Set[Plugin]``
"""
return set(plugins.values())
def _new_plugin(fullname: str, module: ModuleType) -> Plugin:
name = fullname.rsplit(".", 1)[-1] if "." in fullname else fullname
if name in plugins:
raise RuntimeError("Plugin already exists! Check your plugin name.")
plugin = Plugin(name, module, fullname)
plugins[name] = plugin
return plugin

View File

@ -7,6 +7,8 @@ sidebar: auto
## v2.0.0a17
- 新增 `MessageTemplate` 对于 `str` 普通模板的支持
- 移除插件加载的 `NameSpace` 模式
- 修改 toml 加载插件时的键名为 `tool.nonebot` 以符合规范
## v2.0.0a16

View File

@ -59,6 +59,7 @@ line_length = 80
length_sort = true
skip_gitignore = true
force_sort_within_sections = true
known_local_folder = "nonebot"
extra_standard_library = "typing_extensions"
[build-system]

View File

@ -1,3 +1,3 @@
[nonebot.plugins]
[tool.nonebot]
plugins = ["nonebot_plugin_test"]
plugin_dirs = ["test_plugins"]