Support shell-like command

This commit is contained in:
AkiraXie 2021-02-01 22:28:48 +08:00
parent 68945d2f20
commit f8fb36a1f7
3 changed files with 132 additions and 2 deletions

View File

@ -9,6 +9,7 @@ import re
import sys import sys
import pkgutil import pkgutil
import importlib import importlib
from argparse import ArgumentParser
from types import ModuleType from types import ModuleType
from dataclasses import dataclass from dataclasses import dataclass
from importlib._bootstrap import _load from importlib._bootstrap import _load
@ -19,7 +20,7 @@ from nonebot.log import logger
from nonebot.matcher import Matcher from nonebot.matcher import Matcher
from nonebot.permission import Permission from nonebot.permission import Permission
from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker
from nonebot.rule import Rule, startswith, endswith, keyword, command, regex from nonebot.rule import Rule, shell_like_command, startswith, endswith, keyword, command, regex
if TYPE_CHECKING: if TYPE_CHECKING:
from nonebot.adapters import Bot, Event from nonebot.adapters import Bot, Event
@ -436,6 +437,56 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
return on_message(command(*commands) & rule, handlers=handlers, **kwargs) return on_message(command(*commands) & rule, handlers=handlers, **kwargs)
def on_shell_like_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str,
Tuple[str, ...]]]] = None,
shell_like_argsparser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器
与普通的 ``on_command`` 不同的是在添加 ``shell_like_argsparser`` 参数时, 响应器会自动处理消息,
并将 ``shell_like_argsparser`` 处理的参数保存在 ``state["args"]``
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``shell_like_argsparser:Optional[ArgumentParser]``: ``argparse.ArgumentParser`` 对象,是一个类 ``shell`` ``argsparser``
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[T_Handler]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
segment = message.pop(0)
new_message = message.__class__(
str(segment)
[len(state["_prefix"]["raw_command"]):].strip()) # type: ignore
for new_segment in reversed(new_message):
message.insert(0, new_segment)
handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
return on_message(shell_like_command(shell_like_argsparser, *commands) & rule, handlers=handlers, **kwargs)
def on_regex(pattern: str, def on_regex(pattern: str,
flags: Union[int, re.RegexFlag] = 0, flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Rule] = None, rule: Optional[Rule] = None,
@ -917,7 +968,8 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]:
m.module = name m.module = name
plugin = Plugin(name, module, _tmp_matchers.get(), _export.get()) plugin = Plugin(name, module, _tmp_matchers.get(), _export.get())
plugins[name] = plugin plugins[name] = plugin
logger.opt(colors=True).info(f'Succeeded to import "<y>{name}</y>"') logger.opt(colors=True).info(
f'Succeeded to import "<y>{name}</y>"')
return plugin return plugin
except Exception as e: except Exception as e:
logger.opt(colors=True, exception=e).error( logger.opt(colors=True, exception=e).error(

View File

@ -1,4 +1,5 @@
import re import re
from argparse import ArgumentParser
from types import ModuleType from types import ModuleType
from contextvars import ContextVar from contextvars import ContextVar
from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional
@ -146,6 +147,15 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
... ...
def on_shell_like_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str,
Tuple[str, ...]]]] = None,
shell_like_argsparser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]:
...
def on_regex(pattern: str, def on_regex(pattern: str,
flags: Union[int, re.RegexFlag] = 0, flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Rule] = ..., rule: Optional[Rule] = ...,

View File

@ -9,10 +9,12 @@
\:\:\: \:\:\:
""" """
from argparse import ArgumentParser
import re import re
import asyncio import asyncio
from itertools import product from itertools import product
from typing import Any, Dict, Union, Tuple, Optional, Callable, NoReturn, Awaitable, TYPE_CHECKING from typing import Any, Dict, Union, Tuple, Optional, Callable, NoReturn, Awaitable, TYPE_CHECKING
from loguru import logger
from pygtrie import CharTrie from pygtrie import CharTrie
@ -276,6 +278,72 @@ def command(*cmds: Union[str, Tuple[str, ...]]) -> Rule:
return Rule(_command) return Rule(_command)
def shell_like_command(shell_like_argsparser: Optional[ArgumentParser] = None, *cmds: Union[str, Tuple[str, ...]]) -> Rule:
"""
:说明:
支持 ``shell_like`` 解析参数的命令形式匹配根据配置里提供的 ``command_start``, ``command_sep`` 判断消息是否为命令
可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令``("test",)``通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本``"/test"``
添加 ``shell_like_argpsarser`` 参数后, 可以自动处理消息并将结果保存在 ``state["args"]``
:参数:
* ``shell_like_argsparser: Optional[ArgumentParser]``: ``argparse.ArgumentParser`` 对象, 是一个类 ``shell`` ``argsparser``
* ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容
:示例:
使用默认 ``command_start``, ``command_sep`` 配置
命令 ``("test",)`` 可以匹配``/test`` 开头的消息
命令 ``("test", "sub")`` 可以匹配``/test.sub`` 开头的消息
``shell_like_argsparser`` ``argument`` ``-a`` 时且 ``action`` ``store_true`` ``state["args"]["a"]`` 将会记录 ``True``
\:\:\:tip 提示
命令内容与后续消息间无需空格
\:\:\:
"""
config = get_driver().config
command_start = config.command_start
command_sep = config.command_sep
commands = list(cmds)
for index, command in enumerate(commands):
if isinstance(command, str):
commands[index] = command = (command,)
if len(command) == 1:
for start in command_start:
TrieRule.add_prefix(f"{start}{command[0]}", command)
else:
for start, sep in product(command_start, command_sep):
TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
async def _shell_like_command(bot: "Bot", event: "Event", state: T_State) -> bool:
if state["_prefix"]["command"] in commands:
if shell_like_argsparser:
message = str(event.get_message())
strip_message = message[len(
state["_prefix"]["raw_command"]):].lstrip()
try:
args = shell_like_argsparser.parse_args(
strip_message.split())
state["args"]=dict()
state["args"].update(**args.__dict__)
except:
pass
return True
else:
return False
return Rule(_shell_like_command)
def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule: def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
""" """
:说明: :说明: