feat: 配置项目的热修改

This commit is contained in:
远野千束 2024-03-30 06:04:17 +08:00
parent 392376248d
commit c15c604752
11 changed files with 536 additions and 112 deletions

View File

@ -10,17 +10,7 @@
- 全新可视化`npm`包管理,支持一键安装插件
- 支持OneBotv11/12标准通信且使用`Alconna`命令解析不再局限于OneBot
### [文档](https://bot.liteyuki.icu)
### [使用文档](https://bot.liteyuki.icu)
## 4.用户协议
1. 本项目遵循`MIT`协议,你可以自由使用,修改,分发,但是请保留原作者信息
2. 你可以选择开启`auto_report`(默认开启)
轻雪会收集运行环境的设备信息通过安全的方式传输到轻雪服务器用于统计运行时的设备信息帮助我们改进轻雪收集的数据包括但不限于CPU内存插件信息异常信息会话负载(不含隐私部分)
3. 本项目不会收集用户的任何隐私信息,但请注意甄别第三方插件的安全性
## 5.鸣谢
#### 鸣谢
- 此项目使用了[nonebot-plugin-htmlrender](https://github.com/kexue-z/nonebot-plugin-htmlrender/tree/master)作为内置html渲染插件

View File

@ -1,24 +1,23 @@
import json
from typing import Any
import aiofiles
import yaml
from nonebot import require
from nonebot.permission import SUPERUSER
import nonebot
from git import Repo
from nonebot import require, get_driver
from nonebot.permission import SUPERUSER
from liteyuki.utils.config import config, load_from_yaml
from liteyuki.utils.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.language import get_user_lang
from liteyuki.utils.message import Markdown as md, send_markdown
from .reloader import Reloader
from liteyuki.utils.data_manager import StoredConfig, common_db
from liteyuki.utils.language import get_user_lang
from liteyuki.utils.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message import Markdown as md, send_markdown
from .reloader import Reloader
from liteyuki.utils.htmlrender import launch_browser, stop_browser
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import on_alconna, Alconna, Args, Subcommand, Arparma
driver = get_driver()
cmd_liteyuki = on_alconna(
Alconna(
"liteyuki"
@ -121,3 +120,15 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
reply += f"\n{k}={v}"
reply += "\n```"
await send_markdown(reply, bot, event=event)
@driver.on_startup
async def on_startup():
await launch_browser()
nonebot.logger.info("Browser Started.")
@driver.on_shutdown
async def on_shutdown():
await stop_browser()
nonebot.logger.info("Browser Stopped.")

View File

@ -1,91 +1,15 @@
import nonebot
import psutil
from dash import Dash, Input, Output, dcc, html
from starlette.middleware.wsgi import WSGIMiddleware
from nonebot.adapters.onebot.v11 import MessageSegment
from nonebot.permission import SUPERUSER
from liteyuki.utils.htmlrender import render_html
from liteyuki.utils.language import Language
from liteyuki.utils.tools import convert_size
from liteyuki.utils.resource import get
from nonebot import on_command
app = nonebot.get_app()
stats = on_command("stats", priority=5, permission=SUPERUSER)
def get_system_info():
cpu_percent = psutil.cpu_percent(interval=0.1)
memory_info = psutil.virtual_memory()
memory_percent = memory_info.percent
return {
"cpu_percent" : cpu_percent,
"memory_percent": memory_percent
}
@app.get("/system_info")
async def system_info():
return get_system_info()
lang = Language()
dash_app = Dash(__name__)
dash_app.layout = dash_app.layout = html.Div(children=[
html.H1(children=lang.get("main.monitor.title"), style={
"textAlign": "center"
}),
dcc.Graph(id="live-update-graph"),
dcc.Interval(
id="interval-component",
interval=1 * 1000, # in milliseconds
n_intervals=0
)
])
@dash_app.callback(Output("live-update-graph", "figure"),
[Input("interval-component", "n_intervals")])
def update_graph_live(n):
lang = Language()
system_inf = get_system_info()
dash_app.layout = html.Div(children=[
html.H1(children=lang.get("main.monitor.title"), style={
"textAlign": "center"
}),
dcc.Graph(id="live-update-graph"),
dcc.Interval(
id="interval-component",
interval=2 * 1000, # in milliseconds
n_intervals=0
)
])
mem = psutil.virtual_memory()
cpu_f = psutil.cpu_freq()
figure = {
"data" : [
{
"x" : [f"{cpu_f.current / 1000:.2f}GHz {psutil.cpu_count(logical=False)}c{psutil.cpu_count()}t"],
"y" : [system_inf["cpu_percent"]],
"type": "bar",
"name": f"{lang.get('main.monitor.cpu')} {lang.get('main.monitor.usage')}"
},
{
"x" : [f"{convert_size(mem.used, add_unit=False)}/{convert_size(mem.total)}({mem.used / mem.total * 100:.2f}%)"],
"y" : [system_inf["memory_percent"]],
"type": "bar",
"name": f"{lang.get('main.monitor.memory')} {lang.get('main.monitor.usage')}"
},
],
"layout": {
"title": lang.get("main.monitor.description"),
# "xaxis": {
# "range": [0, 10]
# }, # 设置x轴的范围
"yaxis": {
"range": [0, 100]
}, # 设置y轴的范围
}
}
return figure
app.mount("/", WSGIMiddleware(dash_app.server))
@stats.handle()
async def _():
html = get("templates/stats.html")
html_bytes = await render_html(open(html, "r", encoding="utf-8").read())
await stats.finish(MessageSegment.image(html_bytes))

View File

@ -0,0 +1,12 @@
<!DOCTYPE html>
<head>
<meta charset="UTF-8">
<title></title>
</head>
<body>
<div>
<!-- 横向放置三个饼图分别表示CPU/内存/SWAP占用-->
</div>
</body>

View File

@ -0,0 +1,29 @@
{
"type": "canvas",
"children": [
{
"type": "rect",
"x": 0,
"y": 0,
"width": 100,
"height": 100,
"fill": "red"
},
{
"type": "rect",
"x": 100,
"y": 100,
"width": 100,
"height": 100,
"fill": "green"
},
{
"type": "rect",
"x": 200,
"y": 200,
"width": 100,
"height": 100,
"fill": "blue"
}
]
}

View File

@ -0,0 +1 @@
from PIL import Image, ImageDraw, ImageFont

View File

@ -0,0 +1,60 @@
import nonebot
from nonebot.log import logger
from nonebot.plugin import PluginMetadata
from .browser import (
get_browser as get_browser,
get_new_page as get_new_page,
shutdown_browser as shutdown_browser,
)
from .data_source import (
capture_element as capture_element,
html_to_pic as html_to_pic,
md_to_pic as md_to_pic,
template_to_html as template_to_html,
template_to_pic as template_to_pic,
text_to_pic as text_to_pic,
)
__plugin_meta__ = PluginMetadata(
name="nonebot-plugin-htmlrender",
description="通过浏览器渲染图片",
usage="提供多个易用API md_to_pic html_to_pic text_to_pic template_to_pic capture_element 等",
type="library",
homepage="https://github.com/kexue-z/nonebot-plugin-htmlrender",
extra={},
)
driver = nonebot.get_driver()
@driver.on_startup
async def init(**kwargs):
"""Start Browser
Returns:
Browser: Browser
"""
browser = await get_browser(**kwargs)
logger.info("Browser Started.")
return browser
@driver.on_shutdown
async def shutdown():
await shutdown_browser()
logger.info("Browser Stopped.")
browser_init = init
__all__ = [
"browser_init",
"capture_element",
"get_new_page",
"html_to_pic",
"md_to_pic",
"template_to_html",
"template_to_pic",
"text_to_pic",
]

View File

@ -0,0 +1,117 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Author : yanyongyu
@Date : 2021-03-12 13:42:43
@LastEditors : yanyongyu
@LastEditTime : 2021-11-01 14:05:41
@Description : None
@GitHub : https://github.com/yanyongyu
"""
__author__ = "yanyongyu"
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional
from nonebot import get_plugin_config
from nonebot.log import logger
from playwright.async_api import Browser, Error, Page, Playwright, async_playwright
from .config import Config
import asyncio
config = get_plugin_config(Config)
_browser: Optional[Browser] = None
_playwright: Optional[Playwright] = None
async def init(**kwargs) -> Browser:
global _browser
global _playwright
_playwright = await async_playwright().start()
try:
_browser = await launch_browser(**kwargs)
except Error:
await install_browser()
_browser = await launch_browser(**kwargs)
return _browser
async def launch_browser(**kwargs) -> Browser:
assert _playwright is not None, "Playwright 没有安装"
if config.htmlrender_browser_channel:
kwargs["channel"] = config.htmlrender_browser_channel
if config.htmlrender_proxy_host:
kwargs["proxy"] = {
"server": config.htmlrender_proxy_host,
}
if config.htmlrender_browser == "firefox":
logger.info("使用 firefox 启动")
return await _playwright.firefox.launch(**kwargs)
# 默认使用 chromium
logger.info("使用 chromium 启动")
return await _playwright.chromium.launch(**kwargs)
async def get_browser(**kwargs) -> Browser:
return _browser if _browser and _browser.is_connected() else await init(**kwargs)
@asynccontextmanager
async def get_new_page(device_scale_factor: float = 2, **kwargs) -> AsyncIterator[Page]:
browser = await get_browser()
page = await browser.new_page(device_scale_factor=device_scale_factor, **kwargs)
try:
yield page
finally:
await page.close()
async def shutdown_browser():
global _browser
global _playwright
if _browser:
if _browser.is_connected():
await _browser.close()
_browser = None
if _playwright:
# await _playwright.stop()
_playwright = None
async def install_browser():
import os
import sys
from playwright.__main__ import main
if host := config.htmlrender_download_host:
logger.info("使用配置源进行下载")
os.environ["PLAYWRIGHT_DOWNLOAD_HOST"] = host
else:
logger.info("使用镜像源进行下载")
os.environ["PLAYWRIGHT_DOWNLOAD_HOST"] = (
"https://npmmirror.com/mirrors/playwright/"
)
success = False
if config.htmlrender_browser == "firefox":
logger.info("正在安装 firefox")
sys.argv = ["", "install", "firefox"]
else:
# 默认使用 chromium
logger.info("正在安装 chromium")
sys.argv = ["", "install", "chromium"]
try:
logger.info("正在安装依赖")
os.system("playwright install-deps") # noqa: ASYNC102, S605, S607
main()
except SystemExit as e:
if e.code == 0:
success = True
if not success:
logger.error("浏览器更新失败, 请检查网络连通性")

View File

@ -0,0 +1,10 @@
from typing import Optional
from pydantic import BaseModel, Field
class Config(BaseModel):
htmlrender_browser: Optional[str] = Field(default="chromium")
htmlrender_download_host: Optional[str] = Field(default=None)
htmlrender_proxy_host: Optional[str] = Field(default=None)
htmlrender_browser_channel: Optional[str] = Field(default=None)

View File

@ -0,0 +1,266 @@
from os import getcwd
from pathlib import Path
from typing import Literal, Optional, Union
import aiofiles
import jinja2
import markdown
from nonebot.log import logger
from .browser import get_new_page
TEMPLATES_PATH = str(Path(__file__).parent / "templates")
env = jinja2.Environment( # noqa: S701
extensions=["jinja2.ext.loopcontrols"],
loader=jinja2.FileSystemLoader(TEMPLATES_PATH),
enable_async=True,
)
async def text_to_pic(
text: str,
css_path: str = "",
width: int = 500,
type: Literal["jpeg", "png"] = "png", # noqa: A002
quality: Union[int, None] = None,
device_scale_factor: float = 2,
) -> bytes:
"""多行文本转图片
Args:
text (str): 纯文本, 可多行
css_path (str, optional): css文件
width (int, optional): 图片宽度默认为 500
type (Literal["jpeg", "png"]): 图片类型, 默认 png
quality (int, optional): 图片质量 0-100 当为`png`时无效
device_scale_factor: 缩放比例,类型为float,值越大越清晰(真正想让图片清晰更优先请调整此选项)
Returns:
bytes: 图片, 可直接发送
"""
template = env.get_template("text.html")
return await html_to_pic(
template_path=f"file://{css_path if css_path else TEMPLATES_PATH}",
html=await template.render_async(
text=text,
css=await read_file(css_path) if css_path else await read_tpl("text.css"),
),
viewport={"width": width, "height": 10},
type=type,
quality=quality,
device_scale_factor=device_scale_factor,
)
async def md_to_pic(
md: str = "",
md_path: str = "",
css_path: str = "",
width: int = 500,
type: Literal["jpeg", "png"] = "png", # noqa: A002
quality: Union[int, None] = None,
device_scale_factor: float = 2,
) -> bytes:
"""markdown 转 图片
Args:
md (str, optional): markdown 格式文本
md_path (str, optional): markdown 文件路径
css_path (str, optional): css文件路径. Defaults to None.
width (int, optional): 图片宽度默认为 500
type (Literal["jpeg", "png"]): 图片类型, 默认 png
quality (int, optional): 图片质量 0-100 当为`png`时无效
device_scale_factor: 缩放比例,类型为float,值越大越清晰(真正想让图片清晰更优先请调整此选项)
Returns:
bytes: 图片, 可直接发送
"""
template = env.get_template("markdown.html")
if not md:
if md_path:
md = await read_file(md_path)
else:
raise Exception("必须输入 md 或 md_path")
logger.debug(md)
md = markdown.markdown(
md,
extensions=[
"pymdownx.tasklist",
"tables",
"fenced_code",
"codehilite",
"mdx_math",
"pymdownx.tilde",
],
extension_configs={"mdx_math": {"enable_dollar_delimiter": True}},
)
logger.debug(md)
extra = ""
if "math/tex" in md:
katex_css = await read_tpl("katex/katex.min.b64_fonts.css")
katex_js = await read_tpl("katex/katex.min.js")
mathtex_js = await read_tpl("katex/mathtex-script-type.min.js")
extra = (
f'<style type="text/css">{katex_css}</style>'
f"<script defer>{katex_js}</script>"
f"<script defer>{mathtex_js}</script>"
)
if css_path:
css = await read_file(css_path)
else:
css = await read_tpl("github-markdown-light.css") + await read_tpl(
"pygments-default.css",
)
return await html_to_pic(
template_path=f"file://{css_path if css_path else TEMPLATES_PATH}",
html=await template.render_async(md=md, css=css, extra=extra),
viewport={"width": width, "height": 10},
type=type,
quality=quality,
device_scale_factor=device_scale_factor,
)
# async def read_md(md_path: str) -> str:
# async with aiofiles.open(str(Path(md_path).resolve()), mode="r") as f:
# md = await f.read()
# return markdown.markdown(md)
async def read_file(path: str) -> str:
async with aiofiles.open(path, mode="r") as f:
return await f.read()
async def read_tpl(path: str) -> str:
return await read_file(f"{TEMPLATES_PATH}/{path}")
async def template_to_html(
template_path: str,
template_name: str,
**kwargs,
) -> str:
"""使用jinja2模板引擎通过html生成图片
Args:
template_path (str): 模板路径
template_name (str): 模板名
**kwargs: 模板内容
Returns:
str: html
"""
template_env = jinja2.Environment( # noqa: S701
loader=jinja2.FileSystemLoader(template_path),
enable_async=True,
)
template = template_env.get_template(template_name)
return await template.render_async(**kwargs)
async def html_to_pic(
html: str,
wait: int = 0,
template_path: str = f"file://{getcwd()}", # noqa: PTH109
type: Literal["jpeg", "png"] = "png", # noqa: A002
quality: Union[int, None] = None,
device_scale_factor: float = 2,
**kwargs,
) -> bytes:
"""html转图片
Args:
html (str): html文本
wait (int, optional): 等待时间. Defaults to 0.
template_path (str, optional): 模板路径 "file:///path/to/template/"
type (Literal["jpeg", "png"]): 图片类型, 默认 png
quality (int, optional): 图片质量 0-100 当为`png`时无效
device_scale_factor: 缩放比例,类型为float,值越大越清晰(真正想让图片清晰更优先请调整此选项)
**kwargs: 传入 page 的参数
Returns:
bytes: 图片, 可直接发送
"""
# logger.debug(f"html:\n{html}")
if "file:" not in template_path:
raise Exception("template_path 应该为 file:///path/to/template")
async with get_new_page(device_scale_factor, **kwargs) as page:
await page.goto(template_path)
await page.set_content(html, wait_until="networkidle")
await page.wait_for_timeout(wait)
return await page.screenshot(
full_page=True,
type=type,
quality=quality,
)
async def template_to_pic(
template_path: str,
template_name: str,
templates: dict,
pages: Optional[dict] = None,
wait: int = 0,
type: Literal["jpeg", "png"] = "png", # noqa: A002
quality: Union[int, None] = None,
device_scale_factor: float = 2,
) -> bytes:
"""使用jinja2模板引擎通过html生成图片
Args:
template_path (str): 模板路径
template_name (str): 模板名
templates (dict): 模板内参数 : {"name": "abc"}
pages (dict): 网页参数 Defaults to
{"base_url": f"file://{getcwd()}", "viewport": {"width": 500, "height": 10}}
wait (int, optional): 网页载入等待时间. Defaults to 0.
type (Literal["jpeg", "png"]): 图片类型, 默认 png
quality (int, optional): 图片质量 0-100 当为`png`时无效
device_scale_factor: 缩放比例,类型为float,值越大越清晰(真正想让图片清晰更优先请调整此选项)
Returns:
bytes: 图片 可直接发送
"""
if pages is None:
pages = {
"viewport": {"width": 500, "height": 10},
"base_url": f"file://{getcwd()}", # noqa: PTH109
}
template_env = jinja2.Environment( # noqa: S701
loader=jinja2.FileSystemLoader(template_path),
enable_async=True,
)
template = template_env.get_template(template_name)
return await html_to_pic(
template_path=f"file://{template_path}",
html=await template.render_async(**templates),
wait=wait,
type=type,
quality=quality,
device_scale_factor=device_scale_factor,
**pages,
)
async def capture_element(
url: str,
element: str,
timeout: float = 0,
type: Literal["jpeg", "png"] = "png", # noqa: A002
quality: Union[int, None] = None,
**kwargs,
) -> bytes:
async with get_new_page(**kwargs) as page:
await page.goto(url, timeout=timeout)
return await page.locator(element).screenshot(
type=type,
quality=quality,
)

View File

@ -15,4 +15,8 @@ PyYAML~=6.0.1
starlette~=0.36.3
loguru==0.7.2
importlib_metadata==7.0.2
requests==2.31.0
requests==2.31.0
pillow==10.2.0
pyppeteer==2.0.0
pip==24.0
weasyprint==61.2