From f8fb36a1f788777a4eadad9fde817327d05a3491 Mon Sep 17 00:00:00 2001 From: AkiraXie Date: Mon, 1 Feb 2021 22:28:48 +0800 Subject: [PATCH] :zap:Support shell-like command --- nonebot/plugin.py | 56 ++++++++++++++++++++++++++++++++++++-- nonebot/plugin.pyi | 10 +++++++ nonebot/rule.py | 68 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 2 deletions(-) diff --git a/nonebot/plugin.py b/nonebot/plugin.py index de4073c9..562811fd 100644 --- a/nonebot/plugin.py +++ b/nonebot/plugin.py @@ -9,6 +9,7 @@ import re import sys import pkgutil import importlib +from argparse import ArgumentParser from types import ModuleType from dataclasses import dataclass from importlib._bootstrap import _load @@ -19,7 +20,7 @@ from nonebot.log import logger from nonebot.matcher import Matcher from nonebot.permission import Permission from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker -from nonebot.rule import Rule, startswith, endswith, keyword, command, regex +from nonebot.rule import Rule, shell_like_command, startswith, endswith, keyword, command, regex if TYPE_CHECKING: from nonebot.adapters import Bot, Event @@ -436,6 +437,56 @@ def on_command(cmd: Union[str, Tuple[str, ...]], return on_message(command(*commands) & rule, handlers=handlers, **kwargs) +def on_shell_like_command(cmd: Union[str, Tuple[str, ...]], + rule: Optional[Union[Rule, T_RuleChecker]] = None, + aliases: Optional[Set[Union[str, + Tuple[str, ...]]]] = None, + shell_like_argsparser: Optional[ArgumentParser] = None, + **kwargs) -> Type[Matcher]: + """ + :说明: + + 注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器。 + + 与普通的 ``on_command`` 不同的是,在添加 ``shell_like_argsparser`` 参数时, 响应器会自动处理消息, + + 并将 ``shell_like_argsparser`` 处理的参数保存在 ``state["args"]`` 中 + + :参数: + + * ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容 + * ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则 + * ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名 + * ``shell_like_argsparser:Optional[ArgumentParser]``: ``argparse.ArgumentParser`` 对象,是一个类 ``shell`` 的 ``argsparser`` + * ``permission: Optional[Permission]``: 事件响应权限 + * ``handlers: Optional[List[T_Handler]]``: 事件处理函数列表 + * ``temp: bool``: 是否为临时事件响应器(仅执行一次) + * ``priority: int``: 事件响应器优先级 + * ``block: bool``: 是否阻止事件向更低优先级传递 + * ``state: Optional[T_State]``: 默认 state + * ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数 + + :返回: + + - ``Type[Matcher]`` + """ + + async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): + message = event.get_message() + segment = message.pop(0) + new_message = message.__class__( + str(segment) + [len(state["_prefix"]["raw_command"]):].strip()) # type: ignore + for new_segment in reversed(new_message): + message.insert(0, new_segment) + + handlers = kwargs.pop("handlers", []) + handlers.insert(0, _strip_cmd) + + commands = set([cmd]) | (aliases or set()) + return on_message(shell_like_command(shell_like_argsparser, *commands) & rule, handlers=handlers, **kwargs) + + def on_regex(pattern: str, flags: Union[int, re.RegexFlag] = 0, rule: Optional[Rule] = None, @@ -917,7 +968,8 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]: m.module = name plugin = Plugin(name, module, _tmp_matchers.get(), _export.get()) plugins[name] = plugin - logger.opt(colors=True).info(f'Succeeded to import "{name}"') + logger.opt(colors=True).info( + f'Succeeded to import "{name}"') return plugin except Exception as e: logger.opt(colors=True, exception=e).error( diff --git a/nonebot/plugin.pyi b/nonebot/plugin.pyi index db31ad3d..1e8cb810 100644 --- a/nonebot/plugin.pyi +++ b/nonebot/plugin.pyi @@ -1,4 +1,5 @@ import re +from argparse import ArgumentParser from types import ModuleType from contextvars import ContextVar from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional @@ -146,6 +147,15 @@ def on_command(cmd: Union[str, Tuple[str, ...]], ... +def on_shell_like_command(cmd: Union[str, Tuple[str, ...]], + rule: Optional[Union[Rule, T_RuleChecker]] = None, + aliases: Optional[Set[Union[str, + Tuple[str, ...]]]] = None, + shell_like_argsparser: Optional[ArgumentParser] = None, + **kwargs) -> Type[Matcher]: + ... + + def on_regex(pattern: str, flags: Union[int, re.RegexFlag] = 0, rule: Optional[Rule] = ..., diff --git a/nonebot/rule.py b/nonebot/rule.py index d2b8f1fd..0c0effc4 100644 --- a/nonebot/rule.py +++ b/nonebot/rule.py @@ -9,10 +9,12 @@ \:\:\: """ +from argparse import ArgumentParser import re import asyncio from itertools import product from typing import Any, Dict, Union, Tuple, Optional, Callable, NoReturn, Awaitable, TYPE_CHECKING +from loguru import logger from pygtrie import CharTrie @@ -276,6 +278,72 @@ def command(*cmds: Union[str, Tuple[str, ...]]) -> Rule: return Rule(_command) +def shell_like_command(shell_like_argsparser: Optional[ArgumentParser] = None, *cmds: Union[str, Tuple[str, ...]]) -> Rule: + """ + :说明: + + 支持 ``shell_like`` 解析参数的命令形式匹配,根据配置里提供的 ``command_start``, ``command_sep`` 判断消息是否为命令。 + + 可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令(例:``("test",)``),通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本(例:``"/test"``)。 + + 添加 ``shell_like_argpsarser`` 参数后, 可以自动处理消息并将结果保存在 ``state["args"]`` 中。 + + + + + :参数: + * ``shell_like_argsparser: Optional[ArgumentParser]``: ``argparse.ArgumentParser`` 对象, 是一个类 ``shell`` 的 ``argsparser`` + + * ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容 + + :示例: + + 使用默认 ``command_start``, ``command_sep`` 配置 + + 命令 ``("test",)`` 可以匹配:``/test`` 开头的消息 + 命令 ``("test", "sub")`` 可以匹配”``/test.sub`` 开头的消息 + + 当 ``shell_like_argsparser`` 的 ``argument`` 为 ``-a`` 时且 ``action`` 为 ``store_true`` , ``state["args"]["a"]`` 将会记录 ``True`` + + \:\:\:tip 提示 + 命令内容与后续消息间无需空格! + \:\:\: + """ + config = get_driver().config + command_start = config.command_start + command_sep = config.command_sep + commands = list(cmds) + for index, command in enumerate(commands): + if isinstance(command, str): + commands[index] = command = (command,) + + if len(command) == 1: + for start in command_start: + TrieRule.add_prefix(f"{start}{command[0]}", command) + else: + for start, sep in product(command_start, command_sep): + TrieRule.add_prefix(f"{start}{sep.join(command)}", command) + + async def _shell_like_command(bot: "Bot", event: "Event", state: T_State) -> bool: + if state["_prefix"]["command"] in commands: + if shell_like_argsparser: + message = str(event.get_message()) + strip_message = message[len( + state["_prefix"]["raw_command"]):].lstrip() + try: + args = shell_like_argsparser.parse_args( + strip_message.split()) + state["args"]=dict() + state["args"].update(**args.__dict__) + except: + pass + return True + else: + return False + + return Rule(_shell_like_command) + + def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule: """ :说明: