2021-05-31 01:06:56 +08:00
|
|
|
|
r"""
|
2020-09-13 13:01:23 +08:00
|
|
|
|
规则
|
|
|
|
|
====
|
|
|
|
|
|
2021-11-21 16:12:36 +08:00
|
|
|
|
每个事件响应器 ``Matcher`` 拥有一个匹配规则 ``Rule`` ,其中是 ``RuleChecker`` 的集合,只有当所有 ``RuleChecker`` 检查结果为 ``True`` 时继续运行。
|
2020-09-13 13:01:23 +08:00
|
|
|
|
|
|
|
|
|
\:\:\:tip 提示
|
2021-11-21 16:12:36 +08:00
|
|
|
|
``RuleChecker`` 既可以是 async function 也可以是 sync function
|
2020-09-13 13:01:23 +08:00
|
|
|
|
\:\:\:
|
|
|
|
|
"""
|
2020-06-30 10:13:58 +08:00
|
|
|
|
|
2020-05-02 20:03:36 +08:00
|
|
|
|
import re
|
2021-02-02 11:59:14 +08:00
|
|
|
|
import shlex
|
2020-08-14 17:41:24 +08:00
|
|
|
|
import asyncio
|
2020-08-17 16:09:41 +08:00
|
|
|
|
from itertools import product
|
2021-09-27 00:19:30 +08:00
|
|
|
|
from argparse import Namespace
|
2021-11-19 18:18:53 +08:00
|
|
|
|
from contextlib import AsyncExitStack
|
2021-11-17 00:27:58 +08:00
|
|
|
|
from typing_extensions import TypedDict
|
2021-09-27 00:19:30 +08:00
|
|
|
|
from argparse import ArgumentParser as ArgParser
|
2021-12-12 18:19:08 +08:00
|
|
|
|
from typing import Any, Set, List, Tuple, Union, NoReturn, Optional, Sequence
|
2020-05-05 16:11:05 +08:00
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
from pygtrie import CharTrie
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
from nonebot import get_driver
|
2020-08-17 16:09:41 +08:00
|
|
|
|
from nonebot.log import logger
|
2021-12-12 18:19:08 +08:00
|
|
|
|
from nonebot.dependencies import Dependent
|
2021-12-01 19:21:31 +08:00
|
|
|
|
from nonebot.exception import ParserExit, SkippedException
|
2021-12-14 01:08:48 +08:00
|
|
|
|
from nonebot.adapters import Bot, Event, Message, MessageSegment
|
2021-12-23 17:50:59 +08:00
|
|
|
|
from nonebot.typing import T_State, T_RuleChecker, T_DependencyCache
|
2021-12-14 01:08:48 +08:00
|
|
|
|
from nonebot.consts import (
|
|
|
|
|
CMD_KEY,
|
|
|
|
|
PREFIX_KEY,
|
|
|
|
|
REGEX_DICT,
|
|
|
|
|
SHELL_ARGS,
|
|
|
|
|
SHELL_ARGV,
|
|
|
|
|
CMD_ARG_KEY,
|
|
|
|
|
RAW_CMD_KEY,
|
|
|
|
|
REGEX_GROUP,
|
|
|
|
|
REGEX_MATCHED,
|
|
|
|
|
)
|
2021-12-23 17:50:59 +08:00
|
|
|
|
from nonebot.params import (
|
|
|
|
|
State,
|
|
|
|
|
Command,
|
|
|
|
|
BotParam,
|
|
|
|
|
EventToMe,
|
|
|
|
|
EventType,
|
|
|
|
|
EventParam,
|
|
|
|
|
StateParam,
|
|
|
|
|
DependParam,
|
|
|
|
|
DefaultParam,
|
|
|
|
|
EventMessage,
|
|
|
|
|
EventPlainText,
|
|
|
|
|
)
|
2021-11-17 00:27:58 +08:00
|
|
|
|
|
2021-11-22 23:21:26 +08:00
|
|
|
|
CMD_RESULT = TypedDict(
|
2021-12-14 01:08:48 +08:00
|
|
|
|
"CMD_RESULT",
|
|
|
|
|
{
|
|
|
|
|
"command": Optional[Tuple[str, ...]],
|
|
|
|
|
"raw_command": Optional[str],
|
|
|
|
|
"command_arg": Optional[Message[MessageSegment]],
|
|
|
|
|
},
|
2021-11-22 23:21:26 +08:00
|
|
|
|
)
|
2021-11-17 00:27:58 +08:00
|
|
|
|
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
class Rule:
|
2020-09-13 13:01:23 +08:00
|
|
|
|
"""
|
|
|
|
|
:说明:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 13:01:23 +08:00
|
|
|
|
``Matcher`` 规则类,当事件传递时,在 ``Matcher`` 运行前进行检查。
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 13:01:23 +08:00
|
|
|
|
:示例:
|
|
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
|
|
Rule(async_function) & sync_function
|
|
|
|
|
# 等价于
|
|
|
|
|
from nonebot.utils import run_sync
|
|
|
|
|
Rule(async_function, run_sync(sync_function))
|
|
|
|
|
"""
|
2021-11-22 23:21:26 +08:00
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
__slots__ = ("checkers",)
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2021-11-22 11:38:42 +08:00
|
|
|
|
HANDLER_PARAM_TYPES = [
|
2021-12-23 17:50:59 +08:00
|
|
|
|
DependParam,
|
|
|
|
|
BotParam,
|
|
|
|
|
EventParam,
|
|
|
|
|
StateParam,
|
|
|
|
|
DefaultParam,
|
2021-11-19 18:18:53 +08:00
|
|
|
|
]
|
|
|
|
|
|
2021-12-12 18:19:08 +08:00
|
|
|
|
def __init__(self, *checkers: Union[T_RuleChecker, Dependent[bool]]) -> None:
|
2020-09-13 13:01:23 +08:00
|
|
|
|
"""
|
|
|
|
|
:参数:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2021-12-12 18:19:08 +08:00
|
|
|
|
* ``*checkers: Union[T_RuleChecker, Dependent[bool]]``: RuleChecker
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 13:01:23 +08:00
|
|
|
|
"""
|
2021-12-12 18:19:08 +08:00
|
|
|
|
self.checkers: Set[Dependent[bool]] = set(
|
2021-11-22 23:21:26 +08:00
|
|
|
|
checker
|
2021-12-12 18:19:08 +08:00
|
|
|
|
if isinstance(checker, Dependent)
|
2021-12-23 19:36:29 +08:00
|
|
|
|
else Dependent[bool].parse(
|
|
|
|
|
call=checker, allow_types=self.HANDLER_PARAM_TYPES
|
|
|
|
|
)
|
2021-11-22 23:21:26 +08:00
|
|
|
|
for checker in checkers
|
|
|
|
|
)
|
2020-09-13 13:01:23 +08:00
|
|
|
|
"""
|
|
|
|
|
:说明:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 13:01:23 +08:00
|
|
|
|
存储 ``RuleChecker``
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 13:01:23 +08:00
|
|
|
|
:类型:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2021-12-12 18:19:08 +08:00
|
|
|
|
* ``Set[Dependent[bool]]``
|
2020-09-13 13:01:23 +08:00
|
|
|
|
"""
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2021-11-19 18:18:53 +08:00
|
|
|
|
async def __call__(
|
|
|
|
|
self,
|
|
|
|
|
bot: Bot,
|
|
|
|
|
event: Event,
|
|
|
|
|
state: T_State,
|
|
|
|
|
stack: Optional[AsyncExitStack] = None,
|
2021-12-16 23:22:25 +08:00
|
|
|
|
dependency_cache: Optional[T_DependencyCache] = None,
|
2021-11-22 23:21:26 +08:00
|
|
|
|
) -> bool:
|
2020-09-13 13:01:23 +08:00
|
|
|
|
"""
|
|
|
|
|
:说明:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 13:01:23 +08:00
|
|
|
|
检查是否符合所有规则
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 13:01:23 +08:00
|
|
|
|
:参数:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 13:01:23 +08:00
|
|
|
|
* ``bot: Bot``: Bot 对象
|
|
|
|
|
* ``event: Event``: Event 对象
|
2020-12-17 21:09:30 +08:00
|
|
|
|
* ``state: T_State``: 当前 State
|
2021-11-21 12:36:44 +08:00
|
|
|
|
* ``stack: Optional[AsyncExitStack]``: 异步上下文栈
|
2021-12-12 18:19:08 +08:00
|
|
|
|
* ``dependency_cache: Optional[CacheDict[T_Handler, Any]]``: 依赖缓存
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 13:01:23 +08:00
|
|
|
|
:返回:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 13:01:23 +08:00
|
|
|
|
- ``bool``
|
|
|
|
|
"""
|
2021-11-21 12:36:44 +08:00
|
|
|
|
if not self.checkers:
|
|
|
|
|
return True
|
2021-12-01 19:21:31 +08:00
|
|
|
|
try:
|
|
|
|
|
results = await asyncio.gather(
|
|
|
|
|
*(
|
|
|
|
|
checker(
|
|
|
|
|
bot=bot,
|
|
|
|
|
event=event,
|
|
|
|
|
state=state,
|
2021-12-12 18:19:08 +08:00
|
|
|
|
stack=stack,
|
|
|
|
|
dependency_cache=dependency_cache,
|
2021-12-01 19:21:31 +08:00
|
|
|
|
)
|
|
|
|
|
for checker in self.checkers
|
2021-11-22 23:21:26 +08:00
|
|
|
|
)
|
|
|
|
|
)
|
2021-12-01 19:21:31 +08:00
|
|
|
|
except SkippedException:
|
|
|
|
|
return False
|
2020-08-17 16:09:41 +08:00
|
|
|
|
return all(results)
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2020-12-17 21:09:30 +08:00
|
|
|
|
def __and__(self, other: Optional[Union["Rule", T_RuleChecker]]) -> "Rule":
|
2020-09-27 18:05:13 +08:00
|
|
|
|
if other is None:
|
|
|
|
|
return self
|
|
|
|
|
elif isinstance(other, Rule):
|
2021-11-21 12:36:44 +08:00
|
|
|
|
return Rule(*self.checkers, *other.checkers)
|
2020-08-14 17:41:24 +08:00
|
|
|
|
else:
|
2021-11-21 12:36:44 +08:00
|
|
|
|
return Rule(*self.checkers, other)
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
def __or__(self, other) -> NoReturn:
|
|
|
|
|
raise RuntimeError("Or operation between rules is not allowed.")
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
class TrieRule:
|
|
|
|
|
prefix: CharTrie = CharTrie()
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
@classmethod
|
|
|
|
|
def add_prefix(cls, prefix: str, value: Any):
|
|
|
|
|
if prefix in cls.prefix:
|
|
|
|
|
logger.warning(f'Duplicated prefix rule "{prefix}"')
|
|
|
|
|
return
|
|
|
|
|
cls.prefix[prefix] = value
|
2020-08-14 17:41:24 +08:00
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
@classmethod
|
2021-12-14 01:08:48 +08:00
|
|
|
|
def get_value(cls, bot: Bot, event: Event, state: T_State) -> CMD_RESULT:
|
|
|
|
|
prefix = CMD_RESULT(command=None, raw_command=None, command_arg=None)
|
2021-11-17 00:27:58 +08:00
|
|
|
|
state[PREFIX_KEY] = prefix
|
2020-12-09 19:57:49 +08:00
|
|
|
|
if event.get_type() != "message":
|
2021-12-14 01:08:48 +08:00
|
|
|
|
return prefix
|
2021-11-17 00:27:58 +08:00
|
|
|
|
|
2020-12-09 19:57:49 +08:00
|
|
|
|
message = event.get_message()
|
2021-11-17 00:27:58 +08:00
|
|
|
|
message_seg: MessageSegment = message[0]
|
2021-02-07 20:57:08 +08:00
|
|
|
|
if message_seg.is_text():
|
2021-12-14 01:08:48 +08:00
|
|
|
|
segment_text = str(message_seg).lstrip()
|
|
|
|
|
pf = cls.prefix.longest_prefix(segment_text)
|
2021-11-17 00:27:58 +08:00
|
|
|
|
prefix[RAW_CMD_KEY] = pf.key
|
|
|
|
|
prefix[CMD_KEY] = pf.value
|
2021-12-14 01:08:48 +08:00
|
|
|
|
if pf.key:
|
|
|
|
|
msg = message.copy()
|
|
|
|
|
msg.pop(0)
|
|
|
|
|
new_message = msg.__class__(segment_text[len(pf.key) :].lstrip())
|
|
|
|
|
for new_segment in reversed(new_message):
|
|
|
|
|
msg.insert(0, new_segment)
|
|
|
|
|
prefix[CMD_ARG_KEY] = msg
|
|
|
|
|
|
|
|
|
|
return prefix
|
2020-07-25 12:28:30 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class StartswithRule:
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, msg: Tuple[str, ...], ignorecase: bool = False):
|
|
|
|
|
self.msg = msg
|
|
|
|
|
self.ignorecase = ignorecase
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
async def __call__(
|
|
|
|
|
self, type: str = EventType(), text: str = EventPlainText()
|
|
|
|
|
) -> Any:
|
|
|
|
|
if type != "message":
|
2021-12-06 10:10:51 +08:00
|
|
|
|
return False
|
|
|
|
|
return bool(
|
|
|
|
|
re.match(
|
|
|
|
|
f"^(?:{'|'.join(re.escape(prefix) for prefix in self.msg)})",
|
|
|
|
|
text,
|
|
|
|
|
re.IGNORECASE if self.ignorecase else 0,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2021-11-22 23:21:26 +08:00
|
|
|
|
def startswith(msg: Union[str, Tuple[str, ...]], ignorecase: bool = False) -> Rule:
|
2020-09-13 22:36:40 +08:00
|
|
|
|
"""
|
|
|
|
|
:说明:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 22:36:40 +08:00
|
|
|
|
匹配消息开头
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 22:36:40 +08:00
|
|
|
|
:参数:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 22:36:40 +08:00
|
|
|
|
* ``msg: str``: 消息开头字符串
|
|
|
|
|
"""
|
2021-04-04 12:28:10 +08:00
|
|
|
|
if isinstance(msg, str):
|
|
|
|
|
msg = (msg,)
|
2020-07-25 12:28:30 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(StartswithRule(msg, ignorecase))
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
2021-04-04 12:19:03 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class EndswithRule:
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, msg: Tuple[str, ...], ignorecase: bool = False):
|
|
|
|
|
self.msg = msg
|
|
|
|
|
self.ignorecase = ignorecase
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
async def __call__(
|
|
|
|
|
self, type: str = EventType(), text: str = EventPlainText()
|
|
|
|
|
) -> Any:
|
|
|
|
|
if type != "message":
|
2020-12-09 19:57:49 +08:00
|
|
|
|
return False
|
2021-12-06 10:10:51 +08:00
|
|
|
|
return bool(
|
|
|
|
|
re.search(
|
|
|
|
|
f"(?:{'|'.join(re.escape(prefix) for prefix in self.msg)})$",
|
|
|
|
|
text,
|
|
|
|
|
re.IGNORECASE if self.ignorecase else 0,
|
|
|
|
|
)
|
|
|
|
|
)
|
2020-07-25 12:28:30 +08:00
|
|
|
|
|
|
|
|
|
|
2021-11-22 23:21:26 +08:00
|
|
|
|
def endswith(msg: Union[str, Tuple[str, ...]], ignorecase: bool = False) -> Rule:
|
2020-09-13 22:36:40 +08:00
|
|
|
|
"""
|
|
|
|
|
:说明:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 22:36:40 +08:00
|
|
|
|
匹配消息结尾
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 22:36:40 +08:00
|
|
|
|
:参数:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-09-13 22:36:40 +08:00
|
|
|
|
* ``msg: str``: 消息结尾字符串
|
|
|
|
|
"""
|
2021-04-04 12:28:10 +08:00
|
|
|
|
if isinstance(msg, str):
|
|
|
|
|
msg = (msg,)
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(EndswithRule(msg, ignorecase))
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class KeywordsRule:
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, *keywords: str):
|
|
|
|
|
self.keywords = keywords
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
async def __call__(
|
|
|
|
|
self, type: str = EventType(), text: str = EventPlainText()
|
|
|
|
|
) -> bool:
|
|
|
|
|
if type != "message":
|
2020-12-09 19:57:49 +08:00
|
|
|
|
return False
|
2021-12-06 10:10:51 +08:00
|
|
|
|
return bool(text and any(keyword in text for keyword in self.keywords))
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
|
|
|
|
|
2020-10-30 16:26:04 +08:00
|
|
|
|
def keyword(*keywords: str) -> Rule:
|
2020-10-06 02:08:48 +08:00
|
|
|
|
"""
|
|
|
|
|
:说明:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-10-06 02:08:48 +08:00
|
|
|
|
匹配消息关键词
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-10-06 02:08:48 +08:00
|
|
|
|
:参数:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-10-30 16:26:04 +08:00
|
|
|
|
* ``*keywords: str``: 关键词
|
2020-10-06 02:08:48 +08:00
|
|
|
|
"""
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(KeywordsRule(*keywords))
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class CommandRule:
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, cmds: List[Tuple[str, ...]]):
|
|
|
|
|
self.cmds = cmds
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
async def __call__(self, cmd: Tuple[str, ...] = Command()) -> bool:
|
|
|
|
|
return cmd in self.cmds
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return f"<Command {self.cmds}>"
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
|
|
|
|
|
2020-10-22 22:08:19 +08:00
|
|
|
|
def command(*cmds: Union[str, Tuple[str, ...]]) -> Rule:
|
2021-05-31 01:06:56 +08:00
|
|
|
|
r"""
|
2020-10-06 02:08:48 +08:00
|
|
|
|
:说明:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-10-06 02:08:48 +08:00
|
|
|
|
命令形式匹配,根据配置里提供的 ``command_start``, ``command_sep`` 判断消息是否为命令。
|
2020-10-28 13:17:41 +08:00
|
|
|
|
|
|
|
|
|
可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令(例:``("test",)``),通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本(例:``"/test"``)。
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-10-06 02:08:48 +08:00
|
|
|
|
:参数:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-10-22 22:08:19 +08:00
|
|
|
|
* ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-10-06 02:08:48 +08:00
|
|
|
|
:示例:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-10-06 02:08:48 +08:00
|
|
|
|
使用默认 ``command_start``, ``command_sep`` 配置
|
|
|
|
|
|
|
|
|
|
命令 ``("test",)`` 可以匹配:``/test`` 开头的消息
|
|
|
|
|
命令 ``("test", "sub")`` 可以匹配”``/test.sub`` 开头的消息
|
|
|
|
|
|
|
|
|
|
\:\:\:tip 提示
|
|
|
|
|
命令内容与后续消息间无需空格!
|
|
|
|
|
\:\:\:
|
|
|
|
|
"""
|
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
config = get_driver().config
|
|
|
|
|
command_start = config.command_start
|
|
|
|
|
command_sep = config.command_sep
|
2021-12-06 10:10:51 +08:00
|
|
|
|
commands: List[Tuple[str, ...]] = []
|
|
|
|
|
for command in cmds:
|
2020-10-22 22:08:19 +08:00
|
|
|
|
if isinstance(command, str):
|
2021-12-06 10:10:51 +08:00
|
|
|
|
command = (command,)
|
|
|
|
|
|
|
|
|
|
commands.append(command)
|
2020-10-22 22:08:19 +08:00
|
|
|
|
|
|
|
|
|
if len(command) == 1:
|
|
|
|
|
for start in command_start:
|
|
|
|
|
TrieRule.add_prefix(f"{start}{command[0]}", command)
|
|
|
|
|
else:
|
|
|
|
|
for start, sep in product(command_start, command_sep):
|
|
|
|
|
TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(CommandRule(commands))
|
2020-05-02 20:03:36 +08:00
|
|
|
|
|
|
|
|
|
|
2021-02-02 11:59:14 +08:00
|
|
|
|
class ArgumentParser(ArgParser):
|
2021-02-02 12:15:20 +08:00
|
|
|
|
"""
|
|
|
|
|
:说明:
|
|
|
|
|
|
|
|
|
|
``shell_like`` 命令参数解析器,解析出错时不会退出程序。
|
|
|
|
|
"""
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
|
|
|
|
def _print_message(self, message, file=None):
|
2021-03-25 18:22:56 +08:00
|
|
|
|
old_message: str = getattr(self, "message", "")
|
|
|
|
|
if old_message:
|
|
|
|
|
old_message += "\n"
|
|
|
|
|
old_message += message
|
|
|
|
|
setattr(self, "message", old_message)
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
2021-11-19 18:18:53 +08:00
|
|
|
|
def exit(self, status: int = 0, message: Optional[str] = None):
|
2021-11-22 23:21:26 +08:00
|
|
|
|
raise ParserExit(
|
|
|
|
|
status=status, message=message or getattr(self, "message", None)
|
|
|
|
|
)
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
2021-11-22 23:21:26 +08:00
|
|
|
|
def parse_args(
|
|
|
|
|
self,
|
|
|
|
|
args: Optional[Sequence[str]] = None,
|
|
|
|
|
namespace: Optional[Namespace] = None,
|
|
|
|
|
) -> Namespace:
|
2021-06-17 10:14:36 +08:00
|
|
|
|
setattr(self, "message", "")
|
2021-11-22 23:21:26 +08:00
|
|
|
|
return super().parse_args(args=args, namespace=namespace) # type: ignore
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class ShellCommandRule:
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, cmds: List[Tuple[str, ...]], parser: Optional[ArgumentParser]):
|
|
|
|
|
self.cmds = cmds
|
|
|
|
|
self.parser = parser
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
async def __call__(
|
|
|
|
|
self,
|
|
|
|
|
cmd: Tuple[str, ...] = Command(),
|
|
|
|
|
msg: Message = EventMessage(),
|
|
|
|
|
state: T_State = State(),
|
|
|
|
|
) -> bool:
|
|
|
|
|
if cmd in self.cmds:
|
|
|
|
|
message = str(msg)
|
2021-12-06 10:10:51 +08:00
|
|
|
|
strip_message = message[len(state[PREFIX_KEY][RAW_CMD_KEY]) :].lstrip()
|
|
|
|
|
state[SHELL_ARGV] = shlex.split(strip_message)
|
|
|
|
|
if self.parser:
|
|
|
|
|
try:
|
|
|
|
|
args = self.parser.parse_args(state[SHELL_ARGV])
|
|
|
|
|
state[SHELL_ARGS] = args
|
|
|
|
|
except ParserExit as e:
|
|
|
|
|
state[SHELL_ARGS] = e
|
|
|
|
|
return True
|
|
|
|
|
else:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
2021-11-22 23:21:26 +08:00
|
|
|
|
def shell_command(
|
|
|
|
|
*cmds: Union[str, Tuple[str, ...]], parser: Optional[ArgumentParser] = None
|
|
|
|
|
) -> Rule:
|
2021-05-31 01:06:56 +08:00
|
|
|
|
r"""
|
2021-02-01 22:28:48 +08:00
|
|
|
|
:说明:
|
|
|
|
|
|
|
|
|
|
支持 ``shell_like`` 解析参数的命令形式匹配,根据配置里提供的 ``command_start``, ``command_sep`` 判断消息是否为命令。
|
|
|
|
|
|
|
|
|
|
可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令(例:``("test",)``),通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本(例:``"/test"``)。
|
|
|
|
|
|
2021-02-02 11:59:14 +08:00
|
|
|
|
可以通过 ``state["argv"]`` 获取用户输入的原始参数列表
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
2021-02-02 11:59:14 +08:00
|
|
|
|
添加 ``parser`` 参数后, 可以自动处理消息并将结果保存在 ``state["args"]`` 中。
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
|
|
|
|
:参数:
|
|
|
|
|
|
|
|
|
|
* ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容
|
2021-02-02 11:59:14 +08:00
|
|
|
|
* ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
|
|
|
|
:示例:
|
|
|
|
|
|
2021-02-02 11:59:14 +08:00
|
|
|
|
使用默认 ``command_start``, ``command_sep`` 配置,更多示例参考 ``argparse`` 标准库文档。
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
2021-02-02 11:59:14 +08:00
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
|
|
from nonebot.rule import ArgumentParser
|
|
|
|
|
|
|
|
|
|
parser = ArgumentParser()
|
2021-02-02 12:39:23 +08:00
|
|
|
|
parser.add_argument("-a", action="store_true")
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
|
|
|
|
rule = shell_command("ls", parser=parser)
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
|
|
|
|
\:\:\:tip 提示
|
|
|
|
|
命令内容与后续消息间无需空格!
|
|
|
|
|
\:\:\:
|
|
|
|
|
"""
|
2021-12-23 17:20:26 +08:00
|
|
|
|
if parser is not None and not isinstance(parser, ArgumentParser):
|
2021-11-22 23:21:26 +08:00
|
|
|
|
raise TypeError("`parser` must be an instance of nonebot.rule.ArgumentParser")
|
2021-02-02 11:59:14 +08:00
|
|
|
|
|
2021-02-01 22:28:48 +08:00
|
|
|
|
config = get_driver().config
|
|
|
|
|
command_start = config.command_start
|
|
|
|
|
command_sep = config.command_sep
|
2021-12-06 10:10:51 +08:00
|
|
|
|
commands: List[Tuple[str, ...]] = []
|
|
|
|
|
for command in cmds:
|
2021-02-01 22:28:48 +08:00
|
|
|
|
if isinstance(command, str):
|
2021-12-06 10:10:51 +08:00
|
|
|
|
command = (command,)
|
|
|
|
|
|
|
|
|
|
commands.append(command)
|
2021-02-01 22:28:48 +08:00
|
|
|
|
|
|
|
|
|
if len(command) == 1:
|
|
|
|
|
for start in command_start:
|
|
|
|
|
TrieRule.add_prefix(f"{start}{command[0]}", command)
|
|
|
|
|
else:
|
|
|
|
|
for start, sep in product(command_start, command_sep):
|
|
|
|
|
TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(ShellCommandRule(commands, parser))
|
2021-12-06 10:10:51 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class RegexRule:
|
2021-12-06 10:10:51 +08:00
|
|
|
|
def __init__(self, regex: str, flags: int = 0):
|
|
|
|
|
self.regex = regex
|
|
|
|
|
self.flags = flags
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
async def __call__(
|
|
|
|
|
self,
|
|
|
|
|
type: str = EventType(),
|
|
|
|
|
msg: Message = EventMessage(),
|
|
|
|
|
state: T_State = State(),
|
|
|
|
|
) -> bool:
|
|
|
|
|
if type != "message":
|
2021-12-06 10:10:51 +08:00
|
|
|
|
return False
|
2021-12-23 17:50:59 +08:00
|
|
|
|
matched = re.search(self.regex, str(msg), self.flags)
|
2021-12-06 10:10:51 +08:00
|
|
|
|
if matched:
|
|
|
|
|
state[REGEX_MATCHED] = matched.group()
|
|
|
|
|
state[REGEX_GROUP] = matched.groups()
|
|
|
|
|
state[REGEX_DICT] = matched.groupdict()
|
2021-02-01 22:28:48 +08:00
|
|
|
|
return True
|
|
|
|
|
else:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
2020-08-17 16:09:41 +08:00
|
|
|
|
def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
|
2021-05-31 01:06:56 +08:00
|
|
|
|
r"""
|
2020-10-06 02:08:48 +08:00
|
|
|
|
:说明:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-10-28 13:23:48 +08:00
|
|
|
|
根据正则表达式进行匹配。
|
2020-10-30 16:26:04 +08:00
|
|
|
|
|
2021-02-01 11:42:05 +08:00
|
|
|
|
可以通过 ``state["_matched"]`` ``state["_matched_groups"]`` ``state["_matched_dict"]``
|
|
|
|
|
获取正则表达式匹配成功的文本。
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-10-06 02:08:48 +08:00
|
|
|
|
:参数:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-10-06 02:08:48 +08:00
|
|
|
|
* ``regex: str``: 正则表达式
|
|
|
|
|
* ``flags: Union[int, re.RegexFlag]``: 正则标志
|
2020-10-18 15:04:45 +08:00
|
|
|
|
|
|
|
|
|
\:\:\:tip 提示
|
|
|
|
|
正则表达式匹配使用 search 而非 match,如需从头匹配请使用 ``r"^xxx"`` 来确保匹配开头
|
|
|
|
|
\:\:\:
|
2020-10-06 02:08:48 +08:00
|
|
|
|
"""
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(RegexRule(regex, flags))
|
2020-08-23 10:45:26 +08:00
|
|
|
|
|
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
class ToMeRule:
|
|
|
|
|
async def __call__(self, to_me: bool = EventToMe()) -> bool:
|
|
|
|
|
return to_me
|
2021-11-19 18:18:53 +08:00
|
|
|
|
|
|
|
|
|
|
2020-12-10 02:13:25 +08:00
|
|
|
|
def to_me() -> Rule:
|
|
|
|
|
"""
|
|
|
|
|
:说明:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-12-31 17:58:09 +08:00
|
|
|
|
通过 ``event.is_tome()`` 判断事件是否与机器人有关
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-12-10 02:13:25 +08:00
|
|
|
|
:参数:
|
2020-11-30 11:08:00 +08:00
|
|
|
|
|
2020-12-10 02:13:25 +08:00
|
|
|
|
* 无
|
|
|
|
|
"""
|
2020-08-23 10:45:26 +08:00
|
|
|
|
|
2021-12-23 17:50:59 +08:00
|
|
|
|
return Rule(ToMeRule())
|