nonebot2/nonebot/plugin.py

267 lines
9.1 KiB
Python
Raw Normal View History

2020-06-30 10:13:58 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2020-08-17 16:09:41 +08:00
import re
2020-08-24 17:59:36 +08:00
import sys
2020-08-15 17:22:10 +08:00
import pkgutil
2020-06-30 10:13:58 +08:00
import importlib
from dataclasses import dataclass
2020-08-24 17:59:36 +08:00
from importlib._bootstrap import _load
2020-06-30 10:13:58 +08:00
2020-07-05 20:39:34 +08:00
from nonebot.log import logger
2020-08-20 17:15:05 +08:00
from nonebot.permission import Permission
from nonebot.typing import Handler, RuleChecker
from nonebot.matcher import Matcher, MatcherGroup
2020-08-17 16:09:41 +08:00
from nonebot.rule import Rule, startswith, endswith, command, regex
from nonebot.typing import Set, List, Dict, Type, Tuple, Union, Optional, ModuleType
2020-06-30 10:13:58 +08:00
plugins: Dict[str, "Plugin"] = {}
_tmp_matchers: Set[Type[Matcher]] = set()
@dataclass(eq=False)
2020-06-30 10:13:58 +08:00
class Plugin(object):
name: str
module: ModuleType
matcher: Set[Type[Matcher]]
2020-06-30 10:13:58 +08:00
def on(rule: Optional[Union[Rule, RuleChecker]] = None,
2020-08-20 17:15:05 +08:00
permission: Permission = Permission(),
*,
handlers: Optional[List[Handler]] = None,
temp: bool = False,
2020-08-20 17:15:05 +08:00
priority: int = 1,
block: bool = False,
state: Optional[dict] = None) -> Type[Matcher]:
2020-08-20 17:15:05 +08:00
matcher = Matcher.new("",
Rule() & rule,
permission,
temp=temp,
priority=priority,
block=block,
2020-08-20 17:15:05 +08:00
handlers=handlers,
default_state=state)
_tmp_matchers.add(matcher)
return matcher
def on_metaevent(rule: Optional[Union[Rule, RuleChecker]] = None,
2020-07-25 12:28:30 +08:00
*,
handlers: Optional[List[Handler]] = None,
temp: bool = False,
2020-07-25 12:28:30 +08:00
priority: int = 1,
block: bool = False,
state: Optional[dict] = None) -> Type[Matcher]:
2020-08-20 17:15:05 +08:00
matcher = Matcher.new("meta_event",
Rule() & rule,
Permission(),
2020-07-25 12:28:30 +08:00
temp=temp,
priority=priority,
block=block,
2020-07-25 12:28:30 +08:00
handlers=handlers,
default_state=state)
_tmp_matchers.add(matcher)
return matcher
def on_message(rule: Optional[Union[Rule, RuleChecker]] = None,
2020-08-20 17:15:05 +08:00
permission: Permission = Permission(),
2020-06-30 10:13:58 +08:00
*,
handlers: Optional[List[Handler]] = None,
temp: bool = False,
2020-06-30 10:13:58 +08:00
priority: int = 1,
block: bool = True,
state: Optional[dict] = None) -> Type[Matcher]:
2020-08-20 17:15:05 +08:00
matcher = Matcher.new("message",
Rule() & rule,
2020-08-17 16:09:41 +08:00
permission,
2020-07-25 12:28:30 +08:00
temp=temp,
priority=priority,
block=block,
2020-07-25 12:28:30 +08:00
handlers=handlers,
default_state=state)
_tmp_matchers.add(matcher)
return matcher
def on_notice(rule: Optional[Union[Rule, RuleChecker]] = None,
2020-07-25 12:28:30 +08:00
*,
handlers: Optional[List[Handler]] = None,
temp: bool = False,
2020-07-25 12:28:30 +08:00
priority: int = 1,
block: bool = False,
state: Optional[dict] = None) -> Type[Matcher]:
2020-08-20 17:15:05 +08:00
matcher = Matcher.new("notice",
Rule() & rule,
Permission(),
2020-07-25 12:28:30 +08:00
temp=temp,
priority=priority,
block=block,
2020-07-25 12:28:30 +08:00
handlers=handlers,
default_state=state)
_tmp_matchers.add(matcher)
return matcher
def on_request(rule: Optional[Union[Rule, RuleChecker]] = None,
2020-07-25 12:28:30 +08:00
*,
handlers: Optional[List[Handler]] = None,
temp: bool = False,
2020-07-25 12:28:30 +08:00
priority: int = 1,
block: bool = False,
state: Optional[dict] = None) -> Type[Matcher]:
2020-08-20 17:15:05 +08:00
matcher = Matcher.new("request",
Rule() & rule,
Permission(),
2020-06-30 10:13:58 +08:00
temp=temp,
priority=priority,
block=block,
2020-06-30 10:13:58 +08:00
handlers=handlers,
default_state=state)
_tmp_matchers.add(matcher)
return matcher
2020-08-17 16:09:41 +08:00
def on_startswith(msg: str,
rule: Optional[Optional[Union[Rule, RuleChecker]]] = None,
2020-08-20 17:15:05 +08:00
permission: Permission = Permission(),
2020-08-17 16:09:41 +08:00
**kwargs) -> Type[Matcher]:
return on_message(startswith(msg) &
rule, permission, **kwargs) if rule else on_message(
startswith(msg), permission, **kwargs)
def on_endswith(msg: str,
rule: Optional[Optional[Union[Rule, RuleChecker]]] = None,
2020-08-20 17:15:05 +08:00
permission: Permission = Permission(),
2020-08-17 16:09:41 +08:00
**kwargs) -> Type[Matcher]:
return on_message(endswith(msg) &
rule, permission, **kwargs) if rule else on_message(
startswith(msg), permission, **kwargs)
2020-08-23 20:01:58 +08:00
def on_command(cmd: Union[str, Tuple[str, ...]],
alias: Set[Union[str, Tuple[str, ...]]] = None,
2020-08-17 16:09:41 +08:00
rule: Optional[Union[Rule, RuleChecker]] = None,
2020-08-20 17:15:05 +08:00
permission: Permission = Permission(),
**kwargs) -> Union[Type[Matcher], MatcherGroup]:
2020-08-23 10:45:26 +08:00
if isinstance(cmd, str):
cmd = (cmd,)
async def _strip_cmd(bot, event, state: dict):
message = event.message
event.message = message.__class__(
str(message)[len(state["_prefix"]["raw_command"]):].strip())
handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd)
if alias:
alias = set(map(lambda x: (x,) if isinstance(x, str) else x, alias))
group = MatcherGroup("message",
Rule() & rule,
permission,
handlers=handlers,
**kwargs)
for cmd_ in [cmd, *alias]:
group.new(rule=command(cmd_))
return group
else:
return on_message(
command(cmd) & rule, permission, handlers=handlers, **
kwargs) if rule else on_message(
command(cmd), permission, handlers=handlers, **kwargs)
2020-08-17 16:09:41 +08:00
def on_regex(pattern: str,
flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Rule] = None,
2020-08-20 17:15:05 +08:00
permission: Permission = Permission(),
2020-08-17 16:09:41 +08:00
**kwargs) -> Type[Matcher]:
return on_message(regex(pattern, flags) &
2020-08-20 17:15:05 +08:00
rule, permission, **kwargs) if rule else on_message(
regex(pattern, flags), permission, **kwargs)
2020-06-30 10:13:58 +08:00
def load_plugin(module_path: str) -> Optional[Plugin]:
try:
_tmp_matchers.clear()
if module_path in plugins:
return plugins[module_path]
elif module_path in sys.modules:
logger.warning(
f"Module {module_path} has been loaded by other plugins! Ignored"
)
return
2020-06-30 10:13:58 +08:00
module = importlib.import_module(module_path)
2020-08-27 16:43:58 +08:00
for m in _tmp_matchers:
m.module = module_path
2020-06-30 10:13:58 +08:00
plugin = Plugin(module_path, module, _tmp_matchers.copy())
plugins[module_path] = plugin
logger.opt(
colors=True).info(f'Succeeded to import "<y>{module_path}</y>"')
2020-06-30 10:13:58 +08:00
return plugin
except Exception as e:
2020-08-27 13:27:42 +08:00
logger.opt(colors=True, exception=e).error(
f'<r><bg #f8bbd0>Failed to import "{module_path}"</bg #f8bbd0></r>')
2020-06-30 10:13:58 +08:00
return None
2020-08-15 17:22:10 +08:00
def load_plugins(*plugin_dir: str) -> Set[Plugin]:
loaded_plugins = set()
for module_info in pkgutil.iter_modules(plugin_dir):
_tmp_matchers.clear()
name = module_info.name
if name.startswith("_"):
2020-06-30 10:13:58 +08:00
continue
2020-08-24 17:59:36 +08:00
spec = module_info.module_finder.find_spec(name)
if spec.name in plugins:
continue
elif spec.name in sys.modules:
logger.warning(
f"Module {spec.name} has been loaded by other plugin! Ignored")
2020-08-24 17:59:36 +08:00
continue
2020-08-15 17:22:10 +08:00
try:
2020-08-24 17:59:36 +08:00
module = _load(spec)
2020-08-15 17:22:10 +08:00
2020-08-27 16:43:58 +08:00
for m in _tmp_matchers:
m.module = name
2020-08-15 17:22:10 +08:00
plugin = Plugin(name, module, _tmp_matchers.copy())
plugins[name] = plugin
loaded_plugins.add(plugin)
2020-08-27 13:27:42 +08:00
logger.opt(colors=True).info(f'Succeeded to import "<y>{name}</y>"')
2020-08-15 17:22:10 +08:00
except Exception as e:
2020-08-27 13:27:42 +08:00
logger.opt(colors=True, exception=e).error(
f'<r><bg #f8bbd0>Failed to import "{name}"</bg #f8bbd0></r>')
2020-08-15 17:22:10 +08:00
return loaded_plugins
2020-06-30 10:13:58 +08:00
def load_builtin_plugins():
return load_plugin("nonebot.plugins.base")
2020-06-30 10:13:58 +08:00
def get_loaded_plugins() -> Set[Plugin]:
return set(plugins.values())
class CommandGroup:
def __init__(self, name: Union[str, Tuple[str, ...]], **kwargs):
self.basename = (name,) if isinstance(name, str) else name
if "aliases" in kwargs:
del kwargs["aliases"]
self.base_kwargs = kwargs
def command(self, name: Union[str, Tuple[str, ...]],
**kwargs) -> Union[Type[Matcher], MatcherGroup]:
sub_name = (name,) if isinstance(name, str) else name
name = self.basename + sub_name
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
return on_command(name, **final_kwargs)