mirror of
https://github.com/LiteyukiStudio/LiteyukiBot.git
synced 2024-11-26 12:55:06 +08:00
: bug: 哈哈哈哈哈哈哈,markdown又寄了
This commit is contained in:
parent
0a1d96c434
commit
b240d75552
@ -1,280 +0,0 @@
|
|||||||
import platform
|
|
||||||
|
|
||||||
import nonebot
|
|
||||||
import psutil
|
|
||||||
from cpuinfo import get_cpu_info
|
|
||||||
from nonebot import on_command, require
|
|
||||||
from nonebot.adapters.onebot.v11 import MessageSegment
|
|
||||||
from nonebot.permission import SUPERUSER
|
|
||||||
from liteyuki.utils import __NAME__, __VERSION__, load_from_yaml
|
|
||||||
from liteyuki.utils.message.html_tool import template2image
|
|
||||||
from liteyuki.utils.base.language import Language, get_all_lang, get_default_lang_code, get_user_lang
|
|
||||||
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent
|
|
||||||
from liteyuki.utils.base.resource import get_path
|
|
||||||
from liteyuki.utils.message.tools import convert_size
|
|
||||||
from PIL import Image
|
|
||||||
from io import BytesIO
|
|
||||||
|
|
||||||
require("nonebot_plugin_apscheduler")
|
|
||||||
from nonebot_plugin_apscheduler import scheduler
|
|
||||||
|
|
||||||
stats = on_command("status", aliases={"状态"}, priority=5, permission=SUPERUSER)
|
|
||||||
|
|
||||||
config = load_from_yaml("config.yml")
|
|
||||||
|
|
||||||
protocol_names = {
|
|
||||||
0: "iPad",
|
|
||||||
1: "Android Phone",
|
|
||||||
2: "Android Watch",
|
|
||||||
3: "Mac",
|
|
||||||
5: "iPad",
|
|
||||||
6: "Android Pad",
|
|
||||||
}
|
|
||||||
|
|
||||||
# 预存数据区
|
|
||||||
stats_data = {} # lang -> dict
|
|
||||||
image_data = {} # lang -> bytes
|
|
||||||
localization = {}
|
|
||||||
for lang in get_all_lang():
|
|
||||||
ulang = Language(lang)
|
|
||||||
localization[lang] = {
|
|
||||||
"cpu" : ulang.get("main.monitor.cpu"),
|
|
||||||
"mem" : ulang.get("main.monitor.memory"),
|
|
||||||
"swap" : ulang.get("main.monitor.swap"),
|
|
||||||
"disk" : ulang.get("main.monitor.disk"),
|
|
||||||
"used" : ulang.get("main.monitor.used"),
|
|
||||||
"free" : ulang.get("main.monitor.free"),
|
|
||||||
"total": ulang.get("main.monitor.total"),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@scheduler.scheduled_job("cron", minute="*/20")
|
|
||||||
async def _():
|
|
||||||
nonebot.logger.info("数据已刷新")
|
|
||||||
for lang_code in get_all_lang():
|
|
||||||
stats_data[lang_code] = await get_stats_data(lang=lang_code)
|
|
||||||
image_data[lang_code] = await template2image(
|
|
||||||
get_path("templates/stats.html", abs_path=True),
|
|
||||||
{
|
|
||||||
"data": stats_data[lang_code]
|
|
||||||
},
|
|
||||||
wait=1
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@stats.handle()
|
|
||||||
async def _(bot: T_Bot, event: T_MessageEvent):
|
|
||||||
global stats_data
|
|
||||||
ulang = get_user_lang(str(event.user_id))
|
|
||||||
|
|
||||||
if ulang.lang_code not in image_data:
|
|
||||||
stats_data[ulang.lang_code] = await get_stats_data(lang=ulang.lang_code)
|
|
||||||
image_data[ulang.lang_code] = await template2image(
|
|
||||||
get_path("templates/stats.html", abs_path=True),
|
|
||||||
{
|
|
||||||
"data": stats_data[ulang.lang_code]
|
|
||||||
},
|
|
||||||
wait=1
|
|
||||||
)
|
|
||||||
image = await png_to_jpg(image_data[ulang.lang_code])
|
|
||||||
await stats.finish(MessageSegment.image(image))
|
|
||||||
|
|
||||||
|
|
||||||
async def png_to_jpg(image):
|
|
||||||
image_stream = BytesIO(image)
|
|
||||||
img = Image.open(image_stream)
|
|
||||||
rgb_img = img.convert('RGB')
|
|
||||||
output_stream = BytesIO()
|
|
||||||
rgb_img.save(output_stream, format='JPEG')
|
|
||||||
jpg_bytes = output_stream.getvalue()
|
|
||||||
return jpg_bytes
|
|
||||||
|
|
||||||
|
|
||||||
async def get_bots_data(ulang: Language, self_id) -> list:
|
|
||||||
bots_data = []
|
|
||||||
for bot_id, bot in nonebot.get_bots().items():
|
|
||||||
groups = 0
|
|
||||||
friends = 0
|
|
||||||
status = {}
|
|
||||||
bot_name = bot_id
|
|
||||||
version_info = {}
|
|
||||||
try:
|
|
||||||
bot_name = (await bot.get_login_info())["nickname"]
|
|
||||||
groups = len(await bot.get_group_list())
|
|
||||||
friends = len(await bot.get_friend_list())
|
|
||||||
status = await bot.get_status()
|
|
||||||
version_info = await bot.get_version_info()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
statistics = status.get("stat", {})
|
|
||||||
app_name = version_info.get("app_name", "UnknownImplementation")
|
|
||||||
if app_name in ["Lagrange.OneBot", "LLOneBot", "Shamrock"]:
|
|
||||||
icon = f"https://q.qlogo.cn/g?b=qq&nk={bot_id}&s=640"
|
|
||||||
else:
|
|
||||||
icon = "./img/liteyuki.png"
|
|
||||||
bot_data = {
|
|
||||||
"name": bot_name,
|
|
||||||
"icon": icon,
|
|
||||||
"id" : bot_id,
|
|
||||||
"tags": [
|
|
||||||
protocol_names.get(version_info.get("protocol_name"), "Online"),
|
|
||||||
f"{ulang.get('liteyuki.stats.groups')} {groups}",
|
|
||||||
f"{ulang.get('liteyuki.stats.friends')} {friends}",
|
|
||||||
f"{ulang.get('liteyuki.stats.sent')} {statistics.get('message_sent', 0)}",
|
|
||||||
f"{ulang.get('liteyuki.stats.received')} {statistics.get('message_received', 0)}",
|
|
||||||
app_name,
|
|
||||||
|
|
||||||
],
|
|
||||||
"self": bot_id == self_id, # 判断是否是自己
|
|
||||||
}
|
|
||||||
bots_data.append(bot_data)
|
|
||||||
bots_data.append(
|
|
||||||
{
|
|
||||||
"name": "Liteyuki",
|
|
||||||
"icon": "./img/liteyuki.png",
|
|
||||||
"id" : "liteyuki",
|
|
||||||
"tags": [
|
|
||||||
f"{__NAME__} {__VERSION__}",
|
|
||||||
f"{ulang.get('liteyuki.stats.plugins')} {len(nonebot.get_loaded_plugins())}",
|
|
||||||
f"Nonebot {nonebot.__version__}",
|
|
||||||
f"{platform.python_implementation()} {platform.python_version()}",
|
|
||||||
f"{platform.system()} {platform.release()}",
|
|
||||||
]
|
|
||||||
}
|
|
||||||
)
|
|
||||||
return bots_data
|
|
||||||
|
|
||||||
|
|
||||||
async def get_stats_data(self_id: str = None, lang: str = None) -> dict:
|
|
||||||
if self_id is None:
|
|
||||||
self_id = list(nonebot.get_bots().keys())[0] if len(nonebot.get_bots()) > 0 else "liteyuki"
|
|
||||||
if lang is None:
|
|
||||||
ulang = Language(get_default_lang_code())
|
|
||||||
else:
|
|
||||||
ulang = Language(lang)
|
|
||||||
|
|
||||||
fake_device_info: dict = config.get("fake_device_info", {})
|
|
||||||
|
|
||||||
mem_info = psutil.virtual_memory()
|
|
||||||
mem_total = fake_device_info.get('mem', {}).get('total', mem_info.total)
|
|
||||||
|
|
||||||
convert_size(mem_total, 1, False)
|
|
||||||
mem_total_show = convert_size(mem_total, 1) # 格式化带单位
|
|
||||||
|
|
||||||
# 获取当前进程包括所有子进程占用
|
|
||||||
all_processes = psutil.Process().children(recursive=True)
|
|
||||||
all_processes.append(psutil.Process())
|
|
||||||
|
|
||||||
mem_used_bot = 0
|
|
||||||
process_mem = {}
|
|
||||||
for process in all_processes:
|
|
||||||
try:
|
|
||||||
ps_name = process.name().replace(".exe", "")
|
|
||||||
if ps_name not in process_mem:
|
|
||||||
process_mem[ps_name] = 0
|
|
||||||
process_mem[ps_name] += process.memory_info().rss
|
|
||||||
mem_used_bot += process.memory_info().rss
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
mem_used_bot_show = convert_size(mem_used_bot, 1)
|
|
||||||
mem_used_other = mem_info.used - mem_used_bot
|
|
||||||
mem_free = mem_total - mem_info.used
|
|
||||||
mem_used_show = convert_size(mem_used_bot + mem_used_other, 1) # 计算已用格式化带单位
|
|
||||||
|
|
||||||
swap_info = psutil.swap_memory()
|
|
||||||
|
|
||||||
disk_data = []
|
|
||||||
for disk in psutil.disk_partitions(all=True):
|
|
||||||
try:
|
|
||||||
disk_usage = psutil.disk_usage(disk.mountpoint)
|
|
||||||
disk_total_show = convert_size(disk_usage.total, 1)
|
|
||||||
disk_free_show = convert_size(disk_usage.free, 1)
|
|
||||||
if disk_usage.total > 0:
|
|
||||||
disk_data.append(
|
|
||||||
{
|
|
||||||
"name" : disk.device,
|
|
||||||
"total" : disk_total_show,
|
|
||||||
"free" : disk_free_show,
|
|
||||||
"percent" : disk_usage.percent,
|
|
||||||
"usedValue" : disk_usage.used,
|
|
||||||
"freeValue" : disk_usage.free,
|
|
||||||
"totalValue": disk_usage.total,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
cpu_info = get_cpu_info()
|
|
||||||
if "AMD" in cpu_info.get("brand_raw", ""):
|
|
||||||
brand = "AMD"
|
|
||||||
elif "Intel" in cpu_info.get("brand_raw", ""):
|
|
||||||
brand = "Intel"
|
|
||||||
else:
|
|
||||||
brand = "Unknown"
|
|
||||||
|
|
||||||
if fake_device_info.get("cpu", {}).get("brand"):
|
|
||||||
brand = fake_device_info.get("cpu", {}).get("brand")
|
|
||||||
|
|
||||||
cpu_info = get_cpu_info()
|
|
||||||
templ = {
|
|
||||||
"plugin" : len(nonebot.get_loaded_plugins()),
|
|
||||||
"version" : __VERSION__,
|
|
||||||
"system" : platform.platform(),
|
|
||||||
"cpu" : [
|
|
||||||
{
|
|
||||||
"name" : "USED",
|
|
||||||
"value": psutil.cpu_percent()
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name" : "FREE",
|
|
||||||
"value": 100 - psutil.cpu_percent()
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"mem" : [
|
|
||||||
|
|
||||||
{
|
|
||||||
"name" : "OTHER",
|
|
||||||
"value": mem_used_other
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name" : "FREE",
|
|
||||||
"value": mem_free
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name" : "BOT",
|
|
||||||
"value": mem_used_bot
|
|
||||||
},
|
|
||||||
],
|
|
||||||
"swap" : [
|
|
||||||
{
|
|
||||||
"name" : "USED",
|
|
||||||
"value": psutil.swap_memory().used
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"name" : "FREE",
|
|
||||||
"value": psutil.swap_memory().free
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"disk" : disk_data, # list[{"name":"C", "total":100, "used":50, "free":50}]
|
|
||||||
"bot" : await get_bots_data(ulang, self_id),
|
|
||||||
"cpuTags" : [
|
|
||||||
f"{brand} {cpu_info.get('arch', 'Unknown')}",
|
|
||||||
f"{fake_device_info.get('cpu', {}).get('cores', psutil.cpu_count(logical=False))}C "
|
|
||||||
f"{fake_device_info.get('cpu', {}).get('logical_cores', psutil.cpu_count(logical=True))}T",
|
|
||||||
f"{'%.2f' % (fake_device_info.get('cpu', {}).get('frequency', psutil.cpu_freq().current) / 1000)}GHz"
|
|
||||||
],
|
|
||||||
"memTags" : [
|
|
||||||
f"Bot {mem_used_bot_show}",
|
|
||||||
f"{ulang.get('main.monitor.used')} {mem_used_show}",
|
|
||||||
f"{ulang.get('main.monitor.free')} {convert_size(mem_free, 1)}",
|
|
||||||
f"{ulang.get('main.monitor.total')} {mem_total_show}",
|
|
||||||
],
|
|
||||||
"swapTags" : [
|
|
||||||
f"{ulang.get('main.monitor.used')} {convert_size(swap_info.used, 1)}",
|
|
||||||
f"{ulang.get('main.monitor.free')} {convert_size(swap_info.free, 1)}",
|
|
||||||
f"{ulang.get('main.monitor.total')} {convert_size(swap_info.total, 1)}",
|
|
||||||
],
|
|
||||||
"localization": localization[ulang.lang_code]
|
|
||||||
}
|
|
||||||
return templ
|
|
@ -60,7 +60,7 @@ function createPieChartOption(title, data) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
function convertSize(size, precision = 2, addUnit = true, suffix = " XiB"): string {
|
function convertSize(size, precision = 2, addUnit = true, suffix = " XiB") {
|
||||||
let isNegative = size < 0;
|
let isNegative = size < 0;
|
||||||
size = Math.abs(size);
|
size = Math.abs(size);
|
||||||
let units = ["", "K", "M", "G", "T", "P", "E", "Z"];
|
let units = ["", "K", "M", "G", "T", "P", "E", "Z"];
|
||||||
|
@ -22,9 +22,12 @@ from nonebot_plugin_htmlrender import md_to_pic
|
|||||||
config = load_from_yaml("config.yml")
|
config = load_from_yaml("config.yml")
|
||||||
|
|
||||||
can_send_markdown = {} # 用于存储机器人是否支持发送markdown消息,id->bool
|
can_send_markdown = {} # 用于存储机器人是否支持发送markdown消息,id->bool
|
||||||
|
|
||||||
|
|
||||||
class TencentBannedMarkdownError(BaseException):
|
class TencentBannedMarkdownError(BaseException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
async def broadcast_to_superusers(message: str | T_Message, markdown: bool = False):
|
async def broadcast_to_superusers(message: str | T_Message, markdown: bool = False):
|
||||||
"""广播消息给超级用户"""
|
"""广播消息给超级用户"""
|
||||||
for bot in nonebot.get_bots().values():
|
for bot in nonebot.get_bots().values():
|
||||||
@ -65,16 +68,38 @@ class MarkdownMessage:
|
|||||||
message_type = event.message_type
|
message_type = event.message_type
|
||||||
session_id = event.user_id if event.message_type == "private" else event.group_id
|
session_id = event.user_id if event.message_type == "private" else event.group_id
|
||||||
try:
|
try:
|
||||||
# raise TencentBannedMarkdownError("Tencent banned markdown")
|
raise TencentBannedMarkdownError("Tencent banned markdown")
|
||||||
|
forward_id = await bot.call_api(
|
||||||
|
"send_private_forward_msg",
|
||||||
|
messages=[
|
||||||
|
{
|
||||||
|
"type": "node",
|
||||||
|
"data": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"content": "{\"content\":\"%s\"}" % formatted_md,
|
||||||
|
},
|
||||||
|
"type": "markdown"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"name" : "[]",
|
||||||
|
"uin" : bot.self_id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
user_id=bot.self_id
|
||||||
|
|
||||||
|
)
|
||||||
data = await bot.send_msg(
|
data = await bot.send_msg(
|
||||||
user_id=session_id,
|
user_id=session_id,
|
||||||
group_id=session_id,
|
group_id=session_id,
|
||||||
message_type=message_type,
|
message_type=message_type,
|
||||||
message=[
|
message=[
|
||||||
{
|
{
|
||||||
"type": "markdown",
|
"type": "longmsg",
|
||||||
"data": {
|
"data": {
|
||||||
"content": "{\"content\":\"%s\"}" % formatted_md,
|
"id": forward_id
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
|
Loading…
Reference in New Issue
Block a user