From e887c3999866cf84694653b8369c9383c1e884e2 Mon Sep 17 00:00:00 2001 From: yanyongyu <42488585+yanyongyu@users.noreply.github.com> Date: Sat, 29 Jan 2022 13:56:54 +0800 Subject: [PATCH] :label: update message typing --- nonebot/adapters/_message.py | 123 ++++++++++++++++++++++------------- 1 file changed, 76 insertions(+), 47 deletions(-) diff --git a/nonebot/adapters/_message.py b/nonebot/adapters/_message.py index 57eaebaf..8232fac4 100644 --- a/nonebot/adapters/_message.py +++ b/nonebot/adapters/_message.py @@ -24,12 +24,12 @@ from typing import ( from ._template import MessageTemplate T = TypeVar("T") -TMS = TypeVar("TMS", covariant=True) +TMS = TypeVar("TMS", bound="MessageSegment") TM = TypeVar("TM", bound="Message") @dataclass -class MessageSegment(Mapping, abc.ABC, Generic[TM]): +class MessageSegment(abc.ABC, Generic[TM]): """消息段基类""" type: str @@ -54,26 +54,14 @@ class MessageSegment(Mapping, abc.ABC, Generic[TM]): def __ne__(self: T, other: T) -> bool: return not self == other - def __add__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> TM: - return self.get_message_class()(self) + other # type: ignore + def __add__(self: TMS, other: Union[str, TMS, Iterable[TMS]]) -> TM: + return self.get_message_class()(self) + other - def __radd__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> TM: - return self.get_message_class()(other) + self # type: ignore - - def __getitem__(self, key: str): - return getattr(self, key) - - def __setitem__(self, key: str, value: Any): - return setattr(self, key, value) - - def __iter__(self): - yield from asdict(self).keys() - - def __contains__(self, key: Any) -> bool: - return key in asdict(self).keys() + def __radd__(self: TMS, other: Union[str, TMS, Iterable[TMS]]) -> TM: + return self.get_message_class()(other) + self def get(self, key: str, default: Any = None): - return getattr(self, key, default) + return asdict(self).get(key, default) def keys(self): return asdict(self).keys() @@ -101,18 +89,18 @@ class Message(List[TMS], abc.ABC): """ def __init__( - self: TM, - message: Union[str, None, Mapping, Iterable[Mapping], TMS, TM, Any] = None, - *args, - **kwargs, + self, + message: Union[str, None, Iterable[TMS], TMS, Any] = None, ): - super().__init__(*args, **kwargs) + super().__init__() if message is None: return - elif isinstance(message, Message): - self.extend(message) + elif isinstance(message, str): + self.extend(self._construct(message)) elif isinstance(message, MessageSegment): self.append(message) + elif isinstance(message, Iterable): + self.extend(message) else: self.extend(self._construct(message)) @@ -154,7 +142,7 @@ class Message(List[TMS], abc.ABC): """获取消息段类型""" raise NotImplementedError - def __str__(self): + def __str__(self) -> str: return "".join(str(seg) for seg in self) @classmethod @@ -167,47 +155,79 @@ class Message(List[TMS], abc.ABC): @staticmethod @abc.abstractmethod - def _construct(msg: Union[str, Mapping, Iterable[Mapping], Any]) -> Iterable[TMS]: + def _construct(msg: Union[str, Any]) -> Iterable[TMS]: """构造消息数组""" raise NotImplementedError - def __add__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM: + def __add__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM: result = self.copy() result += other return result - def __radd__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM: - result = self.__class__(other) # type: ignore + def __radd__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM: + result = self.__class__(other) return result + self - def __iadd__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM: - if isinstance(other, MessageSegment): + def __iadd__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM: + if isinstance(other, str): + self.extend(self._construct(other)) + elif isinstance(other, MessageSegment): self.append(other) - elif isinstance(other, Message): + elif isinstance(other, Iterable): self.extend(other) else: - self.extend(self._construct(other)) + raise ValueError(f"Unsupported type: {type(other)}") return self @overload def __getitem__(self: TM, __args: str) -> TM: - ... + """ + 参数: + __args: 消息段类型 + + 返回: + 所有类型为 `__args` 的消息段 + """ @overload def __getitem__(self, __args: Tuple[str, int]) -> TMS: - ... + """ + 参数: + __args: 消息段类型和索引 + + 返回: + 类型为 `__args[0]` 的消息段第 `__args[1]` 个 + """ @overload def __getitem__(self: TM, __args: Tuple[str, slice]) -> TM: - ... + """ + 参数: + __args: 消息段类型和切片 + + 返回: + 类型为 `__args[0]` 的消息段切片 `__args[1]` + """ @overload def __getitem__(self, __args: int) -> TMS: - ... + """ + 参数: + __args: 索引 + + 返回: + 第 `__args` 个消息段 + """ @overload def __getitem__(self: TM, __args: slice) -> TM: - ... + """ + 参数: + __args: 切片 + + 返回: + 消息切片 `__args` + """ def __getitem__( self: TM, @@ -235,21 +255,25 @@ class Message(List[TMS], abc.ABC): def index(self, value: Union[TMS, str], *args) -> int: if isinstance(value, str): - first_segment = next((seg for seg in self if seg.type == value), None) # type: ignore - return super().index(first_segment, *args) # type: ignore + first_segment = next((seg for seg in self if seg.type == value), None) + if first_segment is None: + raise ValueError(f"Segment with type {value} is not in message") + return super().index(first_segment, *args) return super().index(value, *args) def get(self: TM, type_: str, count: Optional[int] = None) -> TM: if count is None: return self[type_] - iterator, filtered = (seg for seg in self if seg.type == type_), [] + iterator, filtered = ( + seg for seg in self if seg.type == type_ + ), self.__class__() for _ in range(count): seg = next(iterator, None) if seg is None: break filtered.append(seg) - return self.__class__(filtered) + return filtered def count(self, value: Union[TMS, str]) -> int: return len(self[value]) if isinstance(value, str) else super().count(value) @@ -261,7 +285,7 @@ class Message(List[TMS], abc.ABC): obj: 要添加的消息段 """ if isinstance(obj, MessageSegment): - super(Message, self).append(obj) + super().append(obj) elif isinstance(obj, str): self.extend(self._construct(obj)) else: @@ -281,10 +305,15 @@ class Message(List[TMS], abc.ABC): def copy(self: TM) -> TM: return deepcopy(self) - def extract_plain_text(self: "Message[MessageSegment]") -> str: + def extract_plain_text(self) -> str: """提取消息内纯文本消息""" return "".join(str(seg) for seg in self if seg.is_text()) -__autodoc__ = {"MessageSegment.__str__": True} +__autodoc__ = { + "MessageSegment.__str__": True, + "MessageSegment.__add__": True, + "Message.__getitem__": True, + "Message._construct": True, +}