优先从内存读取用户信息

This commit is contained in:
snowy 2024-04-17 17:45:32 +08:00
parent c4db4dc6a6
commit 0e02af59ca
8 changed files with 158 additions and 57 deletions

2
.gitignore vendored
View File

@ -20,6 +20,8 @@ liteyuki/resources/templates/latest-debug.html
.github
pyproject.toml
test.py
# nuitka
main.build/
main.dist/

View File

@ -1,13 +1,16 @@
from nonebot import on_command
from nonebot import on_command, require
from nonebot.adapters.onebot.v11 import MessageSegment
from nonebot.params import CommandArg
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent, v11
from liteyuki.utils.message.message import MarkdownMessage as md, broadcast_to_superusers
from liteyuki.utils.message.html_tool import *
md_test = on_command("mdts", permission=SUPERUSER)
btn_test = on_command("btnts", permission=SUPERUSER)
latex_test = on_command("latex", permission=SUPERUSER)
placeholder = {
"[": "[",
@ -28,6 +31,7 @@ async def _(bot: T_Bot, event: T_MessageEvent, arg: v11.Message = CommandArg()):
session_id=event.user_id if event.message_type == "private" else event.group_id
)
@btn_test.handle()
async def _(bot: T_Bot, event: T_MessageEvent, arg: v11.Message = CommandArg()):
await md.send_btn(
@ -37,6 +41,14 @@ async def _(bot: T_Bot, event: T_MessageEvent, arg: v11.Message = CommandArg()):
session_id=event.user_id if event.message_type == "private" else event.group_id
)
@latex_test.handle()
async def _(bot: T_Bot, event: T_MessageEvent, arg: v11.Message = CommandArg()):
latex_text = f"$${str(arg)}$$"
img = await md_to_pic(latex_text)
await bot.send(event=event, message=MessageSegment.image(img))
__author__ = "snowykami"
__plugin_meta__ = PluginMetadata(
name="轻雪Markdown测试",

View File

@ -11,11 +11,14 @@ from nonebot.internal.adapter import Event
from nonebot.internal.matcher import Matcher
from nonebot.message import run_preprocessor
from nonebot.permission import SUPERUSER
from nonebot.plugin import Plugin
from nonebot.plugin import Plugin, PluginMetadata
from nonebot.utils import run_sync
from liteyuki.utils.base.data_manager import InstalledPlugin
from liteyuki.utils.base.language import get_user_lang
from liteyuki.utils.base.ly_typing import T_Bot
from liteyuki.utils.message.message import MarkdownMessage as md
from liteyuki.utils.message.markdown import MarkdownComponent as mdc, compile_md, escape_md
from liteyuki.utils.base.permission import GROUP_ADMIN, GROUP_OWNER
from liteyuki.utils.message.tools import clamp
from .common import *
@ -229,7 +232,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
plugin_name: str = result.subcommands["install"].args.get("plugin_name")
store_plugin = await get_store_plugin(plugin_name)
await npm.send(ulang.get("npm.installing", NAME=plugin_name))
r, log = npm_install(plugin_name)
r, log = await npm_install(plugin_name)
log = log.replace("\\", "/")
if not store_plugin:
@ -398,6 +401,56 @@ async def _(bot: T_Bot, event: T_MessageEvent, gm: Matcher, result: Arparma):
)
@on_alconna(
aliases={"帮助"},
command=Alconna(
"help",
Args["plugin_name", str, None],
)
).handle()
async def _(result: Arparma, matcher: Matcher, event: T_MessageEvent, bot: T_Bot):
ulang = get_user_lang(str(event.user_id))
plugin_name = result.main_args.get("plugin_name")
if plugin_name:
loaded_plugin = nonebot.get_plugin(plugin_name)
if loaded_plugin:
if loaded_plugin.metadata is None:
loaded_plugin.metadata = PluginMetadata(name=plugin_name, description="", usage="")
# 从商店获取详细信息
store_plugin = await get_store_plugin(plugin_name)
if loaded_plugin.metadata.extra.get("liteyuki"):
store_plugin = StorePlugin(
name=loaded_plugin.metadata.name,
desc=loaded_plugin.metadata.description,
author="SnowyKami",
module_name=plugin_name,
homepage="https://github.com/snowykami/LiteyukiBot"
)
elif store_plugin is None:
store_plugin = StorePlugin(
name=loaded_plugin.metadata.name,
desc=loaded_plugin.metadata.description,
author="",
module_name=plugin_name,
homepage=""
)
reply = [
mdc.heading(escape_md(loaded_plugin.metadata.name)),
mdc.quote(mdc.bold(ulang.get("npm.author")) + " " +
mdc.link(store_plugin.author, f"https://github/{store_plugin.author}") if store_plugin.author else "Unknown"),
mdc.quote(mdc.bold(ulang.get("npm.description")) + " " + mdc.paragraph(max(loaded_plugin.metadata.description, store_plugin.desc))),
mdc.heading(ulang.get("npm.usage"), 2),
mdc.paragraph(loaded_plugin.metadata.usage),
]
await md.send_md(compile_md(reply), bot, event=event)
else:
await matcher.finish(ulang.get("npm.plugin_not_found", NAME=plugin_name))
else:
pass
# 传入事件阻断hook
@run_preprocessor
async def pre_handle(event: Event, matcher: Matcher):
plugin: Plugin = matcher.plugin
@ -410,6 +463,7 @@ async def pre_handle(event: Event, matcher: Matcher):
raise IgnoredException("Plugin disabled in session")
# 群聊开关阻断hook
@Bot.on_calling_api
async def block_disable_session(bot: Bot, api: str, args: dict):
if "group_id" in args and not args.get("liteyuki_pass", False):
@ -442,7 +496,7 @@ async def npm_update() -> bool:
async def npm_search(keywords: list[str]) -> list[StorePlugin]:
"""
搜索插件
在本地缓存商店数据中搜索插件
Args:
keywords (list[str]): 关键词列表
@ -468,8 +522,10 @@ async def npm_search(keywords: list[str]) -> list[StorePlugin]:
return results
@run_sync
def npm_install(plugin_package_name) -> tuple[bool, str]:
"""
异步安装插件使用pip安装
Args:
plugin_package_name:

