1
0
forked from bot/app

feat: 更新了插件商店部分功能

This commit is contained in:
远野千束 2024-03-21 01:20:18 +08:00
parent 14d9f041ce
commit 79f6d50e82
10 changed files with 223 additions and 10 deletions

View File

@ -1,3 +1,5 @@
aiohttp==3.9.3
aiofiles==23.2.1
arclet-alconna==1.8.5
arclet-alconna-tools==0.7.0
dash==2.16.1

View File

@ -1,6 +1,7 @@
from nonebot.plugin import PluginMetadata
from .manager import *
from .installer import *
from .helper import *
__author__ = "snowykami"

View File

@ -1,5 +1,154 @@
import json
import os.path
import shutil
from typing import Optional
import nonebot
from arclet.alconna import Arparma, MultiVar
from nonebot.permission import SUPERUSER
from nonebot_plugin_alconna import on_alconna, Alconna, Args, Subcommand
import pip
import aiohttp, aiofiles
from typing_extensions import Any
from src.utils.data import LiteModel
from src.utils.language import get_user_lang
from src.utils.message import button, send_markdown
from src.utils.resource import get_res
from src.utils.typing import T_Bot, T_MessageEvent
npm_alc = on_alconna(
Alconna(
"lnpm",
Subcommand(
"update",
alias=["u"],
),
Subcommand(
"search",
Args["keywords", MultiVar(str)]["page", int, 1],
alias=["s"],
),
Subcommand(
"install",
Args["plugin_name", str],
alias=["i"],
),
Subcommand(
"remove",
Args["plugin_name", str],
alias=["rm"],
),
),
permission=SUPERUSER
)
class PluginTag(LiteModel):
label: str
color: str = '#000000'
class StorePlugin(LiteModel):
name: str
desc: str
module_name: str
project_link: str = ''
homepage: str = ''
author: str = ''
type: str | None = None
version: str | None = ''
time: str = ''
tags: list[PluginTag] = []
is_official: bool = False
@npm_alc.handle()
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
ulang = get_user_lang(str(event.user_id))
if not os.path.exists("data/liteyuki/plugins.json"):
shutil.move(get_res('unsorted/plugins.json'), "data/liteyuki/plugins.json")
nonebot.logger.info("Please update plugin store data file.")
if result.subcommands.get("update"):
r = await npm_update()
if r:
await npm_alc.finish(ulang.get("npm.store_update_success"))
else:
await npm_alc.finish(ulang.get("npm.store_update_failed"))
elif result.subcommands.get("search"):
keywords: list[str] = result.subcommands["search"].args.get("keywords")
rs = await npm_search(keywords)
if len(rs):
reply = f"{ulang.get('npm.search_result')} | {ulang.get('npm.total', TOTAL=len(rs))}\n***"
for plugin in rs[:min(10, len(rs))]:
reply += (f"\n{button(ulang.get('npm.install'), 'lnpm install %s' % plugin.module_name)} | **{plugin.name}**\n"
f"\n > **{plugin.desc}**\n"
f"\n > {ulang.get('npm.author')}: {plugin.author} | [🔗{ulang.get('npm.homepage')}]({plugin.homepage})\n\n***\n")
if len(rs) > 10:
reply += (f"\n{ulang.get('npm.too_many_results')}"
f"\n{button(ulang.get('npm.prev_page'), 'lnpm search %s %s' % (' '.join(keywords), 2))} | "
f"{button(ulang.get('npm.next_page'), 'lnpm search %s %s' % (' '.join(keywords), 2))}")
else:
reply = ulang.get("npm.search_no_result")
await send_markdown(reply, bot, event=event)
async def npm_update() -> bool:
"""
更新本地插件json缓存
Returns:
bool: 是否成功更新
"""
url_list = [
"https://registry.nonebot.dev/plugins.json",
]
# 用aiohttp请求json文件成功就覆盖本地文件否则尝试下一个url
for url in url_list:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
async with aiofiles.open("data/liteyuki/plugins.json", "wb") as f:
data = await resp.read()
await f.write(data)
nonebot.logger.info()
return True
return False
async def npm_search(keywords: list[str]) -> list[StorePlugin]:
"""
搜索插件
Args:
keywords (list[str]): 关键词列表
Returns:
list[StorePlugin]: 插件列表
"""
results = []
async with aiofiles.open("data/liteyuki/plugins.json", "r", encoding="utf-8") as f:
plugins: list[StorePlugin] = [StorePlugin(**pobj) for pobj in json.loads(await f.read())]
for plugin in plugins:
plugin_text = ' '.join(
[
plugin.name,
plugin.desc,
plugin.author,
plugin.module_name,
plugin.project_link,
plugin.homepage,
' '.join([tag.label for tag in plugin.tags])
]
)
if all([keyword in plugin_text for keyword in keywords]):
results.append(plugin)
return results
def install(plugin_name) -> bool:
try:

View File

@ -6,7 +6,7 @@ from src.utils.message import button, send_markdown
from src.utils.typing import T_Bot, T_MessageEvent
from src.utils.language import get_user_lang
list_plugins = on_command("list-plugin", aliases={"列出插件"}, priority=0, permission=SUPERUSER)
list_plugins = on_command("list-plugin", aliases={"列出插件", "插件列表"}, priority=0, permission=SUPERUSER)
toggle_plugin = on_command("enable-plugin", aliases={"启用插件", "禁用插件", "disable-plugin"}, priority=0, permission=SUPERUSER)
@ -17,9 +17,11 @@ async def _(event: T_MessageEvent, bot: T_Bot):
for plugin in nonebot.get_loaded_plugins():
# 检查是否有 metadata 属性
if plugin.metadata:
reply += (f"\n{button(lang.get('npm.disable'), 'disable-plugin %s' % plugin.name, False, False)} **{plugin.metadata.name}**\n"
reply += (f"\n{button(lang.get('npm.help'), 'help %s' % plugin.name, False, False)} "
f"**{plugin.metadata.name}**\n"
f"\n > {plugin.metadata.description}\n\n***\n")
else:
reply += (f"\n{button(lang.get('npm.disable'), 'disable-plugin %s' % plugin.name, False, False)} **{plugin.name}**\n"
reply += (f"\n{button(lang.get('npm.help'), 'help %s' % plugin.name, False, False)} "
f"**{plugin.name}**\n"
f"\n > {lang.get('npm.no_description')}\n\n***\n")
await send_markdown(reply, bot, event=event)

View File

@ -12,9 +12,21 @@ main.monitor.usage=Usage
npm.loaded_plugins=Loaded plugins
npm.total=Total {TOTAL}
npm.help=Help
npm.disable=Disable
npm.enable=Enable
npm.install=Install
npm.uninstall=Uninstall
npm.no_description=No description
npm.store_update_success=Plugin store data updated successfully
npm.store_update_failed=Plugin store data update failed
npm.search_result=Search results
npm.search_no_result=No result found
npm.too_many_results=Too many results found
npm.author=Author
npm.homepage=Homepage
npm.next_page=Next
npm.prev_page=Prev
user.profile_manager.query=Your {ATTR} is {VALUE}
user.profile_manager.set=Your {ATTR} has been set to {VALUE}

View File

@ -12,9 +12,21 @@ main.monitor.usage=使用率
npm.loaded_plugins=已加载插件
npm.total=总计 {TOTAL}
npm.help=帮助
npm.disable=停用
npm.enable=启用
npm.install=安装
npm.uninstall=卸载
npm.no_description=无描述
npm.store_update_success=插件商店数据更新成功
npm.store_update_failed=插件商店数据更新失败
npm.search_result=搜索结果
npm.search_no_result=无搜索结果
npm.too_many_results=搜索结果过多,请翻页查看
npm.author=作者
npm.homepage=主页
npm.next_page=下一页
npm.prev_page=上一页
user.profile_manager.query=你的个人信息 {ATTR} 为 {VALUE}
user.profile_manager.set=你的个人信息 {ATTR} 已设置为 {VALUE}

View File

@ -5,7 +5,7 @@
import json
import locale
import os
from typing import Any
from typing_extensions import Any
import nonebot

View File

@ -74,15 +74,15 @@ async def send_markdown(markdown: str, bot: T_Bot, *, message_type: str = None,
def button(name: str, cmd: str, reply: bool = False, enter: bool = True) -> str:
"""生成点击按钮的链接
"""生成点击按钮
Args:
name:
cmd:
name: 按钮显示内容
cmd: 发送的命令已在函数内url编码不需要再次编码
reply: 是否以回复的方式发送消息
enter: 自动发送消息则为True
enter: 自动发送消息则为True否则填充到输入框
Returns:
markdown格式的链接
markdown格式的可点击回调按钮
"""
return f"[{name}](mqqapi://aio/inlinecmd?command={encode_url(cmd)}&reply={str(reply).lower()}&enter={str(enter).lower()})"

View File

@ -2,11 +2,12 @@ import os
import nonebot
import yaml
from typing_extensions import Any
from src.utils.data import LiteModel
_resource_data = {}
_loaded_resource_packs = [] # 按照加载顺序排序
_loaded_resource_packs = [] # 按照加载顺序排序
class ResourceMetadata(LiteModel):
@ -41,3 +42,14 @@ def load_resource_from_dir(path: str):
from src.utils.language import load_from_dir
load_from_dir(os.path.join(path, "lang"))
_loaded_resource_packs.append(ResourceMetadata(**metadata))
def get_res(path: str, default: Any = None) -> str | Any:
"""
获取资源包中的文件
Args:
default: 默认
path: 文件相对路径
Returns: 文件绝对路径
"""
return _resource_data.get(path, default)

View File

@ -49,3 +49,26 @@ def de_escape(text: str) -> str:
def encode_url(text: str) -> str:
return quote(text, safe="")
def keywords_in_text(keywords: list[str], text: str, all_matched: bool) -> bool:
"""
检查关键词是否在文本中
Args:
keywords: 关键词列表
text: 文本
all_matched: 是否需要全部匹配
Returns:
"""
if all_matched:
for keyword in keywords:
if keyword not in text:
return False
return True
else:
for keyword in keywords:
if keyword in text:
return True
return False