From d4f344d2475041ed403d2f9ab512e1f77ce1d49f Mon Sep 17 00:00:00 2001 From: yanyongyu Date: Wed, 30 Dec 2020 12:42:10 +0800 Subject: [PATCH] :wheelchair: improve command --- nonebot/adapters/__init__.py | 260 ++++++++++++++++++----------------- nonebot/plugin.py | 18 ++- 2 files changed, 144 insertions(+), 134 deletions(-) diff --git a/nonebot/adapters/__init__.py b/nonebot/adapters/__init__.py index 22c6f587..84ca6237 100644 --- a/nonebot/adapters/__init__.py +++ b/nonebot/adapters/__init__.py @@ -14,6 +14,7 @@ from typing import Any, Dict, Union, TypeVar, Optional, Callable, Iterable, Awai from pydantic import BaseModel from nonebot.config import Config +from nonebot.utils import DataclassEncoder if TYPE_CHECKING: from nonebot.drivers import Driver, WebSocket @@ -138,135 +139,6 @@ class Bot(abc.ABC): raise NotImplementedError -class Event(abc.ABC, BaseModel): - """Event 基类。提供获取关键信息的方法,其余信息可直接获取。""" - - class Config: - extra = "allow" - - @abc.abstractmethod - def get_type(self) -> Literal["message", "notice", "request", "meta_event"]: - """ - :说明: - - 获取事件类型的方法,类型通常为 NoneBot 内置的四种类型。 - - :返回: - - * ``Literal["message", "notice", "request", "meta_event"]`` - """ - raise NotImplementedError - - @abc.abstractmethod - def get_event_name(self) -> str: - """ - :说明: - - 获取事件名称的方法。 - - :返回: - - * ``str`` - """ - raise NotImplementedError - - @abc.abstractmethod - def get_event_description(self) -> str: - """ - :说明: - - 获取事件描述的方法,通常为事件具体内容。 - - :返回: - - * ``str`` - """ - raise NotImplementedError - - def __str__(self) -> str: - return f"[{self.get_event_name()}]: {self.get_event_description()}" - - def get_log_string(self) -> str: - """ - :说明: - - 获取事件日志信息的方法,通常你不需要修改这个方法,只有当希望 NoneBot 隐藏该事件日志时,可以抛出 ``NoLogException`` 异常。 - - :返回: - - * ``str`` - - :异常: - - - ``NoLogException`` - """ - return f"[{self.get_event_name()}]: {self.get_event_description()}" - - @abc.abstractmethod - def get_user_id(self) -> str: - """ - :说明: - - 获取事件主体 id 的方法,通常是用户 id 。 - - :返回: - - * ``str`` - """ - raise NotImplementedError - - @abc.abstractmethod - def get_session_id(self) -> str: - """ - :说明: - - 获取会话 id 的方法,用于判断当前事件属于哪一个会话,通常是用户 id、群组 id 组合。 - - :返回: - - * ``str`` - """ - raise NotImplementedError - - @abc.abstractmethod - def get_message(self) -> "Message": - """ - :说明: - - 获取事件消息内容的方法。 - - :返回: - - * ``Message`` - """ - raise NotImplementedError - - def get_plaintext(self) -> str: - """ - :说明: - - 获取消息纯文本的方法,通常不需要修改,默认通过 ``get_message().extract_plain_text`` 获取。 - - :返回: - - * ``str`` - """ - return self.get_message().extract_plain_text() - - @abc.abstractmethod - def is_tome(self) -> bool: - """ - :说明: - - 获取事件是否与机器人有关的方法。 - - :返回: - - * ``bool`` - """ - raise NotImplementedError - - T_Message = TypeVar("T_Message", bound="Message") T_MessageSegment = TypeVar("T_MessageSegment", bound="MessageSegment") @@ -446,3 +318,133 @@ class Message(list, abc.ABC): plain_text = reduce(_concat, self, "") return plain_text[1:] if plain_text else plain_text + + +class Event(abc.ABC, BaseModel): + """Event 基类。提供获取关键信息的方法,其余信息可直接获取。""" + + class Config: + extra = "allow" + json_encoders = {Message: DataclassEncoder} + + @abc.abstractmethod + def get_type(self) -> Literal["message", "notice", "request", "meta_event"]: + """ + :说明: + + 获取事件类型的方法,类型通常为 NoneBot 内置的四种类型。 + + :返回: + + * ``Literal["message", "notice", "request", "meta_event"]`` + """ + raise NotImplementedError + + @abc.abstractmethod + def get_event_name(self) -> str: + """ + :说明: + + 获取事件名称的方法。 + + :返回: + + * ``str`` + """ + raise NotImplementedError + + @abc.abstractmethod + def get_event_description(self) -> str: + """ + :说明: + + 获取事件描述的方法,通常为事件具体内容。 + + :返回: + + * ``str`` + """ + raise NotImplementedError + + def __str__(self) -> str: + return f"[{self.get_event_name()}]: {self.get_event_description()}" + + def get_log_string(self) -> str: + """ + :说明: + + 获取事件日志信息的方法,通常你不需要修改这个方法,只有当希望 NoneBot 隐藏该事件日志时,可以抛出 ``NoLogException`` 异常。 + + :返回: + + * ``str`` + + :异常: + + - ``NoLogException`` + """ + return f"[{self.get_event_name()}]: {self.get_event_description()}" + + @abc.abstractmethod + def get_user_id(self) -> str: + """ + :说明: + + 获取事件主体 id 的方法,通常是用户 id 。 + + :返回: + + * ``str`` + """ + raise NotImplementedError + + @abc.abstractmethod + def get_session_id(self) -> str: + """ + :说明: + + 获取会话 id 的方法,用于判断当前事件属于哪一个会话,通常是用户 id、群组 id 组合。 + + :返回: + + * ``str`` + """ + raise NotImplementedError + + @abc.abstractmethod + def get_message(self) -> "Message": + """ + :说明: + + 获取事件消息内容的方法。 + + :返回: + + * ``Message`` + """ + raise NotImplementedError + + def get_plaintext(self) -> str: + """ + :说明: + + 获取消息纯文本的方法,通常不需要修改,默认通过 ``get_message().extract_plain_text`` 获取。 + + :返回: + + * ``str`` + """ + return self.get_message().extract_plain_text() + + @abc.abstractmethod + def is_tome(self) -> bool: + """ + :说明: + + 获取事件是否与机器人有关的方法。 + + :返回: + + * ``bool`` + """ + raise NotImplementedError diff --git a/nonebot/plugin.py b/nonebot/plugin.py index f6bc57e4..7aa46941 100644 --- a/nonebot/plugin.py +++ b/nonebot/plugin.py @@ -13,7 +13,7 @@ from types import ModuleType from dataclasses import dataclass from importlib._bootstrap import _load from contextvars import Context, ContextVar, copy_context -from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional +from typing import Any, Set, List, Dict, Type, Tuple, Union, Optional, TYPE_CHECKING from nonebot.log import logger from nonebot.matcher import Matcher @@ -21,6 +21,9 @@ from nonebot.permission import Permission from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker from nonebot.rule import Rule, startswith, endswith, keyword, command, regex +if TYPE_CHECKING: + from nonebot.adapters import Bot, Event + plugins: Dict[str, "Plugin"] = {} """ :类型: ``Dict[str, Plugin]`` @@ -417,10 +420,15 @@ def on_command(cmd: Union[str, Tuple[str, ...]], - ``Type[Matcher]`` """ - async def _strip_cmd(bot, event, state: T_State): - message = event.message - event.message = message.__class__( - str(message)[len(state["_prefix"]["raw_command"]):].strip()) + async def _strip_cmd(bot: "Bot", event: "Event", state: T_State): + print(event.dict()) + message = event.get_message() + segment = message.pop(0) + new_message = message.__class__( + str(segment) + [len(state["_prefix"]["raw_command"]):].strip()) # type: ignore + for new_segment in reversed(new_message): + message.insert(0, new_segment) handlers = kwargs.pop("handlers", []) handlers.insert(0, _strip_cmd)