View File

@ -65,6 +65,7 @@ npm.loaded_plugins=已加载插件
npm.total=总计 {TOTAL}
npm.help=帮助
npm.usage=用法
npm.description=描述
npm.disable=停用
npm.disable_global=全局停用
npm.enable=启用

View File

@ -36,7 +36,7 @@ def load_from_yaml(file: str) -> dict:
return conf
def get_config(key: str, bot: T_Bot = None, default=None):
def get_config(key: str, *, bot: T_Bot = None, default=None):
"""获取配置项优先级bot > config > db > yaml"""
if bot is None:
bot_config = {}
@ -59,6 +59,15 @@ def get_config(key: str, bot: T_Bot = None, default=None):
def init_conf(conf: dict) -> dict:
"""
初始化配置文件确保配置文件中的必要字段存在且不会冲突
Args:
conf:
Returns:
"""
# 若command_start中无""则添加必要命令头开启alconna_use_command_start防止冲突
if "" not in conf.get("command_start", []):
conf["alconna_use_command_start"] = True
return conf

View File

@ -9,7 +9,7 @@ from typing import Any
import nonebot
from .config import config
from .config import config, get_config
from .data_manager import User, user_db
_language_data = {
@ -18,6 +18,10 @@ _language_data = {
}
}
_user_lang = {
"user_id": "zh-CN"
}
def load_from_lang(file_path: str, lang_code: str = None):
"""
@ -101,8 +105,11 @@ def load_from_dict(data: dict, lang_code: str):
class Language:
def __init__(self, lang_code: str = None, fallback_lang_code: str = "zh-CN"):
# 三重fallback
# 用户语言 > 默认语言/系统语言 > zh-CN
def __init__(self, lang_code: str = None, fallback_lang_code: str = None):
self.lang_code = lang_code
if self.lang_code is None:
self.lang_code = get_default_lang_code()
@ -112,7 +119,7 @@ class Language:
def get(self, item: str, *args, **kwargs) -> str | Any:
"""
获取当前语言文本
获取当前语言文本kwargs中的default参数为默认文本
Args:
item: 文本键
*args: 格式化参数
@ -123,44 +130,44 @@ class Language:
"""
default = kwargs.pop("default", None)
fallback = (self.lang_code, self.fallback_lang_code, "zh-CN")
for lang_code in fallback:
if lang_code in _language_data and item in _language_data[lang_code]:
trans: str = _language_data[lang_code][item]
try:
if self.lang_code in _language_data and item in _language_data[self.lang_code]:
return _language_data[self.lang_code][item].format(*args, **kwargs)
nonebot.logger.debug(f"Language text not found: {self.lang_code}.{item}")
if self.fallback_lang_code in _language_data and item in _language_data[self.fallback_lang_code]:
return _language_data[self.fallback_lang_code][item].format(*args, **kwargs)
nonebot.logger.debug(f"Language text not found in fallback language: {self.fallback_lang_code}.{item}")
return default or item
return trans.format(*args, **kwargs)
except Exception as e:
nonebot.logger.error(f"Failed to get language text or format: {e}")
nonebot.logger.warning(f"Failed to format language data: {e}")
return trans
return default or item
def get_many(self, *args) -> dict[str, str]:
"""
获取多个文本
Args:
*args: 文本键
Returns:
dict: 文本字典
def change_user_lang(user_id: str, lang_code: str):
"""
d = {}
for item in args:
d[item] = self.get(item)
return d
修改用户的语言
"""
user = user_db.first(User(), "user_id = ?", user_id, default=User(user_id=user_id))
user.profile["lang"] = lang_code
user_db.update(user, "user_id = ?", user_id)
def get_user_lang(user_id: str) -> Language:
"""
获取用户的语言代码
获取用户的语言实例优先从内存中获取
"""
user = user_db.first(User(), "user_id = ?", user_id, default=User(
if user_id in _user_lang:
return Language(_user_lang[user_id])
else:
user = user_db.first(
User(), "user_id = ?", user_id, default=User(
user_id=user_id,
username="Unknown"
))
return Language(user.profile.get("lang", get_default_lang_code()))
)
)
lang_code = user.profile.get("lang", get_default_lang_code())
_user_lang[user_id] = lang_code
return Language(lang_code)
def get_system_lang_code() -> str:
@ -172,11 +179,11 @@ def get_system_lang_code() -> str:
def get_default_lang_code() -> str:
"""
获取默认语言代码
获取默认语言代码若没有设置则使用系统语言
Returns:
"""
return config.get("default_language", get_system_lang_code())
return get_config("default_language", default=get_system_lang_code())
def get_all_lang() -> dict[str, str]:

