🔀 Merge pull request #186

New shell like command support
This commit is contained in:
Ju4tCode 2021-02-02 12:20:53 +08:00 committed by GitHub
commit 0d8e51ff3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 576 additions and 7 deletions

View File

@ -78,5 +78,61 @@
"desc": "多语种翻译插件",
"name": "translator",
"repo": "Lancercmd/nonebot_plugin_translator"
},
{
"id": "nonebot_plugin_simdraw",
"link": "nonebot-plugin-simdraw",
"author": "abrahum",
"desc": "根据配置的手游卡池信息模拟抽卡",
"name": "模拟抽卡",
"repo": "abrahum/nonebot_plugin_simdraw"
},
{
"id": "nonebot-plugin-wordbank",
"link": "nonebot-plugin-wordbank",
"author": "Joenothing-lst",
"desc": "无数据库的轻量问答插件,支持模糊问答",
"name": "nonebot-plugin-wordbank",
"repo": "Joenothing-lst/word-bank"
},
{
"id": "nonebot_plugin_cooldown",
"link": "nonebot-plugin-cooldown",
"author": "jks15satoshi",
"desc": "为用户调用功能添加冷却时间(调用频率限制)功能",
"name": "冷却事件",
"repo": "jks15satoshi/nonebot-plugin-cooldown"
},
{
"id": "nonebot_plugin_mqtt",
"link": "nonebot-plugin-mqtt",
"author": "synodriver",
"desc": "接入mqtt网络,订阅和发布消息",
"name": "mqtt接入",
"repo": "synodriver/nonebot_plugin_mqtt"
},
{
"id": "nonebot-plugin-ipypreter",
"link": "nonebot-plugin-ipypreter",
"author": "iyume",
"desc": "消息交互式 python 解释器",
"name": "消息交互式 python 解释器",
"repo": "iyume/nonebot-plugin-ipypreter"
},
{
"id": "nonebot_plugin_songpicker2",
"link": "nonebot-plugin-songpicker2",
"author": "maxesisn",
"desc": "点播歌曲,支持候选菜单、热评显示,数据源为网易云",
"name": "songpicker2",
"repo": "maxesisn/nonebot_plugin_songpicker2"
},
{
"id": "nonebot_plugin_styledstr",
"link": "nonebot-plugin-styledstr",
"author": "jks15satoshi",
"desc": "通过字符串标签管理字符串资源",
"name": "风格化字符串管理",
"repo": "jks15satoshi/nonebot_plugin_styledstr"
}
]

View File

@ -40,6 +40,27 @@ sidebarDepth: 0
## _exception_ `ParserExit`
基类:`nonebot.exception.NoneBotException`
* **说明**
`shell command` 处理消息失败时返回的异常
* **参数**
* `status`
* `message`
## _exception_ `PausedException`
基类:`nonebot.exception.NoneBotException`

View File

@ -34,6 +34,9 @@ sidebarDepth: 0
* `on_command` => `nonebot.plugin.on_command`
* `on_shell_command` => `nonebot.plugin.on_shell_command`
* `on_regex` => `nonebot.plugin.on_regex`

View File

@ -507,6 +507,63 @@ def something_else():
## `on_shell_command(cmd, rule=None, aliases=None, parser=None, **kwargs)`
* **说明**
注册一个支持 `shell_like` 解析参数的命令消息事件响应器。
与普通的 `on_command` 不同的是,在添加 `parser` 参数时, 响应器会自动处理消息。
并将用户输入的原始参数列表保存在 `state["argv"]`, `parser` 处理的参数保存在 `state["args"]`
* **参数**
* `cmd: Union[str, Tuple[str, ...]]`: 指定命令内容
* `rule: Optional[Union[Rule, T_RuleChecker]]`: 事件响应规则
* `aliases: Optional[Set[Union[str, Tuple[str, ...]]]]`: 命令别名
* `parser: Optional[ArgumentParser]`: `nonebot.rule.ArgumentParser` 对象
* `permission: Optional[Permission]`: 事件响应权限
* `handlers: Optional[List[T_Handler]]`: 事件处理函数列表
* `temp: bool`: 是否为临时事件响应器(仅执行一次)
* `priority: int`: 事件响应器优先级
* `block: bool`: 是否阻止事件向更低优先级传递
* `state: Optional[T_State]`: 默认 state
* `state_factory: Optional[T_StateFactory]`: 默认 state 的工厂函数
* **返回**
* `Type[Matcher]`
## `on_regex(pattern, flags=0, rule=None, **kwargs)`
@ -623,6 +680,32 @@ def something_else():
### `shell_command(cmd, **kwargs)`
* **说明**
注册一个新的命令。
* **参数**
* `cmd: Union[str, Tuple[str, ...]]`: 命令前缀
* `**kwargs`: 其他传递给 `on_command` 的参数,将会覆盖命令组默认值
* **返回**
* `Type[Matcher]`
## _class_ `MatcherGroup`
基类:`object`
@ -1070,6 +1153,63 @@ def something_else():
### `on_shell_command(cmd, rule=None, aliases=None, parser=None, **kwargs)`
* **说明**
注册一个支持 `shell_like` 解析参数的命令消息事件响应器。
与普通的 `on_command` 不同的是,在添加 `parser` 参数时, 响应器会自动处理消息。
并将用户输入的原始参数列表保存在 `state["argv"]`, `parser` 处理的参数保存在 `state["args"]`
* **参数**
* `cmd: Union[str, Tuple[str, ...]]`: 指定命令内容
* `rule: Optional[Union[Rule, T_RuleChecker]]`: 事件响应规则
* `aliases: Optional[Set[Union[str, Tuple[str, ...]]]]`: 命令别名
* `parser: Optional[ArgumentParser]`: `nonebot.rule.ArgumentParser` 对象
* `permission: Optional[Permission]`: 事件响应权限
* `handlers: Optional[List[T_Handler]]`: 事件处理函数列表
* `temp: bool`: 是否为临时事件响应器(仅执行一次)
* `priority: int`: 事件响应器优先级
* `block: bool`: 是否阻止事件向更低优先级传递
* `state: Optional[T_State]`: 默认 state
* `state_factory: Optional[T_StateFactory]`: 默认 state 的工厂函数
* **返回**
* `Type[Matcher]`
### `on_regex(pattern, flags=0, rule=None, **kwargs)`

View File

@ -170,6 +170,61 @@ Rule(async_function, run_sync(sync_function))
:::
## _class_ `ArgumentParser`
基类:`argparse.ArgumentParser`
* **说明**
`shell_like` 命令参数解析器,解析出错时不会退出程序。
## `shell_command(*cmds, parser=None)`
* **说明**
支持 `shell_like` 解析参数的命令形式匹配,根据配置里提供的 `command_start`, `command_sep` 判断消息是否为命令。
可以通过 `state["_prefix"]["command"]` 获取匹配成功的命令(例:`("test",)`),通过 `state["_prefix"]["raw_command"]` 获取匹配成功的原始命令文本(例:`"/test"`)。
可以通过 `state["argv"]` 获取用户输入的原始参数列表
添加 `parser` 参数后, 可以自动处理消息并将结果保存在 `state["args"]` 中。
* **参数**
* `*cmds: Union[str, Tuple[str, ...]]`: 命令内容
* `parser: Optional[ArgumentParser]`: `nonebot.rule.ArgumentParser` 对象
* **示例**
使用默认 `command_start`, `command_sep` 配置,更多示例参考 `argparse` 标准库文档。
```python
from nonebot.rule import ArgumentParser
parser = ArgumentParser()
parser.add_argument("-a", type=bool)
rule = shell_command("ls", parser=parser)
```
:::tip 提示
命令内容与后续消息间无需空格!
:::
## `regex(regex, flags=0)`

View File

@ -12,6 +12,7 @@
- ``on_endswith`` => ``nonebot.plugin.on_endswith``
- ``on_keyword`` => ``nonebot.plugin.on_keyword``
- ``on_command`` => ``nonebot.plugin.on_command``
- ``on_shell_command`` => ``nonebot.plugin.on_shell_command``
- ``on_regex`` => ``nonebot.plugin.on_regex``
- ``CommandGroup`` => ``nonebot.plugin.CommandGroup``
- ``Matchergroup`` => ``nonebot.plugin.MatcherGroup``
@ -219,6 +220,6 @@ def run(host: Optional[str] = None,
from nonebot.plugin import on_message, on_notice, on_request, on_metaevent, CommandGroup, MatcherGroup
from nonebot.plugin import on_startswith, on_endswith, on_keyword, on_command, on_regex
from nonebot.plugin import on_startswith, on_endswith, on_keyword, on_command, on_shell_command, on_regex
from nonebot.plugin import load_plugin, load_plugins, load_builtin_plugins
from nonebot.plugin import export, require, get_plugin, get_loaded_plugins

View File

@ -37,6 +37,23 @@ class IgnoredException(NoneBotException):
return self.__repr__()
class ParserExit(NoneBotException):
"""
:说明:
``shell command`` 处理消息失败时返回的异常
:参数:
* ``status``
* ``message``
"""
def __init__(self, status=0, message=None):
self.status = status
self.message = message
class PausedException(NoneBotException):
"""
:说明:

View File

@ -19,7 +19,7 @@ from nonebot.log import logger
from nonebot.matcher import Matcher
from nonebot.permission import Permission
from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker
from nonebot.rule import Rule, startswith, endswith, keyword, command, regex
from nonebot.rule import Rule, startswith, endswith, keyword, command, shell_command, ArgumentParser, regex
if TYPE_CHECKING:
from nonebot.adapters import Bot, Event
@ -436,6 +436,57 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
return on_message(command(*commands) & rule, handlers=handlers, **kwargs)
def on_shell_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
parser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器
与普通的 ``on_command`` 不同的是在添加 ``parser`` 参数时, 响应器会自动处理消息
并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]``
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[T_Handler]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
segment = message.pop(0)
new_message = message.__class__(
str(segment)
[len(state["_prefix"]["raw_command"]):].strip()) # type: ignore
for new_segment in reversed(new_message):
message.insert(0, new_segment)
handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
return on_message(shell_command(*commands, parser=parser) & rule,
handlers=handlers,
**kwargs)
def on_regex(pattern: str,
flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Rule] = None,
@ -513,6 +564,29 @@ class CommandGroup:
final_kwargs.update(kwargs)
return on_command(cmd, **final_kwargs)
def shell_command(self, cmd: Union[str, Tuple[str, ...]],
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个新的命令
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 命令前缀
* ``**kwargs``: 其他传递给 ``on_command`` 的参数将会覆盖命令组默认值
:返回:
- ``Type[Matcher]``
"""
sub_cmd = (cmd,) if isinstance(cmd, str) else cmd
cmd = self.basecmd + sub_cmd
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
return on_shell_command(cmd, **final_kwargs)
class MatcherGroup:
"""事件响应器组合,统一管理。为 ``Matcher`` 创建提供默认属性。"""
@ -800,6 +874,59 @@ class MatcherGroup:
handlers=handlers,
**kwargs)
def on_shell_command(self,
cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, Tuple[str,
...]]]] = None,
parser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]:
"""
:说明:
注册一个支持 ``shell_like`` 解析参数的命令消息事件响应器
与普通的 ``on_command`` 不同的是在添加 ``parser`` 参数时, 响应器会自动处理消息
并将用户输入的原始参数列表保存在 ``state["argv"]``, ``parser`` 处理的参数保存在 ``state["args"]``
:参数:
* ``cmd: Union[str, Tuple[str, ...]]``: 指定命令内容
* ``rule: Optional[Union[Rule, T_RuleChecker]]``: 事件响应规则
* ``aliases: Optional[Set[Union[str, Tuple[str, ...]]]]``: 命令别名
* ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
* ``permission: Optional[Permission]``: 事件响应权限
* ``handlers: Optional[List[T_Handler]]``: 事件处理函数列表
* ``temp: bool``: 是否为临时事件响应器仅执行一次
* ``priority: int``: 事件响应器优先级
* ``block: bool``: 是否阻止事件向更低优先级传递
* ``state: Optional[T_State]``: 默认 state
* ``state_factory: Optional[T_StateFactory]``: 默认 state 的工厂函数
:返回:
- ``Type[Matcher]``
"""
async def _strip_cmd(bot: "Bot", event: "Event", state: T_State):
message = event.get_message()
segment = message.pop(0)
new_message = message.__class__(
str(segment)
[len(state["_prefix"]["raw_command"]):].strip()) # type: ignore
for new_segment in reversed(new_message):
message.insert(0, new_segment)
handlers = kwargs.pop("handlers", [])
handlers.insert(0, _strip_cmd)
commands = set([cmd]) | (aliases or set())
return self.on_message(rule=shell_command(*commands, parser=parser) &
rule,
handlers=handlers,
**kwargs)
def on_regex(self,
pattern: str,
flags: Union[int, re.RegexFlag] = 0,

View File

@ -3,9 +3,9 @@ from types import ModuleType
from contextvars import ContextVar
from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional
from nonebot.rule import Rule
from nonebot.matcher import Matcher
from nonebot.permission import Permission
from nonebot.rule import Rule, ArgumentParser
from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker
plugins: Dict[str, "Plugin"] = ...
@ -146,6 +146,14 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
...
def on_shell_command(cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = None,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = None,
parser: Optional[ArgumentParser] = None,
**kwargs) -> Type[Matcher]:
...
def on_regex(pattern: str,
flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Rule] = ...,
@ -217,6 +225,22 @@ class CommandGroup:
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def shell_command(
self,
cmd: Union[str, Tuple[str, ...]],
*,
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = ...,
parser: Optional[ArgumentParser] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
class MatcherGroup:
@ -351,6 +375,22 @@ class MatcherGroup:
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_shell_command(
self,
*,
cmd: Union[str, Tuple[str, ...]],
rule: Optional[Union[Rule, T_RuleChecker]] = ...,
aliases: Optional[Set[Union[str, Tuple[str, ...]]]] = ...,
parser: Optional[ArgumentParser] = ...,
permission: Optional[Permission] = ...,
handlers: Optional[List[T_Handler]] = ...,
temp: bool = ...,
priority: int = ...,
block: bool = ...,
state: Optional[T_State] = ...,
state_factory: Optional[T_StateFactory] = ...) -> Type[Matcher]:
...
def on_regex(
self,
*,

View File

@ -10,15 +10,18 @@
"""
import re
import shlex
import asyncio
from itertools import product
from typing import Any, Dict, Union, Tuple, Optional, Callable, NoReturn, Awaitable, TYPE_CHECKING
from argparse import Namespace, ArgumentParser as ArgParser
from typing import Any, Dict, Union, Tuple, Optional, Callable, Sequence, NoReturn, Awaitable, TYPE_CHECKING
from pygtrie import CharTrie
from nonebot import get_driver
from nonebot.log import logger
from nonebot.utils import run_sync
from nonebot.exception import ParserExit
from nonebot.typing import T_State, T_RuleChecker
if TYPE_CHECKING:
@ -276,6 +279,100 @@ def command(*cmds: Union[str, Tuple[str, ...]]) -> Rule:
return Rule(_command)
class ArgumentParser(ArgParser):
"""
:说明:
``shell_like`` 命令参数解析器解析出错时不会退出程序
"""
def _print_message(self, message, file=None):
pass
def exit(self, status=0, message=None):
raise ParserExit(status=status, message=message)
def parse_args(self,
args: Optional[Sequence[str]] = None,
namespace: Optional[Namespace] = None) -> Namespace:
return super().parse_args(args=args,
namespace=namespace) # type: ignore
def shell_command(*cmds: Union[str, Tuple[str, ...]],
parser: Optional[ArgumentParser] = None) -> Rule:
"""
:说明:
支持 ``shell_like`` 解析参数的命令形式匹配根据配置里提供的 ``command_start``, ``command_sep`` 判断消息是否为命令
可以通过 ``state["_prefix"]["command"]`` 获取匹配成功的命令``("test",)``通过 ``state["_prefix"]["raw_command"]`` 获取匹配成功的原始命令文本``"/test"``
可以通过 ``state["argv"]`` 获取用户输入的原始参数列表
添加 ``parser`` 参数后, 可以自动处理消息并将结果保存在 ``state["args"]``
:参数:
* ``*cmds: Union[str, Tuple[str, ...]]``: 命令内容
* ``parser: Optional[ArgumentParser]``: ``nonebot.rule.ArgumentParser`` 对象
:示例:
使用默认 ``command_start``, ``command_sep`` 配置更多示例参考 ``argparse`` 标准库文档
.. code-block:: python
from nonebot.rule import ArgumentParser
parser = ArgumentParser()
parser.add_argument("-a", type=bool)
rule = shell_command("ls", parser=parser)
\:\:\:tip 提示
命令内容与后续消息间无需空格
\:\:\:
"""
if not isinstance(parser, ArgumentParser):
raise TypeError(
"`parser` must be an instance of nonebot.rule.ArgumentParser")
config = get_driver().config
command_start = config.command_start
command_sep = config.command_sep
commands = list(cmds)
for index, command in enumerate(commands):
if isinstance(command, str):
commands[index] = command = (command,)
if len(command) == 1:
for start in command_start:
TrieRule.add_prefix(f"{start}{command[0]}", command)
else:
for start, sep in product(command_start, command_sep):
TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
async def _shell_command(bot: "Bot", event: "Event",
state: T_State) -> bool:
if state["_prefix"]["command"] in commands:
message = str(event.get_message())
strip_message = message[len(state["_prefix"]["raw_command"]
):].lstrip()
state["argv"] = shlex.split(strip_message)
if parser:
try:
args = parser.parse_args(state["argv"])
state["args"] = args
except ParserExit as e:
state["args"] = e
return True
else:
return False
return Rule(_shell_command)
def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
"""
:说明:
@ -307,9 +404,6 @@ def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
state["_matched_dict"] = matched.groupdict()
return True
else:
state["_matched"] = None
state["_matched_groups"] = None
state["_matched_dict"] = None
return False
return Rule(_regex)

View File

@ -0,0 +1,15 @@
from nonebot.adapters import Bot
from nonebot.typing import T_State
from nonebot import on_shell_command
from nonebot.rule import to_me, ArgumentParser
parser = ArgumentParser()
parser.add_argument("-a", type=bool)
shell = on_shell_command("ls", to_me(), parser=parser)
@shell.handle()
async def _(bot: Bot, state: T_State):
print(state["argv"])
print(state["args"])