LiteyukiBot-TriM/liteyuki/plugins/liteyuki_status/api.py

278 lines
8.7 KiB
Python

import platform
import time
import nonebot
import psutil
from cpuinfo import cpuinfo
from nonebot import require
from nonebot.adapters import satori
from liteyuki.utils import __NAME__, __VERSION__
from liteyuki.utils.base.config import get_config
from liteyuki.utils.base.data_manager import TempConfig, common_db
from liteyuki.utils.base.language import Language
from liteyuki.utils.base.resource import get_loaded_resource_packs, get_path
from liteyuki.utils.message.html_tool import template2image
from liteyuki.utils import satori_utils
from .counter_for_satori import satori_counter
require("nonebot_plugin_apscheduler")
from nonebot_plugin_apscheduler import scheduler
commit_hash = None
protocol_names = {
0: "iPad",
1: "Android Phone",
2: "Android Watch",
3: "Mac",
5: "iPad",
6: "Android Pad",
}
"""
Universal Interface
data
- bot
- name: str
icon: str
id: int
protocol_name: str
groups: int
friends: int
message_sent: int
message_received: int
app_name: str
- hardware
- cpu
- percent: float
- name: str
- mem
- percent: float
- total: int
- used: int
- free: int
- swap
- percent: float
- total: int
- used: int
- free: int
- disk: list
- name: str
- percent: float
- total: int
"""
status_card_cache = {} # lang -> bytes
# 60s刷新一次
@scheduler.scheduled_job("cron", second="*/40")
async def refresh_status_card():
nonebot.logger.debug("Refreshing status card cache...")
global status_card_cache
bot_data = await get_bots_data()
hardware_data = await get_hardware_data()
liteyuki_data = await get_liteyuki_data()
for lang in status_card_cache.keys():
status_card_cache[lang] = await generate_status_card(
bot_data,
hardware_data,
liteyuki_data,
lang=lang,
use_cache=False
)
async def generate_status_card(bot: dict, hardware: dict, liteyuki: dict, lang="zh-CN", bot_id="0",
use_cache=False) -> bytes:
if not use_cache:
return await template2image(
get_path("templates/status.html", abs_path=True),
{
"data": {
"bot": bot,
"hardware": hardware,
"liteyuki": liteyuki,
"localization": get_local_data(lang)
}
},
debug=True
)
else:
if lang not in status_card_cache:
status_card_cache[lang] = await generate_status_card(bot, hardware, liteyuki, lang=lang, bot_id=bot_id)
return status_card_cache[lang]
def get_local_data(lang_code) -> dict:
lang = Language(lang_code)
return {
"friends": lang.get("status.friends"),
"groups": lang.get("status.groups"),
"plugins": lang.get("status.plugins"),
"bots": lang.get("status.bots"),
"message_sent": lang.get("status.message_sent"),
"message_received": lang.get("status.message_received"),
"cpu": lang.get("status.cpu"),
"memory": lang.get("status.memory"),
"swap": lang.get("status.swap"),
"disk": lang.get("status.disk"),
"usage": lang.get("status.usage"),
"total": lang.get("status.total"),
"used": lang.get("status.used"),
"free": lang.get("status.free"),
"days": lang.get("status.days"),
"hours": lang.get("status.hours"),
"minutes": lang.get("status.minutes"),
"seconds": lang.get("status.seconds"),
"runtime": lang.get("status.runtime"),
"threads": lang.get("status.threads"),
"cores": lang.get("status.cores"),
"process": lang.get("status.process"),
"resources": lang.get("status.resources"),
"description": lang.get("status.description"),
}
async def get_bots_data(self_id: str = "0") -> dict:
"""获取当前所有机器人数据
Returns:
"""
result = {
"self_id": self_id,
"bots": [],
}
for bot_id, bot in nonebot.get_bots().items():
groups = 0
friends = 0
status = {}
bot_name = bot_id
version_info = {}
if isinstance(bot, satori.Bot):
try:
bot_name = (await satori_utils.user_infos.get(bot.self_id)).name
groups = str(await satori_utils.count_groups(bot))
friends = str(await satori_utils.count_friends(bot))
status = {}
version_info = await bot.get_version_info()
except Exception:
pass
else:
try:
# API fetch
bot_name = (await bot.get_login_info())["nickname"]
groups = len(await bot.get_group_list())
friends = len(await bot.get_friend_list())
status = await bot.get_status()
version_info = await bot.get_version_info()
except Exception:
pass
statistics = status.get("stat", {})
app_name = version_info.get("app_name", "UnknownImplementation")
if app_name in ["Lagrange.OneBot", "LLOneBot", "Shamrock", "NapCat.Onebot"]:
icon = f"https://q.qlogo.cn/g?b=qq&nk={bot_id}&s=640"
elif isinstance(bot, satori.Bot):
app_name = "Satori"
icon = (await bot.login_get()).user.avatar
else:
icon = None
bot_data = {
"name": bot_name,
"icon": icon,
"id": bot_id,
"protocol_name": protocol_names.get(version_info.get("protocol_name"), "Online"),
"groups": groups,
"friends": friends,
"message_sent": satori_counter.msg_sent if isinstance(bot, satori.Bot) else statistics.get("message_sent", 0),
"message_received": satori_counter.msg_received if isinstance(bot, satori.Bot) else statistics.get("message_received", 0),
"app_name": app_name
}
result["bots"].append(bot_data)
return result
async def get_hardware_data() -> dict:
mem = psutil.virtual_memory()
all_processes = psutil.Process().children(recursive=True)
all_processes.append(psutil.Process())
mem_used_process = 0
process_mem = {}
for process in all_processes:
try:
ps_name = process.name().replace(".exe", "")
if ps_name not in process_mem:
process_mem[ps_name] = 0
process_mem[ps_name] += process.memory_info().rss
mem_used_process += process.memory_info().rss
except Exception:
pass
swap = psutil.swap_memory()
cpu_brand_raw = cpuinfo.get_cpu_info().get("brand_raw", "Unknown")
if "AMD" in cpu_brand_raw:
brand = "AMD"
elif "Intel" in cpu_brand_raw:
brand = "Intel"
else:
brand = "Unknown"
result = {
"cpu": {
"percent": psutil.cpu_percent(),
"name": f"{brand} {cpuinfo.get_cpu_info().get('arch', 'Unknown')}",
"cores": psutil.cpu_count(logical=False),
"threads": psutil.cpu_count(logical=True),
"freq": psutil.cpu_freq().current # MHz
},
"memory": {
"percent": mem.percent,
"total": mem.total,
"used": mem.used,
"free": mem.free,
"usedProcess": mem_used_process,
},
"swap": {
"percent": swap.percent,
"total": swap.total,
"used": swap.used,
"free": swap.free
},
"disk": [],
}
for disk in psutil.disk_partitions(all=True):
try:
disk_usage = psutil.disk_usage(disk.mountpoint)
if disk_usage.total == 0:
continue # 虚拟磁盘
result["disk"].append({
"name": disk.mountpoint,
"percent": disk_usage.percent,
"total": disk_usage.total,
"used": disk_usage.used,
"free": disk_usage.free
})
except:
pass
return result
async def get_liteyuki_data() -> dict:
temp_data: TempConfig = common_db.where_one(TempConfig(), default=TempConfig())
result = {
"name": list(get_config("nickname", [__NAME__]))[0],
"version": f"{__VERSION__}({commit_hash})",
"plugins": len(nonebot.get_loaded_plugins()),
"resources": len(get_loaded_resource_packs()),
"nonebot": f"{nonebot.__version__}",
"python": f"{platform.python_implementation()} {platform.python_version()}",
"system": f"{platform.system()} {platform.release()}",
"runtime": time.time() - temp_data.data.get("start_time", time.time()), # 运行时间秒数
"bots": len(nonebot.get_bots())
}
return result