使用webp背景图压缩资源包大小

This commit is contained in:
远野千束 2024-05-12 00:18:53 +08:00
parent afe501a06d
commit f9fe1922d4
54 changed files with 436 additions and 142 deletions

View File

@ -1,33 +1,44 @@
--- # 问题反馈
name: BUG 反馈
about: 使用轻雪时遇到了问题?
---
## 问题反馈
### **描述** ## **请确保**
请详细描述一下你所遇到的bug
### **确保** - 已认真阅读[文档]("https://bot.liteyuki.icu"),该问题不是文档提及的或你自己操作不当造成的
- 你的问题是在最新版本的代码上测试的
- 请勿重复提交相同或类似的issue
- [ ] 我已查阅所有issues没有相似或已被证实的内容
- [ ] 我已按照文档指引进行正确的操作,仍会复现该问题
### **预期效果** ## **描述问题**
你想要什么样的预期效果
### **实际效果** 请在此简单描述问题
实际上是怎么样的
### **运行环境**
- 系统及版本:
- Python环境
- commit哈希或版本
- 硬件信息(可选)
### **运行日志**
## **如何复现**
请阐述一下如何重现这个问题
### 预期
描述你期望发生的事情
### 实际
描述实际发生的事情
## **日志或截图**
``` ```
将相关日志粘贴到此处 日志内容
``` ```
### **严重等级**
致命|严重|警告|轻微 ## **设备信息**
- **系统**: [例如 Ubuntu 22.04]
- **CPU**: [例如 Intel i7-7700K]
- **内存**: [例如 16GB]
- **Python**: [例如CPython 3.10.7]
**补充内容**
可选,推荐提供`pip freeze`的输出,以及其他相关信息,以及你的建议

15
db_test.py Normal file
View File

@ -0,0 +1,15 @@
from liteyuki.internal.base.data import *
class People(LiteModel):
TABLE_NAME: str = "people"
name: str = ""
age: int = 0
sex: str = ""
identity: str = ""
db = Database("data/test/test.ldb")
db.where()

View File

@ -11,9 +11,9 @@ __VERSION__ = "6.3.2" # 60201
import requests import requests
from liteyuki.utils.base.config import load_from_yaml, config from liteyuki.internal.base.config import load_from_yaml, config
from liteyuki.utils.base.log import init_log from liteyuki.internal.base.log import init_log
from liteyuki.utils.base.data_manager import TempConfig, auto_migrate, common_db from liteyuki.internal.base.data_manager import TempConfig, auto_migrate, common_db
major, minor, patch = map(int, __VERSION__.split(".")) major, minor, patch = map(int, __VERSION__.split("."))
__VERSION_I__ = major * 10000 + minor * 100 + patch __VERSION_I__ = major * 10000 + minor * 100 + patch
@ -56,7 +56,7 @@ def init():
if sys.version_info < (3, 10): if sys.version_info < (3, 10):
nonebot.logger.error("Requires Python3.10+ to run, please upgrade your Python Environment.") nonebot.logger.error("Requires Python3.10+ to run, please upgrade your Python Environment.")
exit(1) exit(1)
temp_data: TempConfig = common_db.first(TempConfig(), default=TempConfig()) temp_data: TempConfig = common_db.where_one(TempConfig(), default=TempConfig())
temp_data.data["start_time"] = time.time() temp_data.data["start_time"] = time.time()
common_db.save(temp_data) common_db.save(temp_data)

View File

@ -54,8 +54,8 @@ def get_config(key: str, default=None):
elif key in config: elif key in config:
return config[key] return config[key]
elif key in common_db.first(StoredConfig(), default=StoredConfig()).config: elif key in common_db.where_one(StoredConfig(), default=StoredConfig()).config:
return common_db.first(StoredConfig(), default=StoredConfig()).config[key] return common_db.where_one(StoredConfig(), default=StoredConfig()).config[key]
elif key in load_from_yaml("config.yml"): elif key in load_from_yaml("config.yml"):
return load_from_yaml("config.yml")[key] return load_from_yaml("config.yml")[key]
@ -65,7 +65,7 @@ def get_config(key: str, default=None):
def set_stored_config(key: str, value): def set_stored_config(key: str, value):
temp_config: TempConfig = common_db.first(TempConfig(), default=TempConfig()) temp_config: TempConfig = common_db.where_one(TempConfig(), default=TempConfig())
temp_config.data[key] = value temp_config.data[key] = value
common_db.save(temp_config) common_db.save(temp_config)

View File

@ -31,7 +31,7 @@ class Database:
self.conn = sqlite3.connect(db_name) self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor() self.cursor = self.conn.cursor()
def first(self, model: LiteModel, condition: str = "", *args: Any, default: Any = None) -> LiteModel | Any | None: def where_one(self, model: LiteModel, condition: str = "", *args: Any, default: Any = None) -> LiteModel | Any | None:
"""查询第一个 """查询第一个
Args: Args:
model: 数据模型实例 model: 数据模型实例
@ -42,10 +42,10 @@ class Database:
Returns: Returns:
""" """
all_results = self.all(model, condition, *args) all_results = self.where_all(model, condition, *args)
return all_results[0] if all_results else default return all_results[0] if all_results else default
def all(self, model: LiteModel, condition: str = "", *args: Any, default: Any = None) -> list[LiteModel | Any] | None: def where_all(self, model: LiteModel, condition: str = "", *args: Any, default: Any = None) -> list[LiteModel | Any] | None:
"""查询所有 """查询所有
Args: Args:
model: 数据模型实例 model: 数据模型实例
@ -334,6 +334,19 @@ class Database:
# 转换为的字节前缀 # 转换为的字节前缀
BYTES_PREFIX = "PICKLE_BYTES_" BYTES_PREFIX = "PICKLE_BYTES_"
# transaction tx 事务操作
def first(self, model: LiteModel) -> "Database":
pass
def where(self, condition: str, *args) -> "Database":
pass
def limit(self, limit: int) -> "Database":
pass
def order(self, order: str) -> "Database":
pass
def check_sqlite_keyword(name): def check_sqlite_keyword(name):
sqlite_keywords = [ sqlite_keywords = [

View File

@ -147,7 +147,7 @@ def change_user_lang(user_id: str, lang_code: str):
""" """
修改用户的语言同时储存到数据库和内存中 修改用户的语言同时储存到数据库和内存中
""" """
user = user_db.first(User(), "user_id = ?", user_id, default=User(user_id=user_id)) user = user_db.where_one(User(), "user_id = ?", user_id, default=User(user_id=user_id))
user.profile["lang"] = lang_code user.profile["lang"] = lang_code
user_db.save(user) user_db.save(user)
_user_lang[user_id] = lang_code _user_lang[user_id] = lang_code
@ -161,7 +161,7 @@ def get_user_lang(user_id: str) -> Language:
if user_id not in _user_lang: if user_id not in _user_lang:
nonebot.logger.debug(f"Loading user language for {user_id}") nonebot.logger.debug(f"Loading user language for {user_id}")
user = user_db.first( user = user_db.where_one(
User(), "user_id = ?", user_id, default=User( User(), "user_id = ?", user_id, default=User(
user_id=user_id, user_id=user_id,
username="Unknown" username="Unknown"

View File

@ -42,7 +42,7 @@ def load_resource_from_dir(path: str):
metadata["path"] = path metadata["path"] = path
metadata["folder"] = os.path.basename(path) metadata["folder"] = os.path.basename(path)
if os.path.exists(os.path.join(path, "lang")): if os.path.exists(os.path.join(path, "lang")):
from liteyuki.utils.base.language import load_from_dir from liteyuki.internal.base.language import load_from_dir
load_from_dir(os.path.join(path, "lang")) load_from_dir(os.path.join(path, "lang"))
_loaded_resource_packs.insert(0, ResourceMetadata(**metadata)) _loaded_resource_packs.insert(0, ResourceMetadata(**metadata))

View File

@ -6,7 +6,7 @@ import aiofiles
import nonebot import nonebot
from nonebot import require from nonebot import require
from liteyuki.utils.base.resource import load_resources from liteyuki.internal.base.resource import load_resources
require("nonebot_plugin_htmlrender") require("nonebot_plugin_htmlrender")

View File

@ -0,0 +1,41 @@
def convert_duration(text: str, default) -> float:
"""
转换自然语言时间为秒数
Args:
text: 1d2h3m
default: 出错时返回
Returns:
float: 总秒数
"""
units = {
"d" : 86400,
"h" : 3600,
"m" : 60,
"s" : 1,
"ms": 0.001
}
duration = 0
current_number = ''
current_unit = ''
try:
for char in text:
if char.isdigit():
current_number += char
else:
if current_number:
duration += int(current_number) * units[current_unit]
current_number = ''
if char in units:
current_unit = char
else:
current_unit = ''
if current_number:
duration += int(current_number) * units[current_unit]
return duration
except:
return default

View File

View File

@ -16,7 +16,7 @@ __plugin_meta__ = PluginMetadata(
} }
) )
from ..utils.base.language import Language, get_default_lang_code from ..internal.base.language import Language, get_default_lang_code
print("\033[34m" + r""" print("\033[34m" + r"""
__ ______ ________ ________ __ __ __ __ __ __ ______ __ ______ ________ ________ __ __ __ __ __ __ ______

View File

@ -10,12 +10,12 @@ from nonebot.exception import MockApiException
from nonebot.internal.matcher import Matcher from nonebot.internal.matcher import Matcher
from nonebot.permission import SUPERUSER from nonebot.permission import SUPERUSER
from liteyuki.utils.base.config import get_config, load_from_yaml from liteyuki.internal.base.config import get_config, load_from_yaml
from liteyuki.utils.base.data_manager import StoredConfig, TempConfig, common_db from liteyuki.internal.base.data_manager import StoredConfig, TempConfig, common_db
from liteyuki.utils.base.language import get_user_lang from liteyuki.internal.base.language import get_user_lang
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent from liteyuki.internal.base.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message.message import MarkdownMessage as md, broadcast_to_superusers from liteyuki.internal.message.message import MarkdownMessage as md, broadcast_to_superusers
from liteyuki.utils.base.reloader import Reloader from liteyuki.internal.base.reloader import Reloader
from .api import update_liteyuki from .api import update_liteyuki
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
@ -25,7 +25,7 @@ from nonebot_plugin_apscheduler import scheduler
driver = get_driver() driver = get_driver()
markdown_image = common_db.first(StoredConfig(), default=StoredConfig()).config.get("markdown_image", False) markdown_image = common_db.where_one(StoredConfig(), default=StoredConfig()).config.get("markdown_image", False)
@on_alconna( @on_alconna(
@ -70,7 +70,7 @@ async def _(bot: T_Bot, event: T_MessageEvent):
).handle() ).handle()
async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent): async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent):
await matcher.send("Liteyuki reloading") await matcher.send("Liteyuki reloading")
temp_data = common_db.first(TempConfig(), default=TempConfig()) temp_data = common_db.where_one(TempConfig(), default=TempConfig())
temp_data.data.update( temp_data.data.update(
{ {
@ -112,7 +112,7 @@ async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent):
).handle() ).handle()
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, matcher: Matcher): async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, matcher: Matcher):
ulang = get_user_lang(str(event.user_id)) ulang = get_user_lang(str(event.user_id))
stored_config: StoredConfig = common_db.first(StoredConfig(), default=StoredConfig()) stored_config: StoredConfig = common_db.where_one(StoredConfig(), default=StoredConfig())
if result.subcommands.get("set"): if result.subcommands.get("set"):
key, value = result.subcommands.get("set").args.get("key"), result.subcommands.get("set").args.get("value") key, value = result.subcommands.get("set").args.get("key"), result.subcommands.get("set").args.get("value")
try: try:
@ -161,7 +161,7 @@ async def _(event: T_MessageEvent, matcher: Matcher):
global markdown_image global markdown_image
# 切换图片模式False以图片形式发送True以markdown形式发送 # 切换图片模式False以图片形式发送True以markdown形式发送
ulang = get_user_lang(str(event.user_id)) ulang = get_user_lang(str(event.user_id))
stored_config: StoredConfig = common_db.first(StoredConfig(), default=StoredConfig()) stored_config: StoredConfig = common_db.where_one(StoredConfig(), default=StoredConfig())
stored_config.config["markdown_image"] = not stored_config.config.get("markdown_image", False) stored_config.config["markdown_image"] = not stored_config.config.get("markdown_image", False)
markdown_image = stored_config.config["markdown_image"] markdown_image = stored_config.config["markdown_image"]
common_db.save(stored_config) common_db.save(stored_config)
@ -253,7 +253,7 @@ async def test_for_md_image(bot: T_Bot, api: str, data: dict):
@driver.on_startup @driver.on_startup
async def on_startup(): async def on_startup():
temp_data = common_db.first(TempConfig(), default=TempConfig()) temp_data = common_db.where_one(TempConfig(), default=TempConfig())
# 储存重启信息 # 储存重启信息
if temp_data.data.get("reload", False): if temp_data.data.get("reload", False):
delta_time = time.time() - temp_data.data.get("reload_time", 0) delta_time = time.time() - temp_data.data.get("reload_time", 0)
@ -268,7 +268,7 @@ async def on_shutdown():
@driver.on_bot_connect @driver.on_bot_connect
async def _(bot: T_Bot): async def _(bot: T_Bot):
temp_data = common_db.first(TempConfig(), default=TempConfig()) temp_data = common_db.where_one(TempConfig(), default=TempConfig())
# 用于重启计时 # 用于重启计时
if temp_data.data.get("reload", False): if temp_data.data.get("reload", False):
temp_data.data["reload"] = False temp_data.data["reload"] = False

View File

@ -4,9 +4,9 @@ import nonebot
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
from liteyuki.utils.base.config import get_config from liteyuki.internal.base.config import get_config
from liteyuki.utils.base.reloader import Reloader from liteyuki.internal.base.reloader import Reloader
from liteyuki.utils.base.resource import load_resources from liteyuki.internal.base.resource import load_resources
if get_config("debug", False): if get_config("debug", False):
nonebot.logger.info("Liteyuki Reload is enable, watching for file changes...") nonebot.logger.info("Liteyuki Reload is enable, watching for file changes...")

View File

@ -1,10 +1,10 @@
import nonebot.plugin import nonebot.plugin
from liteyuki.utils import init_log from liteyuki.internal import init_log
from liteyuki.utils.base.config import get_config from liteyuki.internal.base.config import get_config
from liteyuki.utils.base.data_manager import InstalledPlugin, plugin_db from liteyuki.internal.base.data_manager import InstalledPlugin, plugin_db
from liteyuki.utils.base.resource import load_resources from liteyuki.internal.base.resource import load_resources
from liteyuki.utils.message.tools import check_for_package from liteyuki.internal.message.tools import check_for_package
load_resources() load_resources()
init_log() init_log()
@ -14,7 +14,7 @@ nonebot.plugin.load_plugins("liteyuki/plugins")
if not get_config("safe_mode", False): if not get_config("safe_mode", False):
# 安全模式下,不加载插件 # 安全模式下,不加载插件
installed_plugins: list[InstalledPlugin] = plugin_db.all(InstalledPlugin()) installed_plugins: list[InstalledPlugin] = plugin_db.where_all(InstalledPlugin())
if installed_plugins: if installed_plugins:
for installed_plugin in installed_plugins: for installed_plugin in installed_plugins:
if not check_for_package(installed_plugin.module_name): if not check_for_package(installed_plugin.module_name):

View File

@ -9,7 +9,7 @@ from nonebot_plugin_alconna import on_alconna, Alconna, Subcommand, Args, MultiV
from pydantic import BaseModel from pydantic import BaseModel
from .canvas import * from .canvas import *
from ...utils.base.resource import get_path from ...internal.base.resource import get_path
resolution = 256 resolution = 256

View File

@ -2,9 +2,9 @@ import nonebot
from nonebot import on_message, require from nonebot import on_message, require
from nonebot.plugin import PluginMetadata from nonebot.plugin import PluginMetadata
from liteyuki.utils.base.data import Database, LiteModel from liteyuki.internal.base.data import Database, LiteModel
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent from liteyuki.internal.base.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message.message import MarkdownMessage as md from liteyuki.internal.message.message import MarkdownMessage as md
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import on_alconna from nonebot_plugin_alconna import on_alconna
@ -64,7 +64,7 @@ async def _(result: Arparma):
push1 = Push( push1 = Push(
source=Node(bot_id=source[0], session_type=source[1], session_id=source[2]), source=Node(bot_id=source[0], session_type=source[1], session_id=source[2]),
target=Node(bot_id=target[0], session_type=target[1], session_id=target[2]), target=Node(bot_id=target[0], session_type=target[1], session_id=target[2]),
inde=len(pushes_db.all(Push(), default=[])) inde=len(pushes_db.where_all(Push(), default=[]))
) )
pushes_db.save(push1) pushes_db.save(push1)
@ -72,7 +72,7 @@ async def _(result: Arparma):
push2 = Push( push2 = Push(
source=Node(bot_id=target[0], session_type=target[1], session_id=target[2]), source=Node(bot_id=target[0], session_type=target[1], session_id=target[2]),
target=Node(bot_id=source[0], session_type=source[1], session_id=source[2]), target=Node(bot_id=source[0], session_type=source[1], session_id=source[2]),
inde=len(pushes_db.all(Push(), default=[])) inde=len(pushes_db.where_all(Push(), default=[]))
) )
pushes_db.save(push2) pushes_db.save(push2)
await add_push.finish("添加成功") await add_push.finish("添加成功")
@ -92,14 +92,14 @@ async def _(result: Arparma):
await add_push.finish( await add_push.finish(
"\n".join([f"{push.inde} {push.source.bot_id}.{push.source.session_type}.{push.source.session_id} -> " "\n".join([f"{push.inde} {push.source.bot_id}.{push.source.session_type}.{push.source.session_id} -> "
f"{push.target.bot_id}.{push.target.session_type}.{push.target.session_id}" for i, push in f"{push.target.bot_id}.{push.target.session_type}.{push.target.session_id}" for i, push in
enumerate(pushes_db.all(Push(), default=[]))])) enumerate(pushes_db.where_all(Push(), default=[]))]))
else: else:
await add_push.finish("参数错误") await add_push.finish("参数错误")
@on_message(block=False).handle() @on_message(block=False).handle()
async def _(event: T_MessageEvent, bot: T_Bot): async def _(event: T_MessageEvent, bot: T_Bot):
for push in pushes_db.all(Push(), default=[]): for push in pushes_db.where_all(Push(), default=[]):
if str(push.source) == f"{bot.self_id}.{event.message_type}.{event.user_id if event.message_type == 'private' else event.group_id}": if str(push.source) == f"{bot.self_id}.{event.message_type}.{event.user_id if event.message_type == 'private' else event.group_id}":
bot2 = nonebot.get_bot(push.target.bot_id) bot2 = nonebot.get_bot(push.target.bot_id)
msg_formatted = "" msg_formatted = ""

View File

@ -4,9 +4,9 @@ from nonebot.params import CommandArg
from nonebot.permission import SUPERUSER from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata from nonebot.plugin import PluginMetadata
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent, v11 from liteyuki.internal.base.ly_typing import T_Bot, T_MessageEvent, v11
from liteyuki.utils.message.message import MarkdownMessage as md, broadcast_to_superusers from liteyuki.internal.message.message import MarkdownMessage as md, broadcast_to_superusers
from liteyuki.utils.message.html_tool import * from liteyuki.internal.message.html_tool import *
md_test = on_command("mdts", permission=SUPERUSER) md_test = on_command("mdts", permission=SUPERUSER)
btn_test = on_command("btnts", permission=SUPERUSER) btn_test = on_command("btnts", permission=SUPERUSER)

View File

@ -1,6 +1,6 @@
import random import random
from pydantic import BaseModel from pydantic import BaseModel
from liteyuki.utils.message.message import MarkdownMessage as md from liteyuki.internal.message.message import MarkdownMessage as md
class Dot(BaseModel): class Dot(BaseModel):
row: int row: int

View File

@ -1,7 +1,7 @@
from nonebot import require from nonebot import require
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent from liteyuki.internal.base.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message.message import MarkdownMessage as md from liteyuki.internal.message.message import MarkdownMessage as md
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
from .game import Minesweeper from .game import Minesweeper

View File

@ -4,9 +4,9 @@ from typing import Optional
import aiofiles import aiofiles
import nonebot.plugin import nonebot.plugin
from liteyuki.utils.base.data import LiteModel from liteyuki.internal.base.data import LiteModel
from liteyuki.utils.base.data_manager import GlobalPlugin, Group, User, group_db, plugin_db, user_db from liteyuki.internal.base.data_manager import GlobalPlugin, Group, User, group_db, plugin_db, user_db
from liteyuki.utils.base.ly_typing import T_MessageEvent from liteyuki.internal.base.ly_typing import T_MessageEvent
__group_data = {} # 群数据缓存, {group_id: Group} __group_data = {} # 群数据缓存, {group_id: Group}
__user_data = {} # 用户数据缓存, {user_id: User} __user_data = {} # 用户数据缓存, {user_id: User}
@ -98,7 +98,7 @@ def get_plugin_session_enable(event: T_MessageEvent, plugin_name: str) -> bool:
if event.message_type == "group": if event.message_type == "group":
group_id = str(event.group_id) group_id = str(event.group_id)
if group_id not in __group_data: if group_id not in __group_data:
group: Group = group_db.first(Group(), "group_id = ?", group_id, default=Group(group_id=group_id)) group: Group = group_db.where_one(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
__group_data[str(event.group_id)] = group __group_data[str(event.group_id)] = group
session = __group_data[group_id] session = __group_data[group_id]
@ -106,7 +106,7 @@ def get_plugin_session_enable(event: T_MessageEvent, plugin_name: str) -> bool:
# session: User = user_db.first(User(), "user_id = ?", event.user_id, default=User(user_id=str(event.user_id))) # session: User = user_db.first(User(), "user_id = ?", event.user_id, default=User(user_id=str(event.user_id)))
user_id = str(event.user_id) user_id = str(event.user_id)
if user_id not in __user_data: if user_id not in __user_data:
user: User = user_db.first(User(), "user_id = ?", user_id, default=User(user_id=user_id)) user: User = user_db.where_one(User(), "user_id = ?", user_id, default=User(user_id=user_id))
__user_data[user_id] = user __user_data[user_id] = user
session = __user_data[user_id] session = __user_data[user_id]
# 默认停用插件在启用列表内表示启用 # 默认停用插件在启用列表内表示启用
@ -132,9 +132,9 @@ def set_plugin_session_enable(event: T_MessageEvent, plugin_name: str, enable: b
""" """
if event.message_type == "group": if event.message_type == "group":
session = group_db.first(Group(), "group_id = ?", str(event.group_id), default=Group(group_id=str(event.group_id))) session = group_db.where_one(Group(), "group_id = ?", str(event.group_id), default=Group(group_id=str(event.group_id)))
else: else:
session = user_db.first(User(), "user_id = ?", str(event.user_id), default=User(user_id=str(event.user_id))) session = user_db.where_one(User(), "user_id = ?", str(event.user_id), default=User(user_id=str(event.user_id)))
default_enable = get_plugin_default_enable(plugin_name) default_enable = get_plugin_default_enable(plugin_name)
if default_enable: if default_enable:
if enable: if enable:
@ -166,7 +166,7 @@ def get_plugin_global_enable(plugin_name: str) -> bool:
""" """
if plugin_name not in __global_enable: if plugin_name not in __global_enable:
plugin = plugin_db.first( plugin = plugin_db.where_one(
GlobalPlugin(), GlobalPlugin(),
"module_name = ?", "module_name = ?",
plugin_name, plugin_name,
@ -186,7 +186,7 @@ def set_plugin_global_enable(plugin_name: str, enable: bool):
Returns: Returns:
""" """
plugin = plugin_db.first( plugin = plugin_db.where_one(
GlobalPlugin(), GlobalPlugin(),
"module_name = ?", "module_name = ?",
plugin_name, plugin_name,
@ -223,7 +223,7 @@ def get_group_enable(group_id: str) -> bool:
""" """
group_id = str(group_id) group_id = str(group_id)
if group_id not in __group_data: if group_id not in __group_data:
group: Group = group_db.first(Group(), "group_id = ?", group_id, default=Group(group_id=group_id)) group: Group = group_db.where_one(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
__group_data[group_id] = group __group_data[group_id] = group
return __group_data[group_id].enable return __group_data[group_id].enable
@ -238,7 +238,7 @@ def set_group_enable(group_id: str, enable: bool):
enable (bool): 是否启用 enable (bool): 是否启用
""" """
group_id = str(group_id) group_id = str(group_id)
group: Group = group_db.first(Group(), "group_id = ?", group_id, default=Group(group_id=group_id)) group: Group = group_db.where_one(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
group.enable = enable group.enable = enable
__group_data[group_id] = group __group_data[group_id] = group

View File

@ -14,13 +14,13 @@ from nonebot.permission import SUPERUSER
from nonebot.plugin import Plugin, PluginMetadata from nonebot.plugin import Plugin, PluginMetadata
from nonebot.utils import run_sync from nonebot.utils import run_sync
from liteyuki.utils.base.data_manager import InstalledPlugin from liteyuki.internal.base.data_manager import InstalledPlugin
from liteyuki.utils.base.language import get_user_lang from liteyuki.internal.base.language import get_user_lang
from liteyuki.utils.base.ly_typing import T_Bot from liteyuki.internal.base.ly_typing import T_Bot
from liteyuki.utils.message.message import MarkdownMessage as md from liteyuki.internal.message.message import MarkdownMessage as md
from liteyuki.utils.message.markdown import MarkdownComponent as mdc, compile_md, escape_md from liteyuki.internal.message.markdown import MarkdownComponent as mdc, compile_md, escape_md
from liteyuki.utils.base.permission import GROUP_ADMIN, GROUP_OWNER from liteyuki.internal.base.permission import GROUP_ADMIN, GROUP_OWNER
from liteyuki.utils.message.tools import clamp from liteyuki.internal.message.tools import clamp
from .common import * from .common import *
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
@ -114,10 +114,10 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
ulang.get("npm.plugin_already", NAME=plugin_name, STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable"))) ulang.get("npm.plugin_already", NAME=plugin_name, STATUS=ulang.get("npm.enable") if toggle else ulang.get("npm.disable")))
if event.message_type == "private": if event.message_type == "private":
session = user_db.first(User(), "user_id = ?", event.user_id, default=User(user_id=event.user_id)) session = user_db.where_one(User(), "user_id = ?", event.user_id, default=User(user_id=event.user_id))
else: else:
if await GROUP_ADMIN(bot, event) or await GROUP_OWNER(bot, event) or await SUPERUSER(bot, event): if await GROUP_ADMIN(bot, event) or await GROUP_OWNER(bot, event) or await SUPERUSER(bot, event):
session = group_db.first(Group(), "group_id = ?", event.group_id, default=Group(group_id=str(event.group_id))) session = group_db.where_one(Group(), "group_id = ?", event.group_id, default=Group(group_id=str(event.group_id)))
else: else:
raise FinishedException(ulang.get("Permission Denied")) raise FinishedException(ulang.get("Permission Denied"))
try: try:
@ -222,7 +222,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
if r: if r:
r_load = nonebot.load_plugin(plugin_name) # 加载插件 r_load = nonebot.load_plugin(plugin_name) # 加载插件
installed_plugin = InstalledPlugin(module_name=plugin_name) # 构造插件信息模型 installed_plugin = InstalledPlugin(module_name=plugin_name) # 构造插件信息模型
found_in_db_plugin = plugin_db.first(InstalledPlugin(), "module_name = ?", plugin_name) # 查询数据库中是否已经安装 found_in_db_plugin = plugin_db.where_one(InstalledPlugin(), "module_name = ?", plugin_name) # 查询数据库中是否已经安装
if r_load: if r_load:
if found_in_db_plugin is None: if found_in_db_plugin is None:
plugin_db.save(installed_plugin) plugin_db.save(installed_plugin)
@ -254,7 +254,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
elif sc.get("uninstall") and perm_s: elif sc.get("uninstall") and perm_s:
plugin_name: str = result.subcommands["uninstall"].args.get("plugin_name") # type: ignore plugin_name: str = result.subcommands["uninstall"].args.get("plugin_name") # type: ignore
found_installed_plugin: InstalledPlugin = plugin_db.first(InstalledPlugin(), "module_name = ?", plugin_name) found_installed_plugin: InstalledPlugin = plugin_db.where_one(InstalledPlugin(), "module_name = ?", plugin_name)
if found_installed_plugin: if found_installed_plugin:
plugin_db.delete(InstalledPlugin(), "module_name = ?", plugin_name) plugin_db.delete(InstalledPlugin(), "module_name = ?", plugin_name)
reply = f"{ulang.get('npm.uninstall_success', NAME=found_installed_plugin.module_name)}" reply = f"{ulang.get('npm.uninstall_success', NAME=found_installed_plugin.module_name)}"
@ -314,7 +314,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
reply += f" {btn_toggle}" reply += f" {btn_toggle}"
if permission_s: if permission_s:
plugin_in_database = plugin_db.first(InstalledPlugin(), "module_name = ?", storePlugin.name) plugin_in_database = plugin_db.where_one(InstalledPlugin(), "module_name = ?", storePlugin.name)
# 添加移除插件和全局切换按钮 # 添加移除插件和全局切换按钮
global_enable = get_plugin_global_enable(storePlugin.name) global_enable = get_plugin_global_enable(storePlugin.name)
btn_uninstall = ( btn_uninstall = (

View File

@ -5,10 +5,10 @@ import yaml
from nonebot import require from nonebot import require
from nonebot.permission import SUPERUSER from nonebot.permission import SUPERUSER
from liteyuki.utils.base.language import get_user_lang from liteyuki.internal.base.language import get_user_lang
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent from liteyuki.internal.base.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message.message import MarkdownMessage as md from liteyuki.internal.message.message import MarkdownMessage as md
from liteyuki.utils.base.resource import (ResourceMetadata, add_resource_pack, change_priority, check_exist, check_status, get_loaded_resource_packs, get_resource_metadata, load_resources, remove_resource_pack) from liteyuki.internal.base.resource import (ResourceMetadata, add_resource_pack, change_priority, check_exist, check_status, get_loaded_resource_packs, get_resource_metadata, load_resources, remove_resource_pack)
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import Alconna, Args, on_alconna, Arparma, Subcommand from nonebot_plugin_alconna import Alconna, Args, on_alconna, Arparma, Subcommand

View File

@ -5,10 +5,10 @@ import aiohttp
from nonebot import require from nonebot import require
from nonebot.plugin import PluginMetadata from nonebot.plugin import PluginMetadata
from liteyuki.utils.base.config import get_config from liteyuki.internal.base.config import get_config
from liteyuki.utils.base.data import Database, LiteModel from liteyuki.internal.base.data import Database, LiteModel
from liteyuki.utils.base.resource import get_path from liteyuki.internal.base.resource import get_path
from liteyuki.utils.message.html_tool import template2image from liteyuki.internal.message.html_tool import template2image
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
require("nonebot_plugin_apscheduler") require("nonebot_plugin_apscheduler")
@ -80,7 +80,7 @@ async def _():
for name, url in SIGN_COUNT_URLS.items(): for name, url in SIGN_COUNT_URLS.items():
count_data = [] count_data = []
for stamp in query_stamp: for stamp in query_stamp:
count_rows = sign_db.all(SignCount(), "sid = ? and time > ?", url, time.time() - 60 * stamp) count_rows = sign_db.where_all(SignCount(), "sid = ? and time > ?", url, time.time() - 60 * stamp)
if len(count_rows) < 2: if len(count_rows) < 2:
count_data.append(-1) count_data.append(-1)
else: else:
@ -142,7 +142,7 @@ async def save_sign_count(timestamp: float, count: int, sid: str):
async def generate_chart(limit): async def generate_chart(limit):
data = [] data = []
for name, url in SIGN_COUNT_URLS.items(): for name, url in SIGN_COUNT_URLS.items():
count_rows = sign_db.all(SignCount(), "sid = ? ORDER BY id DESC LIMIT ?", url, limit) count_rows = sign_db.where_all(SignCount(), "sid = ? ORDER BY id DESC LIMIT ?", url, limit)
count_rows.reverse() count_rows.reverse()
data.append( data.append(
{ {
@ -152,7 +152,6 @@ async def generate_chart(limit):
"counts": [row.count for row in count_rows] "counts": [row.count for row in count_rows]
} }
) )
print(len(count_rows))
img = await template2image( img = await template2image(
template=get_path("templates/sign_status.html", debug=True), template=get_path("templates/sign_status.html", debug=True),

View File

@ -0,0 +1,21 @@
from nonebot.plugin import PluginMetadata
from .stat_matchers import *
from .stat_monitors import *
from .stat_restful_api import *
__author__ = "snowykami"
__plugin_meta__ = PluginMetadata(
name="统计信息",
description="统计机器人的信息",
usage=(
"stat msg [limit] 查看统计次数内的消息\n"
"stat msg -g|--group [group_id] 查看群的统计信息,不带参数为全群\n"
),
type="application",
homepage="https://github.com/snowykami/LiteyukiBot",
extra={
"liteyuki" : True,
"toggleable" : False,
"default_enable": True,
}
)

View File

@ -0,0 +1,21 @@
from liteyuki.internal.base.data import Database, LiteModel
class MessageEventModel(LiteModel):
TABLE_NAME: str = "message_event"
time: int = 0
bot_id: str = ""
adapter: str = ""
user_id: str = ""
group_id: str = ""
message_id: str = ""
message: list = []
message_text: str = ""
message_type: str = ""
msg_db = Database("data/liteyuki/msg.ldb")
msg_db.auto_migrate(MessageEventModel())

View File

@ -0,0 +1,33 @@
import time
from typing import Any
from .common import MessageEventModel, msg_db
def get_stat_msg_data(duration, period) -> tuple[list[int,], list[int,]]:
"""
获取统计消息
Args:
duration: 统计时间单位秒
period: 统计周期单位秒
Returns:
tuple: [int,], [int,] 两个列表分别为周期中心时间戳和消息数量
"""
now = int(time.time())
msg_rows = msg_db.where_all(
MessageEventModel(),
"time > ?",
now - duration
)
timestamps = []
msg_count = []
msg_rows.sort(key=lambda x: x.time)
for msg_row in msg_rows:
period_center_time = msg_row.time - msg_row.time % period + period // 2
# if not timestamps or period_start_time != timestamps[-1]:
# timestamps.append(period_start_time)
# msg_count.append(1)
# else:
# msg_count[-1] += 1
#

View File

@ -0,0 +1,46 @@
from nonebot import require
from liteyuki.internal.message.npl import convert_duration
from .stat_api import *
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import on_alconna, Alconna, Args, Subcommand, Arparma, Option
stat_msg = on_alconna(
Alconna(
"stat",
Subcommand(
"message",
Args["duration", str, "1d"], # 默认为1天
Option(
"-b|--bot", # 生成图表
Args["bot_id", str, ""],
help_text="是否指定机器人",
),
Option(
"-g|--group",
Args["group_id", str, ""],
help_text="指定群组"
),
Option(
"-c|--chart", # 生成图表
help_text="是否生成图表",
),
alias={"msg", "m"},
help_text="查看统计次数内的消息"
)
)
)
@stat_msg.assign("message")
async def _(result: Arparma):
args = result.subcommands.get("message").args
options = result.subcommands.get("message").options
duration = convert_duration(args.get("duration"), 86400) # 秒数
enable_chart = options.get("chart")
if options.get("group"):
group_id = options["group"].args.get("group_id")
else:
msg_rows = get_stat_msg_data(duration)

View File

@ -0,0 +1,38 @@
import time
from nonebot import require
from nonebot.message import event_postprocessor
from liteyuki.internal.base.data import Database, LiteModel
from liteyuki.internal.base.ly_typing import v11
from .common import MessageEventModel, msg_db
require("nonebot_plugin_alconna")
@event_postprocessor
async def onebot_v11_event_monitor(bot: v11.Bot, event: v11.MessageEvent):
if event.message_type == "group":
event: v11.GroupMessageEvent
group_id = str(event.group_id)
else:
group_id = ""
mem = MessageEventModel(
time=int(time.time()),
bot_id=bot.self_id,
adapter="onebot.v11",
group_id=group_id,
user_id=str(event.user_id),
message_id=str(event.message_id),
message=event.message,
message_text=event.raw_message,
message_type=event.message_type,
)
msg_db.save(mem)

View File

@ -5,12 +5,12 @@ import nonebot
import psutil import psutil
from cpuinfo import cpuinfo from cpuinfo import cpuinfo
from nonebot import require from nonebot import require
from liteyuki.utils import __NAME__, __VERSION__ from liteyuki.internal import __NAME__, __VERSION__
from liteyuki.utils.base.config import get_config from liteyuki.internal.base.config import get_config
from liteyuki.utils.base.data_manager import TempConfig, common_db from liteyuki.internal.base.data_manager import TempConfig, common_db
from liteyuki.utils.base.language import Language from liteyuki.internal.base.language import Language
from liteyuki.utils.base.resource import get_loaded_resource_packs, get_path from liteyuki.internal.base.resource import get_loaded_resource_packs, get_path
from liteyuki.utils.message.html_tool import template2image from liteyuki.internal.message.html_tool import template2image
require("nonebot_plugin_apscheduler") require("nonebot_plugin_apscheduler")
from nonebot_plugin_apscheduler import scheduler from nonebot_plugin_apscheduler import scheduler
@ -242,7 +242,7 @@ async def get_hardware_data() -> dict:
async def get_liteyuki_data() -> dict: async def get_liteyuki_data() -> dict:
temp_data: TempConfig = common_db.first(TempConfig(), default=TempConfig()) temp_data: TempConfig = common_db.where_one(TempConfig(), default=TempConfig())
result = { result = {
"name" : list(get_config("nickname", [__NAME__]))[0], "name" : list(get_config("nickname", [__NAME__]))[0],
"version" : __VERSION__, "version" : __VERSION__,

View File

@ -1,10 +1,10 @@
from nonebot import require from nonebot import require
from liteyuki.utils.base.resource import get_path from liteyuki.internal.base.resource import get_path
from liteyuki.utils.message.html_tool import template2image from liteyuki.internal.message.html_tool import template2image
from liteyuki.utils.base.language import get_user_lang from liteyuki.internal.base.language import get_user_lang
from .api import * from .api import *
from ...utils.base.ly_typing import T_Bot, T_MessageEvent from ...internal.base.ly_typing import T_Bot, T_MessageEvent
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import on_alconna, Alconna, Args, Subcommand, Arparma, UniMessage from nonebot_plugin_alconna import on_alconna, Alconna, Args, Subcommand, Arparma, UniMessage

View File

@ -3,11 +3,11 @@ from typing import Optional
import pytz import pytz
from nonebot import require from nonebot import require
from liteyuki.utils.base.data import LiteModel, Database from liteyuki.internal.base.data import LiteModel, Database
from liteyuki.utils.base.data_manager import User, user_db, group_db from liteyuki.internal.base.data_manager import User, user_db, group_db
from liteyuki.utils.base.language import Language, change_user_lang, get_all_lang, get_user_lang from liteyuki.internal.base.language import Language, change_user_lang, get_all_lang, get_user_lang
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent from liteyuki.internal.base.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message.message import MarkdownMessage as md from liteyuki.internal.message.message import MarkdownMessage as md
from .const import representative_timezones_list from .const import representative_timezones_list
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
@ -41,7 +41,7 @@ class Profile(LiteModel):
@profile_alc.handle() @profile_alc.handle()
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot): async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
user: User = user_db.first(User(), "user_id = ?", event.user_id, default=User(user_id=str(event.user_id))) user: User = user_db.where_one(User(), "user_id = ?", event.user_id, default=User(user_id=str(event.user_id)))
ulang = get_user_lang(str(event.user_id)) ulang = get_user_lang(str(event.user_id))
if result.subcommands.get("set"): if result.subcommands.get("set"):
if result.subcommands["set"].args.get("value"): if result.subcommands["set"].args.get("value"):

View File

@ -15,7 +15,7 @@ __plugin_meta__ = PluginMetadata(
} }
) )
from ...utils.base.data_manager import set_memory_data from ...internal.base.data_manager import set_memory_data
driver = get_driver() driver = get_driver()

View File

@ -3,8 +3,8 @@ import aiohttp
from .qw_models import * from .qw_models import *
import httpx import httpx
from ...utils.base.data_manager import get_memory_data from ...internal.base.data_manager import get_memory_data
from ...utils.base.language import Language from ...internal.base.language import Language
dev_url = "https://devapi.qweather.com/" # 开发HBa dev_url = "https://devapi.qweather.com/" # 开发HBa
com_url = "https://api.qweather.com/" # 正式环境 com_url = "https://api.qweather.com/" # 正式环境

View File

@ -1,4 +1,4 @@
from liteyuki.utils.base.data import LiteModel from liteyuki.internal.base.data import LiteModel
class Location(LiteModel): class Location(LiteModel):

View File

@ -2,14 +2,14 @@ from nonebot import require, on_endswith
from nonebot.adapters.onebot.v11 import MessageSegment from nonebot.adapters.onebot.v11 import MessageSegment
from nonebot.internal.matcher import Matcher from nonebot.internal.matcher import Matcher
from liteyuki.utils.base.config import get_config from liteyuki.internal.base.config import get_config
from liteyuki.utils.base.ly_typing import T_MessageEvent from liteyuki.internal.base.ly_typing import T_MessageEvent
from .qw_api import * from .qw_api import *
from liteyuki.utils.base.data_manager import User, user_db from liteyuki.internal.base.data_manager import User, user_db
from liteyuki.utils.base.language import Language, get_user_lang from liteyuki.internal.base.language import Language, get_user_lang
from liteyuki.utils.base.resource import get_path from liteyuki.internal.base.resource import get_path
from liteyuki.utils.message.html_tool import template2image from liteyuki.internal.message.html_tool import template2image
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import on_alconna, Alconna, Args, MultiVar, Arparma from nonebot_plugin_alconna import on_alconna, Alconna, Args, MultiVar, Arparma
@ -42,7 +42,7 @@ async def get_weather_now_card(matcher: Matcher, event: T_MessageEvent, keyword:
qw_lang = get_qw_lang(ulang.lang_code) qw_lang = get_qw_lang(ulang.lang_code)
key = get_config("weather_key") key = get_config("weather_key")
is_dev = get_memory_data("weather.is_dev", True) is_dev = get_memory_data("weather.is_dev", True)
user: User = user_db.first(User(), "user_id = ?", event.user_id, default=User()) user: User = user_db.where_one(User(), "user_id = ?", event.user_id, default=User())
# params # params
unit = user.profile.get("unit", "m") unit = user.profile.get("unit", "m")
stored_location = user.profile.get("location", None) stored_location = user.profile.get("location", None)

View File

@ -0,0 +1,21 @@
from nonebot.plugin import PluginMetadata
from .main import *
__author__ = "snowykami"
__plugin_meta__ = PluginMetadata(
name="网页监控面板",
description="网页监控面板,用于查看机器人的状态和信息",
usage=(
"访问 127.0.0.1:port 查看机器人的状态信息\n"
"stat msg -g|--group [group_id] 查看群的统计信息,不带参数为全群\n"
"配置项custom_domain自定义域名通常对外用内网无需"
),
type="application",
homepage="https://github.com/snowykami/LiteyukiBot",
extra={
"liteyuki" : True,
"toggleable" : False,
"default_enable": True,
}
)

View File

@ -0,0 +1,4 @@
from fastapi import FastAPI
from nonebot import get_app
app: FastAPI = get_app()

View File

@ -0,0 +1,7 @@
from fastapi import FastAPI
from nonebot import get_app
from .restful_api import *
@app.get("/")
async def root():
return {"message": "Hello World"}

View File

@ -0,0 +1,24 @@
from fastapi import FastAPI, APIRouter
from .common import *
device_info_router = APIRouter(prefix="/api/device-info")
bot_info_router = APIRouter(prefix="/api/bot-info")
@device_info_router.get("/")
async def device_info():
print("Hello Device Info")
return {
"message": "Hello Device Info"
}
@bot_info_router.get("/")
async def bot_info():
return {
"message": "Hello Bot Info"
}
app.include_router(device_info_router)
app.include_router(bot_info_router)

View File

@ -17,7 +17,7 @@ data.forEach((item) => {
item["counts"].forEach((count, index) => { item["counts"].forEach((count, index) => {
// 计算平均值index - 1的count + index的count + index + 1的count /3 // 计算平均值index - 1的count + index的count + index + 1的count /3
if (index > 0) { if (index > 0) {
timeCount.push((item["counts"][index] - item["counts"][index - 1])) timeCount.push((item["counts"][index] - item["counts"][index - 1]) / (60 * (item["times"][index] - item["times"][index - 1])))
} }
}) })

12
main.py
View File

@ -1,13 +1,13 @@
import nonebot import nonebot
from nonebot.adapters.onebot import v11, v12 from nonebot.adapters.onebot import v11, v12
from liteyuki.utils import init from liteyuki.internal import init
from liteyuki.utils.base.config import load_from_yaml from liteyuki.internal.base.config import load_from_yaml
from liteyuki.utils.base.data_manager import StoredConfig, common_db from liteyuki.internal.base.data_manager import StoredConfig, common_db
from liteyuki.utils.base.ly_api import liteyuki_api from liteyuki.internal.base.ly_api import liteyuki_api
if __name__ == "__mp_main__": if __name__ == "__mp_main__":
init() init()
store_config: dict = common_db.first(StoredConfig(), default=StoredConfig()).config store_config: dict = common_db.where_one(StoredConfig(), default=StoredConfig()).config
static_config = load_from_yaml("config.yml") static_config = load_from_yaml("config.yml")
store_config.update(static_config) store_config.update(static_config)
nonebot.init(**store_config) nonebot.init(**store_config)
@ -25,5 +25,5 @@ if __name__ == "__mp_main__":
liteyuki_api.bug_report(str(e.__repr__())) liteyuki_api.bug_report(str(e.__repr__()))
if __name__ == "__main__": if __name__ == "__main__":
from liteyuki.utils.base.reloader import Reloader from liteyuki.internal.base.reloader import Reloader
nonebot.run() nonebot.run()