rewrite shell command

This commit is contained in:
yanyongyu 2021-02-02 11:59:14 +08:00
parent c8ebaf38b6
commit 9e4e9f29d1
6 changed files with 212 additions and 50 deletions

View File

@ -218,6 +218,6 @@ def run(host: Optional[str] = None,
from nonebot.plugin import on_message, on_notice, on_request, on_metaevent, CommandGroup, MatcherGroup from nonebot.plugin import on_message, on_notice, on_request, on_metaevent, CommandGroup, MatcherGroup
from nonebot.plugin import on_startswith, on_endswith, on_keyword, on_command, on_regex from nonebot.plugin import on_startswith, on_endswith, on_keyword, on_command, on_shell_command, on_regex
from nonebot.plugin import load_plugin, load_plugins, load_builtin_plugins from nonebot.plugin import load_plugin, load_plugins, load_builtin_plugins
from nonebot.plugin import export, require, get_plugin, get_loaded_plugins from nonebot.plugin import export, require, get_plugin, get_loaded_plugins

View File

@ -37,6 +37,23 @@ class IgnoredException(NoneBotException):
return self.__repr__() return self.__repr__()
class ParserExit(NoneBotException):
"""
:说明:
``shell command`` 处理消息失败时返回的异常
:参数:
* ``status``
* ``message``
"""
def __init__(self, status=0, message=None):
self.status = status
self.message = message
class PausedException(NoneBotException): class PausedException(NoneBotException):
""" """
:说明: :说明:

View File

@ -9,7 +9,6 @@ import re
import sys import sys
import pkgutil import pkgutil
import importlib import importlib
from argparse import ArgumentParser
from types import ModuleType from types import ModuleType
from dataclasses import dataclass from dataclasses import dataclass
from importlib._bootstrap import _load from importlib._bootstrap import _load
@ -20,7 +19,7 @@ from nonebot.log import logger
from nonebot.matcher import Matcher from nonebot.matcher import Matcher
from nonebot.permission import Permission from nonebot.permission import Permission
from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker
from nonebot.rule import Rule, shell_like_command, startswith, endswith, keyword, command, regex from nonebot.rule import Rule, startswith, endswith, keyword, command, shell_command, ArgumentParser, regex
if TYPE_CHECKING: if TYPE_CHECKING:
from nonebot.adapters import Bot, Event from nonebot.adapters import Bot, Event
@ -437,27 +436,26 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
return on_message(command(*commands) & rule, handlers=handlers, **kwargs) return on_message(command(*commands) & rule, handlers=handlers, **kwargs)
def on_shell_like_command(cmd: Union[str, Tuple[str, ...]], def on_shell_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None, rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
Tuple[str, ...]]]] = None, parser: Optional[ArgumentParser] = None,
shell_like_argsparser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]: **kwargs) -> Type[Matcher]:
""" """
:说明: :说明:
注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器 注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器
与普通的 ``on_command`` 不同的是在添加 ``shell_like_argsparser`` 参数时, 响应器会自动处理消息, 与普通的 ``on_command`` 不同的是在添加 ``parser`` 参数时, 响应器会自动处理消息
并将 ``shell_like_argsparser`` 处理的参数保存在 ``state["args"]`` 并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]``
:参数: :参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容 * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名 * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``shell_like_argsparser:Optional[ArgumentParser]``: ``argparse.ArgumentParser`` 对象,是一个类 ``shell`` ``argsparser`` * ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
* ``permission: Optional[Permission]``: 事件响应权限 * ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[T_Handler]]``: 事件处理函数列表 * ``handlers: Optional[List[T_Handler]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次 * ``temp: bool``: 是否为临时事件响应器仅执行一次
@ -484,7 +482,9 @@ def on_shell_like_command(cmd: Union[str, Tuple[str, ...]],
handlers.insert(0, _strip_cmd) handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set()) commands = set([cmd]) | (aliases or set())
return on_message(shell_like_command(shell_like_argsparser, *commands) & rule, handlers=handlers, **kwargs) return on_message(shell_command(*commands, parser=parser) & rule,
handlers=handlers,
**kwargs)
def on_regex(pattern: str, def on_regex(pattern: str,
@ -564,6 +564,29 @@ class CommandGroup:
final_kwargs.update(kwargs) final_kwargs.update(kwargs)
return on_command(cmd, **final_kwargs) return on_command(cmd, **final_kwargs)
def shell_command(self, cmd: Union[str, Tuple[str, ...]],
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个新的命令
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀
* ``**kwargs``: 其他传递给 ``on_command`` 的参数将会覆盖命令组默认值
:返回:
- ``Type[Matcher]``
"""
sub_cmd = (cmd,) if isinstance(cmd, str) else cmd
cmd = self.basecmd + sub_cmd
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
return on_shell_command(cmd, **final_kwargs)
class MatcherGroup: class MatcherGroup:
"""事件响应器组合,统一管理。为 ``Matcher`` 创建提供默认属性。""" """事件响应器组合,统一管理。为 ``Matcher`` 创建提供默认属性。"""
@ -851,6 +874,59 @@ class MatcherGroup:
handlers=handlers, handlers=handlers,
**kwargs) **kwargs)
def on_shell_command(self,
cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, Tuple[str,
...]]]] = None,
parser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器
与普通的 ``on_command`` 不同的是在添加 ``parser`` 参数时, 响应器会自动处理消息
并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]``
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[T_Handler]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
segment = message.pop(0)
new_message = message.__class__(
str(segment)
[len(state["_prefix"]["raw_command"]):].strip()) # type: ignore
for new_segment in reversed(new_message):
message.insert(0, new_segment)
handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
return self.on_message(rule=shell_command(*commands, parser=parser) &
rule,
handlers=handlers,
**kwargs)
def on_regex(self, def on_regex(self,
pattern: str, pattern: str,
flags: Union[int, re.RegexFlag] = 0, flags: Union[int, re.RegexFlag] = 0,
@ -968,8 +1044,7 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]:
m.module = name m.module = name
plugin = Plugin(name, module, _tmp_matchers.get(), _export.get()) plugin = Plugin(name, module, _tmp_matchers.get(), _export.get())
plugins[name] = plugin plugins[name] = plugin
logger.opt(colors=True).info( logger.opt(colors=True).info(f'Succeeded to import "<y>{name}</y>"')
f'Succeeded to import "<y>{name}</y>"')
return plugin return plugin
except Exception as e: except Exception as e:
logger.opt(colors=True, exception=e).error( logger.opt(colors=True, exception=e).error(

View File

@ -1,12 +1,11 @@
import re import re
from argparse import ArgumentParser
from types import ModuleType from types import ModuleType
from contextvars import ContextVar from contextvars import ContextVar
from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional
from nonebot.rule import Rule
from nonebot.matcher import Matcher from nonebot.matcher import Matcher
from nonebot.permission import Permission from nonebot.permission import Permission
from nonebot.rule import Rule, ArgumentParser
from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker
plugins: Dict[str, "Plugin"] = ... plugins: Dict[str, "Plugin"] = ...
@ -147,11 +146,10 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
... ...
def on_shell_like_command(cmd: Union[str, Tuple[str, ...]], def on_shell_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None, rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
Tuple[str, ...]]]] = None, parser: Optional[ArgumentParser] = None,
shell_like_argsparser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]: **kwargs) -> Type[Matcher]:
... ...
@ -227,6 +225,22 @@ class CommandGroup:
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]: state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
... ...
def shell_command(
self,
cmd: Union[str, Tuple[str, ...]],
*,
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = ...,
parser: Optional[ArgumentParser] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
class MatcherGroup: class MatcherGroup:
@ -361,6 +375,22 @@ class MatcherGroup:
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]: state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
... ...
def on_shell_command(
self,
*,
cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = ...,
parser: Optional[ArgumentParser] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_regex( def on_regex(
self, self,
*, *,

View File

@ -9,18 +9,19 @@
\:\:\: \:\:\:
""" """
from argparse import ArgumentParser
import re import re
import shlex
import asyncio import asyncio
from itertools import product from itertools import product
from typing import Any, Dict, Union, Tuple, Optional, Callable, NoReturn, Awaitable, TYPE_CHECKING from argparse import Namespace, ArgumentParser as ArgParser
from loguru import logger from typing import Any, Dict, Union, Tuple, Optional, Callable, Sequence, NoReturn, Awaitable, TYPE_CHECKING
from pygtrie import CharTrie from pygtrie import CharTrie
from nonebot import get_driver from nonebot import get_driver
from nonebot.log import logger from nonebot.log import logger
from nonebot.utils import run_sync from nonebot.utils import run_sync
from nonebot.exception import ParserExit
from nonebot.typing import T_State, T_RuleChecker from nonebot.typing import T_State, T_RuleChecker
if TYPE_CHECKING: if TYPE_CHECKING:
@ -278,7 +279,28 @@ def command(*cmds: Union[str, Tuple[str, ...]]) -> Rule:
return Rule(_command) return Rule(_command)
def shell_like_command(shell_like_argsparser: Optional[ArgumentParser] = None, *cmds: Union[str, Tuple[str, ...]]) -> Rule: class ArgumentParser(ArgParser):
def _print_message(self, message, file=None):
pass
def exit(self, status=0, message=None):
raise ParserExit(status=status, message=message)
def parse_args(
self,
args: Optional[Sequence[str]] = None,
namespace: Optional[Namespace] = None
) -> Union[ParserExit, Namespace]:
try:
return super().parse_args(args=args,
namespace=namespace) # type: ignore
except ParserExit as e:
return e
def shell_command(*cmds: Union[str, Tuple[str, ...]],
parser: Optional[ArgumentParser] = None) -> Rule:
""" """
:说明: :说明:
@ -286,29 +308,36 @@ def shell_like_command(shell_like_argsparser: Optional[ArgumentParser] = None, *
可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令``("test",)``通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本``"/test"`` 可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令``("test",)``通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本``"/test"``
添加 ``shell_like_argpsarser`` 参数后, 可以自动处理消息并将结果保存在 ``state["args"]`` 可以通过 ``state["argv"]`` 获取用户输入的原始参数列表
添加 ``parser`` 参数后, 可以自动处理消息并将结果保存在 ``state["args"]``
:参数: :参数:
* ``shell_like_argsparser: Optional[ArgumentParser]``: ``argparse.ArgumentParser`` 对象, 是一个类 ``shell`` ``argsparser``
* ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容 * ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容
* ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
:示例: :示例:
使用默认 ``command_start``, ``command_sep`` 配置 使用默认 ``command_start``, ``command_sep`` 配置更多示例参考 ``argparse`` 标准库文档
命令 ``("test",)`` 可以匹配``/test`` 开头的消息 .. code-block:: python
命令 ``("test", "sub")`` 可以匹配``/test.sub`` 开头的消息
``shell_like_argsparser`` ``argument`` ``-a`` 时且 ``action`` ``store_true`` ``state["args"]["a"]`` 将会记录 ``True`` from nonebot.rule import ArgumentParser
parser = ArgumentParser()
parser.add_argument("-a", type=bool)
rule = shell_command("ls", parser=parser)
\:\:\:tip 提示 \:\:\:tip 提示
命令内容与后续消息间无需空格 命令内容与后续消息间无需空格
\:\:\: \:\:\:
""" """
if not isinstance(parser, ArgumentParser):
raise TypeError(
"`parser` must be an instance of nonebot.rule.ArgumentParser")
config = get_driver().config config = get_driver().config
command_start = config.command_start command_start = config.command_start
command_sep = config.command_sep command_sep = config.command_sep
@ -324,24 +353,21 @@ def shell_like_command(shell_like_argsparser: Optional[ArgumentParser] = None, *
for start, sep in product(command_start, command_sep): for start, sep in product(command_start, command_sep):
TrieRule.add_prefix(f"{start}{sep.join(command)}", command) TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
async def _shell_like_command(bot: "Bot", event: "Event", state: T_State) -> bool: async def _shell_command(bot: "Bot", event: "Event",
state: T_State) -> bool:
if state["_prefix"]["command"] in commands: if state["_prefix"]["command"] in commands:
if shell_like_argsparser:
message = str(event.get_message()) message = str(event.get_message())
strip_message = message[len( strip_message = message[len(state["_prefix"]["raw_command"]
state["_prefix"]["raw_command"]):].lstrip() ):].lstrip()
try: state["argv"] = shlex.split(strip_message)
args = shell_like_argsparser.parse_args( if parser:
strip_message.split()) args = parser.parse_args(state["argv"])
state["args"]=dict() state["args"] = args
state["args"].update(**args.__dict__)
except:
pass
return True return True
else: else:
return False return False
return Rule(_shell_like_command) return Rule(_shell_command)
def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule: def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
@ -372,7 +398,6 @@ def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
state["_matched"] = matched.group() state["_matched"] = matched.group()
return True return True
else: else:
state["_matched"] = None
return False return False
return Rule(_regex) return Rule(_regex)

View File

@ -0,0 +1,15 @@
from nonebot.adapters import Bot
from nonebot.typing import T_State
from nonebot import on_shell_command
from nonebot.rule import to_me, ArgumentParser
parser = ArgumentParser()
parser.add_argument("-a", type=bool)
shell = on_shell_command("ls", to_me(), parser=parser)
@shell.handle()
async def _(bot: Bot, state: T_State):
print(state["argv"])
print(state["args"])