完成了我们心心念念的转换结构的功能

This commit is contained in:
EillesWan 2024-10-21 18:31:20 +08:00
parent 4ff1822a4e
commit 07655094bd
7 changed files with 753 additions and 22 deletions

1
.gitignore vendored
View File

@ -27,6 +27,7 @@ src/nonebot_plugins/trimo_plugin_msctconverter/config
src/nonebot_plugins/trimo_plugin_msctconverter/temp
src/nonebot_plugins/trimo_plugin_msctconverter/MusicPreview/assets/wav
src/nonebot_plugins/dislink_plugin_ccnd
# src/nonebot_plugins/dislink_plugin_ccnd
# vuepress
.github

View File

@ -1,5 +1,3 @@
import nonebot
import nonebot.adapters
import nonebot.drivers
@ -28,7 +26,7 @@ from nonebot_plugin_alconna import (
from src.utils.base.ly_typing import T_Bot, T_MessageEvent
on_convert_help = on_alconna(
on_submit_help = on_alconna(
command=Alconna("投稿帮助"),
aliases={
"查看投稿帮助",
@ -41,9 +39,22 @@ on_convert_help = on_alconna(
)
@on_convert_help.handle()
@on_submit_help.handle()
async def _(
event: GroupMessageEvent,
bot: T_Bot,
):
pass
on_submit_keyword = nonebot.on_keyword(
keywords={"皮肤投稿", "皮肤上传", "皮肤代投", "皮肤代发"},
)
@on_submit_keyword.handle()
async def _(bot: T_Bot, event: T_MessageEvent):
return
await on_submit_keyword.finish(
"请使用命令提交皮肤:\n" + "```\n" + "皮肤提交 [皮肤名] [图片]\n" + "```"
)

View File

@ -0,0 +1,649 @@
import os
import sys
import random
import brotli
import shutil
from io import StringIO
from pathlib import Path
import nonebot
import zhDateTime
from nonebot.rule import to_me
from nonebot.permission import SUPERUSER
from nonebot.adapters.onebot.v11.event import (
GroupUploadNoticeEvent,
GroupMessageEvent,
PrivateMessageEvent,
)
from nonebot_plugin_alconna import (
Alconna,
# AlconnaQuery,
Args,
# Image,
Option,
# Query,
# Text,
UniMessage,
on_alconna,
# Voice,
Arparma,
Args,
store_true,
)
from Musicreater.plugin.bdx import (
BDX_MOVE_KEY,
bdx_move,
form_command_block_in_BDX_bytes,
x,
y,
z,
)
from Musicreater.plugin.archive import compress_zipfile
from src.utils.io import read_file
from src.utils.base.language import get_user_lang
from src.utils.base.ly_typing import T_Bot, T_MessageEvent
from src.utils.message.html_tool import md_to_pic
from src.utils.message.message import MarkdownMessage
from .utils import hanzi_timeid
from .msctexec import (
# something_temporary,
query_convert_points,
filesaves,
save_filesaves,
configdict,
temporary_dir,
add_file_to_delete,
# add_memory_to_temporary,
# read_memory_from_temporary,
get_stored_path,
)
from ..liteyuki_status.status import random_yanlun_text
FACE_VALUE = {
x: {
True: 5,
False: 4,
},
y: {
True: 1,
False: 0,
},
z: {
True: 3,
False: 2,
},
}
def switch_xz(axis: str):
return z if axis == x else x
def go_forward(forward: bool):
return 1 if forward else -1
def command_to_a_single_chain_in_BDX_bytes(
commands: list[tuple[str, bool, str]],
axis: str,
forward: bool,
limit: int = 128,
):
_bytes = b""
nowf = True
nowgo = 0
for cmd, condition, note in commands:
is_point = ((nowgo != 0) and (not nowf)) or (nowf and (nowgo != (limit - 1)))
_bytes += form_command_block_in_BDX_bytes(
cmd,
(
FACE_VALUE[axis][not forward ^ nowf]
if is_point
else FACE_VALUE[switch_xz(axis)][True]
),
impluse=2,
condition=condition,
needRedstone=False,
tickDelay=0,
customName=note,
executeOnFirstTick=False,
trackOutput=True,
)
nowgo += go_forward(nowf)
if ((nowgo >= limit) and nowf) or ((nowgo < 0) and (not nowf)):
nowgo -= go_forward(nowf)
nowf = not nowf
_bytes += bdx_move(switch_xz(axis), 1)
else:
_bytes += bdx_move(axis, go_forward(not forward ^ nowf))
# _bytes += move(axis, goahead(forward ^ nowf)*nowgo)
_bytes += bdx_move(axis, -1 * nowgo)
return _bytes
def command_to_multiline_BDX_structure_file(
funcList: list[list[tuple[str, bool, str]]],
axis_: str,
forward_: bool,
limit_: int = 128,
author: str = "Eilles",
outfile: str = "./test.bdx",
) -> tuple[bool, int, tuple[int, int, int]]:
"""
Parameters
----------
funcList: list
指令集列表 指令系统[ 指令集[ 单个指令( str指令, bool条件性 ), ], ]
axis_: str
坐标增值方向只能是小写的 `x`,`y`,`z`
forward_: bool
是否沿着坐标轴的正方向
limit_: int
在延展方向上的长度限制
author: str
作者名称
outfile: str
输出文件
Returns
-------
成功与否指令总长度指令结构总大小
"""
with open(os.path.abspath(outfile), "w+", encoding="utf-8") as f:
f.write("BD@")
_bytes = (
b"BDX\x00"
+ author.encode("utf-8")
+ b" with TrimOrg: BDX Generator\x00\x01command_block\x00"
)
totalSize = {x: 0, y: 1, z: 0}
totalLen = 0
# 非链延展方向,即系统延展方向
antiaxis = switch_xz(axis_)
while funcList:
func = funcList.pop(0)
nowlen = len(func)
totalLen += nowlen
# 走一条链的指令方块,会自动复位
_bytes += command_to_a_single_chain_in_BDX_bytes(func, axis_, forward_, limit_)
# 不是最后一组
if funcList:
# 计算系统延展方向的长度
totalSize[antiaxis] += 2 + nowlen // limit_
if totalSize[antiaxis] + 2 <= limit_:
# 没到头,那就 向前走两步?
_bytes += bdx_move(antiaxis, 2)
else:
# 到头了,那就退回去?
_bytes += bdx_move(y, 2)
_bytes += bdx_move(antiaxis, -totalSize[antiaxis])
# _bytes += move(axis_, -len(func))
else:
totalSize[antiaxis] += 1 + nowlen // limit_
# 计算链延展方向的长度
totalSize[axis_] = min(max(totalSize[axis_], nowlen), limit_)
with open(
os.path.abspath(outfile),
"ab+",
) as f:
f.write(brotli.compress(_bytes + b"XE"))
return (True, totalLen, (totalSize[x], totalSize[y], totalSize[z]))
async def read_file_into_command_lines(
file_: Path,
) -> list[list[tuple[str, bool, str]]]:
# 用双换行分割每段
# 单换行分每行
cdt = False
note = ""
functionList = []
for lines in (await read_file(file_path=file_, mode="r")).split("\n\n"): # type: ignore
funcGroup = []
for line in lines.split("\n"):
if line.strip().startswith("#"):
if "cdt" in line.lower():
cdt = True
note = line[1:].replace("cdt", "").strip()
else:
if "#" not in line:
funcGroup.append((line, cdt, note))
else:
funcGroup.append(
(
line[: line.find("#")].strip(),
cdt,
line[line.find("#") + 1 :].strip() + note,
)
)
cdt = False
note = ""
functionList.append(funcGroup)
return functionList
write_2_file = on_alconna(
Alconna(
"写入文本文件",
Option(
"-n|-f|--file-name",
default="新建文本文档",
args=Args["file-name", str, "新建文本文档"],
),
Option("-a|--append", default=False, action=store_true),
),
aliases=(
"write file",
"write down",
"写入",
"write_file",
"write_down",
"write2file",
"写入文件",
"写入文本",
),
rule=to_me(),
# ignorecase=True,
)
@write_2_file.handle()
async def _(
result: Arparma,
event: T_MessageEvent,
bot: T_Bot,
):
nonebot.logger.info(result.options)
usr_id = event.get_user_id()
ulang = get_user_lang(usr_id)
whole_texts = event.get_plaintext().split("\n", 1)
if len(whole_texts) < 2:
await write_2_file.finish(ulang.get("writefile.no_text"))
file_2_write = (
result.options["file-name"].args["file-name"]
if result.options["file-name"].args
else "新建文本文档"
) + ".txt"
file_path = get_stored_path(usr_id, file_2_write, superuser=False)
if result.options["append"].value:
if file_2_write in filesaves[usr_id].keys():
with file_path.open(mode="a", encoding="utf-8") as f:
f.write(whole_texts[1])
file_size = os.path.getsize(file_path)
filesaves[usr_id]["totalSize"] += (
file_size - filesaves[usr_id][file_2_write]["size"]
)
filesaves[usr_id][file_2_write]["size"] = file_size
await write_2_file.finish(
ulang.get(
"writefile.append_success",
NAME=file_2_write,
COUNT=len(whole_texts[1]),
SIZE=file_size,
)
)
else:
await write_2_file.finish(ulang.get("writefile.file_not_exist"))
else:
with open(file_path, "w", encoding="utf-8") as f:
f.write(whole_texts[1])
now = zhDateTime.DateTime.now()
file_size = os.path.getsize(file_path)
try:
filesaves[usr_id][file_2_write] = {
"date": [
now.year,
now.month,
now.day,
now.hour,
now.minute,
],
"size": file_size,
}
filesaves[usr_id]["totalSize"] += file_size
except KeyError:
filesaves[usr_id] = {
file_2_write: {
"date": [
now.year,
now.month,
now.day,
now.hour,
now.minute,
],
"size": file_size,
}
}
filesaves[usr_id]["totalSize"] = file_size
save_filesaves()
await write_2_file.finish(
ulang.get(
"writefile.write_success",
NAME=file_2_write,
SIZE=file_size,
)
)
cmd2struct = on_alconna(
Alconna(
"指令转结构",
Option("-n|-f|--file-name", default="all", args=Args["file-name", str, "all"]),
Option("-t|-type", default="all", args=Args["type", str, "all"]),
Option(
"-e|-x|--expand-axis", default="z+", args=Args["expand-axis", str, "z+"]
),
Option("-l|--length-limit", default=32, args=Args["length-limit", int, 32]),
Option(
"-a|--author",
default="DefaultUser",
args=Args["author", str, "DefaultUser"],
),
),
aliases={
"函数转结构",
"cmd2struct",
"command2structure",
"mcfunction2struct",
"mcfunction2structure",
"mcfunction转结构",
},
)
@cmd2struct.handle()
async def _(
result: Arparma,
event: GroupMessageEvent | PrivateMessageEvent,
bot: T_Bot,
):
nonebot.logger.info(result.options)
usr_id = event.get_user_id()
ulang = get_user_lang(usr_id)
superuser_permission = await SUPERUSER(bot, event)
if ((qres := query_convert_points(usr_id, "structure"))[0] is False) and (
not superuser_permission
):
await cmd2struct.finish(
UniMessage.text(
ulang.get(
"convet.not_enough_point",
NOW=qres[1],
TOTAL=configdict["maxPersonConvert"]["structure"],
)
),
at_sender=True,
)
dest_axis = (
result.options["expand-axis"].args["expand-axis"]
if result.options["expand-axis"].args
else "z+"
).lower()
if len(dest_axis) == 2 and dest_axis[0] in (x, y, z) and dest_axis[1] in ("-", "+"):
dest_axis = dest_axis[0]
axis_forward = True if dest_axis[1] == "+" else False
else:
await cmd2struct.finish(
UniMessage.text(
ulang.get(
"cmd2struct.axis_wrong",
),
),
at_sender=True,
)
dest_type = (
result.options["type"].args["type"] if result.options["type"].args else "all"
).lower()
if dest_type == "all":
dest_type = ("bdx",)
elif dest_type in [
"bdx",
]:
dest_type = ("bdx",)
else:
await cmd2struct.finish(
UniMessage.text(
ulang.get(
"convert.something_not_exist",
WHAT="转换格式",
NAME=dest_type,
)
),
at_sender=True,
)
return
file_2_cvt = (
result.options["file-name"].args["file-name"]
if result.options["file-name"].args
else "all"
)
if ((not superuser_permission) and (usr_id not in filesaves.keys())) or (
superuser_permission
and (
(not len(filesaves))
or (file_2_cvt.lower() == "all" and usr_id not in filesaves.keys())
)
):
await cmd2struct.finish(
UniMessage.text(ulang.get("convert.no_file", TYPE="文本指令")),
at_sender=True,
)
return
else:
if file_2_cvt == "all":
file_2_cvt = [
i for i in filesaves[usr_id].keys() if i.endswith(".mcfunction")
]
else:
file_2_cvt = file_2_cvt.split("&")
length_limit = (
result.options["length-limit"].args["length-limit"]
if result.options["length-limit"].args
else 32
)
author_name = (
result.options["author"].args["author"]
if result.options["author"].args
else "DefaultUser"
)
# usr_data_path = database_dir / usr_id
(usr_temp_path := temporary_dir / usr_id).mkdir(exist_ok=True)
# 重定向标准输出
buffer = StringIO()
sys.stdout = buffer
sys.stderr = buffer
def go_chk_point() -> bool:
res, pnt = query_convert_points(
usr_id,
"music",
random.random() % 0.4 + 0.1,
)
if not res:
buffer.write(ulang.get("convert.break.not_enough_point", NOW=pnt))
return res
await cmd2struct.send(UniMessage.text(ulang.get("convert.start")))
try:
all_files: dict[str, dict[str, dict[str, int | tuple | str | list]]] = {}
for file_to_convert in file_2_cvt:
nonebot.logger.info("载入转换文件:{}".format(file_to_convert))
to_convert_path = get_stored_path(
usr_id, file_to_convert, superuser_permission
)
if to_convert_path.is_file():
all_files[to_convert_path.name] = {}
else:
buffer.write("文件 {} 不存在\n".format(file_to_convert))
continue
cmd_lines = await read_file_into_command_lines(to_convert_path)
if go_chk_point() and "bdx" in dest_type:
if (
res := command_to_multiline_BDX_structure_file(
cmd_lines,
axis_=dest_axis,
forward_=axis_forward,
limit_=length_limit,
author=author_name,
outfile=usr_temp_path / file_to_convert / ".bdx",
)
)[0]:
all_files[file_to_convert]["bdx"] = {
"指令总长度": res[1],
"结构总大小": res[2],
}
else:
buffer.write("转换BDX文件出现错误。")
if not all_files:
nonebot.logger.warning(
"无可供转换的文件",
)
await cmd2struct.finish(
UniMessage(
"我相信质量守恒定律的存在,可是你却要把这份坚信给摧毁。\n*所指向之文件皆不存在"
)
)
except Exception as e:
nonebot.logger.error("转换存在错误:{}".format(e))
buffer.write(
"[ERROR] {}\n".format(e).replace(str(Path(__file__).parent.resolve()), "[]")
)
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
compress_zipfile(
usr_temp_path,
fp := str(
(
temporary_dir
/ (fn := "msctr[{}]-{}.zip".format(hanzi_timeid(), usr_id))
).resolve()
),
)
shutil.rmtree(usr_temp_path)
if isinstance(event, GroupMessageEvent) or isinstance(
event, GroupUploadNoticeEvent
):
await bot.call_api(
"upload_group_file", group_id=event.group_id, name=fn, file=fp
)
# await linglun_convert.send(
# UniMessage.text("文件已上传群文件,请在群文件查看。")
# )
# await linglun_convert.send(UniMessage.file(res_id,path=fp,name=fn))
else:
await bot.call_api(
"upload_private_file", user_id=event.user_id, name=fn, file=fp
)
img_bytes = await md_to_pic(
"##{}\n\n```\n{}\n```".format(
MarkdownMessage.escape("日志信息:"),
buffer.getvalue().replace("\\", "/"),
),
)
await UniMessage.send(UniMessage.image(raw=img_bytes))
# nonebot.logger.info(buffer.getvalue())
img_bytes = await md_to_pic(
"## 转换结果\n\n"
+ ("\n\n\n").join(
[
"###{}\n\n{}".format(
fn,
"\n\n".join(
[
"- {}\n\t{}".format(
tn,
"\n\t".join(
["- {} : {}".format(i, j) for i, j in rps.items()]
),
)
for tn, rps in res.items()
]
),
)
for fn, res in all_files.items()
]
)
+ "\n\n### 言·论 \n\n **{}**".format(random_yanlun_text()),
)
await UniMessage.send(UniMessage.image(raw=img_bytes))
add_file_to_delete(fp, 1)
await cmd2struct.finish(
UniMessage.text(
"转换结束当前剩余转换点数⌊p⌋≈{:.2f}|{}".format(
query_convert_points(
usr_id,
"music",
0,
)[1],
configdict["maxPersonConvert"]["music"],
)
),
at_sender=True,
)

View File

@ -207,7 +207,7 @@ def add_file_to_delete(file_: Path | os.PathLike[str] | str, wait_p30s: int = 0)
文件路径
wait_p30s: int
等待时间单位为 30 秒内不大于 30 默认为 `0`
返回
str
文件路径的字符串
@ -222,7 +222,7 @@ def add_memory_to_temporary(
) -> None:
"""
向临时内存存储中填入内存信息
参数
index: str
索引
@ -236,14 +236,15 @@ def add_memory_to_temporary(
global something_temporary
something_temporary[index] = {"stuff": (memory_, description), "time": wait_p30s}
def read_memory_from_temporary(index: str) -> Any:
"""
从临时内存存储中读取内容
参数
index: str
索引
返回
Any
内容当无此内容时返回 `None`
@ -255,12 +256,13 @@ def read_memory_from_temporary(index: str) -> Any:
else:
return memory_cmp
def get_stored_path(
user_id: str, item: Union[Path, os.PathLike[str], str], superuser: bool = False
) -> Path:
"""
获取用户文件存储路径
参数
user_id: str
用户id
@ -268,7 +270,7 @@ def get_stored_path(
文件名对于用户目录的相对路径
superuser: bool
是否为超级用户默认为 `False` 若为 `True` 则在用户文件中寻找
返回
Path
文件路径
@ -718,6 +720,7 @@ async def _(
nonebot.logger.info(result.options)
usr_id = event.get_user_id()
ulang = get_user_lang(usr_id)
superuser_permission = await SUPERUSER(bot, event)
@ -726,9 +729,10 @@ async def _(
):
await linglun_convert.finish(
UniMessage.text(
"转换点数不足当前剩余⌊p⌋≈{:.2f}|{}".format(
qres[1],
configdict["maxPersonConvert"]["music"],
ulang.get(
"convet.not_enough_point",
NOW=qres[1],
TOTAL=configdict["maxPersonConvert"]["music"],
)
),
at_sender=True,
@ -784,7 +788,7 @@ async def _(
)
):
await linglun_convert.finish(
UniMessage.text("服务器内未存入你的任何文件请先上传midi文件吧")
UniMessage.text(ulang.get("convert.no_file", TYPE="midi"))
)
return
@ -814,7 +818,13 @@ async def _(
pitched_notechart.update(json.load(_ppnt.open("r")))
else:
await linglun_convert.finish(
UniMessage.text("乐器对照表 {} 不存在".format(_args["pitched-note-table"]))
UniMessage.text(
ulang.get(
"convert.something_not_exist",
WHAT="乐音乐器对照表",
NAME=_args["pitched-note-table"],
)
)
)
return
@ -842,7 +852,11 @@ async def _(
else:
await linglun_convert.finish(
UniMessage.text(
"乐器对照表 {} 不存在".format(_args["percussion-note-table"])
ulang.get(
"convert.something_not_exist",
WHAT="打击乐器对照表",
NAME=_args["percussion-note-table"],
)
)
)
return
@ -859,7 +873,11 @@ async def _(
else:
await linglun_convert.finish(
UniMessage.text(
"音量处理曲线 {} 不存在".format(_args["volume-processing-function"])
ulang.get(
"convert.something_not_exist",
WHAT="音量处理曲线",
NAME=_args["volume-processing-function"],
)
)
)
return
@ -889,10 +907,12 @@ async def _(
random.random() % 0.5 + 0.3,
)
if not res:
buffer.write("中途退出,转换点不足:{}\n".format(pnt))
buffer.write(ulang.get("convert.break.not_enough_point", NOW=pnt))
return res
await linglun_convert.send(UniMessage.text("转换开始……"))
await linglun_convert.send(
UniMessage.text(ulang.get("convert.start"))
)
try:
@ -935,7 +955,9 @@ async def _(
if identify_cmp in something_temporary.keys():
nonebot.logger.info("载入已有缓存。")
msct_obj: Musicreater.MidiConvert = read_memory_from_temporary(identify_cmp)
msct_obj: Musicreater.MidiConvert = read_memory_from_temporary(
identify_cmp
)
msct_obj.redefine_execute_format(_args["old-execute-format"])
msct_obj.set_min_volume(_args["minimal-volume"])
# msct_obj.set_deviation()

View File

@ -0,0 +1,20 @@
upexecute.same=It's same of the command after convert
upexecute.enable="execute" command auto update is enabled
upexecute.disable=It is a disable of the "execute" command auto update
writefile.no_text=No text provided to write, please give the text after the first line
writefile.no_file=No file assigned to write, provide it with arg "-f|--file-name"
writefile.
writefile.file_not_exist=The file is not existed
writefile.append_success=File which is called {NAME} have appended {COUNT} characters in the tail of the original text and now have got {SIZE} bytes in totally the full file.
writefile.write_success={NAME}, {SIZE} bytes written
convet.not_enough_point=Not enough convert provide, now ⌊p⌋≈{NOW:.2f} of {TOTAL}
convert.break.not_enough_point=In the reason of lack of convert point and you cannot continue to convert the thing you want. Now you have only {NOW} points and maybe you need to get more in using some unspecified methods.
convert.no_file=No any file in the server, upload a {TYPE} file first
convert.table_not_exist={WHAT} called {NAME} is not exist
convert.start=The convertion is started and soon will come to a result
cmd2struct.axis_wrong=It is a truth that you have entered a totally uncorrect argument that states into the wrong format of tie direction of the structure the program is going to form and the actually input should under the format of Axis±, like x+, z-, etc.

View File

@ -1,4 +1,18 @@
upexecute.same=指令转换前后一致
upexecute.enable=execute指令自动更新已启用
upexecute.disable=已禁用execute指令自动更新
upexecute.disable=已禁用execute指令自动更新
writefile.no_text=无可供写入的内容,请在指令开头后换行提供写入内容
writefile.no_file=未指定写入文件名,请使用"-f|--file-name"参数指定文件名
writefile.file_not_exist=该文件不存在
writefile.append_success=文件{NAME}已增添{COUNT}个字符,现共{SIZE}字节
writefile.write_success=文件{NAME}已写入{SIZE}字节
convet.not_enough_point=转换点数不足当前剩余⌊p⌋≈{NOW:.2f}|{TOTAL}
convert.break.not_enough_point=中途退出,转换点不足:{NOW}
convert.no_file=服务器内未存入你的任何文件,请先上传{TYPE}文件吧
convert.something_not_exist={WHAT} {NAME} 不存在
convert.start=转换开始……
cmd2struct.axis_wrong=生成结构的生成方向格式错误输入数据应符合“轴正负”如x+、z-)的格式。

View File

@ -1,4 +1,18 @@
upexecute.same=令未变
upexecute.enable=更令之法方用
upexecute.disable=更令之法方去
upexecute.disable=更令之法方去
writefile.no_text=无文以写,换行以供
writefile.no_file=无典案名,曰其以参所谓"-f|--file-name"者
writefile.file_not_exist=案虚
writefile.append_success=案曰 {NAME} 者,填{COUNT}字,现{SIZE}字节
writefile.write_success=案曰{NAME}者,{SIZE}字节攥
convet.not_enough_point=点阙,余{NOW:.2f},满{TOTAL}
convert.break.not_enough_point=点阙而不可继,现{NOW}
convert.no_file=无案机内,先传之{TYPE}
convert.table_not_exist={WHAT}曰 {NAME} 者虚
convert.start=始……
cmd2struct.axis_wrong=产型误指应合此法曰“轴正负”类x+、z-。