nonebot2/nonebot/rule.py

474 lines
13 KiB
Python
Raw Normal View History

r"""
2020-09-13 13:01:23 +08:00
规则
====
每个事件响应器 ``Matcher`` 拥有一个匹配规则 ``Rule`` 其中是 ``RuleChecker`` 的集合只有当所有 ``RuleChecker`` 检查结果为 ``True`` 时继续运行
2020-09-13 13:01:23 +08:00
\:\:\:tip 提示
``RuleChecker`` 既可以是 async function 也可以是 sync function
2020-09-13 13:01:23 +08:00
\:\:\:
"""
2020-06-30 10:13:58 +08:00
2020-05-02 20:03:36 +08:00
import re
2021-02-02 11:59:14 +08:00
import shlex
2020-08-14 17:41:24 +08:00
import asyncio
2020-08-17 16:09:41 +08:00
from itertools import product
2021-09-27 00:19:30 +08:00
from argparse import Namespace
2021-11-19 18:18:53 +08:00
from contextlib import AsyncExitStack
from typing_extensions import TypedDict
2021-09-27 00:19:30 +08:00
from argparse import ArgumentParser as ArgParser
from typing import (
Any,
Dict,
List,
Type,
Tuple,
Union,
Callable,
NoReturn,
Optional,
Sequence,
)
2020-05-05 16:11:05 +08:00
2020-08-17 16:09:41 +08:00
from pygtrie import CharTrie
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
from nonebot.log import logger
2021-11-19 18:18:53 +08:00
from nonebot.handler import Handler
from nonebot import params, get_driver
2021-02-02 11:59:14 +08:00
from nonebot.exception import ParserExit
2020-12-17 21:09:30 +08:00
from nonebot.typing import T_State, T_RuleChecker
from nonebot.adapters import Bot, Event, MessageSegment
PREFIX_KEY = "_prefix"
SUFFIX_KEY = "_suffix"
CMD_KEY = "command"
RAW_CMD_KEY = "raw_command"
CMD_RESULT = TypedDict(
"CMD_RESULT", {"command": Optional[Tuple[str, ...]], "raw_command": Optional[str]}
)
SHELL_ARGS = "_args"
SHELL_ARGV = "_argv"
REGEX_MATCHED = "_matched"
REGEX_GROUP = "_matched_groups"
REGEX_DICT = "_matched_dict"
2020-12-06 02:30:19 +08:00
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
class Rule:
2020-09-13 13:01:23 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
``Matcher`` 规则类当事件传递时 ``Matcher`` 运行前进行检查
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
:示例:
.. code-block:: python
Rule(async_function) & sync_function
# 等价于
from nonebot.utils import run_sync
Rule(async_function, run_sync(sync_function))
"""
2020-08-17 16:09:41 +08:00
__slots__ = ("checkers",)
2020-08-14 17:41:24 +08:00
HANDLER_PARAM_TYPES = [
params.BotParam,
params.EventParam,
params.StateParam,
params.DefaultParam,
2021-11-19 18:18:53 +08:00
]
def __init__(self, *checkers: Union[T_RuleChecker, Handler]) -> None:
2020-09-13 13:01:23 +08:00
"""
:参数:
2020-11-30 11:08:00 +08:00
2021-11-21 15:46:48 +08:00
* ``*checkers: Union[T_RuleChecker, Handler]``: RuleChecker
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
"""
2021-11-19 18:18:53 +08:00
self.checkers = set(
checker
if isinstance(checker, Handler)
else Handler(checker, allow_types=self.HANDLER_PARAM_TYPES)
for checker in checkers
)
2020-09-13 13:01:23 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
存储 ``RuleChecker``
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
:类型:
2020-11-30 11:08:00 +08:00
2021-11-19 18:18:53 +08:00
* ``Set[Handler]``
2020-09-13 13:01:23 +08:00
"""
2020-08-14 17:41:24 +08:00
2021-11-19 18:18:53 +08:00
async def __call__(
self,
bot: Bot,
event: Event,
state: T_State,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[Dict[Callable[..., Any], Any]] = None,
) -> bool:
2020-09-13 13:01:23 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
检查是否符合所有规则
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
:参数:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
* ``bot: Bot``: Bot 对象
* ``event: Event``: Event 对象
2020-12-17 21:09:30 +08:00
* ``state: T_State``: 当前 State
* ``stack: Optional[AsyncExitStack]``: 异步上下文栈
* ``dependency_cache: Optional[Dict[Callable[..., Any], Any]]``: 依赖缓存
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
:返回:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
- ``bool``
"""
if not self.checkers:
return True
2020-08-17 16:09:41 +08:00
results = await asyncio.gather(
*(
checker(
bot=bot,
event=event,
state=state,
_stack=stack,
_dependency_cache=dependency_cache,
)
for checker in self.checkers
)
)
2020-08-17 16:09:41 +08:00
return all(results)
2020-08-14 17:41:24 +08:00
2020-12-17 21:09:30 +08:00
def __and__(self, other: Optional[Union["Rule", T_RuleChecker]]) -> "Rule":
2020-09-27 18:05:13 +08:00
if other is None:
return self
elif isinstance(other, Rule):
return Rule(*self.checkers, *other.checkers)
2020-08-14 17:41:24 +08:00
else:
return Rule(*self.checkers, other)
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
def __or__(self, other) -> NoReturn:
raise RuntimeError("Or operation between rules is not allowed.")
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
class TrieRule:
prefix: CharTrie = CharTrie()
suffix: CharTrie = CharTrie()
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
@classmethod
def add_prefix(cls, prefix: str, value: Any):
if prefix in cls.prefix:
logger.warning(f'Duplicated prefix rule "{prefix}"')
return
cls.prefix[prefix] = value
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
@classmethod
def add_suffix(cls, suffix: str, value: Any):
if suffix[::-1] in cls.suffix:
logger.warning(f'Duplicated suffix rule "{suffix}"')
return
cls.suffix[suffix[::-1]] = value
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
@classmethod
def get_value(
cls, bot: Bot, event: Event, state: T_State
) -> Tuple[CMD_RESULT, CMD_RESULT]:
prefix = CMD_RESULT(command=None, raw_command=None)
suffix = CMD_RESULT(command=None, raw_command=None)
state[PREFIX_KEY] = prefix
state[SUFFIX_KEY] = suffix
2020-12-09 19:57:49 +08:00
if event.get_type() != "message":
return prefix, suffix
2020-12-09 19:57:49 +08:00
message = event.get_message()
message_seg: MessageSegment = message[0]
2021-02-07 20:57:08 +08:00
if message_seg.is_text():
pf = cls.prefix.longest_prefix(str(message_seg).lstrip())
prefix[RAW_CMD_KEY] = pf.key
prefix[CMD_KEY] = pf.value
message_seg_r: MessageSegment = message[-1]
2020-12-10 02:13:25 +08:00
if message_seg_r.is_text():
sf = cls.suffix.longest_prefix(str(message_seg_r).rstrip()[::-1])
suffix[RAW_CMD_KEY] = sf.key
suffix[CMD_KEY] = sf.value
return prefix, suffix
2020-07-25 12:28:30 +08:00
def startswith(msg: Union[str, Tuple[str, ...]], ignorecase: bool = False) -> Rule:
2020-09-13 22:36:40 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
匹配消息开头
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
:参数:
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
* ``msg: str``: 消息开头字符串
"""
if isinstance(msg, str):
msg = (msg,)
2020-07-25 12:28:30 +08:00
pattern = re.compile(
f"^(?:{'|'.join(re.escape(prefix) for prefix in msg)})",
re.IGNORECASE if ignorecase else 0,
)
2021-11-14 18:51:23 +08:00
async def _startswith(bot: Bot, event: Event, state: T_State) -> bool:
2020-12-09 19:57:49 +08:00
if event.get_type() != "message":
return False
text = event.get_plaintext()
return bool(pattern.match(text))
2020-07-25 12:28:30 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_startswith)
2020-07-25 12:28:30 +08:00
def endswith(msg: Union[str, Tuple[str, ...]], ignorecase: bool = False) -> Rule:
2020-09-13 22:36:40 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
匹配消息结尾
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
:参数:
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
* ``msg: str``: 消息结尾字符串
"""
if isinstance(msg, str):
msg = (msg,)
pattern = re.compile(
f"(?:{'|'.join(re.escape(prefix) for prefix in msg)})$",
re.IGNORECASE if ignorecase else 0,
)
2020-05-02 20:03:36 +08:00
2021-11-14 18:51:23 +08:00
async def _endswith(bot: Bot, event: Event, state: T_State) -> bool:
2020-12-09 19:57:49 +08:00
if event.get_type() != "message":
return False
text = event.get_plaintext()
2021-08-29 19:18:36 +08:00
return bool(pattern.search(text))
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_endswith)
2020-05-02 20:03:36 +08:00
def keyword(*keywords: str) -> Rule:
"""
:说明:
2020-11-30 11:08:00 +08:00
匹配消息关键词
2020-11-30 11:08:00 +08:00
:参数:
2020-11-30 11:08:00 +08:00
* ``*keywords: str``: 关键词
"""
2020-05-02 20:03:36 +08:00
2021-11-19 18:18:53 +08:00
async def _keyword(event: Event) -> bool:
2020-12-09 19:57:49 +08:00
if event.get_type() != "message":
return False
text = event.get_plaintext()
return bool(text and any(keyword in text for keyword in keywords))
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_keyword)
2020-05-02 20:03:36 +08:00
2020-10-22 22:08:19 +08:00
def command(*cmds: Union[str, Tuple[str, ...]]) -> Rule:
r"""
:说明:
2020-11-30 11:08:00 +08:00
命令形式匹配根据配置里提供的 ``command_start``, ``command_sep`` 判断消息是否为命令
2020-10-28 13:17:41 +08:00
可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令``("test",)``通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本``"/test"``
2020-11-30 11:08:00 +08:00
:参数:
2020-11-30 11:08:00 +08:00
2020-10-22 22:08:19 +08:00
* ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容
2020-11-30 11:08:00 +08:00
:示例:
2020-11-30 11:08:00 +08:00
使用默认 ``command_start``, ``command_sep`` 配置
命令 ``("test",)`` 可以匹配``/test`` 开头的消息
命令 ``("test", "sub")`` 可以匹配``/test.sub`` 开头的消息
\:\:\:tip 提示
命令内容与后续消息间无需空格
\:\:\:
"""
2020-08-17 16:09:41 +08:00
config = get_driver().config
command_start = config.command_start
command_sep = config.command_sep
2020-10-22 22:08:19 +08:00
commands = list(cmds)
for index, command in enumerate(commands):
if isinstance(command, str):
commands[index] = command = (command,)
if len(command) == 1:
for start in command_start:
TrieRule.add_prefix(f"{start}{command[0]}", command)
else:
for start, sep in product(command_start, command_sep):
TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
2020-05-02 20:03:36 +08:00
2021-11-19 18:18:53 +08:00
async def _command(state: T_State) -> bool:
return state[PREFIX_KEY][CMD_KEY] in commands
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_command)
2020-05-02 20:03:36 +08:00
2021-02-02 11:59:14 +08:00
class ArgumentParser(ArgParser):
2021-02-02 12:15:20 +08:00
"""
:说明:
``shell_like`` 命令参数解析器解析出错时不会退出程序
"""
2021-02-02 11:59:14 +08:00
def _print_message(self, message, file=None):
2021-03-25 18:22:56 +08:00
old_message: str = getattr(self, "message", "")
if old_message:
old_message += "\n"
old_message += message
setattr(self, "message", old_message)
2021-02-02 11:59:14 +08:00
2021-11-19 18:18:53 +08:00
def exit(self, status: int = 0, message: Optional[str] = None):
raise ParserExit(
status=status, message=message or getattr(self, "message", None)
)
2021-02-02 11:59:14 +08:00
def parse_args(
self,
args: Optional[Sequence[str]] = None,
namespace: Optional[Namespace] = None,
) -> Namespace:
2021-06-17 10:14:36 +08:00
setattr(self, "message", "")
return super().parse_args(args=args, namespace=namespace) # type: ignore
2021-02-02 11:59:14 +08:00
def shell_command(
*cmds: Union[str, Tuple[str, ...]], parser: Optional[ArgumentParser] = None
) -> Rule:
r"""
2021-02-01 22:28:48 +08:00
:说明:
支持 ``shell_like`` 解析参数的命令形式匹配根据配置里提供的 ``command_start``, ``command_sep`` 判断消息是否为命令
可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令``("test",)``通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本``"/test"``
2021-02-02 11:59:14 +08:00
可以通过 ``state["argv"]`` 获取用户输入的原始参数列表
2021-02-01 22:28:48 +08:00
2021-02-02 11:59:14 +08:00
添加 ``parser`` 参数后, 可以自动处理消息并将结果保存在 ``state["args"]``
2021-02-01 22:28:48 +08:00
:参数:
* ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容
2021-02-02 11:59:14 +08:00
* ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
2021-02-01 22:28:48 +08:00
:示例:
2021-02-02 11:59:14 +08:00
使用默认 ``command_start``, ``command_sep`` 配置更多示例参考 ``argparse`` 标准库文档
2021-02-01 22:28:48 +08:00
2021-02-02 11:59:14 +08:00
.. code-block:: python
from nonebot.rule import ArgumentParser
parser = ArgumentParser()
2021-02-02 12:39:23 +08:00
parser.add_argument("-a", action="store_true")
2021-02-02 11:59:14 +08:00
rule = shell_command("ls", parser=parser)
2021-02-01 22:28:48 +08:00
\:\:\:tip 提示
命令内容与后续消息间无需空格
\:\:\:
"""
2021-02-02 11:59:14 +08:00
if not isinstance(parser, ArgumentParser):
raise TypeError("`parser` must be an instance of nonebot.rule.ArgumentParser")
2021-02-02 11:59:14 +08:00
2021-02-01 22:28:48 +08:00
config = get_driver().config
command_start = config.command_start
command_sep = config.command_sep
commands = list(cmds)
for index, command in enumerate(commands):
if isinstance(command, str):
commands[index] = command = (command,)
if len(command) == 1:
for start in command_start:
TrieRule.add_prefix(f"{start}{command[0]}", command)
else:
for start, sep in product(command_start, command_sep):
TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
2021-11-19 18:18:53 +08:00
async def _shell_command(event: Event, state: T_State) -> bool:
if state[PREFIX_KEY][CMD_KEY] in commands:
2021-02-02 11:59:14 +08:00
message = str(event.get_message())
strip_message = message[len(state[PREFIX_KEY][RAW_CMD_KEY]) :].lstrip()
state[SHELL_ARGV] = shlex.split(strip_message)
2021-02-02 11:59:14 +08:00
if parser:
2021-02-02 12:15:20 +08:00
try:
args = parser.parse_args(state[SHELL_ARGV])
state[SHELL_ARGS] = args
2021-02-02 12:15:20 +08:00
except ParserExit as e:
state[SHELL_ARGS] = e
2021-02-01 22:28:48 +08:00
return True
else:
return False
2021-02-02 11:59:14 +08:00
return Rule(_shell_command)
2021-02-01 22:28:48 +08:00
2020-08-17 16:09:41 +08:00
def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
r"""
:说明:
2020-11-30 11:08:00 +08:00
2020-10-28 13:23:48 +08:00
根据正则表达式进行匹配
2021-02-01 11:42:05 +08:00
可以通过 ``state["_matched"]`` ``state["_matched_groups"]`` ``state["_matched_dict"]``
获取正则表达式匹配成功的文本
2020-11-30 11:08:00 +08:00
:参数:
2020-11-30 11:08:00 +08:00
* ``regex: str``: 正则表达式
* ``flags: Union[int, re.RegexFlag]``: 正则标志
2020-10-18 15:04:45 +08:00
\:\:\:tip 提示
正则表达式匹配使用 search 而非 match如需从头匹配请使用 ``r"^xxx"`` 来确保匹配开头
\:\:\:
"""
2020-08-17 16:09:41 +08:00
pattern = re.compile(regex, flags)
2020-05-02 20:03:36 +08:00
2021-11-19 18:18:53 +08:00
async def _regex(event: Event, state: T_State) -> bool:
2020-12-09 19:57:49 +08:00
if event.get_type() != "message":
return False
matched = pattern.search(str(event.get_message()))
2020-10-28 13:23:48 +08:00
if matched:
state[REGEX_MATCHED] = matched.group()
state[REGEX_GROUP] = matched.groups()
state[REGEX_DICT] = matched.groupdict()
return True
else:
return False
2020-08-17 16:09:41 +08:00
return Rule(_regex)
2020-08-23 10:45:26 +08:00
2021-11-19 18:18:53 +08:00
async def _to_me(event: Event) -> bool:
return event.is_tome()
2020-12-10 02:13:25 +08:00
def to_me() -> Rule:
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-12-31 17:58:09 +08:00
通过 ``event.is_tome()`` 判断事件是否与机器人有关
2020-11-30 11:08:00 +08:00
2020-12-10 02:13:25 +08:00
:参数:
2020-11-30 11:08:00 +08:00
2020-12-10 02:13:25 +08:00
*
"""
2020-08-23 10:45:26 +08:00
2020-12-10 02:13:25 +08:00
return Rule(_to_me)