nonebot2/nonebot/plugin/manager.py

259 lines
8.8 KiB
Python
Raw Normal View History

2021-02-19 14:58:26 +08:00
import sys
import uuid
import pkgutil
import importlib
from hashlib import md5
from pathlib import Path
2021-02-19 14:58:26 +08:00
from types import ModuleType
from collections import Counter
from contextvars import ContextVar
2021-02-19 14:58:26 +08:00
from importlib.abc import MetaPathFinder
from typing import Set, List, Optional, Iterable
2021-03-31 20:38:00 +08:00
from importlib.machinery import PathFinder, SourceFileLoader
from .export import _export, Export
2021-02-19 14:58:26 +08:00
_current_plugin: ContextVar[Optional[ModuleType]] = ContextVar(
"_current_plugin", default=None)
2021-02-19 14:58:26 +08:00
_internal_space = ModuleType(__name__ + "._internal")
_internal_space.__path__ = [] # type: ignore
sys.modules[_internal_space.__name__] = _internal_space
_manager_stack: List["PluginManager"] = []
class _NamespaceModule(ModuleType):
"""Simple namespace module to store plugins."""
@property
def __path__(self):
return []
def __getattr__(self, name: str):
try:
return super().__getattr__(name) # type: ignore
except AttributeError:
if name.startswith("__"):
raise
raise RuntimeError("Plugin manager not activated!")
class _InternalModule(ModuleType):
"""Internal module for each plugin manager."""
2021-02-20 11:09:16 +08:00
def __init__(self, prefix: str, plugin_manager: "PluginManager"):
super().__init__(f"{prefix}.{plugin_manager.internal_id}")
2021-02-19 14:58:26 +08:00
self.__plugin_manager__ = plugin_manager
@property
def __path__(self) -> List[str]:
return list(self.__plugin_manager__.search_path)
class PluginManager:
def __init__(self,
namespace: str,
2021-02-19 14:58:26 +08:00
plugins: Optional[Iterable[str]] = None,
search_path: Optional[Iterable[str]] = None,
*,
id: Optional[str] = None):
self.namespace: str = namespace
self.namespace_module: ModuleType = self._setup_namespace(namespace)
2021-02-19 14:58:26 +08:00
self.id: str = id or str(uuid.uuid4())
self.internal_id: str = md5(
((self.namespace or "") + self.id).encode()).hexdigest()
self.internal_module = self._setup_internal_module(self.internal_id)
# simple plugin not in search path
self.plugins: Set[str] = set(plugins or [])
self.search_path: Set[str] = set(search_path or [])
# ensure can be loaded
self.list_plugins()
def _setup_namespace(self, namespace: str) -> ModuleType:
2021-02-19 14:58:26 +08:00
try:
module = importlib.import_module(namespace)
except ImportError:
module = _NamespaceModule(namespace)
if "." in namespace:
parent = importlib.import_module(namespace.rsplit(".", 1)[0])
setattr(parent, namespace.rsplit(".", 1)[1], module)
sys.modules[namespace] = module
return module
def _setup_internal_module(self, internal_id: str) -> ModuleType:
if hasattr(_internal_space, internal_id):
raise RuntimeError("Plugin manager already exists!")
2021-02-20 11:09:16 +08:00
2021-03-06 22:25:50 +08:00
prefix = sys._getframe(3).f_globals.get(
2021-02-20 11:09:16 +08:00
"__name__") or _internal_space.__name__
if not prefix.startswith(_internal_space.__name__):
prefix = _internal_space.__name__
module = _InternalModule(prefix, self)
sys.modules[module.__name__] = module # type: ignore
2021-02-19 14:58:26 +08:00
setattr(_internal_space, internal_id, module)
return module
def __enter__(self):
if self in _manager_stack:
raise RuntimeError("Plugin manager already activated!")
_manager_stack.append(self)
return self
def __exit__(self, exc_type, exc_value, traceback):
try:
_manager_stack.pop()
except IndexError:
pass
def search_plugins(self) -> List[str]:
return [
module_info.name
for module_info in pkgutil.iter_modules(self.search_path)
]
def list_plugins(self) -> Set[str]:
_pre_managers: List[PluginManager]
if self in _manager_stack:
_pre_managers = _manager_stack[:_manager_stack.index(self)]
else:
_pre_managers = _manager_stack[:]
_search_path: Set[str] = set()
for manager in _pre_managers:
_search_path |= manager.search_path
if _search_path & self.search_path:
raise RuntimeError("Duplicate plugin search path!")
_search_plugins = self.search_plugins()
c = Counter([*_search_plugins, *self.plugins])
conflict = [name for name, num in c.items() if num > 1]
if conflict:
raise RuntimeError(
f"More than one plugin named {' / '.join(conflict)}!")
return set(_search_plugins) | self.plugins
def load_plugin(self, name) -> ModuleType:
if name in self.plugins:
with self:
return importlib.import_module(name)
2021-02-19 14:58:26 +08:00
if "." in name:
raise ValueError("Plugin name cannot contain '.'")
2021-03-13 18:21:56 +08:00
2021-02-19 14:58:26 +08:00
with self:
return importlib.import_module(f"{self.namespace}.{name}")
def load_all_plugins(self) -> List[ModuleType]:
return [self.load_plugin(name) for name in self.list_plugins()]
def _rewrite_module_name(self, module_name: str) -> Optional[str]:
prefix = f"{self.internal_module.__name__}."
raw_name = module_name[len(self.namespace) +
1:] if module_name.startswith(self.namespace +
".") else None
# dir plugins
if raw_name and raw_name.split(".")[0] in self.search_plugins():
return f"{prefix}{raw_name}"
# third party plugin or renamed dir plugins
elif module_name in self.plugins or module_name.startswith(prefix):
return module_name
# dir plugins
2021-03-31 20:38:00 +08:00
elif module_name in self.search_plugins():
return f"{prefix}{module_name}"
2021-02-19 14:58:26 +08:00
return None
def _check_absolute_import(self, origin_path: str) -> Optional[str]:
if not self.search_path:
return
paths = set([
*self.search_path,
*(str(Path(path).resolve()) for path in self.search_path)
])
for path in paths:
try:
rel_path = Path(origin_path).relative_to(path)
return ".".join(rel_path.parts[:-1] + (rel_path.stem,))
except ValueError:
continue
2021-02-19 14:58:26 +08:00
class PluginFinder(MetaPathFinder):
def find_spec(self, fullname: str, path, target):
if _manager_stack:
index = -1
origin_spec = PathFinder.find_spec(fullname, path, target)
2021-02-19 14:58:26 +08:00
while -index <= len(_manager_stack):
manager = _manager_stack[index]
rel_name = None
if origin_spec and origin_spec.origin:
rel_name = manager._check_absolute_import(
origin_spec.origin)
newname = manager._rewrite_module_name(rel_name or fullname)
2021-02-19 14:58:26 +08:00
if newname:
spec = PathFinder.find_spec(
newname, path or [*manager.search_path, *sys.path],
2021-04-02 00:05:27 +08:00
target)
2021-02-19 14:58:26 +08:00
if spec:
spec.loader = PluginLoader( # type: ignore
manager, newname, spec.origin)
2021-02-19 14:58:26 +08:00
return spec
index -= 1
return None
2021-03-13 18:21:56 +08:00
class PluginLoader(SourceFileLoader):
2021-03-19 14:59:59 +08:00
def __init__(self, manager: PluginManager, fullname: str, path) -> None:
self.manager = manager
2021-03-13 18:21:56 +08:00
self.loaded = False
super().__init__(fullname, path)
def create_module(self, spec) -> Optional[ModuleType]:
if self.name in sys.modules:
self.loaded = True
return sys.modules[self.name]
2021-03-19 14:59:59 +08:00
# return None to use default module creation
2021-03-13 18:21:56 +08:00
return super().create_module(spec)
def exec_module(self, module: ModuleType) -> None:
if self.loaded:
return
2021-03-31 20:38:00 +08:00
export = Export()
_export_token = _export.set(export)
prefix = self.manager.internal_module.__name__
is_dir_plugin = self.name.startswith(prefix + ".")
module_name = self.name[len(prefix) +
1:] if is_dir_plugin else self.name
_plugin_token = _current_plugin.set(module)
setattr(module, "__export__", export)
setattr(module, "__plugin_name__",
module_name.split(".")[0] if is_dir_plugin else module_name)
setattr(module, "__module_name__", module_name)
setattr(module, "__module_prefix__", prefix if is_dir_plugin else "")
# try:
# super().exec_module(module)
# except Exception as e:
# raise ImportError(
# f"Error when executing module {module_name} from {module.__file__}."
# ) from e
super().exec_module(module)
_current_plugin.reset(_plugin_token)
_export.reset(_export_token)
2021-03-19 14:59:59 +08:00
return
2021-03-13 18:21:56 +08:00
2021-02-19 14:58:26 +08:00
sys.meta_path.insert(0, PluginFinder())