LiteyukiBot-TriM/liteyuki/utils/message.py

328 lines
11 KiB
Python
Raw Normal View History

2024-03-30 22:22:53 +00:00
import asyncio
import base64
2024-03-30 22:22:53 +00:00
import io
2024-03-26 09:14:41 +00:00
from urllib.parse import quote
2024-03-30 22:22:53 +00:00
import aiofiles
from PIL import Image
import aiohttp
import nonebot
from nonebot import require
from nonebot.adapters.onebot import v11, v12
2024-03-23 11:55:12 +00:00
from typing import Any
2024-03-30 22:22:53 +00:00
from . import load_from_yaml
2024-04-08 02:04:31 +00:00
from .ly_api import liteyuki_api
2024-04-11 17:15:05 +00:00
from .ly_typing import T_Bot, T_Message, T_MessageEvent
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import md_to_pic
config = load_from_yaml("config.yml")
can_send_markdown = {} # 用于存储机器人是否支持发送markdown消息id->bool
2024-04-11 17:15:05 +00:00
async def broadcast_to_superusers(message: str | T_Message, markdown: bool = False):
"""广播消息给超级用户"""
for bot in nonebot.get_bots().values():
for user_id in config.get("superusers", []):
if markdown:
await Markdown.send_md(message, bot, message_type="private", session_id=user_id)
else:
await bot.send_private_msg(user_id=user_id, message=message)
2024-03-30 22:22:53 +00:00
class Markdown:
@staticmethod
async def send_md(
markdown: str,
bot: T_Bot, *,
message_type: str = None,
session_id: str | int = None,
event: T_MessageEvent = None,
retry_as_image: bool = True,
2024-03-22 04:41:38 +00:00
**kwargs
) -> dict[str, Any] | None:
"""
发送Markdown消息支持自动转为图片发送
Args:
markdown:
bot:
message_type:
session_id:
event:
retry_as_image: 发送失败后是否尝试以图片形式发送否则失败返回None
**kwargs:
Returns:
"""
2024-03-30 22:22:53 +00:00
formatted_md = v11.unescape(markdown).replace("\n", r"\n").replace('"', r'\\\"')
if event is not None and message_type is None:
message_type = event.message_type
session_id = event.user_id if event.message_type == "private" else event.group_id
try:
# 构建Markdown消息并获取转发消息ID
2024-03-30 22:22:53 +00:00
forward_id = await bot.call_api(
api="send_forward_msg",
messages=[
v11.MessageSegment(
type="node",
data={
"name" : "Liteyuki.OneBot",
"uin" : bot.self_id,
"content": [
{
"type": "markdown",
"data": {
"content": '{"content":"%s"}' % formatted_md
}
},
]
},
)
2024-03-30 22:22:53 +00:00
]
)
# 发送Markdown longmsg并获取相应数据
2024-03-19 16:44:36 +00:00
data = await bot.send_msg(
2024-03-30 22:22:53 +00:00
user_id=session_id,
group_id=session_id,
2024-03-19 16:44:36 +00:00
message_type=message_type,
2024-03-30 22:22:53 +00:00
message=[
v11.MessageSegment(
type="longmsg",
data={
"id": forward_id
}
),
2024-03-30 22:22:53 +00:00
],
2024-03-22 04:41:38 +00:00
**kwargs
2024-03-19 16:44:36 +00:00
)
except BaseException as e:
nonebot.logger.error(f"send markdown error, retry as image: {e}")
# 发送失败,渲染为图片发送
if not retry_as_image:
return None
plain_markdown = markdown.replace("🔗", "")
md_image_bytes = await md_to_pic(
md=plain_markdown,
width=540,
device_scale_factor=4
)
data = await bot.send_msg(
message_type=message_type,
group_id=session_id,
user_id=session_id,
message=v11.MessageSegment.image(md_image_bytes),
)
2024-03-30 22:22:53 +00:00
return data
@staticmethod
async def send_image(
image: bytes | str,
bot: T_Bot, *,
message_type: str = None,
session_id: str | int = None,
event: T_MessageEvent = None,
**kwargs
) -> dict:
"""
发送单张装逼大图
Args:
image: 图片字节流或图片本地路径链接请使用Markdown.image_async方法获取后通过send_md发送
bot: bot instance
message_type: message type
session_id: session id
event: event
kwargs: other arguments
Returns:
dict: response data
2024-03-30 22:22:53 +00:00
"""
if isinstance(image, str):
async with aiofiles.open(image, "rb") as f:
image = await f.read()
method = 2
2024-04-01 15:56:03 +00:00
# 1.轻雪图床方案
# if method == 1:
# image_url = await liteyuki_api.upload_image(image)
# image_size = Image.open(io.BytesIO(image)).size
# image_md = Markdown.image(image_url, image_size)
# data = await Markdown.send_md(image_md, bot, message_type=message_type, session_id=session_id, event=event,
# retry_as_image=False,
# **kwargs)
# Lagrange.OneBot方案
if method == 2:
base64_string = base64.b64encode(image).decode("utf-8")
data = await bot.call_api("upload_image", file=f"base64://{base64_string}")
await Markdown.send_md(Markdown.image(data, Image.open(io.BytesIO(image)).size), bot, event=event, message_type=message_type,
session_id=session_id, **kwargs)
# 其他实现端方案
else:
image_message_id = (await bot.send_private_msg(
user_id=bot.self_id,
message=[
v11.MessageSegment.image(file=image)
]
))["message_id"]
image_url = (await bot.get_msg(message_id=image_message_id))["message"][0]["data"]["url"]
image_size = Image.open(io.BytesIO(image)).size
image_md = Markdown.image(image_url, image_size)
return await Markdown.send_md(image_md, bot, message_type=message_type, session_id=session_id, event=event, **kwargs)
if data is None:
data = await bot.send_msg(
message_type=message_type,
group_id=session_id,
user_id=session_id,
message=v11.MessageSegment.image(image),
**kwargs
)
return data
2024-04-01 15:56:03 +00:00
2024-03-30 22:22:53 +00:00
@staticmethod
async def get_image_url(image: bytes | str, bot: T_Bot) -> str:
"""把图片上传到图床,返回链接
Args:
bot: 发送的bot
image: 图片字节流或图片本地路径
Returns:
"""
# 等林文轩修好Lagrange.OneBot再说
@staticmethod
def btn_cmd(name: str, cmd: str, reply: bool = False, enter: bool = True) -> str:
"""生成点击回调按钮
Args:
name: 按钮显示内容
cmd: 发送的命令已在函数内url编码不需要再次编码
reply: 是否以回复的方式发送消息
enter: 自动发送消息则为True否则填充到输入框
Returns:
markdown格式的可点击回调按钮
"""
if "" not in config.get("command_start", ["/"]) and config.get("alconna_use_command_start", False):
cmd = f"{config['command_start'][0]}{cmd}"
2024-03-26 09:14:41 +00:00
return f"[{name}](mqqapi://aio/inlinecmd?command={quote(cmd)}&reply={str(reply).lower()}&enter={str(enter).lower()})"
@staticmethod
def btn_link(name: str, url: str) -> str:
"""生成点击链接按钮
Args:
name: 链接显示内容
url: 链接地址
Returns:
markdown格式的链接
"""
2024-03-21 05:02:08 +00:00
return f"[🔗{name}]({url})"
2024-03-30 22:22:53 +00:00
@staticmethod
def image(url: str, size: tuple[int, int]) -> str:
2024-04-01 04:29:04 +00:00
"""构建图片链接
2024-03-30 22:22:53 +00:00
Args:
size:
url: 图片链接
Returns:
markdown格式的图片
"""
return f"![image #{size[0]}px #{size[1]}px]({url})"
@staticmethod
async def image_async(url: str) -> str:
2024-04-01 04:29:04 +00:00
"""获取图片,自动请求获取大小,异步
2024-03-30 22:22:53 +00:00
Args:
url: 图片链接
Returns:
2024-04-01 04:29:04 +00:00
图片Markdown语法: ![image #{width}px #{height}px](link)
2024-03-30 22:22:53 +00:00
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
image = Image.open(io.BytesIO(await resp.read()))
return Markdown.image(url, image.size)
except Exception as e:
nonebot.logger.error(f"get image error: {e}")
return "[Image Error]"
@staticmethod
def escape(text: str) -> str:
"""转义特殊字符
Args:
text: 需要转义的文本请勿直接把整个markdown文本传入否则会转义掉所有字符
Returns:
转义后的文本
"""
2024-03-22 05:39:01 +00:00
chars = "*[]()~_`>#+=|{}.!"
for char in chars:
text = text.replace(char, f"\\\\{char}")
return text
2024-04-10 15:06:55 +00:00
@staticmethod
def H1(text: str, end="\n") -> str:
"""H1标题"""
return f"# {text}{end}"
@staticmethod
def H2(text: str, end="\n") -> str:
"""H2标题"""
return f"## {text}{end}"
@staticmethod
def H3(text: str, end="\n") -> str:
"""H3标题"""
return f"### {text}{end}"
@staticmethod
def H4(text: str, end="\n") -> str:
"""H4标题"""
return f"#### {text}{end}"
@staticmethod
def H5(text: str, end="\n") -> str:
"""H5标题"""
return f"##### {text}{end}"
@staticmethod
def H6(text: str, end="\n") -> str:
"""H6标题"""
return f"###### {text}{end}"
@staticmethod
def Bold(text: str) -> str:
"""加粗"""
return f"**{text}**"
@staticmethod
def Italic(text: str) -> str:
"""斜体"""
return f"*{text}*"
@staticmethod
def BoldItalic(text: str) -> str:
"""粗斜体"""
return f"***{text}***"
@staticmethod
def Underline(text: str) -> str:
"""下划线"""
return f"__{text}__"
@staticmethod
def Strike(text: str) -> str:
"""删除线"""
return f"~~{text}~~"