🔀 Merge pull request #752

Fix: rewrite message typing and construct
This commit is contained in:
Ju4tCode 2022-01-30 13:06:54 +08:00 committed by GitHub
commit bdf2f4771b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 236 additions and 61 deletions

View File

@ -14,27 +14,28 @@ from typing import (
Tuple, Tuple,
Union, Union,
Generic, Generic,
Mapping,
TypeVar, TypeVar,
Iterable, Iterable,
Optional, Optional,
overload, overload,
) )
from pydantic import parse_obj_as
from ._template import MessageTemplate from ._template import MessageTemplate
T = TypeVar("T") T = TypeVar("T")
TMS = TypeVar("TMS", covariant=True) TMS = TypeVar("TMS", bound="MessageSegment")
TM = TypeVar("TM", bound="Message") TM = TypeVar("TM", bound="Message")
@dataclass @dataclass
class MessageSegment(Mapping, abc.ABC, Generic[TM]): class MessageSegment(abc.ABC, Generic[TM]):
"""消息段基类""" """消息段基类"""
type: str type: str
"""消息段类型""" """消息段类型"""
data: Dict[str, Any] = field(default_factory=lambda: {}) data: Dict[str, Any] = field(default_factory=dict)
"""消息段数据""" """消息段数据"""
@classmethod @classmethod
@ -54,26 +55,26 @@ class MessageSegment(Mapping, abc.ABC, Generic[TM]):
def __ne__(self: T, other: T) -> bool: def __ne__(self: T, other: T) -> bool:
return not self == other return not self == other
def __add__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> TM: def __add__(self: TMS, other: Union[str, TMS, Iterable[TMS]]) -> TM:
return self.get_message_class()(self) + other # type: ignore return self.get_message_class()(self) + other
def __radd__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> TM: def __radd__(self: TMS, other: Union[str, TMS, Iterable[TMS]]) -> TM:
return self.get_message_class()(other) + self # type: ignore return self.get_message_class()(other) + self
def __getitem__(self, key: str): @classmethod
return getattr(self, key) def __get_validators__(cls):
yield cls._validate
def __setitem__(self, key: str, value: Any): @classmethod
return setattr(self, key, value) def _validate(cls, value):
if isinstance(value, cls):
def __iter__(self): return value
yield from asdict(self).keys() if not isinstance(value, dict):
raise ValueError(f"Expected dict for MessageSegment, got {type(value)}")
def __contains__(self, key: Any) -> bool: return cls(**value)
return key in asdict(self).keys()
def get(self, key: str, default: Any = None): def get(self, key: str, default: Any = None):
return getattr(self, key, default) return asdict(self).get(key, default)
def keys(self): def keys(self):
return asdict(self).keys() return asdict(self).keys()
@ -101,20 +102,20 @@ class Message(List[TMS], abc.ABC):
""" """
def __init__( def __init__(
self: TM, self,
message: Union[str, None, Mapping, Iterable[Mapping], TMS, TM, Any] = None, message: Union[str, None, Iterable[TMS], TMS] = None,
*args,
**kwargs,
): ):
super().__init__(*args, **kwargs) super().__init__()
if message is None: if message is None:
return return
elif isinstance(message, Message): elif isinstance(message, str):
self.extend(message) self.extend(self._construct(message))
elif isinstance(message, MessageSegment): elif isinstance(message, MessageSegment):
self.append(message) self.append(message)
elif isinstance(message, Iterable):
self.extend(message)
else: else:
self.extend(self._construct(message)) self.extend(self._construct(message)) # pragma: no cover
@classmethod @classmethod
def template(cls: Type[TM], format_string: Union[str, TM]) -> MessageTemplate[TM]: def template(cls: Type[TM], format_string: Union[str, TM]) -> MessageTemplate[TM]:
@ -154,7 +155,7 @@ class Message(List[TMS], abc.ABC):
"""获取消息段类型""" """获取消息段类型"""
raise NotImplementedError raise NotImplementedError
def __str__(self): def __str__(self) -> str:
return "".join(str(seg) for seg in self) return "".join(str(seg) for seg in self)
@classmethod @classmethod
@ -163,51 +164,97 @@ class Message(List[TMS], abc.ABC):
@classmethod @classmethod
def _validate(cls, value): def _validate(cls, value):
if isinstance(value, cls):
return value
elif isinstance(value, Message):
raise ValueError(f"Type {type(value)} can not be converted to {cls}")
elif isinstance(value, str):
pass
elif isinstance(value, dict):
value = parse_obj_as(cls.get_segment_class(), value)
elif isinstance(value, Iterable):
value = [parse_obj_as(cls.get_segment_class(), v) for v in value]
else:
raise ValueError(
f"Expected str, dict or iterable for Message, got {type(value)}"
)
return cls(value) return cls(value)
@staticmethod @staticmethod
@abc.abstractmethod @abc.abstractmethod
def _construct(msg: Union[str, Mapping, Iterable[Mapping], Any]) -> Iterable[TMS]: def _construct(msg: str) -> Iterable[TMS]:
"""构造消息数组""" """构造消息数组"""
raise NotImplementedError raise NotImplementedError
def __add__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM: def __add__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM:
result = self.copy() result = self.copy()
result += other result += other
return result return result
def __radd__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM: def __radd__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM:
result = self.__class__(other) # type: ignore result = self.__class__(other)
return result + self return result + self
def __iadd__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM: def __iadd__(self: TM, other: Union[str, TMS, Iterable[TMS]]) -> TM:
if isinstance(other, MessageSegment): if isinstance(other, str):
self.extend(self._construct(other))
elif isinstance(other, MessageSegment):
self.append(other) self.append(other)
elif isinstance(other, Message): elif isinstance(other, Iterable):
self.extend(other) self.extend(other)
else: else:
self.extend(self._construct(other)) raise ValueError(f"Unsupported type: {type(other)}") # pragma: no cover
return self return self
@overload @overload
def __getitem__(self: TM, __args: str) -> TM: def __getitem__(self: TM, __args: str) -> TM:
... """
参数:
__args: 消息段类型
返回:
所有类型为 `__args` 的消息段
"""
@overload @overload
def __getitem__(self, __args: Tuple[str, int]) -> TMS: def __getitem__(self, __args: Tuple[str, int]) -> TMS:
... """
参数:
__args: 消息段类型和索引
返回:
类型为 `__args[0]` 的消息段第 `__args[1]`
"""
@overload @overload
def __getitem__(self: TM, __args: Tuple[str, slice]) -> TM: def __getitem__(self: TM, __args: Tuple[str, slice]) -> TM:
... """
参数:
__args: 消息段类型和切片
返回:
类型为 `__args[0]` 的消息段切片 `__args[1]`
"""
@overload @overload
def __getitem__(self, __args: int) -> TMS: def __getitem__(self, __args: int) -> TMS:
... """
参数:
__args: 索引
返回:
`__args` 个消息段
"""
@overload @overload
def __getitem__(self: TM, __args: slice) -> TM: def __getitem__(self: TM, __args: slice) -> TM:
... """
参数:
__args: 切片
返回:
消息切片 `__args`
"""
def __getitem__( def __getitem__(
self: TM, self: TM,
@ -231,25 +278,29 @@ class Message(List[TMS], abc.ABC):
elif isinstance(arg1, str) and isinstance(arg2, slice): elif isinstance(arg1, str) and isinstance(arg2, slice):
return self.__class__([seg for seg in self if seg.type == arg1][arg2]) return self.__class__([seg for seg in self if seg.type == arg1][arg2])
else: else:
raise ValueError("Incorrect arguments to slice") raise ValueError("Incorrect arguments to slice") # pragma: no cover
def index(self, value: Union[TMS, str], *args) -> int: def index(self, value: Union[TMS, str], *args) -> int:
if isinstance(value, str): if isinstance(value, str):
first_segment = next((seg for seg in self if seg.type == value), None) # type: ignore first_segment = next((seg for seg in self if seg.type == value), None)
return super().index(first_segment, *args) # type: ignore if first_segment is None:
raise ValueError(f"Segment with type {value} is not in message")
return super().index(first_segment, *args)
return super().index(value, *args) return super().index(value, *args)
def get(self: TM, type_: str, count: Optional[int] = None) -> TM: def get(self: TM, type_: str, count: Optional[int] = None) -> TM:
if count is None: if count is None:
return self[type_] return self[type_]
iterator, filtered = (seg for seg in self if seg.type == type_), [] iterator, filtered = (
seg for seg in self if seg.type == type_
), self.__class__()
for _ in range(count): for _ in range(count):
seg = next(iterator, None) seg = next(iterator, None)
if seg is None: if seg is None:
break break
filtered.append(seg) filtered.append(seg)
return self.__class__(filtered) return filtered
def count(self, value: Union[TMS, str]) -> int: def count(self, value: Union[TMS, str]) -> int:
return len(self[value]) if isinstance(value, str) else super().count(value) return len(self[value]) if isinstance(value, str) else super().count(value)
@ -261,11 +312,11 @@ class Message(List[TMS], abc.ABC):
obj: 要添加的消息段 obj: 要添加的消息段
""" """
if isinstance(obj, MessageSegment): if isinstance(obj, MessageSegment):
super(Message, self).append(obj) super().append(obj)
elif isinstance(obj, str): elif isinstance(obj, str):
self.extend(self._construct(obj)) self.extend(self._construct(obj))
else: else:
raise ValueError(f"Unexpected type: {type(obj)} {obj}") raise ValueError(f"Unexpected type: {type(obj)} {obj}") # pragma: no cover
return self return self
def extend(self: TM, obj: Union[TM, Iterable[TMS]]) -> TM: def extend(self: TM, obj: Union[TM, Iterable[TMS]]) -> TM:
@ -281,10 +332,15 @@ class Message(List[TMS], abc.ABC):
def copy(self: TM) -> TM: def copy(self: TM) -> TM:
return deepcopy(self) return deepcopy(self)
def extract_plain_text(self: "Message[MessageSegment]") -> str: def extract_plain_text(self) -> str:
"""提取消息内纯文本消息""" """提取消息内纯文本消息"""
return "".join(str(seg) for seg in self if seg.is_text()) return "".join(str(seg) for seg in self if seg.is_text())
__autodoc__ = {"MessageSegment.__str__": True} __autodoc__ = {
"MessageSegment.__str__": True,
"MessageSegment.__add__": True,
"Message.__getitem__": True,
"Message._construct": True,
}

View File

@ -1,23 +1,90 @@
from pydantic import ValidationError, parse_obj_as
from utils import make_fake_message from utils import make_fake_message
def test_message_template(): def test_segment_add():
from nonebot.adapters import MessageTemplate
Message = make_fake_message() Message = make_fake_message()
MessageSegment = Message.get_segment_class()
template = MessageTemplate("{a:custom}{b:text}{c:image}", Message) assert MessageSegment.text("text") + MessageSegment.text("text") == Message(
[MessageSegment.text("text"), MessageSegment.text("text")]
)
@template.add_format_spec assert MessageSegment.text("text") + "text" == Message(
def custom(input: str) -> str: [MessageSegment.text("text"), MessageSegment.text("text")]
return input + "-custom!" )
formatted = template.format(a="test", b="test", c="https://example.com/test") assert MessageSegment.text("text") + Message(
assert formatted.extract_plain_text() == "test-custom!test" [MessageSegment.text("text")]
assert str(formatted) == "test-custom!test[fake:image]" ) == Message([MessageSegment.text("text"), MessageSegment.text("text")])
assert "text" + MessageSegment.text("text") == Message(
[MessageSegment.text("text"), MessageSegment.text("text")]
)
def test_message_slice(): def test_segment_validate():
Message = make_fake_message()
MessageSegment = Message.get_segment_class()
assert parse_obj_as(
MessageSegment, {"type": "text", "data": {"text": "text"}}
) == MessageSegment.text("text")
try:
parse_obj_as(MessageSegment, "some str")
assert False
except ValidationError:
assert True
def test_segment():
Message = make_fake_message()
MessageSegment = Message.get_segment_class()
assert len(MessageSegment.text("text")) == 4
assert MessageSegment.text("text") != MessageSegment.text("other")
assert MessageSegment.text("text").get("data") == {"text": "text"}
assert list(MessageSegment.text("text").keys()) == ["type", "data"]
assert list(MessageSegment.text("text").values()) == ["text", {"text": "text"}]
assert list(MessageSegment.text("text").items()) == [
("type", "text"),
("data", {"text": "text"}),
]
origin = MessageSegment.text("text")
copy = origin.copy()
assert origin is not copy
assert origin == copy
def test_message_add():
Message = make_fake_message()
MessageSegment = Message.get_segment_class()
assert Message([MessageSegment.text("text")]) + MessageSegment.text(
"text"
) == Message([MessageSegment.text("text"), MessageSegment.text("text")])
assert Message([MessageSegment.text("text")]) + "text" == Message(
[MessageSegment.text("text"), MessageSegment.text("text")]
)
assert Message([MessageSegment.text("text")]) + Message(
[MessageSegment.text("text")]
) == Message([MessageSegment.text("text"), MessageSegment.text("text")])
assert "text" + Message([MessageSegment.text("text")]) == Message(
[MessageSegment.text("text"), MessageSegment.text("text")]
)
msg = Message([MessageSegment.text("text")])
msg += MessageSegment.text("text")
assert msg == Message([MessageSegment.text("text"), MessageSegment.text("text")])
def test_message_getitem():
Message = make_fake_message() Message = make_fake_message()
MessageSegment = Message.get_segment_class() MessageSegment = Message.get_segment_class()
@ -52,3 +119,38 @@ def test_message_slice():
assert message.get("image", 1) == Message([message["image", 0]]) assert message.get("image", 1) == Message([message["image", 0]])
assert message.count("image") == 2 assert message.count("image") == 2
def test_message_validate():
Message = make_fake_message()
MessageSegment = Message.get_segment_class()
Message_ = make_fake_message()
assert parse_obj_as(Message, Message([])) == Message([])
try:
parse_obj_as(Message, Message_([]))
assert False
except ValidationError:
assert True
assert parse_obj_as(Message, "text") == Message([MessageSegment.text("text")])
assert parse_obj_as(Message, {"type": "text", "data": {"text": "text"}}) == Message(
[MessageSegment.text("text")]
)
assert (
parse_obj_as(
Message,
[MessageSegment.text("text"), {"type": "text", "data": {"text": "text"}}],
)
== Message([MessageSegment.text("text"), MessageSegment.text("text")])
)
try:
parse_obj_as(Message, object())
assert False
except ValidationError:
assert True

View File

@ -0,0 +1,17 @@
from utils import make_fake_message
def test_message_template():
from nonebot.adapters import MessageTemplate
Message = make_fake_message()
template = MessageTemplate("{a:custom}{b:text}{c:image}", Message)
@template.add_format_spec
def custom(input: str) -> str:
return input + "-custom!"
formatted = template.format(a="test", b="test", c="https://example.com/test")
assert formatted.extract_plain_text() == "test-custom!test"
assert str(formatted) == "test-custom!test[fake:image]"