nonebot2/packages/nonebot-adapter-ding/nonebot/adapters/ding/message.py
2021-11-12 11:53:42 +08:00

197 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from copy import copy
from typing import Any, Dict, Type, Union, Mapping, Iterable, cast
from nonebot.typing import overrides
from nonebot.adapters import Message as BaseMessage
from nonebot.adapters import MessageSegment as BaseMessageSegment
class MessageSegment(BaseMessageSegment["Message"]):
"""
钉钉 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。
"""
@classmethod
@overrides(BaseMessageSegment)
def get_message_class(cls) -> Type["Message"]:
return Message
@overrides(BaseMessageSegment)
def __str__(self) -> str:
"""
该消息段所代表的 str在命令匹配部分使用
钉钉目前只支持匹配 text 命令
"""
if self.type == "text":
return str(self.data["content"])
return ""
def __bool__(self) -> bool:
"""
因为暂时还不支持 text 和 markdown 之外的其他复杂消息段的 `__str__`(也不太需要),
会导致判断非这两种类型的消息段的布尔值为 true 的时候出错
"""
return self.data is not None
@overrides(BaseMessageSegment)
def is_text(self) -> bool:
return self.type == "text"
@staticmethod
def atAll() -> "MessageSegment":
"""@全体"""
return MessageSegment("at", {"isAtAll": True})
@staticmethod
def atMobiles(*mobileNumber: str) -> "MessageSegment":
"""@指定手机号人员"""
return MessageSegment("at", {"atMobiles": list(mobileNumber)})
@staticmethod
def atDingtalkIds(*dingtalkIds: str) -> "MessageSegment":
"""@指定 id@ 默认会在消息段末尾。
所以你可以在消息中使用 @{senderId} 占位,发送出去之后 @ 就会出现在占位的位置:
```python
message = MessageSegment.text(f"@{event.senderId},你好")
message += MessageSegment.atDingtalkIds(event.senderId)
```
"""
return MessageSegment("at", {"atDingtalkIds": list(dingtalkIds)})
@staticmethod
def text(text: str) -> "MessageSegment":
"""发送 ``text`` 类型消息"""
return MessageSegment("text", {"content": text})
@staticmethod
def image(picURL: str) -> "MessageSegment":
"""发送 ``image`` 类型消息"""
return MessageSegment("image", {"picURL": picURL})
@staticmethod
def extension(dict_: dict) -> "MessageSegment":
"""标记 text 文本的 extension 属性,需要与 text 消息段相加。"""
return MessageSegment("extension", dict_)
@staticmethod
def code(code_language: str, code: str) -> "Message":
"""发送 code 消息段"""
message = MessageSegment.text(code)
message += MessageSegment.extension({
"text_type": "code_snippet",
"code_language": code_language
})
return message
@staticmethod
def markdown(title: str, text: str) -> "MessageSegment":
"""发送 ``markdown`` 类型消息"""
return MessageSegment(
"markdown",
{
"title": title,
"text": text,
},
)
@staticmethod
def actionCardSingleBtn(title: str, text: str, singleTitle: str,
singleURL) -> "MessageSegment":
"""发送 ``actionCardSingleBtn`` 类型消息"""
return MessageSegment(
"actionCard", {
"title": title,
"text": text,
"singleTitle": singleTitle,
"singleURL": singleURL
})
@staticmethod
def actionCardMultiBtns(
title: str,
text: str,
btns: list,
hideAvatar: bool = False,
btnOrientation: str = '1',
) -> "MessageSegment":
"""
发送 ``actionCardMultiBtn`` 类型消息
:参数:
* ``btnOrientation``: 0按钮竖直排列 1按钮横向排列
* ``btns``: ``[{ "title": title, "actionURL": actionURL }, ...]``
"""
return MessageSegment(
"actionCard", {
"title": title,
"text": text,
"hideAvatar": "1" if hideAvatar else "0",
"btnOrientation": btnOrientation,
"btns": btns
})
@staticmethod
def feedCard(links: list) -> "MessageSegment":
"""
发送 ``feedCard`` 类型消息
:参数:
* ``links``: ``[{ "title": xxx, "messageURL": xxx, "picURL": xxx }, ...]``
"""
return MessageSegment("feedCard", {"links": links})
@staticmethod
def raw(data) -> "MessageSegment":
return MessageSegment('raw', data)
def to_dict(self) -> Dict[str, Any]:
# 让用户可以直接发送原始的消息格式
if self.type == "raw":
return copy(self.data)
# 不属于消息内容,只是作为消息段的辅助
if self.type in ["at", "extension"]:
return {self.type: copy(self.data)}
return {"msgtype": self.type, self.type: copy(self.data)}
class Message(BaseMessage[MessageSegment]):
"""
钉钉 协议 Message 适配。
"""
@classmethod
@overrides(BaseMessage)
def get_segment_class(cls) -> Type[MessageSegment]:
return MessageSegment
@staticmethod
@overrides(BaseMessage)
def _construct(
msg: Union[str, Mapping,
Iterable[Mapping]]) -> Iterable[MessageSegment]:
if isinstance(msg, Mapping):
msg = cast(Mapping[str, Any], msg)
yield MessageSegment(msg["type"], msg.get("data") or {})
elif isinstance(msg, str):
yield MessageSegment.text(msg)
elif isinstance(msg, Iterable):
for seg in msg:
yield MessageSegment(seg["type"], seg.get("data") or {})
def _produce(self) -> dict:
data = {}
segment: MessageSegment
for segment in self:
# text 可以和 text 合并
if segment.type == "text" and data.get("msgtype") == 'text':
data.setdefault("text", {})
data["text"]["content"] = data["text"].setdefault(
"content", "") + segment.data["content"]
else:
data.update(segment.to_dict())
return data