View File

@ -110,3 +110,4 @@ async def url2image(
type=type,
quality=quality
)

View File

@ -9,7 +9,7 @@ from ..base.config import get_config
from ..base.ly_typing import T_Bot
def markdown_escape(text: str) -> str:
def escape_md(text: str) -> str:
"""
转义Markdown特殊字符
Args:
@ -27,57 +27,62 @@ def markdown_escape(text: str) -> str:
def escape_decorator(func):
def wrapper(text: str):
return func(markdown_escape(text))
return func(escape_md(text))
return wrapper
def compile_md(comps: list[str]) -> str:
"""
编译Markdown文本
Args:
comps: list[str]: 组件列表
Returns:
str: 编译后文本
"""
print("".join(comps))
return "".join(comps)
class MarkdownComponent:
@staticmethod
@escape_decorator
def heading(text: str, level: int = 1) -> str:
"""标题"""
assert 1 <= level <= 6, "标题级别应在 1-6 之间"
return f"{'#' * level} {text}"
return f"{'#' * level} {text}\n"
@staticmethod
@escape_decorator
def bold(text: str) -> str:
"""粗体"""
return f"**{text}**"
@staticmethod
@escape_decorator
def italic(text: str) -> str:
"""斜体"""
return f"*{text}*"
@staticmethod
@escape_decorator
def strike(text: str) -> str:
"""删除线"""
return f"~~{text}~~"
@staticmethod
@escape_decorator
def code(text: str) -> str:
"""行内代码"""
return f"`{text}`"
@staticmethod
@escape_decorator
def code_block(text: str, language: str = "") -> str:
"""代码块"""
return f"```{language}\n{text}\n```"
return f"```{language}\n{text}\n```\n"
@staticmethod
@escape_decorator
def quote(text: str) -> str:
"""引用"""
return f"> {text}"
return f"> {text}\n"
@staticmethod
@escape_decorator
def link(text: str, url: str, symbol: bool = True) -> str:
"""
链接
@ -87,10 +92,9 @@ class MarkdownComponent:
url: 链接地址
symbol: 是否显示链接图标, mqqapi请使用False
"""
return f"[{'🔗' if symbol else ''}{text}]({quote(url)})"
return f"[{'🔗' if symbol else ''}{text}]({url})"
@staticmethod
@escape_decorator
def image(url: str, *, size: tuple[int, int]) -> str:
"""
图片本地图片不建议直接使用
@ -104,7 +108,6 @@ class MarkdownComponent:
return f"![image #{size[0]}px #{size[1]}px]({url})"
@staticmethod
@escape_decorator
async def auto_image(image: str | bytes, bot: T_Bot) -> str:
"""
自动获取图片大小
@ -143,7 +146,6 @@ class MarkdownComponent:
return MarkdownComponent.image(url, size=size)
@staticmethod
@escape_decorator
def table(data: list[list[any]]) -> str:
"""
表格
@ -160,6 +162,17 @@ class MarkdownComponent:
table += "|".join(map(str, row)) + "\n"
return table
@staticmethod
def paragraph(text: str) -> str:
"""
段落
Args:
text: 段落内容
Returns:
markdown格式的段落
"""
return f"{text}\n"
class Mqqapi:
@staticmethod