1
0
forked from bot/app

feat: 野兽先辈主题包

fix: 插件模块名问题
feat: `nps` -> `npm`
This commit is contained in:
远野千束 2024-04-08 17:01:55 +08:00
parent c45061a95a
commit 8e1ec22679
22 changed files with 814 additions and 773 deletions

View File

@ -10,10 +10,12 @@ export default defineUserConfig({
description: "LiteyukiBot | 轻雪机器人 | An OneBot Standard ChatBot | 一个OneBot标准的聊天机器人",
head: [
// 设置 favor.ico.vuepress/public 下
[
'link', {rel: 'icon', href: 'https://cdn.liteyuki.icu/favicon.ico'},
['link', {rel: 'icon', href: 'https://cdn.liteyuki.icu/favicon.ico'},],
['link', {rel: 'stylesheet', href: 'https://fonts.cdnfonts.com/css/colortube-2'}],
['link', {rel: 'stylesheet', href: 'https://cdn.bootcdn.net/ajax/libs/firacode/6.2.0/fira_code.min.css'}],
],
[
"meta",
{

View File

@ -13,7 +13,7 @@ export default navbar([
prefix: "usage/",
},
{
text: "主题商店",
text: "资源商店",
link: "/store/",
prefix: "store/",
}

Binary file not shown.

View File

@ -19,10 +19,10 @@
"link": "https://cdn.liteyuki.icu/static/lrp/MapleMonoFonts.zip"
},
{
"name": "示例包1",
"name": "野兽先辈主题HomoTheme",
"author": "SnowyKami",
"description": "A simple bot that shows the status of the bot and the server.",
"link": ""
"description": "野兽先辈主题包114514",
"link": "https://cdn.liteyuki.icu/static/lrp/HomoTheme.zip"
},
{
"name": "示例包2",

View File

@ -15,5 +15,12 @@ export default sidebar({
prefix: "usage/",
children: "structure",
},
{
text: "资源商店",
icon: "store",
prefix: "store/",
link: "/store/",
children: "structure",
}
],
});

View File

@ -1 +1,11 @@
// place your custom styles here
#main-title {
font-family: "ColorTube", serif;
color: #ff0000 !important; /* 你想要的颜色 */
line-height: 2;
}
code {
font-family: "Fira Code", monospace !important;
}

View File

@ -13,7 +13,7 @@ export default hopeTheme({
iconAssets: "fontawesome-with-brands",
logo: "https://cdn.liteyuki.icu/static/img/logo.png",
logo: "https://cdn.liteyuki.icu/static/img/liteyuki_icon_640.png",
repo: "https://github.com/snowykami/LiteyukiBot",
@ -48,8 +48,6 @@ export default hopeTheme({
plugins: {
searchPro: true,
// search: true,
blog: true,
comment: {
provider: "Giscus",
repo: "snowykami/LiteyukiBot",

View File

@ -7,7 +7,7 @@ bgImage:
bgImageDark:
bgImageStyle:
background-attachment: fixed
heroText: LiteyukiBot 6
heroText: LiteyukiBot
tagline: 轻雪机器人一个以轻量和简洁为设计理念基于Nonebot2的OneBot标准聊天机器人
actions:

View File

@ -1 +1,6 @@
---
title: 资源商店
icon: store
index: false
---
<storeComp />

View File

@ -5,50 +5,70 @@ order: 1
category: 使用手册
---
## 基础插件命令
## 基础插件
#### 命令前有[S]的表示仅超级用户可用,[O]和[A]分别为群主和群管可用,[P]为私聊可用
### 轻雪`liteyuki`
### **轻雪 `liteyuki`**
```shell
[S]reload-liteyuki # 重载轻雪
[S]update-liteyuki # 更新轻雪
[S]liteecho # 查看当前bot
[S]config set <key> value # 添加配置项,若存在则会覆盖,输入值会被执行,以便于转换为正确的值,"10"和10是不一样的
[S]config get [key] # 查询配置项不带key返回配置项列表推荐私聊使用
[S]reload-resources # 重载资源
[S]switch-image-mode # 切换图片模式该功能需要commit:505468b及以后的Lagrange.OneBot在普通图片和Markdown图片之间切换后者更大但有失败的可能
仅超级用户
reload-liteyuki # 重载轻雪
update-liteyuki # 更新轻雪
liteecho # 查看当前bot
config set <key> value # 添加配置项,若存在则会覆盖,输入值会被执行以转换为正确的类型,"10"和10是不一样的
config get [key] # 查询配置项不带key返回配置项列表推荐私聊使用
switch-image-mode # 在普通图片和Markdown大图之间切换该功能需要commit:505468b及以后的Lagrange.OneBot
所有人可用
liteyuki-docs # 查看轻雪文档
# 上述两个命令修改的配置项在数据库中保存,但是优先级低于配置文件,如果配置文件中存在相同的配置项,将会使用配置文件中的配置
------
别名: reload-liteyuki 重启轻雪, update-liteyuki 更新轻雪, reload-resources 重载资源, config 配置, set 设置, get 查询,
switch-image-mode 切换图片模式, liteyuki-docs 轻雪文档
```
### 轻雪包管理器 `liteyuki_npm`
命令别名
```shell
[S]nps update # 更新插件索引
[S]nps install <plugin_name> # 安装插件
[S]nps uninstall <plugin_name> # 卸载插件
[S]nps search <keywords...> # 通过关键词搜索插件
------
[AOSP]npm enable <plugin_name> # 当前会话启用插件
[AOSP]npm disable <plugin_name> # 当前会话禁用插件
[S]npm enable-global <plugin_name> # 全局启用插件
[S]npm disable-global <plugin_name> # 全局禁用插件
list-plugin [page] [num] # 列出所有插件 page为页数num为每页显示数量
------
[S]rpm list [page] [num] # 列出所有资源包 page为页数num为每页显示数量
[S]rpm load <resource_pack_name> # 加载资源包
[S]rpm unload <resource_pack_name> # 卸载资源包
[S]rpm change <resource_pack_name> # 修改优先级
[S]rpm reload # 重载所有资源包
------
别名: nps 插件商店, npm 插件管理, update 更新, install 安装, uninstall 卸载, search 搜索,
enable 启用, disable 停用, enable-global 全局启用, disable-global 全局停用, list-plugin 列出插件/插件列表,
rpm 资源包, load 加载, unload 卸载, change 更改, reload 重载, list 列表/列出
reload-liteyuki 重启轻雪,
update-liteyuki 更新轻雪,
reload-resources 重载资源,
config 配置 | set 设置 | get 查询,
switch-image-mode 切换图片模式,
liteyuki-docs 轻雪文档
```
### **插件/包管理器 `liteyuki_pacman`**
- 插件管理
```shell
# 仅超级用户
npm update # 更新插件商店索引
npm install <plugin_name> # 安装插件
npm uninstall <plugin_name> # 卸载插件
npm search <keywords...> # 通过关键词搜索插件
npm enable-global <plugin_name> # 全局启用插件
npm disable-global <plugin_name> # 全局禁用插件
# 群聊仅群主、管理员、超级用户可用,私聊所有人可用
npm enable <plugin_name> # 当前会话启用插件
npm disable <plugin_name> # 当前会话禁用插件
npm list [page] [num] # 列出所有插件 page为页数num为每页显示数量
```
- 资源包管理
```shell
# 仅超级用户
rpm list [page] [num] # 列出所有资源包 page为页数num为每页显示数量
rpm load <pack_name> # 加载资源包
rpm unload <pack_name> # 卸载资源包
rpm change <pack_name> # 修改优先级
rpm reload # 重载所有资源包
```
命令别名
```shell
npm 插件管理 | update 更新 | install 安装 | uninstall 卸载 | search 搜索
enable 启用 | disable 停用 | enable-global 全局启用 | disable-global 全局停用 | list-plugin 插件列表
rpm 资源包 | load 加载 | unload 卸载 | change 更改 | reload 重载 | list 列表
```
```shell
@ -58,14 +78,18 @@ rpm 资源包, load 加载, unload 卸载, change 更改, reload 重载, list
```
### 轻雪用户管理`liteyuki_user`
### **用户管理`liteyuki_user`**
```shell
profile # 查看用户信息菜单
profile set <key> [value] # 设置用户信息或打开属性设置菜单
profile get <key> # 获取用户信息
------
别名: profile 个人信息, set 设置, get 查询
```
命令别名
```shell
profile 个人信息 | set 设置 | get 查询
```
**参数**`<param>`为必填参数,`[option]`为可选参数。

View File

@ -24,6 +24,7 @@ driver = get_driver()
markdown_image = common_db.first(StoredConfig(), default=StoredConfig()).config.get("markdown_image", False)
@on_alconna(
command=Alconna(
"liteecho",
@ -165,6 +166,7 @@ async def _(event: T_MessageEvent, matcher: Matcher):
async def _(matcher: Matcher):
matcher.finish("https://bot.liteyuki.icu/usage")
# system hook
@Bot.on_calling_api # 图片模式检测
async def test_for_md_image(bot: T_Bot, api: str, data: dict):

View File

@ -139,6 +139,7 @@ async def get_stats_data(self_id: str = None, lang: str = None) -> dict:
disk_data = []
for disk in psutil.disk_partitions(all=True):
try:
disk_usage = psutil.disk_usage(disk.mountpoint)
disk_total_show = convert_size(disk_usage.total, 1)
disk_free_show = convert_size(disk_usage.free, 1)
@ -154,6 +155,8 @@ async def get_stats_data(self_id: str = None, lang: str = None) -> dict:
"totalValue": disk_usage.total,
}
)
except Exception:
pass
cpu_info = get_cpu_info()
if "AMD" in cpu_info.get("brand_raw", ""):

View File

@ -1,230 +0,0 @@
import os.path
import sys
from io import StringIO
import aiohttp
import nonebot
import pip
from arclet.alconna import Arparma, MultiVar
from nonebot import require
from nonebot.permission import SUPERUSER
from liteyuki.utils.language import get_user_lang
from liteyuki.utils.ly_typing import T_Bot
from liteyuki.utils.message import Markdown as md
from .common import *
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import Alconna, Args, Subcommand, on_alconna
nps = on_alconna(
Alconna(
"nps",
Subcommand(
"update",
alias=["u"],
),
Subcommand(
"search",
Args["keywords", MultiVar(str)]["page", int, 1],
alias=["s", "搜索"],
),
Subcommand(
"install",
Args["plugin_name", str],
alias=["i", "安装"],
),
Subcommand(
"uninstall",
Args["plugin_name", str],
alias=["r", "rm", "卸载"],
)
),
aliases={"插件商店"},
permission=SUPERUSER,
)
@nps.handle()
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
ulang = get_user_lang(str(event.user_id))
if not os.path.exists("data/liteyuki/plugins.json"):
await npm_update()
if result.subcommands.get("update"):
r = await npm_update()
if r:
await nps.finish(ulang.get("npm.store_update_success"))
else:
await nps.finish(ulang.get("npm.store_update_failed"))
elif result.subcommands.get("search"):
keywords: list[str] = result.subcommands["search"].args.get("keywords")
rs = await npm_search(keywords)
max_show = 10
for p in rs:
print(p.module_name, p.homepage)
if len(rs):
reply = f"{ulang.get('npm.search_result')} | {ulang.get('npm.total', TOTAL=len(rs))}\n***"
for plugin in rs[:min(max_show, len(rs))]:
btn_install = md.btn_cmd(ulang.get("npm.install"), "npm install %s" % plugin.module_name)
link_page = md.btn_link(ulang.get("npm.homepage"), plugin.homepage)
link_pypi = md.btn_link(ulang.get("npm.pypi"), plugin.homepage)
reply += (f"\n# **{plugin.name}**\n"
f"\n> **{plugin.desc}**\n"
f"\n> {ulang.get('npm.author')}: {plugin.author}"
f"\n> *{md.escape(plugin.module_name)}*"
f"\n> {btn_install} {link_page} {link_pypi}\n\n***\n")
if len(rs) > max_show:
reply += f"\n{ulang.get('npm.too_many_results', HIDE_NUM=len(rs) - max_show)}"
else:
reply = ulang.get("npm.search_no_result")
await md.send_md(reply, bot, event=event)
elif result.subcommands.get("install"):
plugin_module_name: str = result.subcommands["install"].args.get("plugin_name")
store_plugin = await get_store_plugin(plugin_module_name)
await nps.send(ulang.get("npm.installing", NAME=plugin_module_name))
r, log = npm_install(plugin_module_name)
log = log.replace("\\", "/")
if not store_plugin:
await nps.finish(ulang.get("npm.plugin_not_found", NAME=plugin_module_name))
homepage_btn = md.btn_cmd(ulang.get("npm.homepage"), store_plugin.homepage)
if r:
r_load = nonebot.load_plugin(plugin_module_name) # 加载插件
installed_plugin = InstalledPlugin(module_name=plugin_module_name) # 构造插件信息模型
found_in_db_plugin = plugin_db.first(InstalledPlugin(), "module_name = ?", plugin_module_name) # 查询数据库中是否已经安装
if r_load:
if found_in_db_plugin is None:
plugin_db.upsert(installed_plugin)
info = md.escape(ulang.get("npm.install_success", NAME=store_plugin.name)) # markdown转义
await md.send_md(
f"{info}\n\n"
f"```\n{log}\n```",
bot,
event=event
)
else:
await nps.finish(ulang.get("npm.plugin_already_installed", NAME=store_plugin.name))
else:
info = ulang.get("npm.load_failed", NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace("_", r"\\_")
await md.send_md(
f"{info}\n\n"
f"```\n{log}\n```\n",
bot,
event=event
)
else:
info = ulang.get("npm.install_failed", NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace("_", r"\\_")
await md.send_md(
f"{info}\n\n"
f"```\n{log}\n```",
bot,
event=event
)
elif result.subcommands.get("uninstall"):
plugin_module_name: str = result.subcommands["uninstall"].args.get("plugin_name")
found_installed_plugin: InstalledPlugin = plugin_db.first(InstalledPlugin(), "module_name = ?", plugin_module_name)
if found_installed_plugin:
plugin_db.delete(InstalledPlugin(), "module_name = ?", plugin_module_name)
reply = f"{ulang.get('npm.uninstall_success', NAME=found_installed_plugin.module_name)}"
await nps.finish(reply)
else:
await nps.finish(ulang.get("npm.plugin_not_installed", NAME=plugin_module_name))
async def npm_update() -> bool:
"""
更新本地插件json缓存
Returns:
bool: 是否成功更新
"""
url_list = [
"https://registry.nonebot.dev/plugins.json",
]
for url in url_list:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
async with aiofiles.open("data/liteyuki/plugins.json", "wb") as f:
data = await resp.read()
await f.write(data)
return True
return False
async def npm_search(keywords: list[str]) -> list[StorePlugin]:
"""
搜索插件
Args:
keywords (list[str]): 关键词列表
Returns:
list[StorePlugin]: 插件列表
"""
results = []
async with aiofiles.open("data/liteyuki/plugins.json", "r", encoding="utf-8") as f:
plugins: list[StorePlugin] = [StorePlugin(**pobj) for pobj in json.loads(await f.read())]
for plugin in plugins:
plugin_text = ' '.join(
[
plugin.name,
plugin.desc,
plugin.author,
plugin.module_name,
' '.join([tag.label for tag in plugin.tags])
]
)
if all([keyword in plugin_text for keyword in keywords]):
results.append(plugin)
return results
def npm_install(plugin_module_name) -> tuple[bool, str]:
"""
Args:
plugin_module_name:
Returns:
tuple[bool, str]:
"""
buffer = StringIO()
sys.stdout = buffer
sys.stderr = buffer
mirrors = [
"https://pypi.tuna.tsinghua.edu.cn/simple", # 清华大学
"https://pypi.mirrors.cqupt.edu.cn/simple", # 重庆邮电大学
"https://pypi.liteyuki.icu/simple", # 轻雪镜像
"https://pypi.org/simple", # 官方源
]
# 使用pip安装包对每个镜像尝试一次成功后返回值
success = False
for mirror in mirrors:
try:
nonebot.logger.info(f"npm_install try mirror: {mirror}")
result = pip.main(["install", plugin_module_name, "-i", mirror])
success = result == 0
if success:
break
else:
nonebot.logger.warning(f"npm_install failed, try next mirror.")
except Exception as e:
success = False
continue
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
return success, buffer.getvalue()

View File

@ -1,243 +0,0 @@
import os
import nonebot.plugin
from nonebot import require
from nonebot.exception import FinishedException, IgnoredException
from nonebot.internal.adapter import Event
from nonebot.internal.matcher import Matcher
from nonebot.message import run_preprocessor
from nonebot.permission import SUPERUSER
from nonebot.plugin import Plugin
from liteyuki.utils.data_manager import GlobalPlugin, Group, InstalledPlugin, User, group_db, plugin_db, user_db
from liteyuki.utils.language import get_user_lang
from liteyuki.utils.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message import Markdown as md
from liteyuki.utils.permission import GROUP_ADMIN, GROUP_OWNER
from .common import get_plugin_can_be_toggle, get_plugin_default_enable, get_plugin_global_enable, get_plugin_session_enable
from .installer import get_store_plugin, npm_update
from ...utils.tools import clamp
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import on_alconna, Alconna, Args, Arparma, Subcommand
list_plugins = on_alconna(
Alconna(
"list-plugin",
Args["page", int, 1]["num", int, 10],
),
aliases={"列出插件", "插件列表"}
)
npm = on_alconna(
aliases={"插件管理"},
command=Alconna(
"npm",
# Args["plugin_name", str],
Subcommand(
"enable",
Args["plugin_name", str],
alias=["启用"],
),
Subcommand(
"disable",
Args["plugin_name", str],
alias=["停用"],
),
Subcommand(
"global-enable",
Args["plugin_name", str],
alias=["全局启用"],
),
Subcommand(
"global-disable",
Args["plugin_name", str],
alias=["全局停用"],
),
),
)
@list_plugins.handle()
async def _(event: T_MessageEvent, bot: T_Bot, result: Arparma):
ulang = get_user_lang(str(event.user_id))
if not os.path.exists("data/liteyuki/plugins.json"):
await npm_update()
loaded_plugin_list = sorted(nonebot.get_loaded_plugins(), key=lambda x: x.module_name)
num_per_page = result.args.get("num")
total = len(loaded_plugin_list) // num_per_page + (1 if len(loaded_plugin_list) % num_per_page else 0)
page = clamp(result.args.get("page"), 1, total)
# 已加载插件 | 总计10 | 第1/3页
reply = (f"# {ulang.get('npm.loaded_plugins')} | "
f"{ulang.get('npm.total', TOTAL=len(nonebot.get_loaded_plugins()))} | "
f"{ulang.get('npm.page', PAGE=page, TOTAL=total)} \n***\n")
for plugin in loaded_plugin_list[(page - 1) * num_per_page: min(page * num_per_page, len(loaded_plugin_list))]:
# 检查是否有 metadata 属性
# 添加帮助按钮
btn_usage = md.btn_cmd(ulang.get("npm.usage"), f"help {plugin.module_name}", False)
store_plugin = await get_store_plugin(plugin.module_name)
session_enable = get_plugin_session_enable(event, plugin.module_name)
if store_plugin:
btn_homepage = md.btn_link(ulang.get("npm.homepage"), store_plugin.homepage)
show_name = store_plugin.name
elif plugin.metadata:
if plugin.metadata.extra.get("liteyuki"):
btn_homepage = md.btn_link(ulang.get("npm.homepage"), "https://github.com/snowykami/LiteyukiBot")
else:
btn_homepage = ulang.get("npm.homepage")
show_name = plugin.metadata.name
else:
btn_homepage = ulang.get("npm.homepage")
show_name = plugin.name
ulang.get("npm.no_description")
if plugin.metadata:
reply += f"\n**{md.escape(show_name)}**\n"
else:
reply += f"**{md.escape(show_name)}**\n"
reply += f"\n > {btn_usage} {btn_homepage}"
if await GROUP_ADMIN(bot, event) or await GROUP_OWNER(bot, event) or await SUPERUSER(bot, event):
# 添加启用/停用插件按钮
cmd_toggle = f"npm {'disable' if session_enable else 'enable'} {plugin.module_name}"
text_toggle = ulang.get("npm.disable" if session_enable else "npm.enable")
can_be_toggle = get_plugin_can_be_toggle(plugin.module_name)
btn_toggle = text_toggle if not can_be_toggle else md.btn_cmd(text_toggle, cmd_toggle)
reply += f" {btn_toggle}"
if await SUPERUSER(bot, event):
plugin_in_database = plugin_db.first(InstalledPlugin(), "module_name = ?", plugin.module_name)
# 添加移除插件和全局切换按钮
global_enable = get_plugin_global_enable(plugin.module_name)
btn_uninstall = (
md.btn_cmd(ulang.get("npm.uninstall"), f'npm uninstall {plugin.module_name}')) if plugin_in_database else ulang.get(
'npm.uninstall')
btn_toggle_global_text = ulang.get("npm.disable_global" if global_enable else "npm.enable_global")
cmd_toggle_global = f"npm {'global-disable' if global_enable else 'global-enable'} {plugin.module_name}"
btn_toggle_global = btn_toggle_global_text if not can_be_toggle else md.btn_cmd(btn_toggle_global_text, cmd_toggle_global)
reply += f" {btn_uninstall} {btn_toggle_global}"
reply += "\n\n***\n"
await md.send_md(reply, bot, event=event)
@npm.handle()
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
if not os.path.exists("data/liteyuki/plugins.json"):
await npm_update()
# 判断会话类型
ulang = get_user_lang(str(event.user_id))
plugin_module_name = result.args.get("plugin_name")
# 支持对自定义command_start的判断
if result.subcommands.get("enable") or result.subcommands.get("disable"):
toggle = result.subcommands.get("enable") is not None
session_enable = get_plugin_session_enable(event, plugin_module_name) # 获取插件当前状态
default_enable = get_plugin_default_enable(plugin_module_name) # 获取插件默认状态
can_be_toggled = get_plugin_can_be_toggle(plugin_module_name) # 获取插件是否可以被启用/停用
if not can_be_toggled:
await npm.finish(ulang.get("npm.plugin_cannot_be_toggled", NAME=plugin_module_name))
if session_enable == toggle:
await npm.finish(
ulang.get("npm.plugin_already", NAME=plugin_module_name, STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable")))
if event.message_type == "private":
session = user_db.first(User(), "user_id = ?", event.user_id, default=User(user_id=event.user_id))
else:
if await GROUP_ADMIN(bot, event) or await GROUP_OWNER(bot, event) or await SUPERUSER(bot, event):
session = group_db.first(Group(), "group_id = ?", event.group_id, default=Group(group_id=str(event.group_id)))
else:
raise FinishedException(ulang.get("Permission Denied"))
try:
if toggle:
if default_enable:
session.disabled_plugins.remove(plugin_module_name)
else:
session.enabled_plugins.append(plugin_module_name)
else:
if default_enable:
session.disabled_plugins.append(plugin_module_name)
else:
session.enabled_plugins.remove(plugin_module_name)
if event.message_type == "private":
user_db.upsert(session)
else:
group_db.upsert(session)
except Exception as e:
print(e)
await npm.finish(
ulang.get(
"npm.toggle_failed",
NAME=plugin_module_name,
STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable"),
ERROR=str(e))
)
await npm.finish(
ulang.get(
"npm.toggle_success",
NAME=plugin_module_name,
STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable"))
)
elif result.subcommands.get("global-enable") or result.subcommands.get("global-disable") and await SUPERUSER(bot, event):
toggle = result.subcommands.get("global-enable") is not None
can_be_toggled = get_plugin_can_be_toggle(plugin_module_name)
if not can_be_toggled:
await npm.finish(ulang.get("npm.plugin_cannot_be_toggled", NAME=plugin_module_name))
global_enable = get_plugin_global_enable(plugin_module_name)
if global_enable == toggle:
await npm.finish(
ulang.get("npm.plugin_already", NAME=plugin_module_name, STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable")))
try:
plugin = plugin_db.first(GlobalPlugin(), "module_name = ?", plugin_module_name, default=GlobalPlugin(module_name=plugin_module_name))
if toggle:
plugin.enabled = True
else:
plugin.enabled = False
plugin_db.upsert(plugin)
except Exception as e:
print(e)
await npm.finish(
ulang.get(
"npm.toggle_failed",
NAME=plugin_module_name,
STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable"),
ERROR=str(e))
)
await npm.finish(
ulang.get(
"npm.toggle_success",
NAME=plugin_module_name,
STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable"))
)
@run_preprocessor
async def pre_handle(event: Event, matcher: Matcher):
plugin: Plugin = matcher.plugin
plugin_global_enable = get_plugin_global_enable(plugin.module_name)
if not plugin_global_enable:
raise IgnoredException("Plugin disabled globally")
if event.get_type() == "message":
plugin_session_enable = get_plugin_session_enable(event, plugin.module_name)
if not plugin_session_enable:
raise IgnoredException("Plugin disabled in session")
# @Bot.on_calling_api
# async def _(bot: Bot, api: str, data: dict[str, any]):
# nonebot.logger.info(f"Plugin Callapi: {api}: {data}")

View File

@ -1,8 +0,0 @@
# 插件权限管理器对api调用进行hook限制防止插件滥用api
from liteyuki.utils.data import LiteModel
class PermissionAllow(LiteModel):
plugin_name: str
api_name: str
allow: bool

View File

@ -1,7 +1,5 @@
from nonebot.plugin import PluginMetadata
from .manager import *
from .installer import *
from .helper import *
from .npm import *
from .rpm import *
__author__ = "snowykami"

View File

@ -4,8 +4,8 @@ from typing import Optional
import aiofiles
import nonebot.plugin
from liteyuki.utils.data import Database, LiteModel
from liteyuki.utils.data_manager import GlobalPlugin, Group, InstalledPlugin, User, group_db, plugin_db, user_db
from liteyuki.utils.data import LiteModel
from liteyuki.utils.data_manager import GlobalPlugin, Group, User, group_db, plugin_db, user_db
from liteyuki.utils.ly_typing import T_MessageEvent
@ -17,7 +17,7 @@ class PluginTag(LiteModel):
class StorePlugin(LiteModel):
name: str
desc: str
module_name: str
module_name: str # 插件商店中的模块名不等于本地的模块名,前者是文件夹名,后者是点分割模块名
project_link: str = ""
homepage: str = ""
author: str = ""
@ -28,12 +28,27 @@ class StorePlugin(LiteModel):
is_official: bool = False
async def get_store_plugin(plugin_module_name: str) -> Optional[StorePlugin]:
def get_plugin_exist(plugin_name: str) -> bool:
"""
获取插件是否存在
Args:
plugin_name:
Returns:
"""
for plugin in nonebot.plugin.get_loaded_plugins():
if plugin.name == plugin_name:
return True
return False
async def get_store_plugin(plugin_name: str) -> Optional[StorePlugin]:
"""
获取插件信息
Args:
plugin_module_name (str): 插件模块名
plugin_name (str): 插件模块名
Returns:
Optional[StorePlugin]: 插件信息
@ -41,33 +56,33 @@ async def get_store_plugin(plugin_module_name: str) -> Optional[StorePlugin]:
async with aiofiles.open("data/liteyuki/plugins.json", "r", encoding="utf-8") as f:
plugins: list[StorePlugin] = [StorePlugin(**pobj) for pobj in json.loads(await f.read())]
for plugin in plugins:
if plugin.module_name == plugin_module_name:
if plugin.name == plugin_name:
return plugin
return None
def get_plugin_default_enable(plugin_module_name: str) -> bool:
def get_plugin_default_enable(plugin_name: str) -> bool:
"""
获取插件默认启用状态由插件定义不存在则默认为启用
Args:
plugin_module_name (str): 插件模块名
plugin_name (str): 插件模块名
Returns:
bool: 插件默认状态
"""
plug = nonebot.plugin.get_plugin_by_module_name(plugin_module_name)
plug = nonebot.plugin.get_plugin(plugin_name)
return (plug.metadata.extra.get("default_enable", True)
if plug.metadata else True) if plug else True
def get_plugin_session_enable(event: T_MessageEvent, plugin_module_name: str) -> bool:
def get_plugin_session_enable(event: T_MessageEvent, plugin_name: str) -> bool:
"""
获取插件当前会话启用状态
Args:
event: 会话事件
plugin_module_name (str): 插件模块名
plugin_name (str): 插件模块名
Returns:
bool: 插件当前状态
@ -80,31 +95,31 @@ def get_plugin_session_enable(event: T_MessageEvent, plugin_module_name: str) ->
# 默认停用插件不在启用列表内表示停用
# 默认启用插件在停用列表内表示停用
# 默认启用插件不在停用列表内表示启用
default_enable = get_plugin_default_enable(plugin_module_name)
default_enable = get_plugin_default_enable(plugin_name)
if default_enable:
return plugin_module_name not in session.disabled_plugins
return plugin_name not in session.disabled_plugins
else:
return plugin_module_name in session.enabled_plugins
return plugin_name in session.enabled_plugins
def get_plugin_global_enable(plugin_module_name: str) -> bool:
loaded_plugin = nonebot.plugin.get_plugin_by_module_name(plugin_module_name)
def get_plugin_global_enable(plugin_name: str) -> bool:
nonebot.plugin.get_plugin(plugin_name)
return plugin_db.first(
GlobalPlugin(),
"module_name = ?",
plugin_module_name,
default=GlobalPlugin(module_name=plugin_module_name, enabled=True)).enabled
plugin_name,
default=GlobalPlugin(module_name=plugin_name, enabled=True)).enabled
def get_plugin_can_be_toggle(plugin_module_name: str) -> bool:
def get_plugin_can_be_toggle(plugin_name: str) -> bool:
"""
获取插件是否可以被启用/停用
Args:
plugin_module_name (str): 插件模块名
plugin_name (str): 插件模块名
Returns:
bool: 插件是否可以被启用/停用
"""
plug = nonebot.plugin.get_plugin_by_module_name(plugin_module_name)
plug = nonebot.plugin.get_plugin(plugin_name)
return plug.metadata.extra.get("toggleable", True) if plug and plug.metadata else True

View File

@ -0,0 +1,460 @@
import os
import sys
import aiohttp
import nonebot.plugin
import pip
from io import StringIO
from arclet.alconna import MultiVar
from nonebot import require
from nonebot.exception import FinishedException, IgnoredException
from nonebot.internal.adapter import Event
from nonebot.internal.matcher import Matcher
from nonebot.message import run_preprocessor
from nonebot.permission import SUPERUSER
from nonebot.plugin import Plugin
from liteyuki.utils.data_manager import InstalledPlugin
from liteyuki.utils.language import get_user_lang
from liteyuki.utils.ly_typing import T_Bot
from liteyuki.utils.message import Markdown as md
from liteyuki.utils.permission import GROUP_ADMIN, GROUP_OWNER
from liteyuki.utils.tools import clamp
from .common import *
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import on_alconna, Alconna, Args, Arparma, Subcommand
# const
enable_global = "enable-global"
disable_global = "disable-global"
enable = "enable"
disable = "disable"
@on_alconna(
aliases={"插件"},
command=Alconna(
"npm",
Subcommand(
"enable",
Args["plugin_name", str],
alias=["启用"],
),
Subcommand(
"disable",
Args["plugin_name", str],
alias=["停用"],
),
Subcommand(
enable_global,
Args["plugin_name", str],
alias=["全局启用"],
),
Subcommand(
disable_global,
Args["plugin_name", str],
alias=["全局停用"],
),
# 安装部分
Subcommand(
"update",
alias=["u"],
),
Subcommand(
"search",
Args["keywords", MultiVar(str)]["show_num", int, 15],
alias=["s", "搜索"],
),
Subcommand(
"install",
Args["plugin_name", str],
alias=["i", "安装"],
),
Subcommand(
"uninstall",
Args["plugin_name", str],
alias=["r", "rm", "卸载"],
),
Subcommand(
"list",
Args["num", int, 10]["page", int, 1],
alias=["ls", "列表"],
),
Subcommand(
"usage",
Args["plugin_name", str],
alias=["详情"],
)
)
).handle()
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
if not os.path.exists("data/liteyuki/plugins.json"):
await npm_update()
# 判断会话类型
ulang = get_user_lang(str(event.user_id))
plugin_name = result.args.get("plugin_name")
sc = result.subcommands # 获取子命令
perm_s = await SUPERUSER(bot, event) # 判断是否为超级用户
# 支持对自定义command_start的判断
if sc.get("enable") or result.subcommands.get("disable"):
toggle = result.subcommands.get("enable") is not None
plugin_exist = get_plugin_exist(plugin_name)
session_enable = get_plugin_session_enable(event, plugin_name) # 获取插件当前状态
default_enable = get_plugin_default_enable(plugin_name) # 获取插件默认状态
can_be_toggled = get_plugin_can_be_toggle(plugin_name) # 获取插件是否可以被启用/停用
if not plugin_exist:
await npm.finish(ulang.get("npm.plugin_not_found", NAME=plugin_name))
if not can_be_toggled:
await npm.finish(ulang.get("npm.plugin_cannot_be_toggled", NAME=plugin_name))
if session_enable == toggle:
await npm.finish(
ulang.get("npm.plugin_already", NAME=plugin_name, STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable")))
if event.message_type == "private":
session = user_db.first(User(), "user_id = ?", event.user_id, default=User(user_id=event.user_id))
else:
if await GROUP_ADMIN(bot, event) or await GROUP_OWNER(bot, event) or await SUPERUSER(bot, event):
session = group_db.first(Group(), "group_id = ?", event.group_id, default=Group(group_id=str(event.group_id)))
else:
raise FinishedException(ulang.get("Permission Denied"))
try:
if toggle:
if default_enable:
session.disabled_plugins.remove(plugin_name)
else:
session.enabled_plugins.append(plugin_name)
else:
if default_enable:
session.disabled_plugins.append(plugin_name)
else:
session.enabled_plugins.remove(plugin_name)
if event.message_type == "private":
user_db.upsert(session)
else:
group_db.upsert(session)
except Exception as e:
print(e)
await npm.finish(
ulang.get(
"npm.toggle_failed",
NAME=plugin_name,
STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable"),
ERROR=str(e))
)
await npm.finish(
ulang.get(
"npm.toggle_success",
NAME=plugin_name,
STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable"))
)
elif sc.get(enable_global) or result.subcommands.get(disable_global) and await SUPERUSER(bot, event):
plugin_exist = get_plugin_exist(plugin_name)
toggle = result.subcommands.get(enable_global) is not None
can_be_toggled = get_plugin_can_be_toggle(plugin_name)
if not plugin_exist:
await npm.finish(ulang.get("npm.plugin_not_found", NAME=plugin_name))
if not can_be_toggled:
await npm.finish(ulang.get("npm.plugin_cannot_be_toggled", NAME=plugin_name))
global_enable = get_plugin_global_enable(plugin_name)
if global_enable == toggle:
await npm.finish(
ulang.get("npm.plugin_already", NAME=plugin_name, STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable")))
try:
storePlugin = plugin_db.first(GlobalPlugin(), "module_name = ?", plugin_name, default=GlobalPlugin(module_name=plugin_name))
if toggle:
storePlugin.enabled = True
else:
storePlugin.enabled = False
plugin_db.upsert(storePlugin)
except Exception as e:
print(e)
await npm.finish(
ulang.get(
"npm.toggle_failed",
NAME=plugin_name,
STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable"),
ERROR=str(e))
)
await npm.finish(
ulang.get(
"npm.toggle_success",
NAME=plugin_name,
STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable"))
)
elif sc.get("update") and perm_s:
r = await npm_update()
if r:
await npm.finish(ulang.get("npm.store_update_success"))
else:
await npm.finish(ulang.get("npm.store_update_failed"))
elif sc.get("search"):
keywords: list[str] = result.subcommands["search"].args.get("keywords")
rs = await npm_search(keywords)
max_show = result.subcommands.get("search").args.get("show_num")
if len(rs):
reply = f"{ulang.get('npm.search_result')} | {ulang.get('npm.total', TOTAL=len(rs))}\n***"
for storePlugin in rs[:min(max_show, len(rs))]:
btn_install = md.btn_cmd(ulang.get("npm.install"), "npm install %s" % storePlugin.module_name)
link_page = md.btn_link(ulang.get("npm.homepage"), storePlugin.homepage)
link_pypi = md.btn_link(ulang.get("npm.pypi"), storePlugin.homepage)
reply += (f"\n# **{storePlugin.name}**\n"
f"\n> **{storePlugin.desc}**\n"
f"\n> {ulang.get('npm.author')}: {storePlugin.author}"
f"\n> *{md.escape(storePlugin.module_name)}*"
f"\n> {btn_install} {link_page} {link_pypi}\n\n***\n")
if len(rs) > max_show:
reply += f"\n{ulang.get('npm.too_many_results', HIDE_NUM=len(rs) - max_show)}"
else:
reply = ulang.get("npm.search_no_result")
await md.send_md(reply, bot, event=event)
elif sc.get("install") and perm_s:
plugin_name: str = result.subcommands["install"].args.get("plugin_name")
store_plugin = await get_store_plugin(plugin_name)
await npm.send(ulang.get("npm.installing", NAME=plugin_name))
r, log = npm_install(plugin_name)
log = log.replace("\\", "/")
if not store_plugin:
await npm.finish(ulang.get("npm.plugin_not_found", NAME=plugin_name))
homepage_btn = md.btn_cmd(ulang.get("npm.homepage"), store_plugin.homepage)
if r:
r_load = nonebot.load_plugin(plugin_name) # 加载插件
installed_plugin = InstalledPlugin(module_name=plugin_name) # 构造插件信息模型
found_in_db_plugin = plugin_db.first(InstalledPlugin(), "module_name = ?", plugin_name) # 查询数据库中是否已经安装
if r_load:
if found_in_db_plugin is None:
plugin_db.upsert(installed_plugin)
info = md.escape(ulang.get("npm.install_success", NAME=store_plugin.name)) # markdown转义
await md.send_md(
f"{info}\n\n"
f"```\n{log}\n```",
bot,
event=event
)
else:
await npm.finish(ulang.get("npm.plugin_already_installed", NAME=store_plugin.name))
else:
info = ulang.get("npm.load_failed", NAME=plugin_name, HOMEPAGE=homepage_btn).replace("_", r"\\_")
await md.send_md(
f"{info}\n\n"
f"```\n{log}\n```\n",
bot,
event=event
)
else:
info = ulang.get("npm.install_failed", NAME=plugin_name, HOMEPAGE=homepage_btn).replace("_", r"\\_")
await md.send_md(
f"{info}\n\n"
f"```\n{log}\n```",
bot,
event=event
)
elif sc.get("uninstall") and perm_s:
plugin_name: str = result.subcommands["uninstall"].args.get("plugin_name")
found_installed_plugin: InstalledPlugin = plugin_db.first(InstalledPlugin(), "module_name = ?", plugin_name)
if found_installed_plugin:
plugin_db.delete(InstalledPlugin(), "module_name = ?", plugin_name)
reply = f"{ulang.get('npm.uninstall_success', NAME=found_installed_plugin.module_name)}"
await npm.finish(reply)
else:
await npm.finish(ulang.get("npm.plugin_not_installed", NAME=plugin_name))
elif sc.get("list"):
loaded_plugin_list = sorted(nonebot.get_loaded_plugins(), key=lambda x: x.name)
num_per_page = result.subcommands.get("list").args.get("num")
total = len(loaded_plugin_list) // num_per_page + (1 if len(loaded_plugin_list) % num_per_page else 0)
page = clamp(result.subcommands.get("list").args.get("page"), 1, total)
# 已加载插件 | 总计10 | 第1/3页
reply = (f"# {ulang.get('npm.loaded_plugins')} | "
f"{ulang.get('npm.total', TOTAL=len(nonebot.get_loaded_plugins()))} | "
f"{ulang.get('npm.page', PAGE=page, TOTAL=total)} \n***\n")
permission_oas = await GROUP_ADMIN(bot, event) or await GROUP_OWNER(bot, event) or await SUPERUSER(bot, event)
permission_s = await SUPERUSER(bot, event)
for storePlugin in loaded_plugin_list[(page - 1) * num_per_page: min(page * num_per_page, len(loaded_plugin_list))]:
# 检查是否有 metadata 属性
# 添加帮助按钮
btn_usage = md.btn_cmd(ulang.get("npm.usage"), f"npm usage {storePlugin.name}", False)
store_plugin = await get_store_plugin(storePlugin.name)
session_enable = get_plugin_session_enable(event, storePlugin.name)
if store_plugin:
btn_homepage = md.btn_link(ulang.get("npm.homepage"), store_plugin.homepage)
show_name = store_plugin.name
elif storePlugin.metadata:
if storePlugin.metadata.extra.get("liteyuki"):
btn_homepage = md.btn_link(ulang.get("npm.homepage"), "https://github.com/snowykami/LiteyukiBot")
else:
btn_homepage = ulang.get("npm.homepage")
show_name = storePlugin.metadata.name
else:
btn_homepage = ulang.get("npm.homepage")
show_name = storePlugin.name
ulang.get("npm.no_description")
if storePlugin.metadata:
reply += f"\n**{md.escape(show_name)}**\n"
else:
reply += f"**{md.escape(show_name)}**\n"
reply += f"\n > {btn_usage} {btn_homepage}"
if permission_oas:
# 添加启用/停用插件按钮
cmd_toggle = f"npm {'disable' if session_enable else 'enable'} {storePlugin.name}"
text_toggle = ulang.get("npm.disable" if session_enable else "npm.enable")
can_be_toggle = get_plugin_can_be_toggle(storePlugin.name)
btn_toggle = text_toggle if not can_be_toggle else md.btn_cmd(text_toggle, cmd_toggle)
reply += f" {btn_toggle}"
if permission_s:
plugin_in_database = plugin_db.first(InstalledPlugin(), "module_name = ?", storePlugin.name)
# 添加移除插件和全局切换按钮
global_enable = get_plugin_global_enable(storePlugin.name)
btn_uninstall = (
md.btn_cmd(ulang.get("npm.uninstall"), f'npm uninstall {storePlugin.name}')) if plugin_in_database else ulang.get(
'npm.uninstall')
btn_toggle_global_text = ulang.get("npm.disable_global" if global_enable else "npm.enable_global")
cmd_toggle_global = f"npm {'disable' if global_enable else 'enable'}-global {storePlugin.name}"
btn_toggle_global = btn_toggle_global_text if not can_be_toggle else md.btn_cmd(btn_toggle_global_text, cmd_toggle_global)
reply += f" {btn_uninstall} {btn_toggle_global}"
reply += "\n\n***\n"
await md.send_md(reply, bot, event=event)
elif sc.get("usage"):
# TODO
pass
else:
pass
@run_preprocessor
async def pre_handle(event: Event, matcher: Matcher):
plugin: Plugin = matcher.plugin
plugin_global_enable = get_plugin_global_enable(plugin.name)
if not plugin_global_enable:
raise IgnoredException("Plugin disabled globally")
if event.get_type() == "message":
plugin_session_enable = get_plugin_session_enable(event, plugin.name)
if not plugin_session_enable:
raise IgnoredException("Plugin disabled in session")
async def npm_update() -> bool:
"""
更新本地插件json缓存
Returns:
bool: 是否成功更新
"""
url_list = [
"https://registry.nonebot.dev/plugins.json",
]
for url in url_list:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
async with aiofiles.open("data/liteyuki/plugins.json", "wb") as f:
data = await resp.read()
await f.write(data)
return True
return False
async def npm_search(keywords: list[str]) -> list[StorePlugin]:
"""
搜索插件
Args:
keywords (list[str]): 关键词列表
Returns:
list[StorePlugin]: 插件列表
"""
results = []
async with aiofiles.open("data/liteyuki/plugins.json", "r", encoding="utf-8") as f:
plugins: list[StorePlugin] = [StorePlugin(**pobj) for pobj in json.loads(await f.read())]
for plugin in plugins:
plugin_text = ' '.join(
[
plugin.name,
plugin.desc,
plugin.author,
plugin.module_name,
' '.join([tag.label for tag in plugin.tags])
]
)
if all([keyword in plugin_text for keyword in keywords]):
results.append(plugin)
return results
def npm_install(plugin_package_name) -> tuple[bool, str]:
"""
Args:
plugin_package_name:
Returns:
tuple[bool, str]:
"""
buffer = StringIO()
sys.stdout = buffer
sys.stderr = buffer
mirrors = [
"https://pypi.tuna.tsinghua.edu.cn/simple", # 清华大学
"https://pypi.mirrors.cqupt.edu.cn/simple", # 重庆邮电大学
"https://pypi.liteyuki.icu/simple", # 轻雪代理镜像
"https://pypi.org/simple", # 官方源
]
# 使用pip安装包对每个镜像尝试一次成功后返回值
success = False
for mirror in mirrors:
try:
nonebot.logger.info(f"npm_install try mirror: {mirror}")
result = pip.main(["install", plugin_package_name, "-i", mirror])
success = result == 0
if success:
break
else:
nonebot.logger.warning(f"npm_install failed, try next mirror.")
except Exception as e:
success = False
continue
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
return success, buffer.getvalue()

View File

@ -13,7 +13,8 @@ from liteyuki.utils.resource import (ResourceMetadata, add_resource_pack, change
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import Alconna, Args, on_alconna, Arparma, Subcommand
rpm = on_alconna(
@on_alconna(
aliases={"资源包"},
command=Alconna(
"rpm",
@ -53,10 +54,7 @@ rpm = on_alconna(
),
),
permission=SUPERUSER
)
@rpm.handle()
).handle()
async def _(bot: T_Bot, event: T_MessageEvent, result: Arparma):
ulang = get_user_lang(str(event.user_id))
reply = ""

View File

@ -6,7 +6,7 @@ import sys
import nonebot
__NAME__ = "LiteyukiBot"
__VERSION__ = "6.2.7" # 60201
__VERSION__ = "6.2.8" # 60201
import requests