diff --git a/nonebot/plugin.py b/nonebot/plugin.py
index de4073c9..562811fd 100644
--- a/nonebot/plugin.py
+++ b/nonebot/plugin.py
@@ -9,6 +9,7 @@ import re
import sys
import pkgutil
import importlib
+from argparse import ArgumentParser
from types import ModuleType
from dataclasses import dataclass
from importlib._bootstrap import _load
@@ -19,7 +20,7 @@ from nonebot.log import logger
from nonebot.matcher import Matcher
from nonebot.permission import Permission
from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker
-from nonebot.rule import Rule, startswith, endswith, keyword, command, regex
+from nonebot.rule import Rule, shell_like_command, startswith, endswith, keyword, command, regex
if TYPE_CHECKING:
from nonebot.adapters import Bot, Event
@@ -436,6 +437,56 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
return on_message(command(*commands) & rule, handlers=handlers, **kwargs)
+def on_shell_like_command(cmd: Union[str, Tuple[str, ...]],
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ aliases: Optional[Set[Union[str,
+ Tuple[str, ...]]]] = None,
+ shell_like_argsparser: Optional[ArgumentParser] = None,
+ **kwargs) -> Type[Matcher]:
+ """
+ :说明:
+
+ 注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器。
+
+ 与普通的 ``on_command`` 不同的是,在添加 ``shell_like_argsparser`` 参数时, 响应器会自动处理消息,
+
+ 并将 ``shell_like_argsparser`` 处理的参数保存在 ``state["args"]`` 中
+
+ :参数:
+
+ * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
+ * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
+ * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
+ * ``shell_like_argsparser:Optional[ArgumentParser]``: ``argparse.ArgumentParser`` 对象,是一个类 ``shell`` 的 ``argsparser``
+ * ``permission: Optional[Permission]``: 事件响应权限
+ * ``handlers: Optional[List[T_Handler]]``: 事件处理函数列表
+ * ``temp: bool``: 是否为临时事件响应器(仅执行一次)
+ * ``priority: int``: 事件响应器优先级
+ * ``block: bool``: 是否阻止事件向更低优先级传递
+ * ``state: Optional[T_State]``: 默认 state
+ * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
+
+ :返回:
+
+ - ``Type[Matcher]``
+ """
+
+ async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
+ message = event.get_message()
+ segment = message.pop(0)
+ new_message = message.__class__(
+ str(segment)
+ [len(state["_prefix"]["raw_command"]):].strip()) # type: ignore
+ for new_segment in reversed(new_message):
+ message.insert(0, new_segment)
+
+ handlers = kwargs.pop("handlers", [])
+ handlers.insert(0, _strip_cmd)
+
+ commands = set([cmd]) | (aliases or set())
+ return on_message(shell_like_command(shell_like_argsparser, *commands) & rule, handlers=handlers, **kwargs)
+
+
def on_regex(pattern: str,
flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Rule] = None,
@@ -917,7 +968,8 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]:
m.module = name
plugin = Plugin(name, module, _tmp_matchers.get(), _export.get())
plugins[name] = plugin
- logger.opt(colors=True).info(f'Succeeded to import "{name}"')
+ logger.opt(colors=True).info(
+ f'Succeeded to import "{name}"')
return plugin
except Exception as e:
logger.opt(colors=True, exception=e).error(
diff --git a/nonebot/plugin.pyi b/nonebot/plugin.pyi
index db31ad3d..1e8cb810 100644
--- a/nonebot/plugin.pyi
+++ b/nonebot/plugin.pyi
@@ -1,4 +1,5 @@
import re
+from argparse import ArgumentParser
from types import ModuleType
from contextvars import ContextVar
from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional
@@ -146,6 +147,15 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
...
+def on_shell_like_command(cmd: Union[str, Tuple[str, ...]],
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ aliases: Optional[Set[Union[str,
+ Tuple[str, ...]]]] = None,
+ shell_like_argsparser: Optional[ArgumentParser] = None,
+ **kwargs) -> Type[Matcher]:
+ ...
+
+
def on_regex(pattern: str,
flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Rule] = ...,
diff --git a/nonebot/rule.py b/nonebot/rule.py
index d2b8f1fd..0c0effc4 100644
--- a/nonebot/rule.py
+++ b/nonebot/rule.py
@@ -9,10 +9,12 @@
\:\:\:
"""
+from argparse import ArgumentParser
import re
import asyncio
from itertools import product
from typing import Any, Dict, Union, Tuple, Optional, Callable, NoReturn, Awaitable, TYPE_CHECKING
+from loguru import logger
from pygtrie import CharTrie
@@ -276,6 +278,72 @@ def command(*cmds: Union[str, Tuple[str, ...]]) -> Rule:
return Rule(_command)
+def shell_like_command(shell_like_argsparser: Optional[ArgumentParser] = None, *cmds: Union[str, Tuple[str, ...]]) -> Rule:
+ """
+ :说明:
+
+ 支持 ``shell_like`` 解析参数的命令形式匹配,根据配置里提供的 ``command_start``, ``command_sep`` 判断消息是否为命令。
+
+ 可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令(例:``("test",)``),通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本(例:``"/test"``)。
+
+ 添加 ``shell_like_argpsarser`` 参数后, 可以自动处理消息并将结果保存在 ``state["args"]`` 中。
+
+
+
+
+ :参数:
+ * ``shell_like_argsparser: Optional[ArgumentParser]``: ``argparse.ArgumentParser`` 对象, 是一个类 ``shell`` 的 ``argsparser``
+
+ * ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容
+
+ :示例:
+
+ 使用默认 ``command_start``, ``command_sep`` 配置
+
+ 命令 ``("test",)`` 可以匹配:``/test`` 开头的消息
+ 命令 ``("test", "sub")`` 可以匹配”``/test.sub`` 开头的消息
+
+ 当 ``shell_like_argsparser`` 的 ``argument`` 为 ``-a`` 时且 ``action`` 为 ``store_true`` , ``state["args"]["a"]`` 将会记录 ``True``
+
+ \:\:\:tip 提示
+ 命令内容与后续消息间无需空格!
+ \:\:\:
+ """
+ config = get_driver().config
+ command_start = config.command_start
+ command_sep = config.command_sep
+ commands = list(cmds)
+ for index, command in enumerate(commands):
+ if isinstance(command, str):
+ commands[index] = command = (command,)
+
+ if len(command) == 1:
+ for start in command_start:
+ TrieRule.add_prefix(f"{start}{command[0]}", command)
+ else:
+ for start, sep in product(command_start, command_sep):
+ TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
+
+ async def _shell_like_command(bot: "Bot", event: "Event", state: T_State) -> bool:
+ if state["_prefix"]["command"] in commands:
+ if shell_like_argsparser:
+ message = str(event.get_message())
+ strip_message = message[len(
+ state["_prefix"]["raw_command"]):].lstrip()
+ try:
+ args = shell_like_argsparser.parse_args(
+ strip_message.split())
+ state["args"]=dict()
+ state["args"].update(**args.__dict__)
+ except:
+ pass
+ return True
+ else:
+ return False
+
+ return Rule(_shell_like_command)
+
+
def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
"""
:说明: