2022-01-19 16:16:56 +08:00
|
|
|
|
"""本模块是 {ref}`nonebot.matcher.Matcher.rule` 的类型定义。
|
2020-09-13 13:01:23 +08:00
|
|
|
|
|
2022-01-19 16:16:56 +08:00
|
|
|
|
每个事件响应器 {ref}`nonebot.matcher.Matcher` 拥有一个匹配规则 {ref}`nonebot.rule.Rule`
|
|
|
|
|
其中是 `RuleChecker` 的集合,只有当所有 `RuleChecker` 检查结果为 `True` 时继续运行。
|
2020-09-13 13:01:23 +08:00
|
|
|
|
|
2022-01-16 11:30:09 +08:00
|
|
|
|
FrontMatter:
|
|
|
|
|
sidebar_position: 5
|
|
|
|
|
description: nonebot.rule 模块
|
2020-09-13 13:01:23 +08:00
|
|
|
|
"""
|
2020-06-30 10:13:58 +08:00
|
|
|
|
|
2020-05-02 20:03:36 +08:00
|
|
|
|
import re
|
2021-02-02 11:59:14 +08:00
|
|
|
|
import shlex
|
2022-08-24 09:54:08 +08:00
|
|
|
|
from argparse import Action
|
|
|
|
|
from argparse import ArgumentError
|
2022-11-29 12:00:09 +08:00
|
|
|
|
from contextvars import ContextVar
|
2022-08-24 09:54:08 +08:00
|
|
|
|
from itertools import chain, product
|
|
|
|
|
from argparse import Namespace as Namespace
|
2021-09-27 00:19:30 +08:00
|
|
|
|
from argparse import ArgumentParser as ArgParser
|
2022-08-24 09:54:08 +08:00
|
|
|
|
from typing import (
|
|
|
|
|
IO,
|
|
|
|
|
TYPE_CHECKING,
|
|
|
|
|
List,
|
2022-08-30 09:54:09 +08:00
|
|
|
|
Type,
|
2022-08-24 09:54:08 +08:00
|
|
|
|
Tuple,
|
|
|
|
|
Union,
|
|
|
|
|
TypeVar,
|
|
|
|
|
Optional,
|
|
|
|
|
Sequence,
|
|
|
|
|
TypedDict,
|
|
|
|
|
NamedTuple,
|
|
|
|
|
cast,
|
|
|
|
|
overload,
|
|
|
|
|
)
|
2020-05-05 16:11:05 +08:00
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
from pygtrie import CharTrie
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
from nonebot import get_driver
|
2020-08-17 16:09:41 +08:00
|
|
|
|
from nonebot.log import logger
|
2022-02-06 14:52:50 +08:00
|
|
|
|
from nonebot.typing import T_State
|
|
|
|
|
from nonebot.exception import ParserExit
|
|
|
|
|
from nonebot.internal.rule import Rule as Rule
|
2022-09-29 16:56:06 +08:00
|
|
|
|
from nonebot.params import Command, EventToMe, CommandArg
|
2021-12-14 01:08:48 +08:00
|
|
|
|
from nonebot.adapters import Bot, Event, Message, MessageSegment
|
|
|
|
|
from nonebot.consts import (
|
|
|
|
|
CMD_KEY,
|
|
|
|
|
PREFIX_KEY,
|
|
|
|
|
REGEX_DICT,
|
|
|
|
|
SHELL_ARGS,
|
|
|
|
|
SHELL_ARGV,
|
|
|
|
|
CMD_ARG_KEY,
|
2022-10-12 13:41:28 +08:00
|
|
|
|
KEYWORD_KEY,
|
2021-12-14 01:08:48 +08:00
|
|
|
|
RAW_CMD_KEY,
|
|
|
|
|
REGEX_GROUP,
|
2022-10-12 13:41:28 +08:00
|
|
|
|
ENDSWITH_KEY,
|
2022-04-20 14:43:29 +08:00
|
|
|
|
CMD_START_KEY,
|
2022-10-12 13:41:28 +08:00
|
|
|
|
FULLMATCH_KEY,
|
2021-12-14 01:08:48 +08:00
|
|
|
|
REGEX_MATCHED,
|
2022-10-12 13:41:28 +08:00
|
|
|
|
STARTSWITH_KEY,
|
2021-12-14 01:08:48 +08:00
|
|
|
|
)
|
2021-11-17 00:27:58 +08:00
|
|
|
|
|
2022-08-24 09:54:08 +08:00
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
2021-11-22 23:21:26 +08:00
|
|
|
|
CMD_RESULT = TypedDict(
|
2021-12-14 01:08:48 +08:00
|
|
|
|
"CMD_RESULT",
|
|
|
|
|
{
|
|
|
|
|
"command": Optional[Tuple[str, ...]],
|
|
|
|
|
"raw_command": Optional[str],
|
|
|
|
|
"command_arg": Optional[Message[MessageSegment]],
|
2022-04-20 14:43:29 +08:00
|
|
|
|
"command_start": Optional[str],
|
2021-12-14 01:08:48 +08:00
|
|
|
|
},
|
2021-11-22 23:21:26 +08:00
|
|
|
|
)
|
2021-11-17 00:27:58 +08:00
|
|
|
|
|
2022-04-20 14:43:29 +08:00
|
|
|
|
TRIE_VALUE = NamedTuple(
|
|
|
|
|
"TRIE_VALUE", [("command_start", str), ("command", Tuple[str, ...])]
|
|
|
|
|
)
|
|
|
|
|
|
2022-11-29 12:00:09 +08:00
|
|
|
|
parser_message: ContextVar[str] = ContextVar("parser_message")
|
|
|
|
|
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
class TrieRule:
|
|
|
|
|
prefix: CharTrie = CharTrie()
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
@classmethod
|
2022-04-20 14:43:29 +08:00
|
|
|
|
def add_prefix(cls, prefix: str, value: TRIE_VALUE) -> None:
|
2020-08-17 16:09:41 +08:00
|
|
|
|
if prefix in cls.prefix:
|
|
|
|
|
logger.warning(f'Duplicated prefix rule "{prefix}"')
|
|
|
|
|
return
|
|
|
|
|
cls.prefix[prefix] = value
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
@classmethod
|
2021-12-14 01:08:48 +08:00
|
|
|
|
def get_value(cls, bot: Bot, event: Event, state: T_State) -> CMD_RESULT:
|
2022-04-20 14:43:29 +08:00
|
|
|
|
prefix = CMD_RESULT(
|
|
|
|
|
command=None, raw_command=None, command_arg=None, command_start=None
|
|
|
|
|
)
|
2021-11-17 00:27:58 +08:00
|
|
|
|
state[PREFIX_KEY] = prefix
|
2020-12-09 19:57:49 +08:00
|
|
|
|
if event.get_type() != "message":
|
2021-12-14 01:08:48 +08:00
|
|
|
|
return prefix
|
2021-11-17 00:27:58 +08:00
|
|
|
|
|
2020-12-09 19:57:49 +08:00
|
|
|
|
message = event.get_message()
|
2021-11-17 00:27:58 +08:00
|
|
|
|
message_seg: MessageSegment = message[0]
|
2021-02-07 20:57:08 +08:00
|
|
|
|
if message_seg.is_text():
|
2021-12-14 01:08:48 +08:00
|
|
|
|
segment_text = str(message_seg).lstrip()
|
2022-08-14 19:41:00 +08:00
|
|
|
|
if pf := cls.prefix.longest_prefix(segment_text):
|
2022-04-20 14:43:29 +08:00
|
|
|
|
value: TRIE_VALUE = pf.value
|
|
|
|
|
prefix[RAW_CMD_KEY] = pf.key
|
|
|
|
|
prefix[CMD_START_KEY] = value.command_start
|
|
|
|
|
prefix[CMD_KEY] = value.command
|
2021-12-14 01:08:48 +08:00
|
|
|
|
msg = message.copy()
|
|
|
|
|
msg.pop(0)
|
|
|
|
|
new_message = msg.__class__(segment_text[len(pf.key) :].lstrip())
|
|
|
|
|
for new_segment in reversed(new_message):
|
|
|
|
|
msg.insert(0, new_segment)
|
|
|
|
|
prefix[CMD_ARG_KEY] = msg
|
|
|
|
|
|
|
|
|
|
return prefix
|
2020-07-25 12:28:30 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class StartswithRule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""检查消息纯文本是否以指定字符串开头。
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
msg: 指定消息开头字符串元组
|
|
|
|
|
ignorecase: 是否忽略大小写
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
__slots__ = ("msg", "ignorecase")
|
|
|
|
|
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, msg: Tuple[str, ...], ignorecase: bool = False):
|
|
|
|
|
self.msg = msg
|
|
|
|
|
self.ignorecase = ignorecase
|
|
|
|
|
|
2022-08-30 09:54:09 +08:00
|
|
|
|
def __repr__(self) -> str:
|
2022-09-09 11:52:57 +08:00
|
|
|
|
return f"Startswith(msg={self.msg}, ignorecase={self.ignorecase})"
|
2022-08-30 09:54:09 +08:00
|
|
|
|
|
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
|
|
|
return (
|
|
|
|
|
isinstance(other, StartswithRule)
|
|
|
|
|
and frozenset(self.msg) == frozenset(other.msg)
|
|
|
|
|
and self.ignorecase == other.ignorecase
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __hash__(self) -> int:
|
|
|
|
|
return hash((frozenset(self.msg), self.ignorecase))
|
|
|
|
|
|
2022-10-12 13:41:28 +08:00
|
|
|
|
async def __call__(self, event: Event, state: T_State) -> bool:
|
2022-09-29 16:56:06 +08:00
|
|
|
|
if event.get_type() != "message":
|
|
|
|
|
return False
|
|
|
|
|
try:
|
|
|
|
|
text = event.get_plaintext()
|
|
|
|
|
except Exception:
|
2021-12-06 10:10:51 +08:00
|
|
|
|
return False
|
2022-10-12 13:41:28 +08:00
|
|
|
|
if match := re.match(
|
|
|
|
|
f"^(?:{'|'.join(re.escape(prefix) for prefix in self.msg)})",
|
|
|
|
|
text,
|
|
|
|
|
re.IGNORECASE if self.ignorecase else 0,
|
|
|
|
|
):
|
|
|
|
|
state[STARTSWITH_KEY] = match.group()
|
|
|
|
|
return True
|
|
|
|
|
return False
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
|
|
|
|
|
2021-11-22 23:21:26 +08:00
|
|
|
|
def startswith(msg: Union[str, Tuple[str, ...]], ignorecase: bool = False) -> Rule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""匹配消息纯文本开头。
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2022-01-12 18:31:12 +08:00
|
|
|
|
参数:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
msg: 指定消息开头字符串元组
|
|
|
|
|
ignorecase: 是否忽略大小写
|
2020-09-13 22:36:40 +08:00
|
|
|
|
"""
|
2021-04-04 12:28:10 +08:00
|
|
|
|
if isinstance(msg, str):
|
|
|
|
|
msg = (msg,)
|
2020-07-25 12:28:30 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(StartswithRule(msg, ignorecase))
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
2021-04-04 12:19:03 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class EndswithRule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""检查消息纯文本是否以指定字符串结尾。
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
msg: 指定消息结尾字符串元组
|
|
|
|
|
ignorecase: 是否忽略大小写
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
__slots__ = ("msg", "ignorecase")
|
|
|
|
|
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, msg: Tuple[str, ...], ignorecase: bool = False):
|
|
|
|
|
self.msg = msg
|
|
|
|
|
self.ignorecase = ignorecase
|
|
|
|
|
|
2022-08-30 09:54:09 +08:00
|
|
|
|
def __repr__(self) -> str:
|
2022-09-09 11:52:57 +08:00
|
|
|
|
return f"Endswith(msg={self.msg}, ignorecase={self.ignorecase})"
|
2022-08-30 09:54:09 +08:00
|
|
|
|
|
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
|
|
|
return (
|
|
|
|
|
isinstance(other, EndswithRule)
|
|
|
|
|
and frozenset(self.msg) == frozenset(other.msg)
|
|
|
|
|
and self.ignorecase == other.ignorecase
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __hash__(self) -> int:
|
|
|
|
|
return hash((frozenset(self.msg), self.ignorecase))
|
|
|
|
|
|
2022-10-12 13:41:28 +08:00
|
|
|
|
async def __call__(self, event: Event, state: T_State) -> bool:
|
2022-09-29 16:56:06 +08:00
|
|
|
|
if event.get_type() != "message":
|
|
|
|
|
return False
|
|
|
|
|
try:
|
|
|
|
|
text = event.get_plaintext()
|
|
|
|
|
except Exception:
|
2020-12-09 19:57:49 +08:00
|
|
|
|
return False
|
2022-10-12 13:41:28 +08:00
|
|
|
|
if match := re.search(
|
|
|
|
|
f"(?:{'|'.join(re.escape(suffix) for suffix in self.msg)})$",
|
|
|
|
|
text,
|
|
|
|
|
re.IGNORECASE if self.ignorecase else 0,
|
|
|
|
|
):
|
|
|
|
|
state[ENDSWITH_KEY] = match.group()
|
|
|
|
|
return True
|
|
|
|
|
return False
|
2020-07-25 12:28:30 +08:00
|
|
|
|
|
|
|
|
|
|
2021-11-22 23:21:26 +08:00
|
|
|
|
def endswith(msg: Union[str, Tuple[str, ...]], ignorecase: bool = False) -> Rule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""匹配消息纯文本结尾。
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2022-01-12 18:31:12 +08:00
|
|
|
|
参数:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
msg: 指定消息开头字符串元组
|
|
|
|
|
ignorecase: 是否忽略大小写
|
2020-09-13 22:36:40 +08:00
|
|
|
|
"""
|
2021-04-04 12:28:10 +08:00
|
|
|
|
if isinstance(msg, str):
|
|
|
|
|
msg = (msg,)
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(EndswithRule(msg, ignorecase))
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
2022-02-15 08:20:29 +08:00
|
|
|
|
class FullmatchRule:
|
|
|
|
|
"""检查消息纯文本是否与指定字符串全匹配。
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
msg: 指定消息全匹配字符串元组
|
|
|
|
|
ignorecase: 是否忽略大小写
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
__slots__ = ("msg", "ignorecase")
|
|
|
|
|
|
|
|
|
|
def __init__(self, msg: Tuple[str, ...], ignorecase: bool = False):
|
2022-08-30 09:54:09 +08:00
|
|
|
|
self.msg = tuple(map(str.casefold, msg) if ignorecase else msg)
|
2022-02-15 08:20:29 +08:00
|
|
|
|
self.ignorecase = ignorecase
|
|
|
|
|
|
2022-08-30 09:54:09 +08:00
|
|
|
|
def __repr__(self) -> str:
|
2022-09-09 11:52:57 +08:00
|
|
|
|
return f"Fullmatch(msg={self.msg}, ignorecase={self.ignorecase})"
|
2022-08-30 09:54:09 +08:00
|
|
|
|
|
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
|
|
|
return (
|
|
|
|
|
isinstance(other, FullmatchRule)
|
|
|
|
|
and frozenset(self.msg) == frozenset(other.msg)
|
|
|
|
|
and self.ignorecase == other.ignorecase
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __hash__(self) -> int:
|
|
|
|
|
return hash((frozenset(self.msg), self.ignorecase))
|
|
|
|
|
|
2022-10-12 13:41:28 +08:00
|
|
|
|
async def __call__(self, event: Event, state: T_State) -> bool:
|
2022-09-29 16:56:06 +08:00
|
|
|
|
if event.get_type() != "message":
|
|
|
|
|
return False
|
|
|
|
|
try:
|
|
|
|
|
text = event.get_plaintext()
|
|
|
|
|
except Exception:
|
|
|
|
|
return False
|
2022-10-12 13:41:28 +08:00
|
|
|
|
if not text:
|
|
|
|
|
return False
|
|
|
|
|
text = text.casefold() if self.ignorecase else text
|
|
|
|
|
if text in self.msg:
|
|
|
|
|
state[FULLMATCH_KEY] = text
|
|
|
|
|
return True
|
|
|
|
|
return False
|
2022-02-15 08:20:29 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fullmatch(msg: Union[str, Tuple[str, ...]], ignorecase: bool = False) -> Rule:
|
|
|
|
|
"""完全匹配消息。
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
msg: 指定消息全匹配字符串元组
|
|
|
|
|
ignorecase: 是否忽略大小写
|
|
|
|
|
"""
|
|
|
|
|
if isinstance(msg, str):
|
|
|
|
|
msg = (msg,)
|
|
|
|
|
|
|
|
|
|
return Rule(FullmatchRule(msg, ignorecase))
|
|
|
|
|
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class KeywordsRule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""检查消息纯文本是否包含指定关键字。
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
keywords: 指定关键字元组
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
__slots__ = ("keywords",)
|
|
|
|
|
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, *keywords: str):
|
|
|
|
|
self.keywords = keywords
|
|
|
|
|
|
2022-08-30 09:54:09 +08:00
|
|
|
|
def __repr__(self) -> str:
|
2022-09-09 11:52:57 +08:00
|
|
|
|
return f"Keywords(keywords={self.keywords})"
|
2022-08-30 09:54:09 +08:00
|
|
|
|
|
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
|
|
|
return isinstance(other, KeywordsRule) and frozenset(
|
|
|
|
|
self.keywords
|
|
|
|
|
) == frozenset(other.keywords)
|
|
|
|
|
|
|
|
|
|
def __hash__(self) -> int:
|
|
|
|
|
return hash(frozenset(self.keywords))
|
|
|
|
|
|
2022-10-12 13:41:28 +08:00
|
|
|
|
async def __call__(self, event: Event, state: T_State) -> bool:
|
2022-09-29 16:56:06 +08:00
|
|
|
|
if event.get_type() != "message":
|
|
|
|
|
return False
|
|
|
|
|
try:
|
|
|
|
|
text = event.get_plaintext()
|
|
|
|
|
except Exception:
|
2020-12-09 19:57:49 +08:00
|
|
|
|
return False
|
2022-10-12 13:41:28 +08:00
|
|
|
|
if not text:
|
|
|
|
|
return False
|
|
|
|
|
if key := next((k for k in self.keywords if k in text), None):
|
|
|
|
|
state[KEYWORD_KEY] = key
|
|
|
|
|
return True
|
|
|
|
|
return False
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
|
|
|
|
|
2020-10-30 16:26:04 +08:00
|
|
|
|
def keyword(*keywords: str) -> Rule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""匹配消息纯文本关键词。
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2022-01-12 18:31:12 +08:00
|
|
|
|
参数:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
keywords: 指定关键字元组
|
2020-10-06 02:08:48 +08:00
|
|
|
|
"""
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(KeywordsRule(*keywords))
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class CommandRule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""检查消息是否为指定命令。
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
cmds: 指定命令元组列表
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
__slots__ = ("cmds",)
|
|
|
|
|
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, cmds: List[Tuple[str, ...]]):
|
2022-08-30 09:54:09 +08:00
|
|
|
|
self.cmds = tuple(cmds)
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
2022-09-09 11:52:57 +08:00
|
|
|
|
return f"Command(cmds={self.cmds})"
|
2022-08-30 09:54:09 +08:00
|
|
|
|
|
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
|
|
|
return isinstance(other, CommandRule) and frozenset(self.cmds) == frozenset(
|
|
|
|
|
other.cmds
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __hash__(self) -> int:
|
|
|
|
|
return hash((frozenset(self.cmds),))
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
2021-12-25 20:17:12 +08:00
|
|
|
|
async def __call__(self, cmd: Optional[Tuple[str, ...]] = Command()) -> bool:
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return cmd in self.cmds
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
2020-10-22 22:08:19 +08:00
|
|
|
|
def command(*cmds: Union[str, Tuple[str, ...]]) -> Rule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""匹配消息命令。
|
2020-10-28 13:17:41 +08:00
|
|
|
|
|
2022-01-19 16:16:56 +08:00
|
|
|
|
根据配置里提供的 {ref}``command_start` <nonebot.config.Config.command_start>`,
|
|
|
|
|
{ref}``command_sep` <nonebot.config.Config.command_sep>` 判断消息是否为命令。
|
|
|
|
|
|
|
|
|
|
可以通过 {ref}`nonebot.params.Command` 获取匹配成功的命令(例: `("test",)`),
|
|
|
|
|
通过 {ref}`nonebot.params.RawCommand` 获取匹配成功的原始命令文本(例: `"/test"`),
|
|
|
|
|
通过 {ref}`nonebot.params.CommandArg` 获取匹配成功的命令参数。
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2022-01-12 18:31:12 +08:00
|
|
|
|
参数:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
cmds: 命令文本或命令元组
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2022-01-12 18:53:30 +08:00
|
|
|
|
用法:
|
2022-01-12 19:20:59 +08:00
|
|
|
|
使用默认 `command_start`, `command_sep` 配置
|
2020-10-06 02:08:48 +08:00
|
|
|
|
|
2022-01-19 16:16:56 +08:00
|
|
|
|
命令 `("test",)` 可以匹配: `/test` 开头的消息
|
|
|
|
|
命令 `("test", "sub")` 可以匹配: `/test.sub` 开头的消息
|
2020-10-06 02:08:48 +08:00
|
|
|
|
|
2022-01-15 09:35:30 +08:00
|
|
|
|
:::tip 提示
|
2022-01-19 16:16:56 +08:00
|
|
|
|
命令内容与后续消息间无需空格!
|
2022-01-12 19:20:59 +08:00
|
|
|
|
:::
|
2020-10-06 02:08:48 +08:00
|
|
|
|
"""
|
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
config = get_driver().config
|
|
|
|
|
command_start = config.command_start
|
|
|
|
|
command_sep = config.command_sep
|
2021-12-06 10:10:51 +08:00
|
|
|
|
commands: List[Tuple[str, ...]] = []
|
|
|
|
|
for command in cmds:
|
2020-10-22 22:08:19 +08:00
|
|
|
|
if isinstance(command, str):
|
2021-12-06 10:10:51 +08:00
|
|
|
|
command = (command,)
|
|
|
|
|
|
|
|
|
|
commands.append(command)
|
2020-10-22 22:08:19 +08:00
|
|
|
|
|
|
|
|
|
if len(command) == 1:
|
|
|
|
|
for start in command_start:
|
2022-04-20 14:43:29 +08:00
|
|
|
|
TrieRule.add_prefix(f"{start}{command[0]}", TRIE_VALUE(start, command))
|
2020-10-22 22:08:19 +08:00
|
|
|
|
else:
|
|
|
|
|
for start, sep in product(command_start, command_sep):
|
2022-04-20 14:43:29 +08:00
|
|
|
|
TrieRule.add_prefix(
|
|
|
|
|
f"{start}{sep.join(command)}", TRIE_VALUE(start, command)
|
|
|
|
|
)
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(CommandRule(commands))
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
|
|
|
|
|
2021-02-02 11:59:14 +08:00
|
|
|
|
class ArgumentParser(ArgParser):
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""`shell_like` 命令参数解析器,解析出错时不会退出程序。
|
|
|
|
|
|
|
|
|
|
用法:
|
|
|
|
|
用法与 `argparse.ArgumentParser` 相同,
|
|
|
|
|
参考文档: [argparse](https://docs.python.org/3/library/argparse.html)
|
2021-02-02 12:15:20 +08:00
|
|
|
|
"""
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
2022-08-24 09:54:08 +08:00
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
|
|
|
|
|
@overload
|
|
|
|
|
def parse_args(
|
|
|
|
|
self, args: Optional[Sequence[Union[str, MessageSegment]]] = ...
|
|
|
|
|
) -> Namespace:
|
|
|
|
|
...
|
|
|
|
|
|
|
|
|
|
@overload
|
|
|
|
|
def parse_args(
|
|
|
|
|
self, args: Optional[Sequence[Union[str, MessageSegment]]], namespace: None
|
|
|
|
|
) -> Namespace:
|
|
|
|
|
... # type: ignore[misc]
|
|
|
|
|
|
|
|
|
|
@overload
|
|
|
|
|
def parse_args(
|
|
|
|
|
self, args: Optional[Sequence[Union[str, MessageSegment]]], namespace: T
|
|
|
|
|
) -> T:
|
|
|
|
|
...
|
|
|
|
|
|
|
|
|
|
def parse_args(
|
|
|
|
|
self,
|
|
|
|
|
args: Optional[Sequence[Union[str, MessageSegment]]] = None,
|
|
|
|
|
namespace: Optional[T] = None,
|
|
|
|
|
) -> Union[Namespace, T]:
|
|
|
|
|
...
|
|
|
|
|
|
|
|
|
|
def _parse_optional(
|
|
|
|
|
self, arg_string: Union[str, MessageSegment]
|
|
|
|
|
) -> Optional[Tuple[Optional[Action], str, Optional[str]]]:
|
|
|
|
|
return (
|
|
|
|
|
super()._parse_optional(arg_string) if isinstance(arg_string, str) else None
|
2021-11-22 23:21:26 +08:00
|
|
|
|
)
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
2022-08-24 09:54:08 +08:00
|
|
|
|
def _print_message(self, message: str, file: Optional[IO[str]] = None):
|
2022-11-29 12:00:09 +08:00
|
|
|
|
if (msg := parser_message.get(None)) is not None:
|
|
|
|
|
parser_message.set(msg + message)
|
|
|
|
|
else:
|
|
|
|
|
super()._print_message(message, file)
|
2022-08-24 09:54:08 +08:00
|
|
|
|
|
|
|
|
|
def exit(self, status: int = 0, message: Optional[str] = None):
|
|
|
|
|
if message:
|
|
|
|
|
self._print_message(message)
|
2022-11-29 12:00:09 +08:00
|
|
|
|
raise ParserExit(status=status, message=parser_message.get(None))
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class ShellCommandRule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""检查消息是否为指定 shell 命令。
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
cmds: 指定命令元组列表
|
|
|
|
|
parser: 可选参数解析器
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
__slots__ = ("cmds", "parser")
|
|
|
|
|
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, cmds: List[Tuple[str, ...]], parser: Optional[ArgumentParser]):
|
2022-08-30 09:54:09 +08:00
|
|
|
|
self.cmds = tuple(cmds)
|
2021-12-06 10:10:51 +08:00
|
|
|
|
self.parser = parser
|
|
|
|
|
|
2022-08-30 09:54:09 +08:00
|
|
|
|
def __repr__(self) -> str:
|
2022-09-09 11:52:57 +08:00
|
|
|
|
return f"ShellCommand(cmds={self.cmds}, parser={self.parser})"
|
2022-08-30 09:54:09 +08:00
|
|
|
|
|
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
|
|
|
return (
|
|
|
|
|
isinstance(other, ShellCommandRule)
|
|
|
|
|
and frozenset(self.cmds) == frozenset(other.cmds)
|
|
|
|
|
and self.parser is other.parser
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __hash__(self) -> int:
|
|
|
|
|
return hash((frozenset(self.cmds), self.parser))
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
async def __call__(
|
|
|
|
|
self,
|
2022-01-10 11:20:06 +08:00
|
|
|
|
state: T_State,
|
2021-12-25 20:17:12 +08:00
|
|
|
|
cmd: Optional[Tuple[str, ...]] = Command(),
|
2022-01-19 16:16:56 +08:00
|
|
|
|
msg: Optional[Message] = CommandArg(),
|
2021-12-23 17:50:59 +08:00
|
|
|
|
) -> bool:
|
2022-08-24 09:54:08 +08:00
|
|
|
|
if cmd not in self.cmds or msg is None:
|
2021-12-06 10:10:51 +08:00
|
|
|
|
return False
|
|
|
|
|
|
2022-08-24 09:54:08 +08:00
|
|
|
|
state[SHELL_ARGV] = list(
|
|
|
|
|
chain.from_iterable(
|
|
|
|
|
shlex.split(str(seg)) if cast(MessageSegment, seg).is_text() else (seg,)
|
|
|
|
|
for seg in msg
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if self.parser:
|
2022-11-29 12:00:09 +08:00
|
|
|
|
t = parser_message.set("")
|
2022-08-24 09:54:08 +08:00
|
|
|
|
try:
|
|
|
|
|
args = self.parser.parse_args(state[SHELL_ARGV])
|
|
|
|
|
state[SHELL_ARGS] = args
|
|
|
|
|
except ArgumentError as e:
|
|
|
|
|
state[SHELL_ARGS] = ParserExit(status=2, message=str(e))
|
|
|
|
|
except ParserExit as e:
|
|
|
|
|
state[SHELL_ARGS] = e
|
2022-11-29 12:00:09 +08:00
|
|
|
|
finally:
|
|
|
|
|
parser_message.reset(t)
|
2022-08-24 09:54:08 +08:00
|
|
|
|
return True
|
|
|
|
|
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
2021-11-22 23:21:26 +08:00
|
|
|
|
def shell_command(
|
|
|
|
|
*cmds: Union[str, Tuple[str, ...]], parser: Optional[ArgumentParser] = None
|
|
|
|
|
) -> Rule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""匹配 `shell_like` 形式的消息命令。
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
2022-01-19 16:16:56 +08:00
|
|
|
|
根据配置里提供的 {ref}``command_start` <nonebot.config.Config.command_start>`,
|
|
|
|
|
{ref}``command_sep` <nonebot.config.Config.command_sep>` 判断消息是否为命令。
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
2022-01-19 16:16:56 +08:00
|
|
|
|
可以通过 {ref}`nonebot.params.Command` 获取匹配成功的命令(例: `("test",)`),
|
|
|
|
|
通过 {ref}`nonebot.params.RawCommand` 获取匹配成功的原始命令文本(例: `"/test"`),
|
|
|
|
|
通过 {ref}`nonebot.params.ShellCommandArgv` 获取解析前的参数列表(例: `["arg", "-h"]`),
|
|
|
|
|
通过 {ref}`nonebot.params.ShellCommandArgs` 获取解析后的参数字典(例: `{"arg": "arg", "h": True}`)。
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
2022-01-19 16:16:56 +08:00
|
|
|
|
:::warning 警告
|
|
|
|
|
如果参数解析失败,则通过 {ref}`nonebot.params.ShellCommandArgs`
|
|
|
|
|
获取的将是 {ref}`nonebot.exception.ParserExit` 异常。
|
|
|
|
|
:::
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
2022-01-12 18:31:12 +08:00
|
|
|
|
参数:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
cmds: 命令文本或命令元组
|
|
|
|
|
parser: {ref}`nonebot.rule.ArgumentParser` 对象
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
2022-01-12 18:53:30 +08:00
|
|
|
|
用法:
|
|
|
|
|
使用默认 `command_start`, `command_sep` 配置,更多示例参考 `argparse` 标准库文档。
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
2022-01-12 18:53:30 +08:00
|
|
|
|
```python
|
2021-02-02 11:59:14 +08:00
|
|
|
|
from nonebot.rule import ArgumentParser
|
|
|
|
|
|
|
|
|
|
parser = ArgumentParser()
|
2021-02-02 12:39:23 +08:00
|
|
|
|
parser.add_argument("-a", action="store_true")
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
|
|
|
|
rule = shell_command("ls", parser=parser)
|
2022-01-12 18:53:30 +08:00
|
|
|
|
```
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
2022-01-15 09:35:30 +08:00
|
|
|
|
:::tip 提示
|
2022-01-19 16:16:56 +08:00
|
|
|
|
命令内容与后续消息间无需空格!
|
2022-01-12 19:20:59 +08:00
|
|
|
|
:::
|
2021-02-01 22:28:48 +08:00
|
|
|
|
"""
|
2021-12-23 17:20:26 +08:00
|
|
|
|
if parser is not None and not isinstance(parser, ArgumentParser):
|
2021-11-22 23:21:26 +08:00
|
|
|
|
raise TypeError("`parser` must be an instance of nonebot.rule.ArgumentParser")
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
2021-02-01 22:28:48 +08:00
|
|
|
|
config = get_driver().config
|
|
|
|
|
command_start = config.command_start
|
|
|
|
|
command_sep = config.command_sep
|
2021-12-06 10:10:51 +08:00
|
|
|
|
commands: List[Tuple[str, ...]] = []
|
|
|
|
|
for command in cmds:
|
2021-02-01 22:28:48 +08:00
|
|
|
|
if isinstance(command, str):
|
2021-12-06 10:10:51 +08:00
|
|
|
|
command = (command,)
|
|
|
|
|
|
|
|
|
|
commands.append(command)
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
|
|
|
|
if len(command) == 1:
|
|
|
|
|
for start in command_start:
|
2022-04-20 14:43:29 +08:00
|
|
|
|
TrieRule.add_prefix(f"{start}{command[0]}", TRIE_VALUE(start, command))
|
2021-02-01 22:28:48 +08:00
|
|
|
|
else:
|
|
|
|
|
for start, sep in product(command_start, command_sep):
|
2022-04-20 14:43:29 +08:00
|
|
|
|
TrieRule.add_prefix(
|
|
|
|
|
f"{start}{sep.join(command)}", TRIE_VALUE(start, command)
|
|
|
|
|
)
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(ShellCommandRule(commands, parser))
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class RegexRule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""检查消息字符串是否符合指定正则表达式。
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
regex: 正则表达式
|
|
|
|
|
flags: 正则表达式标记
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
__slots__ = ("regex", "flags")
|
|
|
|
|
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, regex: str, flags: int = 0):
|
|
|
|
|
self.regex = regex
|
|
|
|
|
self.flags = flags
|
|
|
|
|
|
2022-08-30 09:54:09 +08:00
|
|
|
|
def __repr__(self) -> str:
|
2022-09-09 11:52:57 +08:00
|
|
|
|
return f"Regex(regex={self.regex!r}, flags={self.flags})"
|
2022-08-30 09:54:09 +08:00
|
|
|
|
|
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
|
|
|
return (
|
|
|
|
|
isinstance(other, RegexRule)
|
|
|
|
|
and self.regex == other.regex
|
|
|
|
|
and self.flags == other.flags
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __hash__(self) -> int:
|
|
|
|
|
return hash((self.regex, self.flags))
|
|
|
|
|
|
2022-09-29 16:56:06 +08:00
|
|
|
|
async def __call__(self, event: Event, state: T_State) -> bool:
|
|
|
|
|
if event.get_type() != "message":
|
|
|
|
|
return False
|
|
|
|
|
try:
|
|
|
|
|
msg = event.get_message()
|
|
|
|
|
except Exception:
|
2021-12-06 10:10:51 +08:00
|
|
|
|
return False
|
2022-09-29 16:56:06 +08:00
|
|
|
|
if matched := re.search(self.regex, str(msg), self.flags):
|
2021-12-06 10:10:51 +08:00
|
|
|
|
state[REGEX_MATCHED] = matched.group()
|
|
|
|
|
state[REGEX_GROUP] = matched.groups()
|
|
|
|
|
state[REGEX_DICT] = matched.groupdict()
|
2021-02-01 22:28:48 +08:00
|
|
|
|
return True
|
|
|
|
|
else:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""匹配符合正则表达式的消息字符串。
|
2020-10-30 16:26:04 +08:00
|
|
|
|
|
2022-01-19 16:16:56 +08:00
|
|
|
|
可以通过 {ref}`nonebot.params.RegexMatched` 获取匹配成功的字符串,
|
|
|
|
|
通过 {ref}`nonebot.params.RegexGroup` 获取匹配成功的 group 元组,
|
|
|
|
|
通过 {ref}`nonebot.params.RegexDict` 获取匹配成功的 group 字典。
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2022-01-12 18:31:12 +08:00
|
|
|
|
参数:
|
2022-01-12 19:10:29 +08:00
|
|
|
|
regex: 正则表达式
|
2022-01-19 16:16:56 +08:00
|
|
|
|
flags: 正则表达式标记
|
2020-10-18 15:04:45 +08:00
|
|
|
|
|
2022-01-15 09:35:30 +08:00
|
|
|
|
:::tip 提示
|
2022-01-12 18:19:21 +08:00
|
|
|
|
正则表达式匹配使用 search 而非 match,如需从头匹配请使用 `r"^xxx"` 来确保匹配开头
|
2022-01-12 19:20:59 +08:00
|
|
|
|
:::
|
2022-01-19 16:16:56 +08:00
|
|
|
|
|
|
|
|
|
:::tip 提示
|
|
|
|
|
正则表达式匹配使用 `EventMessage` 的 `str` 字符串,而非 `EventMessage` 的 `PlainText` 纯文本字符串
|
|
|
|
|
:::
|
2020-10-06 02:08:48 +08:00
|
|
|
|
"""
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(RegexRule(regex, flags))
|
2020-08-23 10:45:26 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class ToMeRule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""检查事件是否与机器人有关。"""
|
|
|
|
|
|
|
|
|
|
__slots__ = ()
|
|
|
|
|
|
2022-08-30 09:54:09 +08:00
|
|
|
|
def __repr__(self) -> str:
|
2022-09-09 11:52:57 +08:00
|
|
|
|
return "ToMe()"
|
2022-08-30 09:54:09 +08:00
|
|
|
|
|
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
|
|
|
return isinstance(other, ToMeRule)
|
|
|
|
|
|
|
|
|
|
def __hash__(self) -> int:
|
|
|
|
|
return hash((self.__class__,))
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
async def __call__(self, to_me: bool = EventToMe()) -> bool:
|
|
|
|
|
return to_me
|
2021-11-19 18:18:53 +08:00
|
|
|
|
|
|
|
|
|
|
2020-12-10 02:13:25 +08:00
|
|
|
|
def to_me() -> Rule:
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"""匹配与机器人有关的事件。"""
|
2020-08-23 10:45:26 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(ToMeRule())
|
2022-01-19 16:16:56 +08:00
|
|
|
|
|
|
|
|
|
|
2022-08-30 09:54:09 +08:00
|
|
|
|
class IsTypeRule:
|
|
|
|
|
"""检查事件类型是否为指定类型。"""
|
|
|
|
|
|
|
|
|
|
__slots__ = ("types",)
|
|
|
|
|
|
|
|
|
|
def __init__(self, *types: Type[Event]):
|
|
|
|
|
self.types = types
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
2022-09-09 11:52:57 +08:00
|
|
|
|
return f"IsType(types={tuple(type.__name__ for type in self.types)})"
|
2022-08-30 09:54:09 +08:00
|
|
|
|
|
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
|
|
|
return isinstance(other, IsTypeRule) and self.types == other.types
|
|
|
|
|
|
|
|
|
|
def __hash__(self) -> int:
|
|
|
|
|
return hash((self.types,))
|
|
|
|
|
|
|
|
|
|
async def __call__(self, event: Event) -> bool:
|
|
|
|
|
return isinstance(event, self.types)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_type(*types: Type[Event]) -> Rule:
|
|
|
|
|
"""匹配事件类型。
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
types: 事件类型
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
return Rule(IsTypeRule(*types))
|
|
|
|
|
|
|
|
|
|
|
2022-01-19 16:16:56 +08:00
|
|
|
|
__autodoc__ = {
|
2022-02-06 14:52:50 +08:00
|
|
|
|
"Rule": True,
|
2022-01-19 16:16:56 +08:00
|
|
|
|
"Rule.__call__": True,
|
|
|
|
|
"TrieRule": False,
|
|
|
|
|
"ArgumentParser.exit": False,
|
|
|
|
|
"ArgumentParser.parse_args": False,
|
|
|
|
|
}
|