- markdown发送失败后可以转为图片发送
- 轻雪图床支持
fix:
- 数据库删除时不提交
This commit is contained in:
snowyServer 2024-04-02 20:32:28 +08:00
parent e7765a4513
commit 14fb96fec2
11 changed files with 151 additions and 485 deletions

View File

@ -15,10 +15,10 @@ from liteyuki.utils.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message import Markdown as md
from .reloader import Reloader
from liteyuki.utils import htmlrender
from ..utils.liteyuki_api import liteyuki_api
require("nonebot_plugin_alconna")
require("nonebot_plugin_alconna"), require("nonebot_plugin_htmlrender")
from nonebot_plugin_alconna import on_alconna, Alconna, Args, Subcommand, Arparma
from nonebot_plugin_htmlrender import html_to_pic
driver = get_driver()
@ -183,11 +183,9 @@ async def test_for_md_image(bot: T_Bot, api: str, data: dict):
@driver.on_startup
async def on_startup():
htmlrender.browser = await htmlrender.get_browser()
nonebot.logger.info("Browser Started.")
pass
@driver.on_shutdown
async def on_shutdown():
await htmlrender.shutdown_browser()
nonebot.logger.info("Browser Stopped.")
pass

View File

@ -8,7 +8,7 @@ from nonebot.adapters.onebot.v11 import MessageSegment
from nonebot.permission import SUPERUSER
from liteyuki.utils import __NAME__, __VERSION__
from liteyuki.utils.htmlrender import template_to_pic
from liteyuki.utils.htmlrender import template2image
from liteyuki.utils.language import get_user_lang
from liteyuki.utils.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.resource import get_path
@ -124,10 +124,10 @@ async def _(bot: T_Bot, event: T_MessageEvent):
"MEM" : ulang.get("main.monitor.memory"),
"SWAP" : ulang.get("main.monitor.swap"),
}
image_bytes = await template_to_pic(
template_path=get_path("templates/stats.html", abs_path=True),
image_bytes = await template2image(
template=get_path("templates/stats.html", abs_path=True),
templates=templ,
device_scale_factor=4,
scale_factor=4,
)
# await md.send_image(image_bytes, bot, event=event)
await stats.finish(MessageSegment.image(image_bytes))

View File

@ -79,10 +79,10 @@ async def _(event: T_MessageEvent, bot: T_Bot):
if plugin.metadata:
reply += (f"\n**{md.escape(show_name)}**\n"
f"\n > {md.escape(show_desc)}")
f"\n > {md.escape(show_desc)}\n")
else:
reply += (f"**{md.escape(show_name)}**\n"
f"\n > {md.escape(show_desc)}")
f"\n > {md.escape(show_desc)}\n")
reply += f"\n > {btn_usage} {btn_homepage}"

View File

@ -204,9 +204,12 @@ class Database:
table_name = model.TABLE_NAME
if not table_name:
raise ValueError(f"数据模型{model.__class__.__name__}未提供表名")
if model.id is not None:
condition = f"id = {model.id}"
if not condition and not allow_empty:
raise ValueError("删除操作必须提供条件")
self.cursor.execute(f"DELETE FROM {table_name} WHERE {condition}", args)
self.conn.commit()
def auto_migrate(self, *args: LiteModel):

View File

@ -0,0 +1,60 @@
import os.path
from nonebot import require
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import *
# async def html2image(
# html: str,
# wait: int = 0,
# template_path: str = None,
# scale_factor: float = 2,
# **kwargs
# ) -> bytes:
# """
# Args:
# html: str: HTML 正文
# wait: 等待时间
# template_path: 模板路径
# scale_factor: 缩放因子,越高越清晰
# **kwargs: page 参数
#
# Returns:
#
# """
# return await html_to_pic(html, wait=wait, template_path=template_path, scale_factor=scale_factor)
async def template2image(
template: str,
templates: dict,
pages: dict | None = None,
wait: int = 0,
scale_factor: float = 2,
**kwargs
) -> bytes:
"""
template -> html -> image
Args:
wait: 等待时间单位秒
pages: 页面参数
template: str: 模板文件
templates: dict: 模板参数
scale_factor: 缩放因子越高越清晰
**kwargs: page 参数
Returns:
图片二进制数据
"""
template_path = os.path.dirname(template)
template_name = os.path.basename(template)
return await template_to_pic(
template_name=template_name,
template_path=template_path,
templates=templates,
pages=pages,
wait=wait,
device_scale_factor=scale_factor,
)

