nonebot2/nonebot/rule.py

312 lines
8.1 KiB
Python
Raw Normal View History

2020-09-13 13:01:23 +08:00
"""
规则
====
每个事件响应器 ``Matcher`` 拥有一个匹配规则 ``Rule`` 其中是 **异步** ``RuleChecker`` 的集合只有当所有 ``RuleChecker`` 检查结果为 ``True`` 时继续运行
2020-09-13 13:01:23 +08:00
\:\:\:tip 提示
``RuleChecker`` 既可以是 async function 也可以是 sync function但在最终会被 ``nonebot.utils.run_sync`` 转换为 async function
2020-09-13 13:01:23 +08:00
\:\:\:
"""
2020-06-30 10:13:58 +08:00
2020-05-02 20:03:36 +08:00
import re
2020-08-14 17:41:24 +08:00
import asyncio
2020-08-17 16:09:41 +08:00
from itertools import product
2020-05-05 16:11:05 +08:00
2020-08-17 16:09:41 +08:00
from pygtrie import CharTrie
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
from nonebot import get_driver
from nonebot.log import logger
from nonebot.utils import run_sync
2020-09-27 18:05:13 +08:00
from nonebot.typing import Bot, Any, Dict, Event, Union, Tuple, NoReturn, Optional, Callable, Awaitable, RuleChecker
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
class Rule:
2020-09-13 13:01:23 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
``Matcher`` 规则类当事件传递时 ``Matcher`` 运行前进行检查
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
:示例:
.. code-block:: python
Rule(async_function) & sync_function
# 等价于
from nonebot.utils import run_sync
Rule(async_function, run_sync(sync_function))
"""
2020-08-17 16:09:41 +08:00
__slots__ = ("checkers",)
2020-08-14 17:41:24 +08:00
2020-09-13 13:01:23 +08:00
def __init__(
self, *checkers: Callable[[Bot, Event, dict],
Awaitable[bool]]) -> None:
"""
:参数:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
* ``*checkers: Callable[[Bot, Event, dict], Awaitable[bool]]``: **异步** RuleChecker
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
"""
self.checkers = set(checkers)
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
存储 ``RuleChecker``
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
:类型:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
* ``Set[Callable[[Bot, Event, dict], Awaitable[bool]]]``
"""
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
async def __call__(self, bot: Bot, event: Event, state: dict) -> bool:
2020-09-13 13:01:23 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
检查是否符合所有规则
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
:参数:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
* ``bot: Bot``: Bot 对象
* ``event: Event``: Event 对象
* ``state: dict``: 当前 State
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
:返回:
2020-11-30 11:08:00 +08:00
2020-09-13 13:01:23 +08:00
- ``bool``
"""
2020-08-17 16:09:41 +08:00
results = await asyncio.gather(
*map(lambda c: c(bot, event, state), self.checkers))
return all(results)
2020-08-14 17:41:24 +08:00
2020-09-27 18:05:13 +08:00
def __and__(self, other: Optional[Union["Rule", RuleChecker]]) -> "Rule":
2020-09-13 13:01:23 +08:00
checkers = self.checkers.copy()
2020-09-27 18:05:13 +08:00
if other is None:
return self
elif isinstance(other, Rule):
2020-09-13 13:01:23 +08:00
checkers |= other.checkers
2020-08-17 16:09:41 +08:00
elif asyncio.iscoroutinefunction(other):
2020-09-13 13:01:23 +08:00
checkers.add(other) # type: ignore
2020-08-14 17:41:24 +08:00
else:
2020-09-13 13:01:23 +08:00
checkers.add(run_sync(other))
2020-08-17 16:09:41 +08:00
return Rule(*checkers)
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
def __or__(self, other) -> NoReturn:
raise RuntimeError("Or operation between rules is not allowed.")
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
class TrieRule:
prefix: CharTrie = CharTrie()
suffix: CharTrie = CharTrie()
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
@classmethod
def add_prefix(cls, prefix: str, value: Any):
if prefix in cls.prefix:
logger.warning(f'Duplicated prefix rule "{prefix}"')
return
cls.prefix[prefix] = value
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
@classmethod
def add_suffix(cls, suffix: str, value: Any):
if suffix[::-1] in cls.suffix:
logger.warning(f'Duplicated suffix rule "{suffix}"')
return
cls.suffix[suffix[::-1]] = value
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
@classmethod
def get_value(cls, bot: Bot, event: Event,
state: dict) -> Tuple[Dict[str, Any], Dict[str, Any]]:
2020-08-24 17:59:36 +08:00
if event.type != "message":
state["_prefix"] = {"raw_command": None, "command": None}
state["_suffix"] = {"raw_command": None, "command": None}
return {
"raw_command": None,
"command": None
}, {
"raw_command": None,
"command": None
}
2020-08-24 17:59:36 +08:00
2020-08-17 16:09:41 +08:00
prefix = None
suffix = None
message = event.message[0]
if message.type == "text":
prefix = cls.prefix.longest_prefix(str(message).lstrip())
2020-08-17 16:09:41 +08:00
message_r = event.message[-1]
if message_r.type == "text":
suffix = cls.suffix.longest_prefix(str(message_r).rstrip()[::-1])
2020-08-14 17:41:24 +08:00
state["_prefix"] = {
"raw_command": prefix.key,
"command": prefix.value
} if prefix else {
"raw_command": None,
"command": None
}
state["_suffix"] = {
"raw_command": suffix.key,
"command": suffix.value
} if suffix else {
"raw_command": None,
"command": None
}
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
return ({
"raw_command": prefix.key,
"command": prefix.value
} if prefix else {
"raw_command": None,
"command": None
}, {
"raw_command": suffix.key,
"command": suffix.value
} if suffix else {
"raw_command": None,
"command": None
})
2020-07-25 12:28:30 +08:00
2020-08-17 16:09:41 +08:00
def startswith(msg: str) -> Rule:
2020-09-13 22:36:40 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
匹配消息开头
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
:参数:
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
* ``msg: str``: 消息开头字符串
"""
2020-07-25 12:28:30 +08:00
2020-08-17 16:09:41 +08:00
async def _startswith(bot: Bot, event: Event, state: dict) -> bool:
2020-09-13 22:36:40 +08:00
return event.plain_text.startswith(msg)
2020-07-25 12:28:30 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_startswith)
2020-07-25 12:28:30 +08:00
2020-08-17 16:09:41 +08:00
def endswith(msg: str) -> Rule:
2020-09-13 22:36:40 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
匹配消息结尾
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
:参数:
2020-11-30 11:08:00 +08:00
2020-09-13 22:36:40 +08:00
* ``msg: str``: 消息结尾字符串
"""
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
async def _endswith(bot: Bot, event: Event, state: dict) -> bool:
2020-09-13 22:36:40 +08:00
return event.plain_text.endswith(msg)
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_endswith)
2020-05-02 20:03:36 +08:00
def keyword(*keywords: str) -> Rule:
"""
:说明:
2020-11-30 11:08:00 +08:00
匹配消息关键词
2020-11-30 11:08:00 +08:00
:参数:
2020-11-30 11:08:00 +08:00
* ``*keywords: str``: 关键词
"""
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
async def _keyword(bot: Bot, event: Event, state: dict) -> bool:
return bool(event.plain_text and
any(keyword in event.plain_text for keyword in keywords))
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_keyword)
2020-05-02 20:03:36 +08:00
2020-10-22 22:08:19 +08:00
def command(*cmds: Union[str, Tuple[str, ...]]) -> Rule:
"""
:说明:
2020-11-30 11:08:00 +08:00
命令形式匹配根据配置里提供的 ``command_start``, ``command_sep`` 判断消息是否为命令
2020-10-28 13:17:41 +08:00
可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令``("test",)``通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本``"/test"``
2020-11-30 11:08:00 +08:00
:参数:
2020-11-30 11:08:00 +08:00
2020-10-22 22:08:19 +08:00
* ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容
2020-11-30 11:08:00 +08:00
:示例:
2020-11-30 11:08:00 +08:00
使用默认 ``command_start``, ``command_sep`` 配置
命令 ``("test",)`` 可以匹配``/test`` 开头的消息
命令 ``("test", "sub")`` 可以匹配``/test.sub`` 开头的消息
\:\:\:tip 提示
命令内容与后续消息间无需空格
\:\:\:
"""
2020-08-17 16:09:41 +08:00
config = get_driver().config
command_start = config.command_start
command_sep = config.command_sep
2020-10-22 22:08:19 +08:00
commands = list(cmds)
for index, command in enumerate(commands):
if isinstance(command, str):
commands[index] = command = (command,)
if len(command) == 1:
for start in command_start:
TrieRule.add_prefix(f"{start}{command[0]}", command)
else:
for start, sep in product(command_start, command_sep):
TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
async def _command(bot: Bot, event: Event, state: dict) -> bool:
2020-10-22 22:08:19 +08:00
return state["_prefix"]["command"] in commands
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_command)
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-10-28 13:23:48 +08:00
根据正则表达式进行匹配
2020-10-28 13:45:54 +08:00
可以通过 ``state["_matched"]`` 获取正则表达式匹配成功的文本
2020-11-30 11:08:00 +08:00
:参数:
2020-11-30 11:08:00 +08:00
* ``regex: str``: 正则表达式
* ``flags: Union[int, re.RegexFlag]``: 正则标志
2020-10-18 15:04:45 +08:00
\:\:\:tip 提示
正则表达式匹配使用 search 而非 match如需从头匹配请使用 ``r"^xxx"`` 来确保匹配开头
\:\:\:
"""
2020-08-17 16:09:41 +08:00
pattern = re.compile(regex, flags)
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
async def _regex(bot: Bot, event: Event, state: dict) -> bool:
2020-10-28 13:23:48 +08:00
matched = pattern.search(str(event.message))
if matched:
2020-10-28 13:45:54 +08:00
state["_matched"] = matched.group()
return True
else:
2020-10-28 13:45:54 +08:00
state["_matched"] = None
return False
2020-08-17 16:09:41 +08:00
return Rule(_regex)
2020-08-23 10:45:26 +08:00
def to_me() -> Rule:
"""
:说明:
2020-11-30 11:08:00 +08:00
通过 ``event.to_me`` 判断消息是否是发送给机器人
2020-11-30 11:08:00 +08:00
:参数:
2020-11-30 11:08:00 +08:00
*
"""
2020-08-23 10:45:26 +08:00
async def _to_me(bot: Bot, event: Event, state: dict) -> bool:
return bool(event.to_me)
return Rule(_to_me)