diff --git a/liteyuki/liteyuki_main/runtime.py b/liteyuki/liteyuki_main/runtime.py deleted file mode 100644 index 0eb64cf..0000000 --- a/liteyuki/liteyuki_main/runtime.py +++ /dev/null @@ -1,280 +0,0 @@ -import platform - -import nonebot -import psutil -from cpuinfo import get_cpu_info -from nonebot import on_command, require -from nonebot.adapters.onebot.v11 import MessageSegment -from nonebot.permission import SUPERUSER -from liteyuki.utils import __NAME__, __VERSION__, load_from_yaml -from liteyuki.utils.message.html_tool import template2image -from liteyuki.utils.base.language import Language, get_all_lang, get_default_lang_code, get_user_lang -from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent -from liteyuki.utils.base.resource import get_path -from liteyuki.utils.message.tools import convert_size -from PIL import Image -from io import BytesIO - -require("nonebot_plugin_apscheduler") -from nonebot_plugin_apscheduler import scheduler - -stats = on_command("status", aliases={"状态"}, priority=5, permission=SUPERUSER) - -config = load_from_yaml("config.yml") - -protocol_names = { - 0: "iPad", - 1: "Android Phone", - 2: "Android Watch", - 3: "Mac", - 5: "iPad", - 6: "Android Pad", -} - -# 预存数据区 -stats_data = {} # lang -> dict -image_data = {} # lang -> bytes -localization = {} -for lang in get_all_lang(): - ulang = Language(lang) - localization[lang] = { - "cpu" : ulang.get("main.monitor.cpu"), - "mem" : ulang.get("main.monitor.memory"), - "swap" : ulang.get("main.monitor.swap"), - "disk" : ulang.get("main.monitor.disk"), - "used" : ulang.get("main.monitor.used"), - "free" : ulang.get("main.monitor.free"), - "total": ulang.get("main.monitor.total"), - } - - -@scheduler.scheduled_job("cron", minute="*/20") -async def _(): - nonebot.logger.info("数据已刷新") - for lang_code in get_all_lang(): - stats_data[lang_code] = await get_stats_data(lang=lang_code) - image_data[lang_code] = await template2image( - get_path("templates/stats.html", abs_path=True), - { - "data": stats_data[lang_code] - }, - wait=1 - ) - - -@stats.handle() -async def _(bot: T_Bot, event: T_MessageEvent): - global stats_data - ulang = get_user_lang(str(event.user_id)) - - if ulang.lang_code not in image_data: - stats_data[ulang.lang_code] = await get_stats_data(lang=ulang.lang_code) - image_data[ulang.lang_code] = await template2image( - get_path("templates/stats.html", abs_path=True), - { - "data": stats_data[ulang.lang_code] - }, - wait=1 - ) - image = await png_to_jpg(image_data[ulang.lang_code]) - await stats.finish(MessageSegment.image(image)) - - -async def png_to_jpg(image): - image_stream = BytesIO(image) - img = Image.open(image_stream) - rgb_img = img.convert('RGB') - output_stream = BytesIO() - rgb_img.save(output_stream, format='JPEG') - jpg_bytes = output_stream.getvalue() - return jpg_bytes - - -async def get_bots_data(ulang: Language, self_id) -> list: - bots_data = [] - for bot_id, bot in nonebot.get_bots().items(): - groups = 0 - friends = 0 - status = {} - bot_name = bot_id - version_info = {} - try: - bot_name = (await bot.get_login_info())["nickname"] - groups = len(await bot.get_group_list()) - friends = len(await bot.get_friend_list()) - status = await bot.get_status() - version_info = await bot.get_version_info() - except Exception: - pass - - statistics = status.get("stat", {}) - app_name = version_info.get("app_name", "UnknownImplementation") - if app_name in ["Lagrange.OneBot", "LLOneBot", "Shamrock"]: - icon = f"https://q.qlogo.cn/g?b=qq&nk={bot_id}&s=640" - else: - icon = "./img/liteyuki.png" - bot_data = { - "name": bot_name, - "icon": icon, - "id" : bot_id, - "tags": [ - protocol_names.get(version_info.get("protocol_name"), "Online"), - f"{ulang.get('liteyuki.stats.groups')} {groups}", - f"{ulang.get('liteyuki.stats.friends')} {friends}", - f"{ulang.get('liteyuki.stats.sent')} {statistics.get('message_sent', 0)}", - f"{ulang.get('liteyuki.stats.received')} {statistics.get('message_received', 0)}", - app_name, - - ], - "self": bot_id == self_id, # 判断是否是自己 - } - bots_data.append(bot_data) - bots_data.append( - { - "name": "Liteyuki", - "icon": "./img/liteyuki.png", - "id" : "liteyuki", - "tags": [ - f"{__NAME__} {__VERSION__}", - f"{ulang.get('liteyuki.stats.plugins')} {len(nonebot.get_loaded_plugins())}", - f"Nonebot {nonebot.__version__}", - f"{platform.python_implementation()} {platform.python_version()}", - f"{platform.system()} {platform.release()}", - ] - } - ) - return bots_data - - -async def get_stats_data(self_id: str = None, lang: str = None) -> dict: - if self_id is None: - self_id = list(nonebot.get_bots().keys())[0] if len(nonebot.get_bots()) > 0 else "liteyuki" - if lang is None: - ulang = Language(get_default_lang_code()) - else: - ulang = Language(lang) - - fake_device_info: dict = config.get("fake_device_info", {}) - - mem_info = psutil.virtual_memory() - mem_total = fake_device_info.get('mem', {}).get('total', mem_info.total) - - convert_size(mem_total, 1, False) - mem_total_show = convert_size(mem_total, 1) # 格式化带单位 - - # 获取当前进程包括所有子进程占用 - all_processes = psutil.Process().children(recursive=True) - all_processes.append(psutil.Process()) - - mem_used_bot = 0 - process_mem = {} - for process in all_processes: - try: - ps_name = process.name().replace(".exe", "") - if ps_name not in process_mem: - process_mem[ps_name] = 0 - process_mem[ps_name] += process.memory_info().rss - mem_used_bot += process.memory_info().rss - except Exception: - pass - mem_used_bot_show = convert_size(mem_used_bot, 1) - mem_used_other = mem_info.used - mem_used_bot - mem_free = mem_total - mem_info.used - mem_used_show = convert_size(mem_used_bot + mem_used_other, 1) # 计算已用格式化带单位 - - swap_info = psutil.swap_memory() - - disk_data = [] - for disk in psutil.disk_partitions(all=True): - try: - disk_usage = psutil.disk_usage(disk.mountpoint) - disk_total_show = convert_size(disk_usage.total, 1) - disk_free_show = convert_size(disk_usage.free, 1) - if disk_usage.total > 0: - disk_data.append( - { - "name" : disk.device, - "total" : disk_total_show, - "free" : disk_free_show, - "percent" : disk_usage.percent, - "usedValue" : disk_usage.used, - "freeValue" : disk_usage.free, - "totalValue": disk_usage.total, - } - ) - except Exception: - pass - - cpu_info = get_cpu_info() - if "AMD" in cpu_info.get("brand_raw", ""): - brand = "AMD" - elif "Intel" in cpu_info.get("brand_raw", ""): - brand = "Intel" - else: - brand = "Unknown" - - if fake_device_info.get("cpu", {}).get("brand"): - brand = fake_device_info.get("cpu", {}).get("brand") - - cpu_info = get_cpu_info() - templ = { - "plugin" : len(nonebot.get_loaded_plugins()), - "version" : __VERSION__, - "system" : platform.platform(), - "cpu" : [ - { - "name" : "USED", - "value": psutil.cpu_percent() - }, - { - "name" : "FREE", - "value": 100 - psutil.cpu_percent() - } - ], - "mem" : [ - - { - "name" : "OTHER", - "value": mem_used_other - }, - { - "name" : "FREE", - "value": mem_free - }, - { - "name" : "BOT", - "value": mem_used_bot - }, - ], - "swap" : [ - { - "name" : "USED", - "value": psutil.swap_memory().used - }, - { - "name" : "FREE", - "value": psutil.swap_memory().free - } - ], - "disk" : disk_data, # list[{"name":"C", "total":100, "used":50, "free":50}] - "bot" : await get_bots_data(ulang, self_id), - "cpuTags" : [ - f"{brand} {cpu_info.get('arch', 'Unknown')}", - f"{fake_device_info.get('cpu', {}).get('cores', psutil.cpu_count(logical=False))}C " - f"{fake_device_info.get('cpu', {}).get('logical_cores', psutil.cpu_count(logical=True))}T", - f"{'%.2f' % (fake_device_info.get('cpu', {}).get('frequency', psutil.cpu_freq().current) / 1000)}GHz" - ], - "memTags" : [ - f"Bot {mem_used_bot_show}", - f"{ulang.get('main.monitor.used')} {mem_used_show}", - f"{ulang.get('main.monitor.free')} {convert_size(mem_free, 1)}", - f"{ulang.get('main.monitor.total')} {mem_total_show}", - ], - "swapTags" : [ - f"{ulang.get('main.monitor.used')} {convert_size(swap_info.used, 1)}", - f"{ulang.get('main.monitor.free')} {convert_size(swap_info.free, 1)}", - f"{ulang.get('main.monitor.total')} {convert_size(swap_info.total, 1)}", - ], - "localization": localization[ulang.lang_code] - } - return templ diff --git a/liteyuki/resources/templates/js/status.js b/liteyuki/resources/templates/js/status.js index 45825cc..4bf5a17 100644 --- a/liteyuki/resources/templates/js/status.js +++ b/liteyuki/resources/templates/js/status.js @@ -60,7 +60,7 @@ function createPieChartOption(title, data) { } -function convertSize(size, precision = 2, addUnit = true, suffix = " XiB"): string { +function convertSize(size, precision = 2, addUnit = true, suffix = " XiB") { let isNegative = size < 0; size = Math.abs(size); let units = ["", "K", "M", "G", "T", "P", "E", "Z"]; diff --git a/liteyuki/utils/message/message.py b/liteyuki/utils/message/message.py index b5a764c..32388cd 100644 --- a/liteyuki/utils/message/message.py +++ b/liteyuki/utils/message/message.py @@ -22,9 +22,12 @@ from nonebot_plugin_htmlrender import md_to_pic config = load_from_yaml("config.yml") can_send_markdown = {} # 用于存储机器人是否支持发送markdown消息,id->bool + + class TencentBannedMarkdownError(BaseException): pass + async def broadcast_to_superusers(message: str | T_Message, markdown: bool = False): """广播消息给超级用户""" for bot in nonebot.get_bots().values(): @@ -65,16 +68,38 @@ class MarkdownMessage: message_type = event.message_type session_id = event.user_id if event.message_type == "private" else event.group_id try: - # raise TencentBannedMarkdownError("Tencent banned markdown") + raise TencentBannedMarkdownError("Tencent banned markdown") + forward_id = await bot.call_api( + "send_private_forward_msg", + messages=[ + { + "type": "node", + "data": { + "content": [ + { + "data": { + "content": "{\"content\":\"%s\"}" % formatted_md, + }, + "type": "markdown" + } + ], + "name" : "[]", + "uin" : bot.self_id + } + } + ], + user_id=bot.self_id + + ) data = await bot.send_msg( user_id=session_id, group_id=session_id, message_type=message_type, message=[ { - "type": "markdown", + "type": "longmsg", "data": { - "content": "{\"content\":\"%s\"}" % formatted_md, + "id": forward_id } }, ],