nonebot2/nonebot/rule.py

493 lines
14 KiB
Python
Raw Normal View History

r"""
2020-09-13 05:01:23 +00:00
规则
====
2022-01-12 10:19:21 +00:00
每个事件响应器 `Matcher` 拥有一个匹配规则 `Rule` 其中是 `RuleChecker` 的集合只有当所有 `RuleChecker` 检查结果为 `True` 时继续运行
2020-09-13 05:01:23 +00:00
\:\:\:tip 提示
2022-01-12 10:19:21 +00:00
`RuleChecker` 既可以是 async function 也可以是 sync function
2020-09-13 05:01:23 +00:00
\:\:\:
"""
2020-06-30 02:13:58 +00:00
2020-05-02 12:03:36 +00:00
import re
2021-02-02 03:59:14 +00:00
import shlex
2020-08-14 09:41:24 +00:00
import asyncio
2020-08-17 08:09:41 +00:00
from itertools import product
2021-09-26 16:19:30 +00:00
from argparse import Namespace
2021-11-19 10:18:53 +00:00
from contextlib import AsyncExitStack
from typing_extensions import TypedDict
2021-09-26 16:19:30 +00:00
from argparse import ArgumentParser as ArgParser
from typing import Any, Set, List, Tuple, Union, NoReturn, Optional, Sequence
2020-05-05 08:11:05 +00:00
2020-08-17 08:09:41 +00:00
from pygtrie import CharTrie
2020-08-14 09:41:24 +00:00
from nonebot import get_driver
2020-08-17 08:09:41 +00:00
from nonebot.log import logger
from nonebot.dependencies import Dependent
from nonebot.exception import ParserExit, SkippedException
2021-12-13 17:08:48 +00:00
from nonebot.adapters import Bot, Event, Message, MessageSegment
from nonebot.typing import T_State, T_RuleChecker, T_DependencyCache
2021-12-13 17:08:48 +00:00
from nonebot.consts import (
CMD_KEY,
PREFIX_KEY,
REGEX_DICT,
SHELL_ARGS,
SHELL_ARGV,
CMD_ARG_KEY,
RAW_CMD_KEY,
REGEX_GROUP,
REGEX_MATCHED,
)
from nonebot.params import (
Command,
BotParam,
EventToMe,
EventType,
EventParam,
StateParam,
DependParam,
DefaultParam,
EventMessage,
EventPlainText,
)
CMD_RESULT = TypedDict(
2021-12-13 17:08:48 +00:00
"CMD_RESULT",
{
"command": Optional[Tuple[str, ...]],
"raw_command": Optional[str],
"command_arg": Optional[Message[MessageSegment]],
},
)
2020-08-14 09:41:24 +00:00
2020-08-17 08:09:41 +00:00
class Rule:
2020-09-13 05:01:23 +00:00
"""
2022-01-12 10:19:21 +00:00
`Matcher` 规则类当事件传递时 `Matcher` 运行前进行检查
2020-11-30 03:08:00 +00:00
2020-09-13 05:01:23 +00:00
:示例:
.. code-block:: python
Rule(async_function) & sync_function
# 等价于
from nonebot.utils import run_sync
Rule(async_function, run_sync(sync_function))
"""
2020-08-17 08:09:41 +00:00
__slots__ = ("checkers",)
2020-08-14 09:41:24 +00:00
HANDLER_PARAM_TYPES = [
DependParam,
BotParam,
EventParam,
StateParam,
DefaultParam,
2021-11-19 10:18:53 +00:00
]
def __init__(self, *checkers: Union[T_RuleChecker, Dependent[bool]]) -> None:
2020-09-13 05:01:23 +00:00
"""
2022-01-12 10:31:12 +00:00
参数:
2022-01-12 10:25:25 +00:00
*checkers: RuleChecker
2020-11-30 03:08:00 +00:00
2020-09-13 05:01:23 +00:00
"""
self.checkers: Set[Dependent[bool]] = set(
checker
if isinstance(checker, Dependent)
2021-12-23 11:36:29 +00:00
else Dependent[bool].parse(
call=checker, allow_types=self.HANDLER_PARAM_TYPES
)
for checker in checkers
)
2020-09-13 05:01:23 +00:00
"""
2022-01-12 10:19:21 +00:00
存储 `RuleChecker`
2020-09-13 05:01:23 +00:00
"""
2020-08-14 09:41:24 +00:00
2021-11-19 10:18:53 +00:00
async def __call__(
self,
bot: Bot,
event: Event,
state: T_State,
stack: Optional[AsyncExitStack] = None,
2021-12-16 15:22:25 +00:00
dependency_cache: Optional[T_DependencyCache] = None,
) -> bool:
2020-09-13 05:01:23 +00:00
"""
2022-01-12 10:16:05 +00:00
检查是否符合所有规则
2020-11-30 03:08:00 +00:00
2022-01-12 10:31:12 +00:00
参数:
2022-01-12 10:25:25 +00:00
bot: Bot 对象
event: Event 对象
state: 当前 State
stack: 异步上下文栈
dependency_cache: 依赖缓存
2020-09-13 05:01:23 +00:00
"""
if not self.checkers:
return True
try:
results = await asyncio.gather(
*(
checker(
bot=bot,
event=event,
state=state,
stack=stack,
dependency_cache=dependency_cache,
)
for checker in self.checkers
)
)
except SkippedException:
return False
2020-08-17 08:09:41 +00:00
return all(results)
2020-08-14 09:41:24 +00:00
2020-12-17 13:09:30 +00:00
def __and__(self, other: Optional[Union["Rule", T_RuleChecker]]) -> "Rule":
2020-09-27 10:05:13 +00:00
if other is None:
return self
elif isinstance(other, Rule):
return Rule(*self.checkers, *other.checkers)
2020-08-14 09:41:24 +00:00
else:
return Rule(*self.checkers, other)
2020-08-14 09:41:24 +00:00
2020-08-17 08:09:41 +00:00
def __or__(self, other) -> NoReturn:
raise RuntimeError("Or operation between rules is not allowed.")
2020-08-14 09:41:24 +00:00
2020-08-17 08:09:41 +00:00
class TrieRule:
prefix: CharTrie = CharTrie()
2020-08-14 09:41:24 +00:00
2020-08-17 08:09:41 +00:00
@classmethod
def add_prefix(cls, prefix: str, value: Any):
if prefix in cls.prefix:
logger.warning(f'Duplicated prefix rule "{prefix}"')
return
cls.prefix[prefix] = value
2020-08-14 09:41:24 +00:00
2020-08-17 08:09:41 +00:00
@classmethod
2021-12-13 17:08:48 +00:00
def get_value(cls, bot: Bot, event: Event, state: T_State) -> CMD_RESULT:
prefix = CMD_RESULT(command=None, raw_command=None, command_arg=None)
state[PREFIX_KEY] = prefix
2020-12-09 11:57:49 +00:00
if event.get_type() != "message":
2021-12-13 17:08:48 +00:00
return prefix
2020-12-09 11:57:49 +00:00
message = event.get_message()
message_seg: MessageSegment = message[0]
2021-02-07 12:57:08 +00:00
if message_seg.is_text():
2021-12-13 17:08:48 +00:00
segment_text = str(message_seg).lstrip()
pf = cls.prefix.longest_prefix(segment_text)
prefix[RAW_CMD_KEY] = pf.key
prefix[CMD_KEY] = pf.value
2021-12-13 17:08:48 +00:00
if pf.key:
msg = message.copy()
msg.pop(0)
new_message = msg.__class__(segment_text[len(pf.key) :].lstrip())
for new_segment in reversed(new_message):
msg.insert(0, new_segment)
prefix[CMD_ARG_KEY] = msg
return prefix
2020-07-25 04:28:30 +00:00
class StartswithRule:
def __init__(self, msg: Tuple[str, ...], ignorecase: bool = False):
self.msg = msg
self.ignorecase = ignorecase
async def __call__(
self, type: str = EventType(), text: str = EventPlainText()
) -> Any:
if type != "message":
return False
return bool(
re.match(
f"^(?:{'|'.join(re.escape(prefix) for prefix in self.msg)})",
text,
re.IGNORECASE if self.ignorecase else 0,
)
)
def startswith(msg: Union[str, Tuple[str, ...]], ignorecase: bool = False) -> Rule:
2020-09-13 14:36:40 +00:00
"""
2022-01-12 10:16:05 +00:00
匹配消息开头
2020-11-30 03:08:00 +00:00
2022-01-12 10:31:12 +00:00
参数:
2022-01-12 10:25:25 +00:00
msg: 消息开头字符串
2020-09-13 14:36:40 +00:00
"""
if isinstance(msg, str):
msg = (msg,)
2020-07-25 04:28:30 +00:00
return Rule(StartswithRule(msg, ignorecase))
class EndswithRule:
def __init__(self, msg: Tuple[str, ...], ignorecase: bool = False):
self.msg = msg
self.ignorecase = ignorecase
async def __call__(
self, type: str = EventType(), text: str = EventPlainText()
) -> Any:
if type != "message":
2020-12-09 11:57:49 +00:00
return False
return bool(
re.search(
f"(?:{'|'.join(re.escape(prefix) for prefix in self.msg)})$",
text,
re.IGNORECASE if self.ignorecase else 0,
)
)
2020-07-25 04:28:30 +00:00
def endswith(msg: Union[str, Tuple[str, ...]], ignorecase: bool = False) -> Rule:
2020-09-13 14:36:40 +00:00
"""
2022-01-12 10:16:05 +00:00
匹配消息结尾
2020-11-30 03:08:00 +00:00
2022-01-12 10:31:12 +00:00
参数:
2022-01-12 10:25:25 +00:00
msg: 消息结尾字符串
2020-09-13 14:36:40 +00:00
"""
if isinstance(msg, str):
msg = (msg,)
return Rule(EndswithRule(msg, ignorecase))
2020-05-02 12:03:36 +00:00
class KeywordsRule:
def __init__(self, *keywords: str):
self.keywords = keywords
async def __call__(
self, type: str = EventType(), text: str = EventPlainText()
) -> bool:
if type != "message":
2020-12-09 11:57:49 +00:00
return False
return bool(text and any(keyword in text for keyword in self.keywords))
2020-05-02 12:03:36 +00:00
def keyword(*keywords: str) -> Rule:
"""
2022-01-12 10:16:05 +00:00
匹配消息关键词
2020-11-30 03:08:00 +00:00
2022-01-12 10:31:12 +00:00
参数:
2022-01-12 10:25:25 +00:00
*keywords: 关键词
"""
2020-05-02 12:03:36 +00:00
return Rule(KeywordsRule(*keywords))
class CommandRule:
def __init__(self, cmds: List[Tuple[str, ...]]):
self.cmds = cmds
2020-05-02 12:03:36 +00:00
async def __call__(self, cmd: Optional[Tuple[str, ...]] = Command()) -> bool:
return cmd in self.cmds
def __repr__(self):
return f"<Command {self.cmds}>"
2020-05-02 12:03:36 +00:00
2020-10-22 14:08:19 +00:00
def command(*cmds: Union[str, Tuple[str, ...]]) -> Rule:
r"""
2022-01-12 10:19:21 +00:00
命令形式匹配根据配置里提供的 `command_start`, `command_sep` 判断消息是否为命令
2020-10-28 05:17:41 +00:00
2022-01-12 10:19:21 +00:00
可以通过 `state["_prefix"]["command"]` 获取匹配成功的命令`("test",)`通过 `state["_prefix"]["raw_command"]` 获取匹配成功的原始命令文本`"/test"`
2020-11-30 03:08:00 +00:00
2022-01-12 10:31:12 +00:00
参数:
2022-01-12 10:25:25 +00:00
*cmds: 命令内容
2020-11-30 03:08:00 +00:00
:示例:
2020-11-30 03:08:00 +00:00
2022-01-12 10:19:21 +00:00
使用默认 `command_start`, `command_sep` 配置
2022-01-12 10:19:21 +00:00
命令 `("test",)` 可以匹配`/test` 开头的消息
命令 `("test", "sub")` 可以匹配`/test.sub` 开头的消息
\:\:\:tip 提示
命令内容与后续消息间无需空格
\:\:\:
"""
2020-08-17 08:09:41 +00:00
config = get_driver().config
command_start = config.command_start
command_sep = config.command_sep
commands: List[Tuple[str, ...]] = []
for command in cmds:
2020-10-22 14:08:19 +00:00
if isinstance(command, str):
command = (command,)
commands.append(command)
2020-10-22 14:08:19 +00:00
if len(command) == 1:
for start in command_start:
TrieRule.add_prefix(f"{start}{command[0]}", command)
else:
for start, sep in product(command_start, command_sep):
TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
2020-05-02 12:03:36 +00:00
return Rule(CommandRule(commands))
2020-05-02 12:03:36 +00:00
2021-02-02 03:59:14 +00:00
class ArgumentParser(ArgParser):
2021-02-02 04:15:20 +00:00
"""
2022-01-12 10:19:21 +00:00
`shell_like` 命令参数解析器解析出错时不会退出程序
2021-02-02 04:15:20 +00:00
"""
2021-02-02 03:59:14 +00:00
def _print_message(self, message, file=None):
2021-03-25 10:22:56 +00:00
old_message: str = getattr(self, "message", "")
if old_message:
old_message += "\n"
old_message += message
setattr(self, "message", old_message)
2021-02-02 03:59:14 +00:00
2021-11-19 10:18:53 +00:00
def exit(self, status: int = 0, message: Optional[str] = None):
raise ParserExit(
status=status, message=message or getattr(self, "message", None)
)
2021-02-02 03:59:14 +00:00
def parse_args(
self,
args: Optional[Sequence[str]] = None,
namespace: Optional[Namespace] = None,
) -> Namespace:
2021-06-17 02:14:36 +00:00
setattr(self, "message", "")
return super().parse_args(args=args, namespace=namespace) # type: ignore
2021-02-02 03:59:14 +00:00
class ShellCommandRule:
def __init__(self, cmds: List[Tuple[str, ...]], parser: Optional[ArgumentParser]):
self.cmds = cmds
self.parser = parser
async def __call__(
self,
2022-01-10 03:20:06 +00:00
state: T_State,
cmd: Optional[Tuple[str, ...]] = Command(),
msg: Message = EventMessage(),
) -> bool:
if cmd in self.cmds:
message = str(msg)
strip_message = message[len(state[PREFIX_KEY][RAW_CMD_KEY]) :].lstrip()
state[SHELL_ARGV] = shlex.split(strip_message)
if self.parser:
try:
args = self.parser.parse_args(state[SHELL_ARGV])
state[SHELL_ARGS] = args
except ParserExit as e:
state[SHELL_ARGS] = e
return True
else:
return False
def shell_command(
*cmds: Union[str, Tuple[str, ...]], parser: Optional[ArgumentParser] = None
) -> Rule:
r"""
2022-01-12 10:19:21 +00:00
支持 `shell_like` 解析参数的命令形式匹配根据配置里提供的 `command_start`, `command_sep` 判断消息是否为命令
2021-02-01 14:28:48 +00:00
2022-01-12 10:19:21 +00:00
可以通过 `state["_prefix"]["command"]` 获取匹配成功的命令`("test",)`通过 `state["_prefix"]["raw_command"]` 获取匹配成功的原始命令文本`"/test"`
2021-02-01 14:28:48 +00:00
2022-01-12 10:19:21 +00:00
可以通过 `state["argv"]` 获取用户输入的原始参数列表
2021-02-01 14:28:48 +00:00
2022-01-12 10:19:21 +00:00
添加 `parser` 参数后, 可以自动处理消息并将结果保存在 `state["args"]`
2021-02-01 14:28:48 +00:00
2022-01-12 10:31:12 +00:00
参数:
2022-01-12 10:25:25 +00:00
*cmds: 命令内容
parser: `nonebot.rule.ArgumentParser` 对象
2021-02-01 14:28:48 +00:00
:示例:
2022-01-12 10:19:21 +00:00
使用默认 `command_start`, `command_sep` 配置更多示例参考 `argparse` 标准库文档
2021-02-01 14:28:48 +00:00
2021-02-02 03:59:14 +00:00
.. code-block:: python
from nonebot.rule import ArgumentParser
parser = ArgumentParser()
2021-02-02 04:39:23 +00:00
parser.add_argument("-a", action="store_true")
2021-02-02 03:59:14 +00:00
rule = shell_command("ls", parser=parser)
2021-02-01 14:28:48 +00:00
\:\:\:tip 提示
命令内容与后续消息间无需空格
\:\:\:
"""
if parser is not None and not isinstance(parser, ArgumentParser):
raise TypeError("`parser` must be an instance of nonebot.rule.ArgumentParser")
2021-02-02 03:59:14 +00:00
2021-02-01 14:28:48 +00:00
config = get_driver().config
command_start = config.command_start
command_sep = config.command_sep
commands: List[Tuple[str, ...]] = []
for command in cmds:
2021-02-01 14:28:48 +00:00
if isinstance(command, str):
command = (command,)
commands.append(command)
2021-02-01 14:28:48 +00:00
if len(command) == 1:
for start in command_start:
TrieRule.add_prefix(f"{start}{command[0]}", command)
else:
for start, sep in product(command_start, command_sep):
TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
return Rule(ShellCommandRule(commands, parser))
class RegexRule:
def __init__(self, regex: str, flags: int = 0):
self.regex = regex
self.flags = flags
async def __call__(
self,
2022-01-10 03:20:06 +00:00
state: T_State,
type: str = EventType(),
msg: Message = EventMessage(),
) -> bool:
if type != "message":
return False
matched = re.search(self.regex, str(msg), self.flags)
if matched:
state[REGEX_MATCHED] = matched.group()
state[REGEX_GROUP] = matched.groups()
state[REGEX_DICT] = matched.groupdict()
2021-02-01 14:28:48 +00:00
return True
else:
return False
2020-08-17 08:09:41 +00:00
def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
r"""
2022-01-12 10:16:05 +00:00
根据正则表达式进行匹配
2022-01-12 10:19:21 +00:00
可以通过 `state["_matched"]` `state["_matched_groups"]` `state["_matched_dict"]`
2021-02-01 03:42:05 +00:00
获取正则表达式匹配成功的文本
2020-11-30 03:08:00 +00:00
2022-01-12 10:31:12 +00:00
参数:
2022-01-12 10:25:25 +00:00
regex: 正则表达式
flags: 正则标志
2020-10-18 07:04:45 +00:00
\:\:\:tip 提示
2022-01-12 10:19:21 +00:00
正则表达式匹配使用 search 而非 match如需从头匹配请使用 `r"^xxx"` 来确保匹配开头
2020-10-18 07:04:45 +00:00
\:\:\:
"""
return Rule(RegexRule(regex, flags))
2020-08-23 02:45:26 +00:00
class ToMeRule:
async def __call__(self, to_me: bool = EventToMe()) -> bool:
return to_me
2021-11-19 10:18:53 +00:00
2020-12-09 18:13:25 +00:00
def to_me() -> Rule:
"""
2022-01-12 10:19:21 +00:00
通过 `event.is_tome()` 判断事件是否与机器人有关
2020-11-30 03:08:00 +00:00
2022-01-12 10:31:12 +00:00
参数:
2020-12-09 18:13:25 +00:00
*
"""
2020-08-23 02:45:26 +00:00
return Rule(ToMeRule())