View File

@ -1,29 +0,0 @@
import nonebot
from nonebot.log import logger
from nonebot.plugin import PluginMetadata
from playwright.async_api import Browser
from .browser import (
get_browser as get_browser,
get_new_page as get_new_page,
shutdown_browser as shutdown_browser,
)
from .data_source import (
capture_element as capture_element,
html_to_pic as html_to_pic,
md_to_pic as md_to_pic,
template_to_html as template_to_html,
template_to_pic as template_to_pic,
text_to_pic as text_to_pic,
)
__plugin_meta__ = PluginMetadata(
name="nonebot-plugin-htmlrender",
description="通过浏览器渲染图片",
usage="提供多个易用API md_to_pic html_to_pic text_to_pic template_to_pic capture_element 等",
type="library",
homepage="https://github.com/kexue-z/nonebot-plugin-htmlrender",
extra={},
)
browser: Browser

View File

@ -1,117 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Author : yanyongyu
@Date : 2021-03-12 13:42:43
@LastEditors : yanyongyu
@LastEditTime : 2021-11-01 14:05:41
@Description : None
@GitHub : https://github.com/yanyongyu
"""
__author__ = "yanyongyu"
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional
from nonebot import get_plugin_config
from nonebot.log import logger
from playwright.async_api import Browser, Error, Page, Playwright, async_playwright
from .config import Config
import asyncio
config = Config()
_browser: Optional[Browser] = None
_playwright: Optional[Playwright] = None
async def init(**kwargs) -> Browser:
global _browser
global _playwright
_playwright = await async_playwright().start()
try:
_browser = await launch_browser(**kwargs)
except Error:
await install_browser()
_browser = await launch_browser(**kwargs)
return _browser
async def launch_browser(**kwargs) -> Browser:
assert _playwright is not None, "Playwright 没有安装"
if config.htmlrender_browser_channel:
kwargs["channel"] = config.htmlrender_browser_channel
if config.htmlrender_proxy_host:
kwargs["proxy"] = {
"server": config.htmlrender_proxy_host,
}
if config.htmlrender_browser == "firefox":
logger.info("使用 firefox 启动")
return await _playwright.firefox.launch(**kwargs)
# 默认使用 chromium
logger.info("使用 chromium 启动")
return await _playwright.chromium.launch(**kwargs)
async def get_browser(**kwargs) -> Browser:
return _browser if _browser and _browser.is_connected() else await init(**kwargs)
@asynccontextmanager
async def get_new_page(device_scale_factor: float = 2, **kwargs) -> AsyncIterator[Page]:
browser = await get_browser()
page = await browser.new_page(device_scale_factor=device_scale_factor, **kwargs)
try:
yield page
finally:
await page.close()
async def shutdown_browser():
global _browser
global _playwright
if _browser:
if _browser.is_connected():
await _browser.close()
_browser = None
if _playwright:
# await _playwright.stop()
_playwright = None
async def install_browser():
import os
import sys
from playwright.__main__ import main
if host := config.htmlrender_download_host:
logger.info("使用配置源进行下载")
os.environ["PLAYWRIGHT_DOWNLOAD_HOST"] = host
else:
logger.info("使用镜像源进行下载")
os.environ["PLAYWRIGHT_DOWNLOAD_HOST"] = (
"https://npmmirror.com/mirrors/playwright/"
)
success = False
if config.htmlrender_browser == "firefox":
logger.info("正在安装 firefox")
sys.argv = ["", "install", "firefox"]
else:
# 默认使用 chromium
logger.info("正在安装 chromium")
sys.argv = ["", "install", "chromium"]
try:
logger.info("正在安装依赖")
os.system("playwright install-deps") # noqa: ASYNC102, S605, S607
main()
except SystemExit as e:
if e.code == 0:
success = True
if not success:
logger.error("浏览器更新失败, 请检查网络连通性")

View File

@ -1,10 +0,0 @@
from typing import Optional
from pydantic import BaseModel, Field
class Config(BaseModel):
htmlrender_browser: Optional[str] = Field(default="chromium")
htmlrender_download_host: Optional[str] = Field(default=None)
htmlrender_proxy_host: Optional[str] = Field(default=None)
htmlrender_browser_channel: Optional[str] = Field(default=None)

View File

@ -1,265 +0,0 @@
import os.path
from os import getcwd
from pathlib import Path
from typing import Literal, Optional, Union
import aiofiles
import jinja2
import markdown
from nonebot.log import logger
from .browser import get_new_page
TEMPLATES_PATH = str(Path(__file__).parent / "templates")
env = jinja2.Environment( # noqa: S701
extensions=["jinja2.ext.loopcontrols"],
loader=jinja2.FileSystemLoader(TEMPLATES_PATH),
enable_async=True,
)
async def text_to_pic(
text: str,
css_path: str = "",
width: int = 500,
type: Literal["jpeg", "png"] = "png", # noqa: A002
quality: Union[int, None] = None,
device_scale_factor: float = 2,
) -> bytes:
"""多行文本转图片
Args:
text (str): 纯文本, 可多行
css_path (str, optional): css文件
width (int, optional): 图片宽度默认为 500
type (Literal["jpeg", "png"]): 图片类型, 默认 png
quality (int, optional): 图片质量 0-100 当为`png`时无效
device_scale_factor: 缩放比例,类型为float,值越大越清晰(真正想让图片清晰更优先请调整此选项)
Returns:
bytes: 图片, 可直接发送
"""
template = env.get_template("text.html")
return await html_to_pic(
template_path=f"file://{css_path if css_path else TEMPLATES_PATH}",
html=await template.render_async(
text=text,
css=await read_file(css_path) if css_path else await read_tpl("text.css"),
),
viewport={"width": width, "height": 10},
type=type,
quality=quality,
device_scale_factor=device_scale_factor,
)
async def md_to_pic(
md: str = "",
md_path: str = "",
css_path: str = "",
width: int = 500,
type: Literal["jpeg", "png"] = "png", # noqa: A002
quality: Union[int, None] = None,
device_scale_factor: float = 2,
) -> bytes:
"""markdown 转 图片
Args:
md (str, optional): markdown 格式文本
md_path (str, optional): markdown 文件路径
css_path (str, optional): css文件路径. Defaults to None.
width (int, optional): 图片宽度默认为 500
type (Literal["jpeg", "png"]): 图片类型, 默认 png
quality (int, optional): 图片质量 0-100 当为`png`时无效
device_scale_factor: 缩放比例,类型为float,值越大越清晰(真正想让图片清晰更优先请调整此选项)
Returns:
bytes: 图片, 可直接发送
"""
template = env.get_template("markdown.html")
if not md:
if md_path:
md = await read_file(md_path)
else:
raise Exception("必须输入 md 或 md_path")
logger.debug(md)
md = markdown.markdown(
md,
extensions=[
"pymdownx.tasklist",
"tables",
"fenced_code",
"codehilite",
"mdx_math",
"pymdownx.tilde",
],
extension_configs={"mdx_math": {"enable_dollar_delimiter": True}},
)
logger.debug(md)
extra = ""
if "math/tex" in md:
katex_css = await read_tpl("katex/katex.min.b64_fonts.css")
katex_js = await read_tpl("katex/katex.min.js")
mathtex_js = await read_tpl("katex/mathtex-script-type.min.js")
extra = (
f'<style type="text/css">{katex_css}</style>'
f"<script defer>{katex_js}</script>"
f"<script defer>{mathtex_js}</script>"
)
if css_path:
css = await read_file(css_path)
else:
css = await read_tpl("github-markdown-light.css") + await read_tpl(
"pygments-default.css",
)
return await html_to_pic(
template_path=f"file://{css_path if css_path else TEMPLATES_PATH}",
html=await template.render_async(md=md, css=css, extra=extra),
viewport={"width": width, "height": 10},
type=type,
quality=quality,
device_scale_factor=device_scale_factor,
)
# async def read_md(md_path: str) -> str:
# async with aiofiles.open(str(Path(md_path).resolve()), mode="r") as f:
# md = await f.read()
# return markdown.markdown(md)
async def read_file(path: str) -> str:
async with aiofiles.open(path, mode="r") as f:
return await f.read()
async def read_tpl(path: str) -> str:
return await read_file(f"{TEMPLATES_PATH}/{path}")
async def template_to_html(
template_path: str,
template_name: str,
**kwargs,
) -> str:
"""使用jinja2模板引擎通过html生成图片
Args:
template_path (str): 模板路径
template_name (str): 模板名
**kwargs: 模板内容
Returns:
str: html
"""
template_env = jinja2.Environment( # noqa: S701
loader=jinja2.FileSystemLoader(template_path),
enable_async=True,
)
template = template_env.get_template(template_name)
return await template.render_async(**kwargs)
async def html_to_pic(
html: str,
wait: int = 0,
template_path: str = f"file://{getcwd()}", # noqa: PTH109
type: Literal["jpeg", "png"] = "png", # noqa: A002
quality: Union[int, None] = None,
device_scale_factor: float = 2,
**kwargs,
) -> bytes:
"""html转图片
Args:
html (str): html文本
wait (int, optional): 等待时间. Defaults to 0.
template_path (str, optional): 模板路径 "file:///path/to/template/"
type (Literal["jpeg", "png"]): 图片类型, 默认 png
quality (int, optional): 图片质量 0-100 当为`png`时无效
device_scale_factor: 缩放比例,类型为float,值越大越清晰(真正想让图片清晰更优先请调整此选项)
**kwargs: 传入 page 的参数
Returns:
bytes: 图片, 可直接发送
"""
# logger.debug(f"html:\n{html}")
if "file:" not in template_path:
raise Exception("template_path 应该为 file:///path/to/template")
async with get_new_page(device_scale_factor, **kwargs) as page:
await page.goto(template_path)
await page.set_content(html, wait_until="networkidle")
await page.wait_for_timeout(wait)
return await page.screenshot(
full_page=True,
type=type,
quality=quality,
)
async def template_to_pic(
template_path: str,
templates: dict,
pages: Optional[dict] = None,
wait: int = 0,
type: Literal["jpeg", "png"] = "png", # noqa: A002
quality: Union[int, None] = None,
device_scale_factor: float = 2,
) -> bytes:
"""使用jinja2模板引擎通过html生成图片
Args:
template_path (str): 模板路径
templates (dict): 模板内参数 : {"name": "abc"}
pages (dict): 网页参数 Defaults to
{"base_url": f"file://{getcwd()}", "viewport": {"width": 500, "height": 10}}
wait (int, optional): 网页载入等待时间. Defaults to 0.
type (Literal["jpeg", "png"]): 图片类型, 默认 png
quality (int, optional): 图片质量 0-100 当为`png`时无效
device_scale_factor: 缩放比例,类型为float,值越大越清晰(真正想让图片清晰更优先请调整此选项)
Returns:
bytes: 图片 可直接发送
"""
if pages is None:
pages = {
"viewport": {"width": 500, "height": 10},
"base_url": f"file://{getcwd()}", # noqa: PTH109
}
template_env = jinja2.Environment( # noqa: S701
loader=jinja2.FileSystemLoader(os.path.dirname(template_path)),
enable_async=True,
)
template = template_env.get_template(os.path.basename(template_path))
return await html_to_pic(
template_path=f"file://{template_path}",
html=await template.render_async(**templates),
wait=wait,
type=type,
quality=quality,
device_scale_factor=device_scale_factor,
**pages,
)
async def capture_element(
url: str,
element: str,
timeout: float = 0,
type: Literal["jpeg", "png"] = "png", # noqa: A002
quality: Union[int, None] = None,
**kwargs,
) -> bytes:
async with get_new_page(**kwargs) as page:
await page.goto(url, timeout=timeout)
return await page.locator(element).screenshot(
type=type,
quality=quality,
)

View File

@ -6,6 +6,7 @@ import aiofiles
from PIL import Image
import aiohttp
import nonebot
from nonebot import require
from nonebot.adapters.onebot import v11, v12
from typing import Any
@ -13,8 +14,12 @@ from . import load_from_yaml
from .liteyuki_api import liteyuki_api
from .ly_typing import T_Bot, T_MessageEvent
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import md_to_pic
config = load_from_yaml("config.yml")
can_send_markdown={} # 用于存储机器人是否支持发送markdown消息id->bool
class Markdown:
@staticmethod
@ -24,21 +29,37 @@ class Markdown:
message_type: str = None,
session_id: str | int = None,
event: T_MessageEvent = None,
retry_as_image: bool = True,
**kwargs
) -> dict[str, Any]:
) -> dict[str, Any] | None:
"""
发送Markdown消息支持自动转为图片发送
Args:
markdown:
bot:
message_type:
session_id:
event:
retry_as_image: 发送失败后是否尝试以图片形式发送否则失败返回None
**kwargs:
Returns:
"""
formatted_md = v11.unescape(markdown).replace("\n", r"\n").replace('"', r'\\\"')
if event is not None and message_type is None:
message_type = event.message_type
session_id = event.user_id if event.message_type == "private" else event.group_id
try:
# 构建Markdown消息并获取转发消息ID
forward_id = await bot.call_api(
api="send_forward_msg",
messages=[
v11.MessageSegment(
type="node",
data={
"name" : "Liteyuki.OneBot",
"uin" : bot.self_id,
"name": "Liteyuki.OneBot",
"uin": bot.self_id,
"content": [
{
"type": "markdown",
@ -51,6 +72,7 @@ class Markdown:
)
]
)
# 发送Markdown longmsg并获取相应数据
data = await bot.send_msg(
user_id=session_id,
group_id=session_id,
@ -65,31 +87,24 @@ class Markdown:
],
**kwargs
)
except Exception as e:
nonebot.logger.warning("send_markdown error, send as plain text: %s" % e.__repr__())
if isinstance(bot, v11.Bot):
except BaseException as e:
nonebot.logger.error(f"send markdown error, retry as image: {e}")
# 发送失败,渲染为图片发送
if not retry_as_image:
return None
plain_markdown = markdown.replace("🔗", "")
md_image_bytes = await md_to_pic(
md=plain_markdown,
width=540,
device_scale_factor=4
)
data = await bot.send_msg(
message_type=message_type,
message=markdown,
user_id=int(session_id),
group_id=int(session_id),
**kwargs
group_id=session_id,
user_id=session_id,
message=v11.MessageSegment.image(md_image_bytes),
)
elif isinstance(bot, v12.Bot):
data = await bot.send_message(
detail_type=message_type,
message=v12.Message(
v12.MessageSegment.text(
text=markdown
)
),
user_id=str(session_id),
group_id=str(session_id),
**kwargs
)
else:
nonebot.logger.error("send_markdown: bot type not supported")
data = {}
return data
@staticmethod
@ -114,7 +129,6 @@ class Markdown:
dict: response data
"""
print("\n\n\n发送图片\n\n\n")
if isinstance(image, str):
async with aiofiles.open(image, "rb") as f:
image = await f.read()
@ -122,7 +136,10 @@ class Markdown:
image_url = await liteyuki_api.upload_image(image)
image_size = Image.open(io.BytesIO(image)).size
image_md = Markdown.image(image_url, image_size)
return await Markdown.send_md(image_md, bot, message_type=message_type, session_id=session_id, event=event, **kwargs)
data = await Markdown.send_md(image_md, bot, message_type=message_type, session_id=session_id, event=event,
retry_as_image=False,
**kwargs)
# 2.此方案等林文轩修好后再用QQ图床再嵌入markdown发送
# image_message_id = (await bot.send_private_msg(
@ -138,6 +155,15 @@ class Markdown:
# image_size = Image.open(io.BytesIO(image)).size
# image_md = Markdown.image(image_url, image_size)
# return await Markdown.send_md(image_md, bot, message_type=message_type, session_id=session_id, event=event, **kwargs)
if data is None:
data = await bot.send_msg(
message_type=message_type,
group_id=session_id,
user_id=session_id,
message=v11.MessageSegment.image(image),
**kwargs
)
return data
@staticmethod
async def get_image_url(image: bytes | str, bot: T_Bot) -> str:

View File

@ -5,7 +5,7 @@ arclet-alconna-tools==0.7.0
colored==2.2.4
dash==2.16.1
GitPython==3.1.42
jinja2==3.0.3
jinja2==3.1.3
markdown==3.3.6
nonebot2[fastapi]==2.2.1
nonebot-adapter-onebot==2.4.3
@ -14,10 +14,10 @@ playwright==1.17.2
psutil==5.9.8
py-cpuinfo==9.0.0
pydantic==1.10.14
Pygments==2.10.0
Pygments==2.17.2
pytz==2024.1
python-markdown-math==0.8
pymdown-extensions==9.1
pymdown-extensions==10.7.1
PyYAML~=6.0.1
starlette~=0.36.3
loguru==0.7.2