app/liteyuki/plugins/liteyuki_npm/installer.py
2024-04-07 00:35:53 +08:00

231 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os.path
import sys
from io import StringIO
import aiohttp
import nonebot
import pip
from arclet.alconna import Arparma, MultiVar
from nonebot import require
from nonebot.permission import SUPERUSER
from liteyuki.utils.language import get_user_lang
from liteyuki.utils.ly_typing import T_Bot
from liteyuki.utils.message import Markdown as md
from .common import *
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import Alconna, Args, Subcommand, on_alconna
nps = on_alconna(
Alconna(
"nps",
Subcommand(
"update",
alias=["u"],
),
Subcommand(
"search",
Args["keywords", MultiVar(str)]["page", int, 1],
alias=["s", "搜索"],
),
Subcommand(
"install",
Args["plugin_name", str],
alias=["i", "安装"],
),
Subcommand(
"uninstall",
Args["plugin_name", str],
alias=["r", "rm", "卸载"],
)
),
aliases={"插件商店"},
permission=SUPERUSER,
)
@nps.handle()
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
ulang = get_user_lang(str(event.user_id))
if not os.path.exists("data/liteyuki/plugins.json"):
await npm_update()
if result.subcommands.get("update"):
r = await npm_update()
if r:
await nps.finish(ulang.get("npm.store_update_success"))
else:
await nps.finish(ulang.get("npm.store_update_failed"))
elif result.subcommands.get("search"):
keywords: list[str] = result.subcommands["search"].args.get("keywords")
rs = await npm_search(keywords)
max_show = 10
for p in rs:
print(p.module_name, p.homepage)
if len(rs):
reply = f"{ulang.get('npm.search_result')} | {ulang.get('npm.total', TOTAL=len(rs))}\n***"
for plugin in rs[:min(max_show, len(rs))]:
btn_install = md.btn_cmd(ulang.get("npm.install"), "npm install %s" % plugin.module_name)
link_page = md.btn_link(ulang.get("npm.homepage"), plugin.homepage)
link_pypi = md.btn_link(ulang.get("npm.pypi"), plugin.homepage)
reply += (f"\n# **{plugin.name}**\n"
f"\n> **{plugin.desc}**\n"
f"\n> {ulang.get('npm.author')}: {plugin.author}"
f"\n> *{md.escape(plugin.module_name)}*"
f"\n> {btn_install} {link_page} {link_pypi}\n\n***\n")
if len(rs) > max_show:
reply += f"\n{ulang.get('npm.too_many_results', HIDE_NUM=len(rs) - max_show)}"
else:
reply = ulang.get("npm.search_no_result")
await md.send_md(reply, bot, event=event)
elif result.subcommands.get("install"):
plugin_module_name: str = result.subcommands["install"].args.get("plugin_name")
store_plugin = await get_store_plugin(plugin_module_name)
await nps.send(ulang.get("npm.installing", NAME=plugin_module_name))
r, log = npm_install(plugin_module_name)
log = log.replace("\\", "/")
if not store_plugin:
await nps.finish(ulang.get("npm.plugin_not_found", NAME=plugin_module_name))
homepage_btn = md.btn_cmd(ulang.get("npm.homepage"), store_plugin.homepage)
if r:
r_load = nonebot.load_plugin(plugin_module_name) # 加载插件
installed_plugin = InstalledPlugin(module_name=plugin_module_name) # 构造插件信息模型
found_in_db_plugin = plugin_db.first(InstalledPlugin(), "module_name = ?", plugin_module_name) # 查询数据库中是否已经安装
if r_load:
if found_in_db_plugin is None:
plugin_db.upsert(installed_plugin)
info = md.escape(ulang.get("npm.install_success", NAME=store_plugin.name)) # markdown转义
await md.send_md(
f"{info}\n\n"
f"```\n{log}\n```",
bot,
event=event
)
else:
await nps.finish(ulang.get("npm.plugin_already_installed", NAME=store_plugin.name))
else:
info = ulang.get("npm.load_failed", NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace("_", r"\\_")
await md.send_md(
f"{info}\n\n"
f"```\n{log}\n```\n",
bot,
event=event
)
else:
info = ulang.get("npm.install_failed", NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace("_", r"\\_")
await md.send_md(
f"{info}\n\n"
f"```\n{log}\n```",
bot,
event=event
)
elif result.subcommands.get("uninstall"):
plugin_module_name: str = result.subcommands["uninstall"].args.get("plugin_name")
found_installed_plugin: InstalledPlugin = plugin_db.first(InstalledPlugin(), "module_name = ?", plugin_module_name)
if found_installed_plugin:
plugin_db.delete(InstalledPlugin(), "module_name = ?", plugin_module_name)
reply = f"{ulang.get('npm.uninstall_success', NAME=found_installed_plugin.module_name)}"
await nps.finish(reply)
else:
await nps.finish(ulang.get("npm.plugin_not_installed", NAME=plugin_module_name))
async def npm_update() -> bool:
"""
更新本地插件json缓存
Returns:
bool: 是否成功更新
"""
url_list = [
"https://registry.nonebot.dev/plugins.json",
]
for url in url_list:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
async with aiofiles.open("data/liteyuki/plugins.json", "wb") as f:
data = await resp.read()
await f.write(data)
return True
return False
async def npm_search(keywords: list[str]) -> list[StorePlugin]:
"""
搜索插件
Args:
keywords (list[str]): 关键词列表
Returns:
list[StorePlugin]: 插件列表
"""
results = []
async with aiofiles.open("data/liteyuki/plugins.json", "r", encoding="utf-8") as f:
plugins: list[StorePlugin] = [StorePlugin(**pobj) for pobj in json.loads(await f.read())]
for plugin in plugins:
plugin_text = ' '.join(
[
plugin.name,
plugin.desc,
plugin.author,
plugin.module_name,
' '.join([tag.label for tag in plugin.tags])
]
)
if all([keyword in plugin_text for keyword in keywords]):
results.append(plugin)
return results
def npm_install(plugin_module_name) -> tuple[bool, str]:
"""
Args:
plugin_module_name:
Returns:
tuple[bool, str]:
"""
buffer = StringIO()
sys.stdout = buffer
sys.stderr = buffer
mirrors = [
"https://pypi.tuna.tsinghua.edu.cn/simple", # 清华大学
"https://pypi.mirrors.cqupt.edu.cn/simple", # 重庆邮电大学
"https://pypi.liteyuki.icu/simple", # 轻雪镜像
"https://pypi.org/simple", # 官方源
]
# 使用pip安装包对每个镜像尝试一次成功后返回值
success = False
for mirror in mirrors:
try:
nonebot.logger.info(f"npm_install try mirror: {mirror}")
result = pip.main(["install", plugin_module_name, "-i", mirror])
success = result == 0
if success:
break
else:
nonebot.logger.warning(f"npm_install failed, try next mirror.")
except Exception as e:
success = False
continue
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
return success, buffer.getvalue()