diff --git a/nonebot/__init__.py b/nonebot/__init__.py
index 11dad7cd..dce9c9ec 100644
--- a/nonebot/__init__.py
+++ b/nonebot/__init__.py
@@ -218,6 +218,6 @@ def run(host: Optional[str] = None,
from nonebot.plugin import on_message, on_notice, on_request, on_metaevent, CommandGroup, MatcherGroup
-from nonebot.plugin import on_startswith, on_endswith, on_keyword, on_command, on_regex
+from nonebot.plugin import on_startswith, on_endswith, on_keyword, on_command, on_shell_command, on_regex
from nonebot.plugin import load_plugin, load_plugins, load_builtin_plugins
from nonebot.plugin import export, require, get_plugin, get_loaded_plugins
diff --git a/nonebot/exception.py b/nonebot/exception.py
index 815ac714..b8a42aa4 100644
--- a/nonebot/exception.py
+++ b/nonebot/exception.py
@@ -37,6 +37,23 @@ class IgnoredException(NoneBotException):
return self.__repr__()
+class ParserExit(NoneBotException):
+ """
+ :说明:
+
+ ``shell command`` 处理消息失败时返回的异常
+
+ :参数:
+
+ * ``status``
+ * ``message``
+ """
+
+ def __init__(self, status=0, message=None):
+ self.status = status
+ self.message = message
+
+
class PausedException(NoneBotException):
"""
:说明:
diff --git a/nonebot/plugin.py b/nonebot/plugin.py
index 562811fd..1e198179 100644
--- a/nonebot/plugin.py
+++ b/nonebot/plugin.py
@@ -9,7 +9,6 @@ import re
import sys
import pkgutil
import importlib
-from argparse import ArgumentParser
from types import ModuleType
from dataclasses import dataclass
from importlib._bootstrap import _load
@@ -20,7 +19,7 @@ from nonebot.log import logger
from nonebot.matcher import Matcher
from nonebot.permission import Permission
from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker
-from nonebot.rule import Rule, shell_like_command, startswith, endswith, keyword, command, regex
+from nonebot.rule import Rule, startswith, endswith, keyword, command, shell_command, ArgumentParser, regex
if TYPE_CHECKING:
from nonebot.adapters import Bot, Event
@@ -437,27 +436,26 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
return on_message(command(*commands) & rule, handlers=handlers, **kwargs)
-def on_shell_like_command(cmd: Union[str, Tuple[str, ...]],
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- aliases: Optional[Set[Union[str,
- Tuple[str, ...]]]] = None,
- shell_like_argsparser: Optional[ArgumentParser] = None,
- **kwargs) -> Type[Matcher]:
+def on_shell_command(cmd: Union[str, Tuple[str, ...]],
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
+ parser: Optional[ArgumentParser] = None,
+ **kwargs) -> Type[Matcher]:
"""
:说明:
注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器。
- 与普通的 ``on_command`` 不同的是,在添加 ``shell_like_argsparser`` 参数时, 响应器会自动处理消息,
+ 与普通的 ``on_command`` 不同的是,在添加 ``parser`` 参数时, 响应器会自动处理消息。
- 并将 ``shell_like_argsparser`` 处理的参数保存在 ``state["args"]`` 中
+ 并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]`` 中
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
- * ``shell_like_argsparser:Optional[ArgumentParser]``: ``argparse.ArgumentParser`` 对象,是一个类 ``shell`` 的 ``argsparser``
+ * ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[T_Handler]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器(仅执行一次)
@@ -484,7 +482,9 @@ def on_shell_like_command(cmd: Union[str, Tuple[str, ...]],
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
- return on_message(shell_like_command(shell_like_argsparser, *commands) & rule, handlers=handlers, **kwargs)
+ return on_message(shell_command(*commands, parser=parser) & rule,
+ handlers=handlers,
+ **kwargs)
def on_regex(pattern: str,
@@ -564,6 +564,29 @@ class CommandGroup:
final_kwargs.update(kwargs)
return on_command(cmd, **final_kwargs)
+ def shell_command(self, cmd: Union[str, Tuple[str, ...]],
+ **kwargs) -> Type[Matcher]:
+ """
+ :说明:
+
+ 注册一个新的命令。
+
+ :参数:
+
+ * ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀
+ * ``**kwargs``: 其他传递给 ``on_command`` 的参数,将会覆盖命令组默认值
+
+ :返回:
+
+ - ``Type[Matcher]``
+ """
+ sub_cmd = (cmd,) if isinstance(cmd, str) else cmd
+ cmd = self.basecmd + sub_cmd
+
+ final_kwargs = self.base_kwargs.copy()
+ final_kwargs.update(kwargs)
+ return on_shell_command(cmd, **final_kwargs)
+
class MatcherGroup:
"""事件响应器组合,统一管理。为 ``Matcher`` 创建提供默认属性。"""
@@ -851,6 +874,59 @@ class MatcherGroup:
handlers=handlers,
**kwargs)
+ def on_shell_command(self,
+ cmd: Union[str, Tuple[str, ...]],
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ aliases: Optional[Set[Union[str, Tuple[str,
+ ...]]]] = None,
+ parser: Optional[ArgumentParser] = None,
+ **kwargs) -> Type[Matcher]:
+ """
+ :说明:
+
+ 注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器。
+
+ 与普通的 ``on_command`` 不同的是,在添加 ``parser`` 参数时, 响应器会自动处理消息。
+
+ 并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]`` 中
+
+ :参数:
+
+ * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
+ * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
+ * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
+ * ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
+ * ``permission: Optional[Permission]``: 事件响应权限
+ * ``handlers: Optional[List[T_Handler]]``: 事件处理函数列表
+ * ``temp: bool``: 是否为临时事件响应器(仅执行一次)
+ * ``priority: int``: 事件响应器优先级
+ * ``block: bool``: 是否阻止事件向更低优先级传递
+ * ``state: Optional[T_State]``: 默认 state
+ * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
+
+ :返回:
+
+ - ``Type[Matcher]``
+ """
+
+ async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
+ message = event.get_message()
+ segment = message.pop(0)
+ new_message = message.__class__(
+ str(segment)
+ [len(state["_prefix"]["raw_command"]):].strip()) # type: ignore
+ for new_segment in reversed(new_message):
+ message.insert(0, new_segment)
+
+ handlers = kwargs.pop("handlers", [])
+ handlers.insert(0, _strip_cmd)
+
+ commands = set([cmd]) | (aliases or set())
+ return self.on_message(rule=shell_command(*commands, parser=parser) &
+ rule,
+ handlers=handlers,
+ **kwargs)
+
def on_regex(self,
pattern: str,
flags: Union[int, re.RegexFlag] = 0,
@@ -968,8 +1044,7 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]:
m.module = name
plugin = Plugin(name, module, _tmp_matchers.get(), _export.get())
plugins[name] = plugin
- logger.opt(colors=True).info(
- f'Succeeded to import "{name}"')
+ logger.opt(colors=True).info(f'Succeeded to import "{name}"')
return plugin
except Exception as e:
logger.opt(colors=True, exception=e).error(
diff --git a/nonebot/plugin.pyi b/nonebot/plugin.pyi
index 1e8cb810..5fbefa8a 100644
--- a/nonebot/plugin.pyi
+++ b/nonebot/plugin.pyi
@@ -1,12 +1,11 @@
import re
-from argparse import ArgumentParser
from types import ModuleType
from contextvars import ContextVar
from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional
-from nonebot.rule import Rule
from nonebot.matcher import Matcher
from nonebot.permission import Permission
+from nonebot.rule import Rule, ArgumentParser
from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker
plugins: Dict[str, "Plugin"] = ...
@@ -147,12 +146,11 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
...
-def on_shell_like_command(cmd: Union[str, Tuple[str, ...]],
- rule: Optional[Union[Rule, T_RuleChecker]] = None,
- aliases: Optional[Set[Union[str,
- Tuple[str, ...]]]] = None,
- shell_like_argsparser: Optional[ArgumentParser] = None,
- **kwargs) -> Type[Matcher]:
+def on_shell_command(cmd: Union[str, Tuple[str, ...]],
+ rule: Optional[Union[Rule, T_RuleChecker]] = None,
+ aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
+ parser: Optional[ArgumentParser] = None,
+ **kwargs) -> Type[Matcher]:
...
@@ -227,6 +225,22 @@ class CommandGroup:
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
+ def shell_command(
+ self,
+ cmd: Union[str, Tuple[str, ...]],
+ *,
+ rule: Optional[Union[Rule, T_RuleChecker]] = ...,
+ aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = ...,
+ parser: Optional[ArgumentParser] = ...,
+ permission: Optional[Permission] = ...,
+ handlers: Optional[List[T_Handler]] = ...,
+ temp: bool = ...,
+ priority: int = ...,
+ block: bool = ...,
+ state: Optional[T_State] = ...,
+ state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
+ ...
+
class MatcherGroup:
@@ -361,6 +375,22 @@ class MatcherGroup:
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
+ def on_shell_command(
+ self,
+ *,
+ cmd: Union[str, Tuple[str, ...]],
+ rule: Optional[Union[Rule, T_RuleChecker]] = ...,
+ aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = ...,
+ parser: Optional[ArgumentParser] = ...,
+ permission: Optional[Permission] = ...,
+ handlers: Optional[List[T_Handler]] = ...,
+ temp: bool = ...,
+ priority: int = ...,
+ block: bool = ...,
+ state: Optional[T_State] = ...,
+ state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
+ ...
+
def on_regex(
self,
*,
diff --git a/nonebot/rule.py b/nonebot/rule.py
index 0c0effc4..09a8ff70 100644
--- a/nonebot/rule.py
+++ b/nonebot/rule.py
@@ -9,18 +9,19 @@
\:\:\:
"""
-from argparse import ArgumentParser
import re
+import shlex
import asyncio
from itertools import product
-from typing import Any, Dict, Union, Tuple, Optional, Callable, NoReturn, Awaitable, TYPE_CHECKING
-from loguru import logger
+from argparse import Namespace, ArgumentParser as ArgParser
+from typing import Any, Dict, Union, Tuple, Optional, Callable, Sequence, NoReturn, Awaitable, TYPE_CHECKING
from pygtrie import CharTrie
from nonebot import get_driver
from nonebot.log import logger
from nonebot.utils import run_sync
+from nonebot.exception import ParserExit
from nonebot.typing import T_State, T_RuleChecker
if TYPE_CHECKING:
@@ -278,7 +279,28 @@ def command(*cmds: Union[str, Tuple[str, ...]]) -> Rule:
return Rule(_command)
-def shell_like_command(shell_like_argsparser: Optional[ArgumentParser] = None, *cmds: Union[str, Tuple[str, ...]]) -> Rule:
+class ArgumentParser(ArgParser):
+
+ def _print_message(self, message, file=None):
+ pass
+
+ def exit(self, status=0, message=None):
+ raise ParserExit(status=status, message=message)
+
+ def parse_args(
+ self,
+ args: Optional[Sequence[str]] = None,
+ namespace: Optional[Namespace] = None
+ ) -> Union[ParserExit, Namespace]:
+ try:
+ return super().parse_args(args=args,
+ namespace=namespace) # type: ignore
+ except ParserExit as e:
+ return e
+
+
+def shell_command(*cmds: Union[str, Tuple[str, ...]],
+ parser: Optional[ArgumentParser] = None) -> Rule:
"""
:说明:
@@ -286,29 +308,36 @@ def shell_like_command(shell_like_argsparser: Optional[ArgumentParser] = None, *
可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令(例:``("test",)``),通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本(例:``"/test"``)。
- 添加 ``shell_like_argpsarser`` 参数后, 可以自动处理消息并将结果保存在 ``state["args"]`` 中。
-
-
+ 可以通过 ``state["argv"]`` 获取用户输入的原始参数列表
+ 添加 ``parser`` 参数后, 可以自动处理消息并将结果保存在 ``state["args"]`` 中。
:参数:
- * ``shell_like_argsparser: Optional[ArgumentParser]``: ``argparse.ArgumentParser`` 对象, 是一个类 ``shell`` 的 ``argsparser``
* ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容
+ * ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
:示例:
- 使用默认 ``command_start``, ``command_sep`` 配置
+ 使用默认 ``command_start``, ``command_sep`` 配置,更多示例参考 ``argparse`` 标准库文档。
- 命令 ``("test",)`` 可以匹配:``/test`` 开头的消息
- 命令 ``("test", "sub")`` 可以匹配”``/test.sub`` 开头的消息
-
- 当 ``shell_like_argsparser`` 的 ``argument`` 为 ``-a`` 时且 ``action`` 为 ``store_true`` , ``state["args"]["a"]`` 将会记录 ``True``
+ .. code-block:: python
+
+ from nonebot.rule import ArgumentParser
+
+ parser = ArgumentParser()
+ parser.add_argument("-a", type=bool)
+
+ rule = shell_command("ls", parser=parser)
\:\:\:tip 提示
命令内容与后续消息间无需空格!
\:\:\:
"""
+ if not isinstance(parser, ArgumentParser):
+ raise TypeError(
+ "`parser` must be an instance of nonebot.rule.ArgumentParser")
+
config = get_driver().config
command_start = config.command_start
command_sep = config.command_sep
@@ -324,24 +353,21 @@ def shell_like_command(shell_like_argsparser: Optional[ArgumentParser] = None, *
for start, sep in product(command_start, command_sep):
TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
- async def _shell_like_command(bot: "Bot", event: "Event", state: T_State) -> bool:
+ async def _shell_command(bot: "Bot", event: "Event",
+ state: T_State) -> bool:
if state["_prefix"]["command"] in commands:
- if shell_like_argsparser:
- message = str(event.get_message())
- strip_message = message[len(
- state["_prefix"]["raw_command"]):].lstrip()
- try:
- args = shell_like_argsparser.parse_args(
- strip_message.split())
- state["args"]=dict()
- state["args"].update(**args.__dict__)
- except:
- pass
+ message = str(event.get_message())
+ strip_message = message[len(state["_prefix"]["raw_command"]
+ ):].lstrip()
+ state["argv"] = shlex.split(strip_message)
+ if parser:
+ args = parser.parse_args(state["argv"])
+ state["args"] = args
return True
else:
return False
- return Rule(_shell_like_command)
+ return Rule(_shell_command)
def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
@@ -372,7 +398,6 @@ def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
state["_matched"] = matched.group()
return True
else:
- state["_matched"] = None
return False
return Rule(_regex)
diff --git a/tests/test_plugins/test_shell.py b/tests/test_plugins/test_shell.py
new file mode 100644
index 00000000..5afebd6d
--- /dev/null
+++ b/tests/test_plugins/test_shell.py
@@ -0,0 +1,15 @@
+from nonebot.adapters import Bot
+from nonebot.typing import T_State
+from nonebot import on_shell_command
+from nonebot.rule import to_me, ArgumentParser
+
+parser = ArgumentParser()
+parser.add_argument("-a", type=bool)
+
+shell = on_shell_command("ls", to_me(), parser=parser)
+
+
+@shell.handle()
+async def _(bot: Bot, state: T_State):
+ print(state["argv"])
+ print(state["args"])