nonebot2/packages/nonebot-adapter-ding/nonebot/adapters/ding/message.py
2021-07-28 16:32:50 +08:00

186 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from copy import copy
from typing import Any, Dict, Type, Union, Mapping, Iterable
from nonebot.typing import overrides
from nonebot.adapters import Message as BaseMessage, MessageSegment as BaseMessageSegment
class MessageSegment(BaseMessageSegment["Message"]):
"""
钉钉 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。
"""
@classmethod
@overrides(BaseMessageSegment)
def get_message_class(cls) -> Type["Message"]:
return Message
@overrides(BaseMessageSegment)
def __str__(self) -> str:
if self.type == "text":
return str(self.data["content"])
elif self.type == "markdown":
return str(self.data["text"])
return ""
@overrides(BaseMessageSegment)
def is_text(self) -> bool:
return self.type == "text"
@staticmethod
def atAll() -> "MessageSegment":
"""@全体"""
return MessageSegment("at", {"isAtAll": True})
@staticmethod
def atMobiles(*mobileNumber: str) -> "MessageSegment":
"""@指定手机号人员"""
return MessageSegment("at", {"atMobiles": list(mobileNumber)})
@staticmethod
def atDingtalkIds(*dingtalkIds: str) -> "MessageSegment":
"""@指定 id@ 默认会在消息段末尾。
所以你可以在消息中使用 @{senderId} 占位,发送出去之后 @ 就会出现在占位的位置:
```python
message = MessageSegment.text(f"@{event.senderId},你好")
message += MessageSegment.atDingtalkIds(event.senderId)
```
"""
return MessageSegment("at", {"atDingtalkIds": list(dingtalkIds)})
@staticmethod
def text(text: str) -> "MessageSegment":
"""发送 ``text`` 类型消息"""
return MessageSegment("text", {"content": text})
@staticmethod
def image(picURL: str) -> "MessageSegment":
"""发送 ``image`` 类型消息"""
return MessageSegment("image", {"picURL": picURL})
@staticmethod
def extension(dict_: dict) -> "MessageSegment":
"""标记 text 文本的 extension 属性,需要与 text 消息段相加。"""
return MessageSegment("extension", dict_)
@staticmethod
def code(code_language: str, code: str) -> "Message":
"""发送 code 消息段"""
message = MessageSegment.text(code)
message += MessageSegment.extension({
"text_type": "code_snippet",
"code_language": code_language
})
return message
@staticmethod
def markdown(title: str, text: str) -> "MessageSegment":
"""发送 ``markdown`` 类型消息"""
return MessageSegment(
"markdown",
{
"title": title,
"text": text,
},
)
@staticmethod
def actionCardSingleBtn(title: str, text: str, singleTitle: str,
singleURL) -> "MessageSegment":
"""发送 ``actionCardSingleBtn`` 类型消息"""
return MessageSegment(
"actionCard", {
"title": title,
"text": text,
"singleTitle": singleTitle,
"singleURL": singleURL
})
@staticmethod
def actionCardMultiBtns(
title: str,
text: str,
btns: list,
hideAvatar: bool = False,
btnOrientation: str = '1',
) -> "MessageSegment":
"""
发送 ``actionCardMultiBtn`` 类型消息
:参数:
* ``btnOrientation``: 0按钮竖直排列 1按钮横向排列
* ``btns``: ``[{ "title": title, "actionURL": actionURL }, ...]``
"""
return MessageSegment(
"actionCard", {
"title": title,
"text": text,
"hideAvatar": "1" if hideAvatar else "0",
"btnOrientation": btnOrientation,
"btns": btns
})
@staticmethod
def feedCard(links: list) -> "MessageSegment":
"""
发送 ``feedCard`` 类型消息
:参数:
* ``links``: ``[{ "title": xxx, "messageURL": xxx, "picURL": xxx }, ...]``
"""
return MessageSegment("feedCard", {"links": links})
@staticmethod
def raw(data) -> "MessageSegment":
return MessageSegment('raw', data)
def to_dict(self) -> Dict[str, Any]:
# 让用户可以直接发送原始的消息格式
if self.type == "raw":
return copy(self.data)
# 不属于消息内容,只是作为消息段的辅助
if self.type in ["at", "extension"]:
return {self.type: copy(self.data)}
return {"msgtype": self.type, self.type: copy(self.data)}
class Message(BaseMessage[MessageSegment]):
"""
钉钉 协议 Message 适配。
"""
@classmethod
@overrides(BaseMessage)
def get_segment_class(cls) -> Type[MessageSegment]:
return MessageSegment
@staticmethod
@overrides(BaseMessage)
def _construct(
msg: Union[str, Mapping,
Iterable[Mapping]]) -> Iterable[MessageSegment]:
if isinstance(msg, Mapping):
yield MessageSegment(msg["type"], msg.get("data") or {})
elif isinstance(msg, str):
yield MessageSegment.text(msg)
elif isinstance(msg, Iterable):
for seg in msg:
yield MessageSegment(seg["type"], seg.get("data") or {})
def _produce(self) -> dict:
data = {}
segment: MessageSegment
for segment in self:
# text 可以和 text 合并
if segment.type == "text" and data.get("msgtype") == 'text':
data.setdefault("text", {})
data["text"]["content"] = data["text"].setdefault(
"content", "") + segment.data["content"]
else:
data.update(segment.to_dict())
return data