⬆️ Update to cqhttp v11

This commit is contained in:
yanyongyu 2020-08-26 14:43:27 +08:00
parent cef3a8236e
commit 24e03ed0e7
3 changed files with 98 additions and 83 deletions

View File

@ -179,7 +179,7 @@ class BaseEvent(abc.ABC):
@dataclass @dataclass
class BaseMessageSegment(abc.ABC): class BaseMessageSegment(abc.ABC):
type: str type: str
data: Dict[str, str] = field(default_factory=lambda: {}) data: Dict[str, Union[str, list]] = field(default_factory=lambda: {})
@abc.abstractmethod @abc.abstractmethod
def __str__(self): def __str__(self):

View File

@ -1,5 +1,16 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
CQHTTP (OneBot) v11 协议适配
============================
协议详情请看: `CQHTTP`_ | `OneBot`_
.. _CQHTTP:
http://cqhttp.cc/
.. _OneBot:
https://github.com/howmanybots/onebot
"""
import re import re
import sys import sys
@ -38,8 +49,8 @@ def unescape(s: str) -> str:
.replace("&", "&") .replace("&", "&")
def _b2s(b: bool) -> str: def _b2s(b: Optional[bool]) -> Optional[str]:
return str(b).lower() return b if b is None else str(b).lower()
def _check_at_me(bot: "Bot", event: "Event"): def _check_at_me(bot: "Bot", event: "Event"):
@ -389,14 +400,8 @@ class Event(BaseEvent):
class MessageSegment(BaseMessageSegment): class MessageSegment(BaseMessageSegment):
@overrides(BaseMessageSegment) @overrides(BaseMessageSegment)
def __init__(self, type: str, data: Dict[str, str]) -> None: def __init__(self, type: str, data: Dict[str, Union[str, list]]) -> None:
if type == "at" and data.get("qq") == "all": if type == "text":
type = "at_all"
data.clear()
elif type == "shake":
type = "poke"
data = {"type": "Poke"}
elif type == "text":
data["text"] = unescape(data["text"]) data["text"] = unescape(data["text"])
super().__init__(type=type, data=data) super().__init__(type=type, data=data)
@ -406,16 +411,11 @@ class MessageSegment(BaseMessageSegment):
data = self.data.copy() data = self.data.copy()
# process special types # process special types
if type_ == "at_all": if type_ == "text":
type_ = "at"
data = {"qq": "all"}
elif type_ == "poke":
type_ = "shake"
data.clear()
elif type_ == "text":
return escape(data.get("text", ""), escape_comma=False) return escape(data.get("text", ""), escape_comma=False)
params = ",".join([f"{k}={escape(str(v))}" for k, v in data.items()]) params = ",".join(
[f"{k}={escape(str(v))}" for k, v in data.items() if v is not None])
return f"[CQ:{type_}{',' if params else ''}{params}]" return f"[CQ:{type_}{',' if params else ''}{params}]"
@overrides(BaseMessageSegment) @overrides(BaseMessageSegment)
@ -423,17 +423,13 @@ class MessageSegment(BaseMessageSegment):
return Message(self) + other return Message(self) + other
@staticmethod @staticmethod
def anonymous(ignore_failure: bool = False) -> "MessageSegment": def anonymous(ignore_failure: Optional[bool] = None) -> "MessageSegment":
return MessageSegment("anonymous", {"ignore": _b2s(ignore_failure)}) return MessageSegment("anonymous", {"ignore": _b2s(ignore_failure)})
@staticmethod @staticmethod
def at(user_id: Union[int, str]) -> "MessageSegment": def at(user_id: Union[int, str]) -> "MessageSegment":
return MessageSegment("at", {"qq": str(user_id)}) return MessageSegment("at", {"qq": str(user_id)})
@staticmethod
def at_all() -> "MessageSegment":
return MessageSegment("at_all")
@staticmethod @staticmethod
def contact_group(group_id: int) -> "MessageSegment": def contact_group(group_id: int) -> "MessageSegment":
return MessageSegment("contact", {"type": "group", "id": str(group_id)}) return MessageSegment("contact", {"type": "group", "id": str(group_id)})
@ -442,23 +438,43 @@ class MessageSegment(BaseMessageSegment):
def contact_user(user_id: int) -> "MessageSegment": def contact_user(user_id: int) -> "MessageSegment":
return MessageSegment("contact", {"type": "qq", "id": str(user_id)}) return MessageSegment("contact", {"type": "qq", "id": str(user_id)})
@staticmethod
def dice() -> "MessageSegment":
return MessageSegment("dice", {})
@staticmethod @staticmethod
def face(id_: int) -> "MessageSegment": def face(id_: int) -> "MessageSegment":
return MessageSegment("face", {"id": str(id_)}) return MessageSegment("face", {"id": str(id_)})
@staticmethod @staticmethod
def forward(id_: str) -> "MessageSegment": def forward(id_: str) -> "MessageSegment":
logger.warning("Forward Message only can be received!")
return MessageSegment("forward", {"id": id_}) return MessageSegment("forward", {"id": id_})
@staticmethod @staticmethod
def image(file: str) -> "MessageSegment": def image(file: str,
return MessageSegment("image", {"file": file}) type_: Optional[str] = None,
cache: bool = True,
proxy: bool = True,
timeout: Optional[int] = None) -> "MessageSegment":
return MessageSegment(
"image", {
"file": file,
"type": type_,
"cache": cache,
"proxy": proxy,
"timeout": timeout
})
@staticmethod
def json(data: str) -> "MessageSegment":
return MessageSegment("json", {"data": data})
@staticmethod @staticmethod
def location(latitude: float, def location(latitude: float,
longitude: float, longitude: float,
title: str = "", title: Optional[str] = None,
content: str = "") -> "MessageSegment": content: Optional[str] = None) -> "MessageSegment":
return MessageSegment( return MessageSegment(
"location", { "location", {
"lat": str(latitude), "lat": str(latitude),
@ -468,36 +484,18 @@ class MessageSegment(BaseMessageSegment):
}) })
@staticmethod @staticmethod
def magic_face(type_: str) -> "MessageSegment": def music(type_: str, id_: int) -> "MessageSegment":
if type_ not in ["dice", "rpc"]:
raise ValueError(
f"Coolq doesn't support magic face type {type_}. Supported types: dice, rpc."
)
return MessageSegment("magic_face", {"type": type_})
@staticmethod
def music(type_: str,
id_: int,
style: Optional[int] = None) -> "MessageSegment":
if style is None:
return MessageSegment("music", {"type": type_, "id": id_}) return MessageSegment("music", {"type": type_, "id": id_})
else:
return MessageSegment("music", {
"type": type_,
"id": id_,
"style": style
})
@staticmethod @staticmethod
def music_custom(type_: str, def music_custom(url: str,
url: str,
audio: str, audio: str,
title: str, title: str,
content: str = "", content: Optional[str] = None,
img_url: str = "") -> "MessageSegment": img_url: Optional[str] = None) -> "MessageSegment":
return MessageSegment( return MessageSegment(
"music", { "music", {
"type": type_, "type": "custom",
"url": url, "url": url,
"audio": audio, "audio": audio,
"title": title, "title": title,
@ -510,35 +508,43 @@ class MessageSegment(BaseMessageSegment):
return MessageSegment("node", {"id": str(id_)}) return MessageSegment("node", {"id": str(id_)})
@staticmethod @staticmethod
def node_custom(name: str, uin: int, def node_custom(user_id: int, nickname: str,
content: "Message") -> "MessageSegment": content: Union[str, "Message"]) -> "MessageSegment":
return MessageSegment("node", { return MessageSegment("node", {
"name": name, "user_id": str(user_id),
"uin": str(uin), "nickname": nickname,
"content": str(content) "content": content
}) })
@staticmethod @staticmethod
def poke(type_: str = "Poke") -> "MessageSegment": def poke(type_: str, id_: str) -> "MessageSegment":
if type_ not in ["Poke"]: return MessageSegment("poke", {"type": type_, "id": id_})
raise ValueError(
f"Coolq doesn't support poke type {type_}. Supported types: Poke."
)
return MessageSegment("poke", {"type": type_})
@staticmethod @staticmethod
def record(file: str, magic: bool = False) -> "MessageSegment": def record(file: str,
magic: Optional[bool] = None,
cache: Optional[bool] = None,
proxy: Optional[bool] = None,
timeout: Optional[int] = None) -> "MessageSegment":
return MessageSegment("record", {"file": file, "magic": _b2s(magic)}) return MessageSegment("record", {"file": file, "magic": _b2s(magic)})
@staticmethod @staticmethod
def replay(id_: int) -> "MessageSegment": def reply(id_: int) -> "MessageSegment":
return MessageSegment("replay", {"id": str(id_)}) return MessageSegment("reply", {"id": str(id_)})
@staticmethod
def rps() -> "MessageSegment":
return MessageSegment("rps", {})
@staticmethod
def shake() -> "MessageSegment":
return MessageSegment("shake", {})
@staticmethod @staticmethod
def share(url: str = "", def share(url: str = "",
title: str = "", title: str = "",
content: str = "", content: Optional[str] = None,
img_url: str = "") -> "MessageSegment": img_url: Optional[str] = None) -> "MessageSegment":
return MessageSegment("share", { return MessageSegment("share", {
"url": url, "url": url,
"title": title, "title": title,
@ -550,6 +556,22 @@ class MessageSegment(BaseMessageSegment):
def text(text: str) -> "MessageSegment": def text(text: str) -> "MessageSegment":
return MessageSegment("text", {"text": text}) return MessageSegment("text", {"text": text})
@staticmethod
def video(file: str,
cache: Optional[bool] = None,
proxy: Optional[bool] = None,
timeout: Optional[int] = None) -> "MessageSegment":
return MessageSegment("video", {
"file": file,
"cache": cache,
"proxy": proxy,
"timeout": timeout
})
@staticmethod
def xml(data: str) -> "MessageSegment":
return MessageSegment("xml", {"data": data})
class Message(BaseMessage): class Message(BaseMessage):
@ -564,7 +586,7 @@ class Message(BaseMessage):
yield MessageSegment(seg["type"], seg.get("data") or {}) yield MessageSegment(seg["type"], seg.get("data") or {})
return return
def _iter_message() -> Iterable[Tuple[str, str]]: def _iter_message(msg: str) -> Iterable[Tuple[str, str]]:
text_begin = 0 text_begin = 0
for cqcode in re.finditer( for cqcode in re.finditer(
r"\[CQ:(?P<type>[a-zA-Z0-9-_.]+)" r"\[CQ:(?P<type>[a-zA-Z0-9-_.]+)"
@ -577,7 +599,7 @@ class Message(BaseMessage):
yield cqcode.group("type"), cqcode.group("params").lstrip(",") yield cqcode.group("type"), cqcode.group("params").lstrip(",")
yield "text", unescape(msg[text_begin:]) yield "text", unescape(msg[text_begin:])
for type_, data in _iter_message(): for type_, data in _iter_message(msg):
if type_ == "text": if type_ == "text":
if data: if data:
# only yield non-empty text segment # only yield non-empty text segment
@ -589,13 +611,4 @@ class Message(BaseMessage):
filter(lambda x: x, ( filter(lambda x: x, (
x.lstrip() for x in data.split(",")))) x.lstrip() for x in data.split(","))))
} }
if type_ == "at" and data["qq"] == "all":
type_ = "at_all"
data.clear()
elif type_ in ["dice", "rpc"]:
type_ = "magic_face"
data["type"] = type_
elif type_ == "shake":
type_ = "poke"
data["type"] = "Poke"
yield MessageSegment(type_, data) yield MessageSegment(type_, data)

View File

@ -4,10 +4,12 @@
"description": "An asynchronous python bot framework.", "description": "An asynchronous python bot framework.",
"homepage": "https://docs.nonebot.dev/", "homepage": "https://docs.nonebot.dev/",
"main": "index.js", "main": "index.js",
"contributors": [{ "contributors": [
{
"name": "yanyongyu", "name": "yanyongyu",
"email": "yanyongyu_1@126.com" "email": "yanyongyu_1@126.com"
}], }
],
"repository": "https://github.com/nonebot/nonebot/", "repository": "https://github.com/nonebot/nonebot/",
"bugs": { "bugs": {
"url": "https://github.com/nonebot/nonebot/issues" "url": "https://github.com/nonebot/nonebot/issues"