添加对zip格式的资源包的支持,对function的支持

This commit is contained in:
snowy 2024-05-31 19:17:25 +08:00
parent c66d470166
commit e15aafd781
10 changed files with 285 additions and 90 deletions

View File

@ -34,7 +34,7 @@ log_level: "INFO" # 日志等级
log_icon: true # 是否显示日志等级图标(某些控制台字体不可用)
auto_report: true # 是否自动上报问题给轻雪服务器
auto_update: true # 是否自动更新轻雪每天4点检查更新
debug: false # 轻雪调试,开启后在调试时修改代码或资源会自动重载相应内容
debug: false # 轻雪调试,开启会自动重载Bot或者资源其他插件自带的调试功能也将开启
safe_mode: false # 安全模式,开启后将不会加载任何第三方插件
# 其他Nonebot插件的配置项
custom_config_1: "custom_value1"

View File

@ -2,7 +2,7 @@ from nonebot.plugin import PluginMetadata
from .core import *
from .loader import *
from .dev_tools import *
from .dev import *
__author__ = "snowykami"
__plugin_meta__ = PluginMetadata(

View File

@ -80,12 +80,13 @@ async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent):
temp_data.data.update(
{
"reload": True,
"reload_time": time.time(),
"reload_bot_id": bot.self_id,
"reload_session_type": event_utils.get_message_type(event),
"reload_session_id": (event.group_id if event.message_type == "group" else event.user_id) if not isinstance(event,satori.event.Event) else event.channel.id,
"delta_time": 0
"reload" : True,
"reload_time" : time.time(),
"reload_bot_id" : bot.self_id,
"reload_session_type": event_utils.get_message_type(event),
"reload_session_id" : (event.group_id if event.message_type == "group" else event.user_id) if not isinstance(event,
satori.event.Event) else event.channel.id,
"delta_time" : 0
}
)
@ -188,6 +189,39 @@ async def _(matcher: Matcher):
await matcher.finish("https://bot.liteyuki.icu/usage")
@on_alconna(
command=Alconna(
"/function",
Args["function", str]["args", MultiVar(str), ()],
)
).handle()
async def _(result: Arparma, bot: T_Bot, event: T_MessageEvent, matcher: Matcher):
"""
调用轻雪函数
Args:
result:
bot:
event:
Returns:
"""
function_name = result.main_args.get("function")
args: tuple[str] = result.main_args.get("args", ())
_args = []
_kwargs = {}
for arg in args:
arg = arg.replace("\\=", "EQUAL_SIGN")
if "=" in arg:
key, value = arg.split("=", 1)
_kwargs[key] = value.replace("EQUAL_SIGN", "=")
else:
_args.append(arg.replace("EQUAL_SIGN", "="))
@on_alconna(
command=Alconna(
"/api",
@ -294,7 +328,7 @@ async def _(bot: T_Bot):
reload_session_id = temp_data.data.get("reload_session_id", 0)
delta_time = temp_data.data.get("delta_time", 0)
common_db.save(temp_data) # 更新数据
if isinstance(bot,satori.Bot):
if isinstance(bot, satori.Bot):
await bot.send_message(
channel_id=reload_session_id,
message="Liteyuki reloaded in %.2f s" % delta_time
@ -323,23 +357,23 @@ async def every_day_update():
nonebot.logger.info(logs)
# 安全的需要用户id的api
# 需要用户id的api
need_user_id = (
"send_private_msg",
"send_msg",
"set_group_card",
"set_group_special_title",
"get_stranger_info",
"get_group_member_info"
"send_private_msg",
"send_msg",
"set_group_card",
"set_group_special_title",
"get_stranger_info",
"get_group_member_info"
)
need_group_id = (
"send_group_msg",
"send_msg",
"set_group_card",
"set_group_name",
"set_group_special_title",
"get_group_member_info",
"get_group_member_list",
"get_group_honor_info"
"send_group_msg",
"send_msg",
"set_group_card",
"set_group_name",
"set_group_special_title",
"get_group_member_info",
"get_group_member_list",
"get_group_honor_info"
)

View File

@ -0,0 +1,55 @@
import nonebot
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from liteyuki.utils.base.config import get_config
from liteyuki.utils.base.reloader import Reloader
from liteyuki.utils.base.resource import load_resources
if get_config("debug", False):
src_directories = (
"liteyuki/liteyuki_main",
"liteyuki/plugins",
"liteyuki/utils",
)
src_excludes_extensions = (
"pyc",
)
res_directories = (
"liteyuki/resources",
"resources",
)
nonebot.logger.info("Liteyuki Reload is enable, watching for file changes...")
class CodeModifiedHandler(FileSystemEventHandler):
"""
Handler for code file changes
"""
def on_modified(self, event):
if event.src_path.endswith(src_excludes_extensions) or event.is_directory:
return
nonebot.logger.debug(f"{event.src_path} modified, reloading bot...")
Reloader.reload()
class ResourceModifiedHandler(FileSystemEventHandler):
"""
Handler for resource file changes
"""
def on_modified(self, event):
nonebot.logger.debug(f"{event.src_path} modified, reloading resource...")
load_resources()
code_modified_handler = CodeModifiedHandler()
resource_modified_handle = ResourceModifiedHandler()
observer = Observer()
for directory in src_directories:
observer.schedule(code_modified_handler, directory, recursive=True)
for directory in res_directories:
observer.schedule(resource_modified_handle, directory, recursive=True)
observer.start()

View File

@ -1,43 +0,0 @@
import time
import nonebot
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from liteyuki.utils.base.config import get_config
from liteyuki.utils.base.reloader import Reloader
from liteyuki.utils.base.resource import load_resources
if get_config("debug", False):
nonebot.logger.info("Liteyuki Reload is enable, watching for file changes...")
class CodeModifiedHandler(FileSystemEventHandler):
def on_modified(self, event):
if "liteyuki/resources" not in event.src_path.replace("\\", "/"):
nonebot.logger.debug(f"{event.src_path} has been modified, reloading bot...")
Reloader.reload()
class ResourceModifiedHandler(FileSystemEventHandler):
def on_modified(self, event):
if not event.is_directory:
nonebot.logger.debug(f"{event.src_path} has been modified, reloading resource...")
load_resources()
code_modified_handler = CodeModifiedHandler()
resource_modified_handle = ResourceModifiedHandler()
observer = Observer()
observer.schedule(resource_modified_handle, path="liteyuki/resources", recursive=True)
observer.schedule(resource_modified_handle, path="resources", recursive=True)
observer.schedule(code_modified_handler, path="liteyuki", recursive=True)
observer.start()
# try:
# while True:
# time.sleep(1)
# except KeyboardInterrupt:
# observer.stop()
# observer.join()

View File

@ -1,6 +1,6 @@
# 轻雪资源包管理器
import os
import zipfile
import yaml
from nonebot import require
from nonebot.permission import SUPERUSER
@ -83,26 +83,43 @@ async def _(bot: T_Bot, event: T_MessageEvent, result: Arparma):
f"> {btn_move_up} {btn_move_down} {btn_move_top} {btn_unload}\n\n***")
reply += f"\n\n{ulang.get('liteyuki.unloaded_resources')}\n"
loaded_folders = [rp.folder for rp in get_loaded_resource_packs()]
# 遍历resources文件夹获取未加载的资源包
for folder in os.listdir("resources"):
if folder not in loaded_folders and os.path.exists(os.path.join("resources", folder, "metadata.yml")):
metadata = ResourceMetadata(
**yaml.load(
open(
os.path.join("resources", folder, "metadata.yml"),
encoding="utf-8"
),
Loader=yaml.FullLoader
if folder not in loaded_folders:
if os.path.exists(os.path.join("resources", folder, "metadata.yml")):
metadata = ResourceMetadata(
**yaml.load(
open(
os.path.join("resources", folder, "metadata.yml"),
encoding="utf-8"
),
Loader=yaml.FullLoader
)
)
)
metadata.folder = folder
metadata.path = os.path.join("resources", folder)
btn_load = md.btn_cmd(
ulang.get("npm.install"),
f"rpm load {metadata.folder}"
)
# 添加新行
reply += (f"\n**{md.escape(metadata.name)}**({md.escape(metadata.folder)})\n\n"
f"> {btn_load}\n\n***")
metadata.folder = folder
metadata.path = os.path.join("resources", folder)
btn_load = md.btn_cmd(
ulang.get("npm.install"),
f"rpm load {metadata.folder}"
)
# 添加新行
reply += (f"\n**{md.escape(metadata.name)}**({md.escape(metadata.folder)})\n\n"
f"> {btn_load}\n\n***")
elif os.path.isfile(os.path.join("resources", folder)) and folder.endswith(".zip"):
# zip文件
# 临时解压并读取metadata.yml
with zipfile.ZipFile(os.path.join("resources", folder), "r") as zip_ref:
with zip_ref.open("metadata.yml") as f:
metadata = ResourceMetadata(
**yaml.load(f, Loader=yaml.FullLoader)
)
btn_load = md.btn_cmd(
ulang.get("npm.install"),
f"rpm load {folder}"
)
# 添加新行
reply += (f"\n**{md.escape(metadata.name)}**({md.escape(folder)})\n\n"
f"> {btn_load}\n\n***")
elif result.subcommands.get("load") or result.subcommands.get("unload"):
load = result.subcommands.get("load") is not None
rp_name = result.args.get("name")

View File

@ -23,7 +23,6 @@
const data = JSON.parse(document.getElementById("data").innerText);
const results = data["result"];
const route_template = document.importNode(document.getElementById("route-template").content, true)
results.forEach(

View File

@ -0,0 +1,2 @@
var qq=2751454815
api send_private_msg user_id=qq message="Hello"

View File

@ -0,0 +1,111 @@
"""
liteyuki function是一种类似于mcfunction的函数用于在liteyuki中实现一些功能例如自定义指令等也可与Python函数绑定
使用 /function function_name *args **kwargs来调用
例如 /function test/hello user_id=123456
可以用于一些轻量级插件的编写无需Python代码
SnowyKami
"""
import functools
# cmd *args **kwargs
# api api_name **kwargs
import os
from typing import Any, Awaitable, Callable, Coroutine
from nonebot import Bot
from nonebot.adapters.satori import bot
ly_function_extensions = (
"lyf",
"lyfunction",
"mcfunction"
)
loaded_functions = dict()
class LiteyukiFunction:
def __init__(self, name: str, path: str):
self.name = name
self.path = path
self.functions = list()
self.bot: Bot = None
self.var_data = dict()
self.macro_data = dict()
def __call__(self, *args, **kwargs):
for _callable in self.functions:
if _callable is not None:
_callable(*args, **kwargs)
def __str__(self):
return f"LiteyukiFunction({self.name}, {self.path})"
def __repr__(self):
return self.__str__()
async def execute_line(self, line: str) -> Callable[[tuple, dict], Coroutine[Any, Any, Any] | Any] | None:
"""
解析一行轻雪函数
Args:
line:
Returns:
"""
args: list[str] = line.split(" ")
head = args.pop(0)
if head.startswith("#"):
# 注释
return None
elif head == "var":
# 变量定义
for arg in args:
self.var_data[arg.split("=", 1)[0]] = eval(arg.split("=", 1)[1])
elif head == "cmd":
# 在当前计算机上执行命令
os.system(line.split(" ", 1)[1])
elif head == "api":
# 调用Bot API 需要Bot实例
await self.bot.call_api(line.split(" ", 1)[1])
elif head == "function":
# 调用轻雪函数
return functools.partial(get_function, line.split(" ", 1)[1])
def get_function(name: str) -> LiteyukiFunction | None:
"""
获取一个轻雪函数
Args:
name: 函数名
Returns:
"""
return loaded_functions.get(name)
def load_from_dir(path: str):
"""
从目录中加载轻雪函数类似mcfunction
Args:
path: 目录路径
"""
for f in os.listdir(path):
f = os.path.join(path, f)
if os.path.isfile(f):
if f.endswith(ly_function_extensions):
load_from_file(f)
def load_from_file(path: str):
"""
从文件中加载轻雪函数
Args:
path:
Returns:
"""
pass

View File

@ -1,6 +1,7 @@
import json
import os
import shutil
import zipfile
from typing import Any
import nonebot
@ -11,6 +12,7 @@ from .language import Language, get_default_lang_code
_loaded_resource_packs: list["ResourceMetadata"] = [] # 按照加载顺序排序
temp_resource_root = "data/liteyuki/resources"
temp_extract_root = "data/liteyuki/temp"
lang = Language(get_default_lang_code())
@ -32,6 +34,15 @@ def load_resource_from_dir(path: str):
if os.path.exists(os.path.join(path, "metadata.yml")):
with open(os.path.join(path, "metadata.yml"), "r", encoding="utf-8") as f:
metadata = yaml.safe_load(f)
elif os.path.isfile(path) and path.endswith(".zip"):
# zip文件
# 临时解压并读取metadata.yml
with zipfile.ZipFile(path, "r") as zip_ref:
# 解压至临时目录 data/liteyuki/temp/{pack_name}.zip
zip_ref.extractall(os.path.join(temp_extract_root, os.path.basename(path)))
with zip_ref.open("metadata.yml") as f:
metadata = yaml.safe_load(f)
path = os.path.join(temp_extract_root, os.path.basename(path))
else:
# 没有metadata.yml文件不是一个资源包
return
@ -41,13 +52,21 @@ def load_resource_from_dir(path: str):
copy_file(os.path.join(root, file), os.path.join(temp_resource_root, relative_path))
metadata["path"] = path
metadata["folder"] = os.path.basename(path)
if os.path.exists(os.path.join(path, "lang")):
# 加载语言
from liteyuki.utils.base.language import load_from_dir
load_from_dir(os.path.join(path, "lang"))
if os.path.exists(os.path.join(path, "functions")):
# 加载功能
from liteyuki.utils.base.ly_function import load_from_dir
load_from_dir(os.path.join(path, "functions"))
_loaded_resource_packs.insert(0, ResourceMetadata(**metadata))
def get_path(path: str, abs_path: bool = True, default: Any = None, debug: bool=False) -> str | Any:
def get_path(path: str, abs_path: bool = True, default: Any = None, debug: bool = False) -> str | Any:
"""
获取资源包中的文件
Args:
@ -125,7 +144,7 @@ def load_resources():
json.dump([], open("resources/index.json", "w", encoding="utf-8"))
resource_index: list[str] = json.load(open("resources/index.json", "r", encoding="utf-8"))
resource_index.reverse() # 优先级高的后加载,但是排在前面
resource_index.reverse() # 优先级高的后加载,但是排在前面
for resource in resource_index:
load_resource_from_dir(os.path.join("resources", resource))
@ -147,7 +166,8 @@ def check_exist(name: str) -> bool:
name: 资源包名称文件夹名
Returns: 是否存在
"""
return os.path.exists(os.path.join("resources", name, "metadata.yml"))
path = os.path.join("resources", name)
return os.path.exists(os.path.join(path, "metadata.yml")) or (os.path.isfile(path) and name.endswith(".zip"))
def add_resource_pack(name: str) -> bool: