From ecbe06a9e8c9d9e91833a031b29e73fc524672a7 Mon Sep 17 00:00:00 2001 From: snowy Date: Tue, 26 Mar 2024 17:14:41 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=BB=9F=E4=B8=80=E5=8F=8C=E5=BC=95?= =?UTF-8?q?=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- liteyuki/liteyuki_main/loader.py | 3 +- liteyuki/liteyuki_main/webdash.py | 46 ++-- liteyuki/plugins/liteyuki_npm/common.py | 18 +- liteyuki/plugins/liteyuki_npm/installer.py | 18 +- liteyuki/plugins/liteyuki_npm/manager.py | 37 ++-- liteyuki/plugins/liteyuki_user/__init__.py | 6 +- .../plugins/liteyuki_user/input_handle.py | 2 - .../plugins/liteyuki_user/profile_manager.py | 6 +- liteyuki/utils/config.py | 8 +- liteyuki/utils/data.py | 28 +-- liteyuki/utils/data_manager.py | 34 +-- liteyuki/utils/data_new.py | 20 +- liteyuki/utils/language.py | 18 +- liteyuki/utils/log.py | 10 +- liteyuki/utils/ly_typing.py | 2 +- liteyuki/utils/message.py | 10 +- liteyuki/utils/orm.py | 207 ------------------ liteyuki/utils/tools.py | 17 -- 18 files changed, 131 insertions(+), 359 deletions(-) delete mode 100644 liteyuki/utils/orm.py diff --git a/liteyuki/liteyuki_main/loader.py b/liteyuki/liteyuki_main/loader.py index c21b419f..db477664 100644 --- a/liteyuki/liteyuki_main/loader.py +++ b/liteyuki/liteyuki_main/loader.py @@ -22,4 +22,5 @@ if installed_plugins: if not check_for_package(installed_plugin.module_name): nonebot.logger.error(f"{installed_plugin.module_name} not installed, but in loading database. please run `npm fixup` in chat to reinstall it.") else: - nonebot.load_plugin(installed_plugin.module_name) \ No newline at end of file + print(installed_plugin.module_name) + nonebot.load_plugin(installed_plugin.module_name) diff --git a/liteyuki/liteyuki_main/webdash.py b/liteyuki/liteyuki_main/webdash.py index ad0e1963..38d49ff9 100644 --- a/liteyuki/liteyuki_main/webdash.py +++ b/liteyuki/liteyuki_main/webdash.py @@ -28,31 +28,31 @@ lang = Language() dash_app = Dash(__name__) dash_app.layout = dash_app.layout = html.Div(children=[ html.H1(children=lang.get("main.monitor.title"), style={ - 'textAlign': 'center' + "textAlign": "center" }), - dcc.Graph(id='live-update-graph'), + dcc.Graph(id="live-update-graph"), dcc.Interval( - id='interval-component', + id="interval-component", interval=1 * 1000, # in milliseconds n_intervals=0 ) ]) -@dash_app.callback(Output('live-update-graph', 'figure'), - [Input('interval-component', 'n_intervals')]) +@dash_app.callback(Output("live-update-graph", "figure"), + [Input("interval-component", "n_intervals")]) def update_graph_live(n): lang = Language() system_inf = get_system_info() dash_app.layout = html.Div(children=[ html.H1(children=lang.get("main.monitor.title"), style={ - 'textAlign': 'center' + "textAlign": "center" }), - dcc.Graph(id='live-update-graph'), + dcc.Graph(id="live-update-graph"), dcc.Interval( - id='interval-component', + id="interval-component", interval=2 * 1000, # in milliseconds n_intervals=0 ) @@ -60,28 +60,28 @@ def update_graph_live(n): mem = psutil.virtual_memory() cpu_f = psutil.cpu_freq() figure = { - 'data' : [ + "data" : [ { - 'x' : [f"{cpu_f.current / 1000:.2f}GHz {psutil.cpu_count(logical=False)}c{psutil.cpu_count()}t"], - 'y' : [system_inf['cpu_percent']], - 'type': 'bar', - 'name': f"{lang.get('main.monitor.cpu')} {lang.get('main.monitor.usage')}" + "x" : [f"{cpu_f.current / 1000:.2f}GHz {psutil.cpu_count(logical=False)}c{psutil.cpu_count()}t"], + "y" : [system_inf["cpu_percent"]], + "type": "bar", + "name": f"{lang.get('main.monitor.cpu')} {lang.get('main.monitor.usage')}" }, { - 'x' : [f"{convert_size(mem.used, add_unit=False)}/{convert_size(mem.total)}({mem.used / mem.total * 100:.2f}%)"], - 'y' : [system_inf['memory_percent']], - 'type': 'bar', - 'name': f"{lang.get('main.monitor.memory')} {lang.get('main.monitor.usage')}" + "x" : [f"{convert_size(mem.used, add_unit=False)}/{convert_size(mem.total)}({mem.used / mem.total * 100:.2f}%)"], + "y" : [system_inf["memory_percent"]], + "type": "bar", + "name": f"{lang.get('main.monitor.memory')} {lang.get('main.monitor.usage')}" }, ], - 'layout': { - 'title': lang.get('main.monitor.description'), - # 'xaxis': { - # 'range': [0, 10] + "layout": { + "title": lang.get("main.monitor.description"), + # "xaxis": { + # "range": [0, 10] # }, # 设置x轴的范围 - 'yaxis': { - 'range': [0, 100] + "yaxis": { + "range": [0, 100] }, # 设置y轴的范围 } } diff --git a/liteyuki/plugins/liteyuki_npm/common.py b/liteyuki/plugins/liteyuki_npm/common.py index 4f58ff51..5f3254b3 100644 --- a/liteyuki/plugins/liteyuki_npm/common.py +++ b/liteyuki/plugins/liteyuki_npm/common.py @@ -20,12 +20,12 @@ class StorePlugin(LiteModel): name: str desc: str module_name: str - project_link: str = '' - homepage: str = '' - author: str = '' + project_link: str = "" + homepage: str ="" + author: str = "" type: str | None = None - version: str | None = '' - time: str = '' + version: str | None = "" + time: str = "" tags: list[PluginTag] = [] is_official: bool = False @@ -59,7 +59,7 @@ def get_plugin_default_enable(plugin_module_name: str) -> bool: bool: 插件默认状态 """ plug = nonebot.plugin.get_plugin_by_module_name(plugin_module_name) - return (plug.metadata.extra.get('default_enable', True) + return (plug.metadata.extra.get("default_enable", True) if plug.metadata else True) if plug else True @@ -75,9 +75,9 @@ def get_plugin_session_enable(event: T_MessageEvent, plugin_module_name: str) -> bool: 插件当前状态 """ if event.message_type == "group": - session: GroupChat = group_db.first(GroupChat, 'group_id = ?', event.group_id, default=GroupChat(group_id=str(event.group_id))) + session: GroupChat = group_db.first(GroupChat, "group_id = ?", event.group_id, default=GroupChat(group_id=str(event.group_id))) else: - session: User = user_db.first(User, 'user_id = ?', event.user_id, default=User(user_id=str(event.user_id))) + session: User = user_db.first(User, "user_id = ?", event.user_id, default=User(user_id=str(event.user_id))) # 默认停用插件在启用列表内表示启用 # 默认停用插件不在启用列表内表示停用 # 默认启用插件在停用列表内表示停用 @@ -104,4 +104,4 @@ def get_plugin_can_be_toggle(plugin_module_name: str) -> bool: bool: 插件是否可以被启用/停用 """ plug = nonebot.plugin.get_plugin_by_module_name(plugin_module_name) - return plug.metadata.extra.get('toggleable', True) if plug and plug.metadata else True + return plug.metadata.extra.get("toggleable", True) if plug and plug.metadata else True diff --git a/liteyuki/plugins/liteyuki_npm/installer.py b/liteyuki/plugins/liteyuki_npm/installer.py index aa4f34f0..6fbd2b64 100644 --- a/liteyuki/plugins/liteyuki_npm/installer.py +++ b/liteyuki/plugins/liteyuki_npm/installer.py @@ -68,9 +68,9 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot): if len(rs): reply = f"{ulang.get('npm.search_result')} | {ulang.get('npm.total', TOTAL=len(rs))}\n***" for plugin in rs[:min(max_show, len(rs))]: - btn_install = md.button(ulang.get('npm.install'), 'npm install %s' % plugin.module_name) - link_page = md.link(ulang.get('npm.homepage'), plugin.homepage) - link_pypi = md.link(ulang.get('npm.pypi'), plugin.homepage) + btn_install = md.button(ulang.get("npm.install"), "npm install %s" % plugin.module_name) + link_page = md.link(ulang.get("npm.homepage"), plugin.homepage) + link_pypi = md.link(ulang.get("npm.pypi"), plugin.homepage) reply += (f"\n# **{plugin.name}**\n" f"\n> **{plugin.desc}**\n" @@ -93,7 +93,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot): if not store_plugin: await npm_alc.finish(ulang.get("npm.plugin_not_found", NAME=plugin_module_name)) - homepage_btn = md.button(ulang.get('npm.homepage'), store_plugin.homepage) + homepage_btn = md.button(ulang.get("npm.homepage"), store_plugin.homepage) if r: r_load = nonebot.load_plugin(plugin_module_name) # 加载插件 @@ -103,7 +103,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot): if r_load: if found_in_db_plugin is None: plugin_db.upsert(installed_plugin) - info = ulang.get('npm.install_success', NAME=store_plugin.name).replace('_', r'\\_') # markdown转义 + info = md.escape(ulang.get("npm.install_success", NAME=store_plugin.name)) # markdown转义 await send_markdown( f"{info}\n\n" f"```\n{log}\n```", @@ -111,9 +111,9 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot): event=event ) else: - await npm_alc.finish(ulang.get('npm.plugin_already_installed', NAME=store_plugin.name)) + await npm_alc.finish(ulang.get("npm.plugin_already_installed", NAME=store_plugin.name)) else: - info = ulang.get('npm.load_failed', NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace('_', r'\\_') + info = ulang.get("npm.load_failed", NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace("_", r"\\_") await send_markdown( f"{info}\n\n" f"```\n{log}\n```\n", @@ -121,7 +121,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot): event=event ) else: - info = ulang.get('npm.install_failed', NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace('_', r'\\_') + info = ulang.get("npm.install_failed", NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace("_", r"\\_") await send_markdown( f"{info}\n\n" f"```\n{log}\n```", @@ -216,7 +216,7 @@ def npm_install(plugin_module_name) -> tuple[bool, str]: for mirror in mirrors: try: nonebot.logger.info(f"npm_install try mirror: {mirror}") - result = pip.main(['install', plugin_module_name, "-i", mirror]) + result = pip.main(["install", plugin_module_name, "-i", mirror]) success = result == 0 if success: break diff --git a/liteyuki/plugins/liteyuki_npm/manager.py b/liteyuki/plugins/liteyuki_npm/manager.py index ae245e10..54f2d9c1 100644 --- a/liteyuki/plugins/liteyuki_npm/manager.py +++ b/liteyuki/plugins/liteyuki_npm/manager.py @@ -20,21 +20,21 @@ from nonebot_plugin_alconna import on_alconna, Alconna, Args, Arparma list_plugins = on_alconna( Alconna( - ['list-plugins', "插件列表", "列出插件"], + ["list-plugins", "插件列表", "列出插件"], ) ) toggle_plugin = on_alconna( Alconna( - ['enable-plugin', 'disable-plugin'], - Args['plugin_name', str], + ["enable-plugin", "disable-plugin"], + Args["plugin_name", str], ) ) global_toggle = on_alconna( Alconna( - ['toggle-global'], - Args['plugin_name', str], + ["toggle-global"], + Args["plugin_name", str], ), permission=SUPERUSER ) @@ -49,27 +49,27 @@ async def _(event: T_MessageEvent, bot: T_Bot): for plugin in nonebot.get_loaded_plugins(): # 检查是否有 metadata 属性 # 添加帮助按钮 - btn_usage = md.button(lang.get('npm.usage'), f'help {plugin.module_name}', False) + btn_usage = md.button(lang.get("npm.usage"), f"help {plugin.module_name}", False) store_plugin = await get_store_plugin(plugin.module_name) session_enable = get_plugin_session_enable(event, plugin.module_name) default_enable = get_plugin_default_enable(plugin.module_name) if store_plugin: - btn_homepage = md.link(lang.get('npm.homepage'), store_plugin.homepage) + btn_homepage = md.link(lang.get("npm.homepage"), store_plugin.homepage) show_name = store_plugin.name show_desc = store_plugin.desc elif plugin.metadata: - if plugin.metadata.extra.get('liteyuki'): - btn_homepage = md.link(lang.get('npm.homepage'), "https://github.com/snowykami/LiteyukiBot") + if plugin.metadata.extra.get("liteyuki"): + btn_homepage = md.link(lang.get("npm.homepage"), "https://github.com/snowykami/LiteyukiBot") else: - btn_homepage = lang.get('npm.homepage') + btn_homepage = lang.get("npm.homepage") show_name = plugin.metadata.name show_desc = plugin.metadata.description else: - btn_homepage = lang.get('npm.homepage') + btn_homepage = lang.get("npm.homepage") show_name = plugin.name - show_desc = lang.get('npm.no_description') + show_desc = lang.get("npm.no_description") if plugin.metadata: reply += (f"\n**{md.escape(show_name)}**\n" @@ -83,22 +83,22 @@ async def _(event: T_MessageEvent, bot: T_Bot): if await GROUP_ADMIN(bot, event) or await GROUP_OWNER(bot, event) or await SUPERUSER(bot, event): # 添加启用/停用插件按钮 cmd_toggle = f"{'disable' if session_enable else 'enable'}-plugin {plugin.module_name}" - text_toggle = lang.get('npm.disable' if session_enable else 'npm.enable') + text_toggle = lang.get("npm.disable" if session_enable else "npm.enable") can_be_toggle = get_plugin_can_be_toggle(plugin.module_name) btn_toggle = text_toggle if not can_be_toggle else md.button(text_toggle, cmd_toggle) reply += f" {btn_toggle}" if await SUPERUSER(bot, event): - plugin_in_database = plugin_db.first(InstalledPlugin, 'module_name = ?', plugin.module_name) + plugin_in_database = plugin_db.first(InstalledPlugin, "module_name = ?", plugin.module_name) # 添加移除插件和全局切换按钮 global_enable = get_plugin_global_enable(plugin.module_name) btn_uninstall = ( - md.button(lang.get('npm.uninstall'), f'npm uninstall {plugin.module_name}')) if plugin_in_database else lang.get( + md.button(lang.get("npm.uninstall"), f'npm uninstall {plugin.module_name}')) if plugin_in_database else lang.get( 'npm.uninstall') - btn_toggle_global_text = lang.get('npm.disable_global' if global_enable else 'npm.enable_global') - cmd_toggle_global = f'npm toggle-global {plugin.module_name}' + btn_toggle_global_text = lang.get("npm.disable_global" if global_enable else "npm.enable_global") + cmd_toggle_global = f"npm toggle-global {plugin.module_name}" btn_toggle_global = btn_toggle_global_text if not can_be_toggle else md.button(btn_toggle_global_text, cmd_toggle_global) reply += f" {btn_uninstall} {btn_toggle_global}" @@ -115,7 +115,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot): ulang = get_user_lang(str(event.user_id)) plugin_module_name = result.args.get("plugin_name") - toggle = result.header_result == 'enable-plugin' # 判断是启用还是停用 + toggle = result.header_result == "enable-plugin" # 判断是启用还是停用 session_enable = get_plugin_session_enable(event, plugin_module_name) # 获取插件当前状态 @@ -149,7 +149,6 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot): else: session.enabled_plugins.remove(plugin_module_name) if event.message_type == "private": - print("已保存") user_db.upsert(session) else: group_db.upsert(session) diff --git a/liteyuki/plugins/liteyuki_user/__init__.py b/liteyuki/plugins/liteyuki_user/__init__.py index ff3e4409..13abb265 100644 --- a/liteyuki/plugins/liteyuki_user/__init__.py +++ b/liteyuki/plugins/liteyuki_user/__init__.py @@ -9,8 +9,8 @@ __plugin_meta__ = PluginMetadata( usage="", homepage="https://github.com/snowykami/LiteyukiBot", extra={ - "liteyuki": True, - "toggleable" : False, - "default_enable" : True, + "liteyuki" : True, + "toggleable" : False, + "default_enable": True, } ) diff --git a/liteyuki/plugins/liteyuki_user/input_handle.py b/liteyuki/plugins/liteyuki_user/input_handle.py index ded0aebb..e69de29b 100644 --- a/liteyuki/plugins/liteyuki_user/input_handle.py +++ b/liteyuki/plugins/liteyuki_user/input_handle.py @@ -1,2 +0,0 @@ -def detect_lang(input_str: str) -> str: - return "zh-CN" if input_str == "zh" else "en" \ No newline at end of file diff --git a/liteyuki/plugins/liteyuki_user/profile_manager.py b/liteyuki/plugins/liteyuki_user/profile_manager.py index abb55610..61c8db68 100644 --- a/liteyuki/plugins/liteyuki_user/profile_manager.py +++ b/liteyuki/plugins/liteyuki_user/profile_manager.py @@ -116,7 +116,7 @@ def get_profile_menu(key: str, ulang: Language) -> Optional[str]: reply = f"**{setting_name} {ulang.get('user.profile.settings')}**\n***\n" if key == "lang": for lang_code, lang_name in get_all_lang().items(): - btn_set = md.button(ulang.get('user.profile.set'), f"profile set {key} {lang_code}") + btn_set = md.button(ulang.get("user.profile.set"), f"profile set {key} {lang_code}") reply += f"\n{btn_set} | **{lang_name}** - {lang_code}\n***\n" elif key == "timezone": for tz in representative_timezones_list: @@ -135,9 +135,9 @@ def set_profile(key: str, value: str) -> bool: 是否成功设置,输入合法性不通过返回False """ - if key == 'lang': + if key == "lang": if value in get_all_lang(): return True - elif key == 'timezone': + elif key == "timezone": if value in pytz.all_timezones: return True diff --git a/liteyuki/utils/config.py b/liteyuki/utils/config.py index c74ade33..04483ba5 100644 --- a/liteyuki/utils/config.py +++ b/liteyuki/utils/config.py @@ -19,14 +19,14 @@ def load_from_yaml(file: str) -> dict: global config nonebot.logger.debug("Loading config from %s" % file) if not os.path.exists(file): - nonebot.logger.warning(f'Config file {file} not found, created default config, please modify it and restart') - with open(file, 'w', encoding='utf-8') as f: + nonebot.logger.warning(f"Config file {file} not found, created default config, please modify it and restart") + with open(file, "w", encoding="utf-8") as f: yaml.dump(BasicConfig().dict(), f, default_flow_style=False) - with open(file, 'r', encoding='utf-8') as f: + with open(file, "r", encoding="utf-8") as f: conf = yaml.load(f, Loader=yaml.FullLoader) config = conf if conf is None: - nonebot.logger.warning(f'Config file {file} is empty, use default config. please modify it and restart') + nonebot.logger.warning(f"Config file {file} is empty, use default config. please modify it and restart") conf = BasicConfig().dict() return conf diff --git a/liteyuki/utils/data.py b/liteyuki/utils/data.py index cb825f75..9944fb81 100644 --- a/liteyuki/utils/data.py +++ b/liteyuki/utils/data.py @@ -226,13 +226,13 @@ class Database(BaseORMAdapter): return_data = {} for k, v in data.items(): if isinstance(v, LiteModel): - return_data[f'{self.FOREIGNID}{k}'] = f'{self.ID}:{v.__class__.__name__}:{self.upsert(v)}' + return_data[f"{self.FOREIGNID}{k}"] = f"{self.ID}:{v.__class__.__name__}:{self.upsert(v)}" elif isinstance(v, list): - return_data[f'{self.LIST}{k}'] = self._flat(v) + return_data[f"{self.LIST}{k}"] = self._flat(v) elif isinstance(v, dict): - return_data[f'{self.DICT}{k}'] = self._flat(v) + return_data[f"{self.DICT}{k}"] = self._flat(v) elif isinstance(v, BaseIterable): - return_data[f'{self.JSON}{k}'] = self._flat(v) + return_data[f"{self.JSON}{k}"] = self._flat(v) else: return_data[k] = v @@ -240,7 +240,7 @@ class Database(BaseORMAdapter): return_data = [] for v in data: if isinstance(v, LiteModel): - return_data.append(f'{self.ID}:{v.__class__.__name__}:{self.upsert(v)}') + return_data.append(f"{self.ID}:{v.__class__.__name__}:{self.upsert(v)}") elif isinstance(v, list): return_data.append(self._flat(v)) elif isinstance(v, dict): @@ -250,7 +250,7 @@ class Database(BaseORMAdapter): else: return_data.append(v) else: - raise ValueError('数据类型错误') + raise ValueError("数据类型错误") return json.dumps(return_data) @@ -263,7 +263,7 @@ class Database(BaseORMAdapter): Returns: """ - return self.cursor.execute(f'SELECT * FROM sqlite_master WHERE type = "table" AND name = ?', (table_name,)).fetchone() + return self.cursor.execute(f"SELECT * FROM sqlite_master WHERE type = 'table' AND name = ?", (table_name,)).fetchone() def first(self, model: type(LiteModel), conditions, *args, default: Any = None) -> LiteModel | None: """查询第一条数据 @@ -281,7 +281,7 @@ class Database(BaseORMAdapter): if not self._detect_for_table(table_name): return default - self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args) + self.cursor.execute(f"SELECT * FROM {table_name} WHERE {conditions}", args) if row_data := self.cursor.fetchone(): data = dict(row_data) return model(**self.convert_to_dict(data)) @@ -304,9 +304,9 @@ class Database(BaseORMAdapter): return default if conditions: - self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args) + self.cursor.execute(f"SELECT * FROM {table_name} WHERE {conditions}", args) else: - self.cursor.execute(f'SELECT * FROM {table_name}') + self.cursor.execute(f"SELECT * FROM {table_name}") if row_datas := self.cursor.fetchall(): datas = [dict(row_data) for row_data in row_datas] return [model(**self.convert_to_dict(d)) for d in datas] if datas else default @@ -327,8 +327,8 @@ class Database(BaseORMAdapter): if not self._detect_for_table(table_name): return - nonebot.logger.debug(f'DELETE FROM {table_name} WHERE {conditions}') - self.cursor.execute(f'DELETE FROM {table_name} WHERE {conditions}', args) + nonebot.logger.debug(f"DELETE FROM {table_name} WHERE {conditions}") + self.cursor.execute(f"DELETE FROM {table_name} WHERE {conditions}", args) self.conn.commit() def convert_to_dict(self, data: dict) -> dict: @@ -346,8 +346,8 @@ class Database(BaseORMAdapter): new_d = {} for k, v in d.items(): if k.startswith(self.FOREIGNID): - new_d[k.replace(self.FOREIGNID, '')] = load( - dict(self.cursor.execute(f'SELECT * FROM {v.split(":", 2)[1]} WHERE id = ?', (v.split(":", 2)[2],)).fetchone())) + new_d[k.replace(self.FOREIGNID, "")] = load( + dict(self.cursor.execute(f"SELECT * FROM {v.split(':', 2)[1]} WHERE id = ?", (v.split(":", 2)[2],)).fetchone())) elif k.startswith(self.LIST): if v == '': v = '[]' diff --git a/liteyuki/utils/data_manager.py b/liteyuki/utils/data_manager.py index 06ad3ca4..464d6ccb 100644 --- a/liteyuki/utils/data_manager.py +++ b/liteyuki/utils/data_manager.py @@ -6,36 +6,36 @@ from liteyuki.utils.data import LiteModel, Database as DB DATA_PATH = "data/liteyuki" -user_db = DB(os.path.join(DATA_PATH, 'users.ldb')) -group_db = DB(os.path.join(DATA_PATH, 'groups.ldb')) -plugin_db = DB(os.path.join(DATA_PATH, 'plugins.ldb')) -common_db = DB(os.path.join(DATA_PATH, 'common.ldb')) +user_db = DB(os.path.join(DATA_PATH, "users.ldb")) +group_db = DB(os.path.join(DATA_PATH, "groups.ldb")) +plugin_db = DB(os.path.join(DATA_PATH, "plugins.ldb")) +common_db = DB(os.path.join(DATA_PATH, "common.ldb")) class User(LiteModel): - user_id: str = Field(str(), alias='user_id') - username: str = Field(str(), alias='username') - profile: dict[str, str] = Field(dict(), alias='profile') - enabled_plugins: list[str] = Field(list(), alias='enabled_plugins') - disabled_plugins: list[str] = Field(list(), alias='disabled_plugins') + user_id: str = Field(str(), alias="user_id") + username: str = Field(str(), alias="username") + profile: dict[str, str] = Field(dict(), alias="profile") + enabled_plugins: list[str] = Field(list(), alias="enabled_plugins") + disabled_plugins: list[str] = Field(list(), alias="disabled_plugins") class GroupChat(LiteModel): # Group是一个关键字,所以这里用GroupChat - group_id: str = Field(str(), alias='group_id') - group_name: str = Field(str(), alias='group_name') - enabled_plugins: list[str] = Field([], alias='enabled_plugins') - disabled_plugins: list[str] = Field([], alias='disabled_plugins') + group_id: str = Field(str(), alias="group_id") + group_name: str = Field(str(), alias="group_name") + enabled_plugins: list[str] = Field([], alias="enabled_plugins") + disabled_plugins: list[str] = Field([], alias="disabled_plugins") class InstalledPlugin(LiteModel): - module_name: str = Field(str(), alias='module_name') - version: str = Field(str(), alias='version') + module_name: str = Field(str(), alias="module_name") + version: str = Field(str(), alias="version") class GlobalPlugin(LiteModel): - module_name: str = Field(str(), alias='module_name') - enabled: bool = Field(True, alias='enabled') + module_name: str = Field(str(), alias="module_name") + enabled: bool = Field(True, alias="enabled") def auto_migrate(): diff --git a/liteyuki/utils/data_new.py b/liteyuki/utils/data_new.py index 9f60de0d..502a91c0 100644 --- a/liteyuki/utils/data_new.py +++ b/liteyuki/utils/data_new.py @@ -1,20 +1,11 @@ -import copy -import json import os import pickle import sqlite3 -import types from types import NoneType -from collections.abc import Iterable -from pydantic import BaseModel, Field from typing import Any -LOG_OUT = True - - -def log(*args, **kwargs): - if LOG_OUT: - print(*args, **kwargs) +import pydantic +from pydantic import BaseModel class LiteModel(BaseModel): @@ -85,7 +76,12 @@ class Database: elif model.TABLE_NAME not in table_list: raise ValueError(f"数据模型 {model.__class__.__name__} 的表 {model.TABLE_NAME} 不存在,请先迁移") else: - self._save(model.model_dump(by_alias=True)) + if pydantic.__version__ < "1.8.2": + # 兼容pydantic 1.8.2以下版本 + model_dict = model.dict(by_alias=True) + else: + model_dict = model.model_dump(by_alias=True) + self._save(model_dict) def _save(self, obj: Any) -> Any: # obj = copy.deepcopy(obj) diff --git a/liteyuki/utils/language.py b/liteyuki/utils/language.py index 54221a14..4f7c5ca5 100644 --- a/liteyuki/utils/language.py +++ b/liteyuki/utils/language.py @@ -30,14 +30,14 @@ def load_from_lang(file_path: str, lang_code: str = None): """ try: if lang_code is None: - lang_code = os.path.basename(file_path).split('.')[0] - with open(file_path, 'r', encoding='utf-8') as file: + lang_code = os.path.basename(file_path).split(".")[0] + with open(file_path, "r", encoding="utf-8") as file: data = {} for line in file: line = line.strip() - if not line or line.startswith('#'): # 空行或注释 + if not line or line.startswith("#"): # 空行或注释 continue - key, value = line.split('=', 1) + key, value = line.split("=", 1) data[key.strip()] = value.strip() if lang_code not in _language_data: _language_data[lang_code] = {} @@ -56,8 +56,8 @@ def load_from_json(file_path: str, lang_code: str = None): """ try: if lang_code is None: - lang_code = os.path.basename(file_path).split('.')[0] - with open(file_path, 'r', encoding='utf-8') as file: + lang_code = os.path.basename(file_path).split(".")[0] + with open(file_path, "r", encoding="utf-8") as file: data = json.load(file) if lang_code not in _language_data: _language_data[lang_code] = {} @@ -77,9 +77,9 @@ def load_from_dir(dir_path: str): try: file_path = os.path.join(dir_path, file) if os.path.isfile(file_path): - if file.endswith('.lang'): + if file.endswith(".lang"): load_from_lang(file_path) - elif file.endswith('.json'): + elif file.endswith(".json"): load_from_json(file_path) except Exception as e: nonebot.logger.error(f"Failed to load language data from {file}: {e}") @@ -140,7 +140,7 @@ def get_user_lang(user_id: str) -> Language: username="Unknown" )) - return Language(user.profile.get('lang', config.get("default_language", get_system_lang_code()))) + return Language(user.profile.get("lang", config.get("default_language", get_system_lang_code()))) def get_system_lang_code() -> str: diff --git a/liteyuki/utils/log.py b/liteyuki/utils/log.py index d098999a..a2d43c19 100644 --- a/liteyuki/utils/log.py +++ b/liteyuki/utils/log.py @@ -61,11 +61,11 @@ def init_log(): show_icon = config.get("log_icon", True) lang = Language(config.get("default_language", get_system_lang_code())) - debug = lang.get('log.debug', default="==DEBUG") - info = lang.get('log.info', default="===INFO") - success = lang.get('log.success', default="SUCCESS") - warning = lang.get('log.warning', default="WARNING") - error = lang.get('log.error', default="==ERROR") + debug = lang.get("log.debug", default="==DEBUG") + info = lang.get("log.info", default="===INFO") + success = lang.get("log.success", default="SUCCESS") + warning = lang.get("log.warning", default="WARNING") + error = lang.get("log.error", default="==ERROR") logger.level("DEBUG", color="", icon=f"{'*️⃣' if show_icon else ''}{debug}") logger.level("INFO", color="", icon=f"{'ℹ️' if show_icon else ''}{info}") diff --git a/liteyuki/utils/ly_typing.py b/liteyuki/utils/ly_typing.py index 4a164337..102e8c9f 100644 --- a/liteyuki/utils/ly_typing.py +++ b/liteyuki/utils/ly_typing.py @@ -4,4 +4,4 @@ T_Bot = v11.Bot | v12.Bot T_GroupMessageEvent = v11.GroupMessageEvent | v12.GroupMessageEvent T_PrivateMessageEvent = v11.PrivateMessageEvent | v12.PrivateMessageEvent T_MessageEvent = v11.MessageEvent | v12.MessageEvent -T_Message = v11.Message | v12.Message +T_Message = v11.Message | v12.Message \ No newline at end of file diff --git a/liteyuki/utils/message.py b/liteyuki/utils/message.py index 864875e7..7afdcab7 100644 --- a/liteyuki/utils/message.py +++ b/liteyuki/utils/message.py @@ -1,14 +1,16 @@ +from urllib.parse import quote + import nonebot from nonebot.adapters.onebot import v11, v12 from typing import Any -from .tools import de_escape, encode_url +from .tools import encode_url from .ly_typing import T_Bot, T_MessageEvent async def send_markdown(markdown: str, bot: T_Bot, *, message_type: str = None, session_id: str | int = None, event: T_MessageEvent = None, **kwargs) -> dict[ - str, Any]: - formatted_md = de_escape(markdown).replace("\n", r"\n").replace("\"", r'\\\"') + str, Any]: + formatted_md = v11.unescape(markdown).replace("\n", r"\n").replace("\"", r'\\\"') if event is not None and message_type is None: message_type = event.message_type session_id = event.user_id if event.message_type == "private" else event.group_id @@ -89,7 +91,7 @@ class Markdown: markdown格式的可点击回调按钮 """ - return f"[{name}](mqqapi://aio/inlinecmd?command={encode_url(cmd)}&reply={str(reply).lower()}&enter={str(enter).lower()})" + return f"[{name}](mqqapi://aio/inlinecmd?command={quote(cmd)}&reply={str(reply).lower()}&enter={str(enter).lower()})" @staticmethod def link(name: str, url: str) -> str: diff --git a/liteyuki/utils/orm.py b/liteyuki/utils/orm.py deleted file mode 100644 index 2fb79297..00000000 --- a/liteyuki/utils/orm.py +++ /dev/null @@ -1,207 +0,0 @@ -import os -import pickle -import sqlite3 -from types import NoneType -from typing import Any - -import nonebot -from pydantic import BaseModel, Field - - -class LiteModel(BaseModel): - """轻量级模型基类 - 类型注解统一使用Python3.9的PEP585标准,如需使用泛型请使用typing模块的泛型类型 - 不允许使用id, table_name以及其他SQLite关键字作为字段名,不允许使用JSON和ID,必须指定默认值,且默认值类型必须与字段类型一致 - """ - __ID__: int = Field(None, alias='id') - __TABLE_NAME__: str = Field(None, alias='table_name') - - -class Database: - TYPE_MAPPING = { - int : "INTEGER", - float : "REAL", - str : "TEXT", - bool : "INTEGER", - bytes : "BLOB", - NoneType: "NULL", - - dict : "BLOB", # LITEYUKIDICT{key_name} - list : "BLOB", # LITEYUKILIST{key_name} - tuple : "BLOB", # LITEYUKITUPLE{key_name} - set : "BLOB", # LITEYUKISET{key_name} - } - - # 基础类型 - BASIC_TYPE = [int, float, str, bool, bytes, NoneType] - # 可序列化类型 - ITERABLE_TYPE = [dict, list, tuple, set] - - LITEYUKI = "LITEYUKI" - - # 字段前缀映射,默认基础类型为"" - FIELD_PREFIX_MAPPING = { - dict : f"{LITEYUKI}DICT", - list : f"{LITEYUKI}LIST", - tuple : f"{LITEYUKI}TUPLE", - set : f"{LITEYUKI}SET", - type(LiteModel): f"{LITEYUKI}MODEL" - } - - def __init__(self, db_name: str): - if not os.path.exists(os.path.dirname(db_name)): - os.makedirs(os.path.dirname(db_name)) - self.conn = sqlite3.connect(db_name) # 连接对象 - self.conn.row_factory = sqlite3.Row # 以字典形式返回查询结果 - self.cursor = self.conn.cursor() # 游标对象 - - def auto_migrate(self, *args: LiteModel): - """ - 自动迁移模型 - Args: - *args: 模型类实例化对象,支持空默认值,不支持嵌套迁移 - - Returns: - - """ - for model in args: - if not model.__TABLE_NAME__: - raise ValueError(f"数据模型{model.__class__.__name__}未提供表名") - - # 若无则创建表 - self.cursor.execute( - f'CREATE TABLE IF NOT EXISTS {model.__TABLE_NAME__} (id INTEGER PRIMARY KEY AUTOINCREMENT)' - ) - - # 获取表结构 - new_fields, new_stored_types = ( - zip( - *[(self._get_stored_field_prefix(model.__getattribute__(field)) + field, self._get_stored_type(model.__getattribute__(field))) - for field in model.__annotations__] - ) - ) - - # 原有的字段列表 - existing_fields = self.cursor.execute(f'PRAGMA table_info({model.__TABLE_NAME__})').fetchall() - existing_types = [field['name'] for field in existing_fields] - - # 检测缺失字段,由于SQLite是动态类型,所以不需要检测类型 - for n_field, n_type in zip(new_fields, new_stored_types): - if n_field not in existing_types: - nonebot.logger.debug(f'ALTER TABLE {model.__TABLE_NAME__} ADD COLUMN {n_field} {n_type}') - self.cursor.execute( - f'ALTER TABLE {model.__TABLE_NAME__} ADD COLUMN {n_field} {n_type}' - ) - - # 检测多余字段进行删除 - for e_field in existing_types: - if e_field not in new_fields and e_field not in ['id']: - nonebot.logger.debug(f'ALTER TABLE {model.__TABLE_NAME__} DROP COLUMN {e_field}') - self.cursor.execute( - f'ALTER TABLE {model.__TABLE_NAME__} DROP COLUMN {e_field}' - ) - - self.conn.commit() - - def save(self, *args: LiteModel) -> [int | tuple[int, ...]]: - """ - 保存或更新模型 - Args: - *args: 模型类实例化对象,支持空默认值,不支持嵌套迁移 - Returns: - - """ - ids = [] - for model in args: - if not model.__TABLE_NAME__: - raise ValueError(f"数据模型{model.__class__.__name__}未提供表名") - if not self.cursor.execute(f'PRAGMA table_info({model.__TABLE_NAME__})').fetchall(): - raise ValueError(f"数据表{model.__TABLE_NAME__}不存在,请先迁移{model.__class__.__name__}模型") - - stored_fields, stored_values = [], [] - for r_field in model.__annotations__: - r_value = model.__getattribute__(r_field) - stored_fields.append(self._get_stored_field_prefix(r_value) + r_field) - - if type(r_value) in Database.BASIC_TYPE: - # int str float bool bytes NoneType - stored_values.append(r_value) - - elif type(r_value) in Database.ITERABLE_TYPE: - # dict list tuple set - stored_values.append(pickle.dumps(self._flat_save(r_value))) - - elif isinstance(r_value, LiteModel): - # LiteModel TABLE_NAME:ID - stored_values.append(f"{r_value.__TABLE_NAME__}:{self.save(r_value)}") - - else: - raise ValueError(f"不支持的数据类型{type(r_value)}") - nonebot.logger.debug(f"INSERT OR REPLACE INTO {model.__TABLE_NAME__} ({','.join(stored_fields)}) VALUES ({','.join([_ for _ in stored_values])})") - self.cursor.execute( - f"INSERT OR REPLACE INTO {model.__TABLE_NAME__} ({','.join(stored_fields)}) VALUES ({','.join(['?' for _ in stored_values])})", - stored_values - ) - ids.append(self.cursor.lastrowid) - self.conn.commit() - return tuple(ids) if len(ids) > 1 else ids[0] - - # 检测id字段是否有1,有则更新,无则插入 - - def _flat_save(self, obj) -> Any: - """扁平化存储 - - Args: - obj: 需要存储的对象 - - Returns: - 存储的字节流 - """ - # TODO 递归扁平化存储 - if type(obj) in Database.ITERABLE_TYPE: - for i, item in enumerate(obj) if type(obj) in [list, tuple, set] else obj.items(): - if type(item) in Database.BASIC_TYPE: - continue - elif type(item) in Database.ITERABLE_TYPE: - obj[i] = pickle.dumps(self._flat_save(item)) - elif isinstance(item, LiteModel): - obj[i] = f"{item.__TABLE_NAME__}:{self.save(item)}" - else: - raise ValueError(f"不支持的数据类型{type(item)}") - else: - raise ValueError(f"不支持的数据类型{type(obj)}") - - @staticmethod - def _get_stored_field_prefix(value) -> str: - """获取存储字段前缀,一定在后加上字段名 - - LiteModel -> LITEYUKIID - - dict -> LITEYUKIDICT - - list -> LITEYUKILIST - - tuple -> LITEYUKITUPLE - - set -> LITEYUKISET - - * -> "" - Args: - value: 储存的值 - - Returns: - Sqlite3存储字段 - """ - return Database.FIELD_PREFIX_MAPPING.get(type(value), "") - - @staticmethod - def _get_stored_type(value) -> str: - """获取存储类型 - - Args: - value: 储存的值 - - Returns: - Sqlite3存储类型 - """ - return Database.TYPE_MAPPING.get(type(value), "TEXT") diff --git a/liteyuki/utils/tools.py b/liteyuki/utils/tools.py index c5d9ccbd..b088ee23 100644 --- a/liteyuki/utils/tools.py +++ b/liteyuki/utils/tools.py @@ -35,23 +35,6 @@ def convert_size(size: int, precision: int = 2, add_unit: bool = True, suffix: s return f"{size:.{precision}f}" -def de_escape(text: str) -> str: - str_map = { - "[": "[", - "]": "]", - "&": "&", - ",": ",", - } - for k, v in str_map.items(): - text = text.replace(k, v) - - return text - - -def encode_url(text: str) -> str: - return quote(text, safe="") - - def keywords_in_text(keywords: list[str], text: str, all_matched: bool) -> bool: """ 检查关键词是否在文本中