nonebot2/nonebot/plugin.py

417 lines
14 KiB
Python
Raw Normal View History

2019-01-03 19:58:56 +08:00
import os
import re
2020-04-20 13:50:38 +08:00
import shlex
2020-04-07 21:58:10 +08:00
import warnings
import importlib
from types import ModuleType
2020-04-20 13:50:38 +08:00
from typing import Any, Set, Dict, Union, Optional, Iterable, Callable
2019-01-03 19:58:56 +08:00
from .log import logger
2020-04-20 13:50:38 +08:00
from nonebot import permission as perm
2020-04-07 21:58:10 +08:00
from .command import Command, CommandManager
from .natural_language import NLProcessor, NLPManager
from .notice_request import _bus, EventHandler
2020-04-20 13:50:38 +08:00
from .typing import CommandName_T, CommandHandler_T
_tmp_command: Set[Command] = set()
_tmp_nl_processor: Set[NLProcessor] = set()
_tmp_event_handler: Set[EventHandler] = set()
2019-01-03 19:58:56 +08:00
2019-01-03 20:03:14 +08:00
2019-01-09 21:29:10 +08:00
class Plugin:
2020-04-20 13:50:38 +08:00
__slots__ = ('module', 'name', 'usage', 'commands', 'nl_processors',
'event_handlers')
2019-01-09 21:29:10 +08:00
2020-04-20 13:50:38 +08:00
def __init__(self,
module: ModuleType,
2019-01-09 21:29:10 +08:00
name: Optional[str] = None,
2020-04-07 21:58:10 +08:00
usage: Optional[Any] = None,
commands: Set[Command] = set(),
nl_processors: Set[NLProcessor] = set(),
event_handlers: Set[EventHandler] = set()):
2019-01-09 21:29:10 +08:00
self.module = module
self.name = name
self.usage = usage
2020-04-07 21:58:10 +08:00
self.commands = commands
self.nl_processors = nl_processors
self.event_handlers = event_handlers
2020-04-20 13:50:38 +08:00
2020-04-07 21:58:10 +08:00
class PluginManager:
_plugins: Dict[str, Plugin] = {}
2020-04-20 13:50:38 +08:00
2020-04-07 21:58:10 +08:00
def __init__(self):
self.cmd_manager = CommandManager()
self.nlp_manager = NLPManager()
2020-04-20 13:50:38 +08:00
2020-04-07 21:58:10 +08:00
@classmethod
2020-04-20 13:50:38 +08:00
def add_plugin(cls, module_path: str, plugin: Plugin) -> None:
2020-04-07 21:58:10 +08:00
"""Register a plugin
Args:
2020-04-20 13:50:38 +08:00
name (str): module path
2020-04-07 21:58:10 +08:00
plugin (Plugin): Plugin object
"""
2020-04-20 13:50:38 +08:00
if module_path in cls._plugins:
warnings.warn(f"Plugin {module_path} already exists")
return
cls._plugins[module_path] = plugin
2020-04-07 21:58:10 +08:00
@classmethod
2020-04-20 13:50:38 +08:00
def get_plugin(cls, module_path: str) -> Optional[Plugin]:
"""Get plugin object by plugin path
Args:
name (str): plugin path
Returns:
Optional[Plugin]: Plugin object
"""
return cls._plugins.get(module_path, None)
2020-04-07 21:58:10 +08:00
@classmethod
2020-04-20 13:50:38 +08:00
def remove_plugin(cls, module_path: str) -> bool:
plugin = cls.get_plugin(module_path)
if not plugin:
warnings.warn(f"Plugin {module_path} not exists")
return False
for command in plugin.commands:
CommandManager.remove_command(command.name)
for nl_processor in plugin.nl_processors:
NLPManager.remove_nl_processor(nl_processor)
for event_handler in plugin.event_handlers:
for event in event_handler.events:
_bus.unsubscribe(event, event_handler.func)
del cls._plugins[module_path]
return True
2020-04-07 21:58:10 +08:00
@classmethod
2020-04-20 13:50:38 +08:00
def switch_plugin_global(cls, name: str,
state: Optional[bool] = None) -> None:
2020-04-07 21:58:10 +08:00
"""Change plugin state globally or simply switch it if `state` is None
Args:
name (str): Plugin name
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = cls.get_plugin(name)
if not plugin:
warnings.warn(f"Plugin {name} not found")
return
for command in plugin.commands:
CommandManager.switch_command_global(command.name, state)
for nl_processor in plugin.nl_processors:
2020-04-11 14:56:39 +08:00
NLPManager.switch_nlprocessor_global(nl_processor, state)
for event_handler in plugin.event_handlers:
for event in event_handler.events:
if event_handler.func in _bus._subscribers[event] and not state:
_bus.unsubscribe(event, event_handler.func)
2020-04-20 13:50:38 +08:00
elif event_handler.func not in _bus._subscribers[
event] and state != False:
2020-04-11 14:56:39 +08:00
_bus.subscribe(event, event_handler.func)
@classmethod
2020-04-20 13:50:38 +08:00
def switch_command_global(cls, name: str,
state: Optional[bool] = None) -> None:
2020-04-11 14:56:39 +08:00
"""Change plugin command state globally or simply switch it if `state` is None
Args:
name (str): Plugin name
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = cls.get_plugin(name)
if not plugin:
warnings.warn(f"Plugin {name} not found")
return
for command in plugin.commands:
CommandManager.switch_command_global(command.name, state)
2020-04-20 13:50:38 +08:00
2020-04-11 14:56:39 +08:00
@classmethod
2020-04-20 13:50:38 +08:00
def switch_nlprocessor_global(cls, name: str,
state: Optional[bool] = None) -> None:
2020-04-11 14:56:39 +08:00
"""Change plugin nlprocessor state globally or simply switch it if `state` is None
Args:
name (str): Plugin name
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = cls.get_plugin(name)
if not plugin:
warnings.warn(f"Plugin {name} not found")
return
for processor in plugin.nl_processors:
NLPManager.switch_nlprocessor_global(processor, state)
@classmethod
2020-04-20 13:50:38 +08:00
def switch_eventhandler_global(cls, name: str,
state: Optional[bool] = None) -> None:
2020-04-11 14:56:39 +08:00
"""Change plugin event handler state globally or simply switch it if `state` is None
Args:
name (str): Plugin name
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = cls.get_plugin(name)
if not plugin:
warnings.warn(f"Plugin {name} not found")
return
2020-04-07 21:58:10 +08:00
for event_handler in plugin.event_handlers:
for event in event_handler.events:
if event_handler.func in _bus._subscribers[event] and not state:
_bus.unsubscribe(event, event_handler.func)
2020-04-20 13:50:38 +08:00
elif event_handler.func not in _bus._subscribers[
event] and state != False:
2020-04-07 21:58:10 +08:00
_bus.subscribe(event, event_handler.func)
def switch_plugin(self, name: str, state: Optional[bool] = None) -> None:
"""Change plugin state or simply switch it if `state` is None
2020-04-11 14:56:39 +08:00
Tips:
This method will only change the state of the plugin's
commands and natural language processors since change
state of the event handler partially is meaningless.
2020-04-07 21:58:10 +08:00
Args:
name (str): Plugin name
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = self.get_plugin(name)
if not plugin:
warnings.warn(f"Plugin {name} not found")
return
for command in plugin.commands:
self.cmd_manager.switch_command(command.name, state)
for nl_processor in plugin.nl_processors:
2020-04-11 14:56:39 +08:00
self.nlp_manager.switch_nlprocessor(nl_processor, state)
2020-04-20 13:50:38 +08:00
2020-04-11 14:56:39 +08:00
def switch_command(self, name: str, state: Optional[bool] = None) -> None:
"""Change plugin command state or simply switch it if `state` is None
Args:
name (str): Plugin name
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = self.get_plugin(name)
if not plugin:
warnings.warn(f"Plugin {name} not found")
return
for command in plugin.commands:
self.cmd_manager.switch_command(command.name, state)
2020-04-20 13:50:38 +08:00
def switch_nlprocessor(self, name: str,
state: Optional[bool] = None) -> None:
2020-04-11 14:56:39 +08:00
"""Change plugin nlprocessor state or simply switch it if `state` is None
Args:
name (str): Plugin name
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = self.get_plugin(name)
if not plugin:
warnings.warn(f"Plugin {name} not found")
return
for processor in plugin.nl_processors:
self.nlp_manager.switch_nlprocessor(processor, state)
2020-04-07 21:58:10 +08:00
2020-04-20 13:50:38 +08:00
def load_plugin(module_path: str) -> Optional[Plugin]:
"""Load a module as a plugin
Args:
module_path (str): path of module to import
Returns:
Optional[Plugin]: Plugin object loaded
2019-01-03 19:58:56 +08:00
"""
2020-04-20 13:50:38 +08:00
# Make sure tmp is clean
_tmp_command.clear()
_tmp_nl_processor.clear()
_tmp_event_handler.clear()
2019-01-03 19:58:56 +08:00
try:
2020-04-20 13:50:38 +08:00
module = importlib.import_module(module_path)
2019-01-03 19:58:56 +08:00
name = getattr(module, '__plugin_name__', None)
usage = getattr(module, '__plugin_usage__', None)
2020-04-20 13:50:38 +08:00
commands = _tmp_command.copy()
nl_processors = _tmp_nl_processor.copy()
event_handlers = _tmp_event_handler.copy()
plugin = Plugin(module, name, usage, commands, nl_processors,
event_handlers)
PluginManager.add_plugin(module_path, plugin)
logger.info(f'Succeeded to import "{module_path}"')
2020-04-07 21:58:10 +08:00
return plugin
2019-01-03 19:58:56 +08:00
except Exception as e:
2020-04-20 13:50:38 +08:00
logger.error(f'Failed to import "{module_path}", error: {e}')
2019-01-03 19:58:56 +08:00
logger.exception(e)
2020-04-07 21:58:10 +08:00
return None
2020-04-20 13:50:38 +08:00
def reload_plugin(module_path: str) -> Optional[Plugin]:
result = PluginManager.remove_plugin(module_path)
if not result:
return None
return load_plugin(module_path)
2019-01-03 19:58:56 +08:00
2020-04-07 21:58:10 +08:00
def load_plugins(plugin_dir: str, module_prefix: str) -> Set[Plugin]:
2019-01-03 19:58:56 +08:00
"""
Find all non-hidden modules or packages in a given directory,
and import them with the given module prefix.
:param plugin_dir: plugin directory to search
:param module_prefix: module prefix used while importing
:return: number of plugins successfully loaded
"""
2020-04-07 21:58:10 +08:00
count = set()
2019-01-03 19:58:56 +08:00
for name in os.listdir(plugin_dir):
path = os.path.join(plugin_dir, name)
if os.path.isfile(path) and \
(name.startswith('_') or not name.endswith('.py')):
continue
if os.path.isdir(path) and \
(name.startswith('_') or not os.path.exists(
os.path.join(path, '__init__.py'))):
continue
m = re.match(r'([_A-Z0-9a-z]+)(.py)?', name)
if not m:
continue
2020-04-07 21:58:10 +08:00
result = load_plugin(f'{module_prefix}.{m.group(1)}')
if result:
count.add(result)
2019-01-03 19:58:56 +08:00
return count
2020-04-07 21:58:10 +08:00
def load_builtin_plugins() -> Set[Plugin]:
2019-01-03 19:58:56 +08:00
"""
Load built-in plugins distributed along with "nonebot" package.
"""
plugin_dir = os.path.join(os.path.dirname(__file__), 'plugins')
return load_plugins(plugin_dir, 'nonebot.plugins')
def get_loaded_plugins() -> Set[Plugin]:
2019-01-03 20:04:31 +08:00
"""
Get all plugins loaded.
:return: a set of Plugin objects
"""
2020-04-20 13:50:38 +08:00
return set(PluginManager._plugins.values())
def on_command(name: Union[str, CommandName_T],
*,
aliases: Union[Iterable[str], str] = (),
permission: int = perm.EVERYBODY,
only_to_me: bool = True,
privileged: bool = False,
shell_like: bool = False) -> Callable:
"""
Decorator to register a function as a command.
:param name: command name (e.g. 'echo' or ('random', 'number'))
:param aliases: aliases of command name, for convenient access
:param permission: permission required by the command
:param only_to_me: only handle messages to me
:param privileged: can be run even when there is already a session
:param shell_like: use shell-like syntax to split arguments
"""
def deco(func: CommandHandler_T) -> CommandHandler_T:
if not isinstance(name, (str, tuple)):
raise TypeError('the name of a command must be a str or tuple')
if not name:
raise ValueError('the name of a command must not be empty')
cmd_name = (name,) if isinstance(name, str) else name
cmd = Command(name=cmd_name,
func=func,
permission=permission,
only_to_me=only_to_me,
privileged=privileged)
if shell_like:
async def shell_like_args_parser(session):
session.args['argv'] = shlex.split(session.current_arg)
cmd.args_parser_func = shell_like_args_parser
CommandManager.add_command(cmd_name, cmd)
CommandManager.add_aliases(aliases, cmd)
_tmp_command.add(cmd)
func.args_parser = cmd.args_parser
return func
return deco
def on_natural_language(
keywords: Union[Optional[Iterable], str, Callable] = None,
*,
permission: int = perm.EVERYBODY,
only_to_me: bool = True,
only_short_message: bool = True,
allow_empty_message: bool = False) -> Callable:
"""
Decorator to register a function as a natural language processor.
:param keywords: keywords to respond to, if None, respond to all messages
:param permission: permission required by the processor
:param only_to_me: only handle messages to me
:param only_short_message: only handle short messages
:param allow_empty_message: handle empty messages
"""
def deco(func: Callable) -> Callable:
nl_processor = NLProcessor(
func=func,
keywords=keywords, # type: ignore
permission=permission,
only_to_me=only_to_me,
only_short_message=only_short_message,
allow_empty_message=allow_empty_message)
NLPManager.add_nl_processor(nl_processor)
_tmp_nl_processor.add(nl_processor)
return func
if isinstance(keywords, Callable):
# here "keywords" is the function to be decorated
return on_natural_language()(keywords)
else:
if isinstance(keywords, str):
keywords = (keywords,)
return deco
def _make_event_deco(post_type: str) -> Callable:
def deco_deco(arg: Optional[Union[str, Callable]] = None,
*events: str) -> Callable:
def deco(func: Callable) -> Callable:
if isinstance(arg, str):
events_tmp = list(
map(lambda x: f"{post_type}.{x}", [arg] + list(events)))
for e in events_tmp:
_bus.subscribe(e, func)
handler = EventHandler(events_tmp, func)
else:
_bus.subscribe(post_type, func)
handler = EventHandler([post_type], func)
_tmp_event_handler.add(handler)
return func
if isinstance(arg, Callable):
return deco(arg) # type: ignore
return deco
return deco_deco
on_notice = _make_event_deco('notice')
on_request = _make_event_deco('request')