nonebot2/nonebot/plugin/manager.py

258 lines
8.6 KiB
Python
Raw Normal View History

2022-01-22 15:23:07 +08:00
"""本模块实现插件加载流程。
参考: [import hooks](https://docs.python.org/3/reference/import.html#import-hooks), [PEP302](https://www.python.org/dev/peps/pep-0302/)
FrontMatter:
sidebar_position: 5
description: nonebot.plugin.manager 模块
"""
2021-02-19 14:58:26 +08:00
import sys
import pkgutil
import importlib
from pathlib import Path
2021-11-11 17:33:30 +08:00
from itertools import chain
2021-02-19 14:58:26 +08:00
from types import ModuleType
from importlib.abc import MetaPathFinder
2021-03-31 20:38:00 +08:00
from importlib.machinery import PathFinder, SourceFileLoader
from typing import Set, Dict, List, Iterable, Optional, Sequence
2021-03-31 20:38:00 +08:00
2021-11-11 17:33:30 +08:00
from nonebot.log import logger
from nonebot.utils import escape_tag, path_to_module_name
2022-01-15 21:27:43 +08:00
from .plugin import Plugin, PluginMetadata
2022-05-26 16:35:47 +08:00
from . import (
_managers,
_new_plugin,
_revert_plugin,
_current_plugin_chain,
2022-05-26 16:35:47 +08:00
_module_name_to_plugin_name,
)
2021-02-19 14:58:26 +08:00
2021-11-09 00:57:59 +08:00
2021-02-19 14:58:26 +08:00
class PluginManager:
2022-05-26 16:35:47 +08:00
"""插件管理器。
参数:
plugins: 独立插件模块名集合
search_path: 插件搜索路径文件夹
"""
2021-11-09 00:57:59 +08:00
def __init__(
self,
plugins: Optional[Iterable[str]] = None,
search_path: Optional[Iterable[str]] = None,
):
2021-02-19 14:58:26 +08:00
# simple plugin not in search path
self.plugins: Set[str] = set(plugins or [])
self.search_path: Set[str] = set(search_path or [])
2022-05-26 16:35:47 +08:00
2021-11-11 17:33:30 +08:00
# cache plugins
2022-05-26 16:35:47 +08:00
self._third_party_plugin_names: Dict[str, str] = {}
self._searched_plugin_names: Dict[str, Path] = {}
self.prepare_plugins()
def __repr__(self) -> str:
return f"PluginManager(plugins={self.plugins}, search_path={self.search_path})"
2022-05-26 16:35:47 +08:00
@property
def third_party_plugins(self) -> Set[str]:
"""返回所有独立插件名称。"""
return set(self._third_party_plugin_names.keys())
@property
def searched_plugins(self) -> Set[str]:
"""返回已搜索到的插件名称。"""
return set(self._searched_plugin_names.keys())
@property
def available_plugins(self) -> Set[str]:
"""返回当前插件管理器中可用的插件名称。"""
return self.third_party_plugins | self.searched_plugins
2021-02-19 14:58:26 +08:00
2022-05-26 16:35:47 +08:00
def _previous_plugins(self) -> Set[str]:
2021-11-11 17:33:30 +08:00
_pre_managers: List[PluginManager]
if self in _managers:
_pre_managers = _managers[: _managers.index(self)]
2021-11-11 17:33:30 +08:00
else:
_pre_managers = _managers[:]
2022-05-26 16:35:47 +08:00
return {
*chain.from_iterable(manager.available_plugins for manager in _pre_managers)
}
def prepare_plugins(self) -> Set[str]:
"""搜索插件并缓存插件名称。"""
2021-11-11 17:33:30 +08:00
# get all previous ready to load plugins
previous_plugins = self._previous_plugins()
searched_plugins: Dict[str, Path] = {}
2022-05-26 16:35:47 +08:00
third_party_plugins: Dict[str, str] = {}
2021-12-21 11:18:34 +08:00
2022-05-26 16:35:47 +08:00
# check third party plugins
2021-12-21 11:18:34 +08:00
for plugin in self.plugins:
2022-05-26 16:35:47 +08:00
name = _module_name_to_plugin_name(plugin)
2021-12-21 11:18:34 +08:00
if name in third_party_plugins or name in previous_plugins:
raise RuntimeError(
f"Plugin already exists: {name}! Check your plugin name"
)
2022-05-26 16:35:47 +08:00
third_party_plugins[name] = plugin
2021-11-11 17:33:30 +08:00
2022-05-26 16:35:47 +08:00
self._third_party_plugin_names = third_party_plugins
# check plugins in search path
2021-11-11 17:33:30 +08:00
for module_info in pkgutil.iter_modules(self.search_path):
2022-05-26 16:35:47 +08:00
# ignore if startswith "_"
2021-11-11 17:33:30 +08:00
if module_info.name.startswith("_"):
continue
2022-05-26 16:35:47 +08:00
if (
2022-05-26 16:35:47 +08:00
module_info.name in searched_plugins
or module_info.name in previous_plugins
2021-12-21 11:18:34 +08:00
or module_info.name in third_party_plugins
):
2021-11-11 17:33:30 +08:00
raise RuntimeError(
f"Plugin already exists: {module_info.name}! Check your plugin name"
)
2022-05-26 16:35:47 +08:00
if not (
module_spec := module_info.module_finder.find_spec(
module_info.name, None
)
):
2021-11-11 17:33:30 +08:00
continue
if not (module_path := module_spec.origin):
2021-11-11 17:33:30 +08:00
continue
searched_plugins[module_info.name] = Path(module_path).resolve()
2022-05-26 16:35:47 +08:00
self._searched_plugin_names = searched_plugins
2021-11-11 17:33:30 +08:00
2022-05-26 16:35:47 +08:00
return self.available_plugins
2021-11-11 17:33:30 +08:00
2022-01-26 15:37:35 +08:00
def load_plugin(self, name: str) -> Optional[Plugin]:
2022-05-26 16:35:47 +08:00
"""加载指定插件。
对于独立插件可以使用完整插件模块名或者插件名称
参数:
name: 插件名称
"""
2021-11-11 17:33:30 +08:00
try:
if name in self.plugins:
module = importlib.import_module(name)
2022-05-26 16:35:47 +08:00
elif name in self._third_party_plugin_names:
module = importlib.import_module(self._third_party_plugin_names[name])
elif name in self._searched_plugin_names:
2021-11-11 17:33:30 +08:00
module = importlib.import_module(
path_to_module_name(self._searched_plugin_names[name])
)
2022-01-26 15:06:53 +08:00
else:
raise RuntimeError(f"Plugin not found: {name}! Check your plugin name")
2021-11-11 17:33:30 +08:00
if (
plugin := getattr(module, "__plugin__", None)
) is None or not isinstance(plugin, Plugin):
2022-01-26 15:06:53 +08:00
raise RuntimeError(
f"Module {module.__name__} is not loaded as a plugin! "
"Make sure not to import it before loading."
)
logger.opt(colors=True).success(
f'Succeeded to load plugin "<y>{escape_tag(plugin.name)}</y>"'
+ (
f' from "<m>{escape_tag(plugin.module_name)}</m>"'
if plugin.module_name != plugin.name
else ""
)
)
2022-01-26 15:06:53 +08:00
return plugin
2021-11-11 17:33:30 +08:00
except Exception as e:
logger.opt(colors=True, exception=e).error(
f'<r><bg #f8bbd0>Failed to import "{escape_tag(name)}"</bg #f8bbd0></r>'
)
def load_all_plugins(self) -> Set[Plugin]:
2022-05-26 16:35:47 +08:00
"""加载所有可用插件。"""
2021-11-11 17:33:30 +08:00
return set(
2022-05-26 16:35:47 +08:00
filter(None, (self.load_plugin(name) for name in self.available_plugins))
)
2021-02-19 14:58:26 +08:00
class PluginFinder(MetaPathFinder):
def find_spec(
self,
fullname: str,
path: Optional[Sequence[str]],
target: Optional[ModuleType] = None,
):
2021-11-11 17:33:30 +08:00
if _managers:
module_spec = PathFinder.find_spec(fullname, path, target)
if not module_spec:
return
module_origin = module_spec.origin
if not module_origin:
return
module_path = Path(module_origin).resolve()
2022-01-26 20:55:23 +08:00
for manager in reversed(_managers):
2022-05-26 16:35:47 +08:00
# use path instead of name in case of submodule name conflict
if (
fullname in manager.plugins
2022-05-26 16:35:47 +08:00
or module_path in manager._searched_plugin_names.values()
2021-11-11 17:33:30 +08:00
):
module_spec.loader = PluginLoader(manager, fullname, module_origin)
2021-11-11 17:33:30 +08:00
return module_spec
return
2021-02-19 14:58:26 +08:00
2021-03-13 18:21:56 +08:00
class PluginLoader(SourceFileLoader):
2021-03-19 14:59:59 +08:00
def __init__(self, manager: PluginManager, fullname: str, path) -> None:
self.manager = manager
2021-03-13 18:21:56 +08:00
self.loaded = False
super().__init__(fullname, path)
def create_module(self, spec) -> Optional[ModuleType]:
if self.name in sys.modules:
self.loaded = True
return sys.modules[self.name]
2021-03-19 14:59:59 +08:00
# return None to use default module creation
2021-03-13 18:21:56 +08:00
return super().create_module(spec)
def exec_module(self, module: ModuleType) -> None:
if self.loaded:
return
2021-03-31 20:38:00 +08:00
2022-05-26 16:35:47 +08:00
# create plugin before executing
2021-12-20 00:28:17 +08:00
plugin = _new_plugin(self.name, module, self.manager)
2022-05-26 16:35:47 +08:00
setattr(module, "__plugin__", plugin)
# detect parent plugin before entering current plugin context
parent_plugins = _current_plugin_chain.get()
for pre_plugin in reversed(parent_plugins):
# ensure parent plugin is declared before current plugin
if _managers.index(pre_plugin.manager) < _managers.index(self.manager):
plugin.parent_plugin = pre_plugin
pre_plugin.sub_plugins.add(plugin)
break
2022-05-26 16:35:47 +08:00
# enter plugin context
_plugin_token = _current_plugin_chain.set(parent_plugins + (plugin,))
2022-05-26 16:35:47 +08:00
try:
super().exec_module(module)
except Exception:
_revert_plugin(plugin)
raise
finally:
# leave plugin context
_current_plugin_chain.reset(_plugin_token)
2022-01-09 23:15:33 +08:00
# get plugin metadata
metadata: Optional[PluginMetadata] = getattr(module, "__plugin_meta__", None)
plugin.metadata = metadata
2021-03-19 14:59:59 +08:00
return
2021-03-13 18:21:56 +08:00
2021-02-19 14:58:26 +08:00
sys.meta_path.insert(0, PluginFinder())