mirror of
https://github.com/nonebot/nonebot2.git
synced 2024-11-27 18:45:05 +08:00
🏷️ update message typing
This commit is contained in:
parent
4c2269f789
commit
e887c39998
@ -24,12 +24,12 @@ from typing import (
|
||||
from ._template import MessageTemplate
|
||||
|
||||
T = TypeVar("T")
|
||||
TMS = TypeVar("TMS", covariant=True)
|
||||
TMS = TypeVar("TMS", bound="MessageSegment")
|
||||
TM = TypeVar("TM", bound="Message")
|
||||
|
||||
|
||||
@dataclass
|
||||
class MessageSegment(Mapping, abc.ABC, Generic[TM]):
|
||||
class MessageSegment(abc.ABC, Generic[TM]):
|
||||
"""消息段基类"""
|
||||
|
||||
type: str
|
||||
@ -54,26 +54,14 @@ class MessageSegment(Mapping, abc.ABC, Generic[TM]):
|
||||
def __ne__(self: T, other: T) -> bool:
|
||||
return not self == other
|
||||
|
||||
def __add__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> TM:
|
||||
return self.get_message_class()(self) + other # type: ignore
|
||||
def __add__(self: TMS, other: Union[str, TMS, Iterable[TMS]]) -> TM:
|
||||
return self.get_message_class()(self) + other
|
||||
|
||||
def __radd__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> TM:
|
||||
return self.get_message_class()(other) + self # type: ignore
|
||||
|
||||
def __getitem__(self, key: str):
|
||||
return getattr(self, key)
|
||||
|
||||
def __setitem__(self, key: str, value: Any):
|
||||
return setattr(self, key, value)
|
||||
|
||||
def __iter__(self):
|
||||
yield from asdict(self).keys()
|
||||
|
||||
def __contains__(self, key: Any) -> bool:
|
||||
return key in asdict(self).keys()
|
||||
def __radd__(self: TMS, other: Union[str, TMS, Iterable[TMS]]) -> TM:
|
||||
return self.get_message_class()(other) + self
|
||||
|
||||
def get(self, key: str, default: Any = None):
|
||||
return getattr(self, key, default)
|
||||
return asdict(self).get(key, default)
|
||||
|
||||
def keys(self):
|
||||
return asdict(self).keys()
|
||||
@ -101,18 +89,18 @@ class Message(List[TMS], abc.ABC):
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self: TM,
|
||||
message: Union[str, None, Mapping, Iterable[Mapping], TMS, TM, Any] = None,
|
||||
*args,
|
||||
**kwargs,
|
||||
self,
|
||||
message: Union[str, None, Iterable[TMS], TMS, Any] = None,
|
||||
):
|
||||
super().__init__(*args, **kwargs)
|
||||
super().__init__()
|
||||
if message is None:
|
||||
return
|
||||
elif isinstance(message, Message):
|
||||
self.extend(message)
|
||||
elif isinstance(message, str):
|
||||
self.extend(self._construct(message))
|
||||
elif isinstance(message, MessageSegment):
|
||||
self.append(message)
|
||||
elif isinstance(message, Iterable):
|
||||
self.extend(message)
|
||||
else:
|
||||
self.extend(self._construct(message))
|
||||
|
||||
@ -154,7 +142,7 @@ class Message(List[TMS], abc.ABC):
|
||||
"""获取消息段类型"""
|
||||
raise NotImplementedError
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return "".join(str(seg) for seg in self)
|
||||
|
||||
@classmethod
|
||||
@ -167,47 +155,79 @@ class Message(List[TMS], abc.ABC):
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def _construct(msg: Union[str, Mapping, Iterable[Mapping], Any]) -> Iterable[TMS]:
|
||||
def _construct(msg: Union[str, Any]) -> Iterable[TMS]:
|
||||
"""构造消息数组"""
|
||||
raise NotImplementedError
|
||||
|
||||
def __add__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM:
|
||||
def __add__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM:
|
||||
result = self.copy()
|
||||
result += other
|
||||
return result
|
||||
|
||||
def __radd__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM:
|
||||
result = self.__class__(other) # type: ignore
|
||||
def __radd__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM:
|
||||
result = self.__class__(other)
|
||||
return result + self
|
||||
|
||||
def __iadd__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM:
|
||||
if isinstance(other, MessageSegment):
|
||||
def __iadd__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM:
|
||||
if isinstance(other, str):
|
||||
self.extend(self._construct(other))
|
||||
elif isinstance(other, MessageSegment):
|
||||
self.append(other)
|
||||
elif isinstance(other, Message):
|
||||
elif isinstance(other, Iterable):
|
||||
self.extend(other)
|
||||
else:
|
||||
self.extend(self._construct(other))
|
||||
raise ValueError(f"Unsupported type: {type(other)}")
|
||||
return self
|
||||
|
||||
@overload
|
||||
def __getitem__(self: TM, __args: str) -> TM:
|
||||
...
|
||||
"""
|
||||
参数:
|
||||
__args: 消息段类型
|
||||
|
||||
返回:
|
||||
所有类型为 `__args` 的消息段
|
||||
"""
|
||||
|
||||
@overload
|
||||
def __getitem__(self, __args: Tuple[str, int]) -> TMS:
|
||||
...
|
||||
"""
|
||||
参数:
|
||||
__args: 消息段类型和索引
|
||||
|
||||
返回:
|
||||
类型为 `__args[0]` 的消息段第 `__args[1]` 个
|
||||
"""
|
||||
|
||||
@overload
|
||||
def __getitem__(self: TM, __args: Tuple[str, slice]) -> TM:
|
||||
...
|
||||
"""
|
||||
参数:
|
||||
__args: 消息段类型和切片
|
||||
|
||||
返回:
|
||||
类型为 `__args[0]` 的消息段切片 `__args[1]`
|
||||
"""
|
||||
|
||||
@overload
|
||||
def __getitem__(self, __args: int) -> TMS:
|
||||
...
|
||||
"""
|
||||
参数:
|
||||
__args: 索引
|
||||
|
||||
返回:
|
||||
第 `__args` 个消息段
|
||||
"""
|
||||
|
||||
@overload
|
||||
def __getitem__(self: TM, __args: slice) -> TM:
|
||||
...
|
||||
"""
|
||||
参数:
|
||||
__args: 切片
|
||||
|
||||
返回:
|
||||
消息切片 `__args`
|
||||
"""
|
||||
|
||||
def __getitem__(
|
||||
self: TM,
|
||||
@ -235,21 +255,25 @@ class Message(List[TMS], abc.ABC):
|
||||
|
||||
def index(self, value: Union[TMS, str], *args) -> int:
|
||||
if isinstance(value, str):
|
||||
first_segment = next((seg for seg in self if seg.type == value), None) # type: ignore
|
||||
return super().index(first_segment, *args) # type: ignore
|
||||
first_segment = next((seg for seg in self if seg.type == value), None)
|
||||
if first_segment is None:
|
||||
raise ValueError(f"Segment with type {value} is not in message")
|
||||
return super().index(first_segment, *args)
|
||||
return super().index(value, *args)
|
||||
|
||||
def get(self: TM, type_: str, count: Optional[int] = None) -> TM:
|
||||
if count is None:
|
||||
return self[type_]
|
||||
|
||||
iterator, filtered = (seg for seg in self if seg.type == type_), []
|
||||
iterator, filtered = (
|
||||
seg for seg in self if seg.type == type_
|
||||
), self.__class__()
|
||||
for _ in range(count):
|
||||
seg = next(iterator, None)
|
||||
if seg is None:
|
||||
break
|
||||
filtered.append(seg)
|
||||
return self.__class__(filtered)
|
||||
return filtered
|
||||
|
||||
def count(self, value: Union[TMS, str]) -> int:
|
||||
return len(self[value]) if isinstance(value, str) else super().count(value)
|
||||
@ -261,7 +285,7 @@ class Message(List[TMS], abc.ABC):
|
||||
obj: 要添加的消息段
|
||||
"""
|
||||
if isinstance(obj, MessageSegment):
|
||||
super(Message, self).append(obj)
|
||||
super().append(obj)
|
||||
elif isinstance(obj, str):
|
||||
self.extend(self._construct(obj))
|
||||
else:
|
||||
@ -281,10 +305,15 @@ class Message(List[TMS], abc.ABC):
|
||||
def copy(self: TM) -> TM:
|
||||
return deepcopy(self)
|
||||
|
||||
def extract_plain_text(self: "Message[MessageSegment]") -> str:
|
||||
def extract_plain_text(self) -> str:
|
||||
"""提取消息内纯文本消息"""
|
||||
|
||||
return "".join(str(seg) for seg in self if seg.is_text())
|
||||
|
||||
|
||||
__autodoc__ = {"MessageSegment.__str__": True}
|
||||
__autodoc__ = {
|
||||
"MessageSegment.__str__": True,
|
||||
"MessageSegment.__add__": True,
|
||||
"Message.__getitem__": True,
|
||||
"Message._construct": True,
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user