feat: 统一双引号

This commit is contained in:
远野千束 2024-03-26 17:14:41 +08:00
parent 04fc9c3dd7
commit ecbe06a9e8
18 changed files with 131 additions and 359 deletions

View File

@ -22,4 +22,5 @@ if installed_plugins:
if not check_for_package(installed_plugin.module_name): if not check_for_package(installed_plugin.module_name):
nonebot.logger.error(f"{installed_plugin.module_name} not installed, but in loading database. please run `npm fixup` in chat to reinstall it.") nonebot.logger.error(f"{installed_plugin.module_name} not installed, but in loading database. please run `npm fixup` in chat to reinstall it.")
else: else:
print(installed_plugin.module_name)
nonebot.load_plugin(installed_plugin.module_name) nonebot.load_plugin(installed_plugin.module_name)

View File

@ -28,31 +28,31 @@ lang = Language()
dash_app = Dash(__name__) dash_app = Dash(__name__)
dash_app.layout = dash_app.layout = html.Div(children=[ dash_app.layout = dash_app.layout = html.Div(children=[
html.H1(children=lang.get("main.monitor.title"), style={ html.H1(children=lang.get("main.monitor.title"), style={
'textAlign': 'center' "textAlign": "center"
}), }),
dcc.Graph(id='live-update-graph'), dcc.Graph(id="live-update-graph"),
dcc.Interval( dcc.Interval(
id='interval-component', id="interval-component",
interval=1 * 1000, # in milliseconds interval=1 * 1000, # in milliseconds
n_intervals=0 n_intervals=0
) )
]) ])
@dash_app.callback(Output('live-update-graph', 'figure'), @dash_app.callback(Output("live-update-graph", "figure"),
[Input('interval-component', 'n_intervals')]) [Input("interval-component", "n_intervals")])
def update_graph_live(n): def update_graph_live(n):
lang = Language() lang = Language()
system_inf = get_system_info() system_inf = get_system_info()
dash_app.layout = html.Div(children=[ dash_app.layout = html.Div(children=[
html.H1(children=lang.get("main.monitor.title"), style={ html.H1(children=lang.get("main.monitor.title"), style={
'textAlign': 'center' "textAlign": "center"
}), }),
dcc.Graph(id='live-update-graph'), dcc.Graph(id="live-update-graph"),
dcc.Interval( dcc.Interval(
id='interval-component', id="interval-component",
interval=2 * 1000, # in milliseconds interval=2 * 1000, # in milliseconds
n_intervals=0 n_intervals=0
) )
@ -60,28 +60,28 @@ def update_graph_live(n):
mem = psutil.virtual_memory() mem = psutil.virtual_memory()
cpu_f = psutil.cpu_freq() cpu_f = psutil.cpu_freq()
figure = { figure = {
'data' : [ "data" : [
{ {
'x' : [f"{cpu_f.current / 1000:.2f}GHz {psutil.cpu_count(logical=False)}c{psutil.cpu_count()}t"], "x" : [f"{cpu_f.current / 1000:.2f}GHz {psutil.cpu_count(logical=False)}c{psutil.cpu_count()}t"],
'y' : [system_inf['cpu_percent']], "y" : [system_inf["cpu_percent"]],
'type': 'bar', "type": "bar",
'name': f"{lang.get('main.monitor.cpu')} {lang.get('main.monitor.usage')}" "name": f"{lang.get('main.monitor.cpu')} {lang.get('main.monitor.usage')}"
}, },
{ {
'x' : [f"{convert_size(mem.used, add_unit=False)}/{convert_size(mem.total)}({mem.used / mem.total * 100:.2f}%)"], "x" : [f"{convert_size(mem.used, add_unit=False)}/{convert_size(mem.total)}({mem.used / mem.total * 100:.2f}%)"],
'y' : [system_inf['memory_percent']], "y" : [system_inf["memory_percent"]],
'type': 'bar', "type": "bar",
'name': f"{lang.get('main.monitor.memory')} {lang.get('main.monitor.usage')}" "name": f"{lang.get('main.monitor.memory')} {lang.get('main.monitor.usage')}"
}, },
], ],
'layout': { "layout": {
'title': lang.get('main.monitor.description'), "title": lang.get("main.monitor.description"),
# 'xaxis': { # "xaxis": {
# 'range': [0, 10] # "range": [0, 10]
# }, # 设置x轴的范围 # }, # 设置x轴的范围
'yaxis': { "yaxis": {
'range': [0, 100] "range": [0, 100]
}, # 设置y轴的范围 }, # 设置y轴的范围
} }
} }

View File

@ -20,12 +20,12 @@ class StorePlugin(LiteModel):
name: str name: str
desc: str desc: str
module_name: str module_name: str
project_link: str = '' project_link: str = ""
homepage: str = '' homepage: str =""
author: str = '' author: str = ""
type: str | None = None type: str | None = None
version: str | None = '' version: str | None = ""
time: str = '' time: str = ""
tags: list[PluginTag] = [] tags: list[PluginTag] = []
is_official: bool = False is_official: bool = False
@ -59,7 +59,7 @@ def get_plugin_default_enable(plugin_module_name: str) -> bool:
bool: 插件默认状态 bool: 插件默认状态
""" """
plug = nonebot.plugin.get_plugin_by_module_name(plugin_module_name) plug = nonebot.plugin.get_plugin_by_module_name(plugin_module_name)
return (plug.metadata.extra.get('default_enable', True) return (plug.metadata.extra.get("default_enable", True)
if plug.metadata else True) if plug else True if plug.metadata else True) if plug else True
@ -75,9 +75,9 @@ def get_plugin_session_enable(event: T_MessageEvent, plugin_module_name: str) ->
bool: 插件当前状态 bool: 插件当前状态
""" """
if event.message_type == "group": if event.message_type == "group":
session: GroupChat = group_db.first(GroupChat, 'group_id = ?', event.group_id, default=GroupChat(group_id=str(event.group_id))) session: GroupChat = group_db.first(GroupChat, "group_id = ?", event.group_id, default=GroupChat(group_id=str(event.group_id)))
else: else:
session: User = user_db.first(User, 'user_id = ?', event.user_id, default=User(user_id=str(event.user_id))) session: User = user_db.first(User, "user_id = ?", event.user_id, default=User(user_id=str(event.user_id)))
# 默认停用插件在启用列表内表示启用 # 默认停用插件在启用列表内表示启用
# 默认停用插件不在启用列表内表示停用 # 默认停用插件不在启用列表内表示停用
# 默认启用插件在停用列表内表示停用 # 默认启用插件在停用列表内表示停用
@ -104,4 +104,4 @@ def get_plugin_can_be_toggle(plugin_module_name: str) -> bool:
bool: 插件是否可以被启用/停用 bool: 插件是否可以被启用/停用
""" """
plug = nonebot.plugin.get_plugin_by_module_name(plugin_module_name) plug = nonebot.plugin.get_plugin_by_module_name(plugin_module_name)
return plug.metadata.extra.get('toggleable', True) if plug and plug.metadata else True return plug.metadata.extra.get("toggleable", True) if plug and plug.metadata else True

View File

@ -68,9 +68,9 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
if len(rs): if len(rs):
reply = f"{ulang.get('npm.search_result')} | {ulang.get('npm.total', TOTAL=len(rs))}\n***" reply = f"{ulang.get('npm.search_result')} | {ulang.get('npm.total', TOTAL=len(rs))}\n***"
for plugin in rs[:min(max_show, len(rs))]: for plugin in rs[:min(max_show, len(rs))]:
btn_install = md.button(ulang.get('npm.install'), 'npm install %s' % plugin.module_name) btn_install = md.button(ulang.get("npm.install"), "npm install %s" % plugin.module_name)
link_page = md.link(ulang.get('npm.homepage'), plugin.homepage) link_page = md.link(ulang.get("npm.homepage"), plugin.homepage)
link_pypi = md.link(ulang.get('npm.pypi'), plugin.homepage) link_pypi = md.link(ulang.get("npm.pypi"), plugin.homepage)
reply += (f"\n# **{plugin.name}**\n" reply += (f"\n# **{plugin.name}**\n"
f"\n> **{plugin.desc}**\n" f"\n> **{plugin.desc}**\n"
@ -93,7 +93,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
if not store_plugin: if not store_plugin:
await npm_alc.finish(ulang.get("npm.plugin_not_found", NAME=plugin_module_name)) await npm_alc.finish(ulang.get("npm.plugin_not_found", NAME=plugin_module_name))
homepage_btn = md.button(ulang.get('npm.homepage'), store_plugin.homepage) homepage_btn = md.button(ulang.get("npm.homepage"), store_plugin.homepage)
if r: if r:
r_load = nonebot.load_plugin(plugin_module_name) # 加载插件 r_load = nonebot.load_plugin(plugin_module_name) # 加载插件
@ -103,7 +103,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
if r_load: if r_load:
if found_in_db_plugin is None: if found_in_db_plugin is None:
plugin_db.upsert(installed_plugin) plugin_db.upsert(installed_plugin)
info = ulang.get('npm.install_success', NAME=store_plugin.name).replace('_', r'\\_') # markdown转义 info = md.escape(ulang.get("npm.install_success", NAME=store_plugin.name)) # markdown转义
await send_markdown( await send_markdown(
f"{info}\n\n" f"{info}\n\n"
f"```\n{log}\n```", f"```\n{log}\n```",
@ -111,9 +111,9 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
event=event event=event
) )
else: else:
await npm_alc.finish(ulang.get('npm.plugin_already_installed', NAME=store_plugin.name)) await npm_alc.finish(ulang.get("npm.plugin_already_installed", NAME=store_plugin.name))
else: else:
info = ulang.get('npm.load_failed', NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace('_', r'\\_') info = ulang.get("npm.load_failed", NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace("_", r"\\_")
await send_markdown( await send_markdown(
f"{info}\n\n" f"{info}\n\n"
f"```\n{log}\n```\n", f"```\n{log}\n```\n",
@ -121,7 +121,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
event=event event=event
) )
else: else:
info = ulang.get('npm.install_failed', NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace('_', r'\\_') info = ulang.get("npm.install_failed", NAME=plugin_module_name, HOMEPAGE=homepage_btn).replace("_", r"\\_")
await send_markdown( await send_markdown(
f"{info}\n\n" f"{info}\n\n"
f"```\n{log}\n```", f"```\n{log}\n```",
@ -216,7 +216,7 @@ def npm_install(plugin_module_name) -> tuple[bool, str]:
for mirror in mirrors: for mirror in mirrors:
try: try:
nonebot.logger.info(f"npm_install try mirror: {mirror}") nonebot.logger.info(f"npm_install try mirror: {mirror}")
result = pip.main(['install', plugin_module_name, "-i", mirror]) result = pip.main(["install", plugin_module_name, "-i", mirror])
success = result == 0 success = result == 0
if success: if success:
break break

View File

@ -20,21 +20,21 @@ from nonebot_plugin_alconna import on_alconna, Alconna, Args, Arparma
list_plugins = on_alconna( list_plugins = on_alconna(
Alconna( Alconna(
['list-plugins', "插件列表", "列出插件"], ["list-plugins", "插件列表", "列出插件"],
) )
) )
toggle_plugin = on_alconna( toggle_plugin = on_alconna(
Alconna( Alconna(
['enable-plugin', 'disable-plugin'], ["enable-plugin", "disable-plugin"],
Args['plugin_name', str], Args["plugin_name", str],
) )
) )
global_toggle = on_alconna( global_toggle = on_alconna(
Alconna( Alconna(
['toggle-global'], ["toggle-global"],
Args['plugin_name', str], Args["plugin_name", str],
), ),
permission=SUPERUSER permission=SUPERUSER
) )
@ -49,27 +49,27 @@ async def _(event: T_MessageEvent, bot: T_Bot):
for plugin in nonebot.get_loaded_plugins(): for plugin in nonebot.get_loaded_plugins():
# 检查是否有 metadata 属性 # 检查是否有 metadata 属性
# 添加帮助按钮 # 添加帮助按钮
btn_usage = md.button(lang.get('npm.usage'), f'help {plugin.module_name}', False) btn_usage = md.button(lang.get("npm.usage"), f"help {plugin.module_name}", False)
store_plugin = await get_store_plugin(plugin.module_name) store_plugin = await get_store_plugin(plugin.module_name)
session_enable = get_plugin_session_enable(event, plugin.module_name) session_enable = get_plugin_session_enable(event, plugin.module_name)
default_enable = get_plugin_default_enable(plugin.module_name) default_enable = get_plugin_default_enable(plugin.module_name)
if store_plugin: if store_plugin:
btn_homepage = md.link(lang.get('npm.homepage'), store_plugin.homepage) btn_homepage = md.link(lang.get("npm.homepage"), store_plugin.homepage)
show_name = store_plugin.name show_name = store_plugin.name
show_desc = store_plugin.desc show_desc = store_plugin.desc
elif plugin.metadata: elif plugin.metadata:
if plugin.metadata.extra.get('liteyuki'): if plugin.metadata.extra.get("liteyuki"):
btn_homepage = md.link(lang.get('npm.homepage'), "https://github.com/snowykami/LiteyukiBot") btn_homepage = md.link(lang.get("npm.homepage"), "https://github.com/snowykami/LiteyukiBot")
else: else:
btn_homepage = lang.get('npm.homepage') btn_homepage = lang.get("npm.homepage")
show_name = plugin.metadata.name show_name = plugin.metadata.name
show_desc = plugin.metadata.description show_desc = plugin.metadata.description
else: else:
btn_homepage = lang.get('npm.homepage') btn_homepage = lang.get("npm.homepage")
show_name = plugin.name show_name = plugin.name
show_desc = lang.get('npm.no_description') show_desc = lang.get("npm.no_description")
if plugin.metadata: if plugin.metadata:
reply += (f"\n**{md.escape(show_name)}**\n" reply += (f"\n**{md.escape(show_name)}**\n"
@ -83,22 +83,22 @@ async def _(event: T_MessageEvent, bot: T_Bot):
if await GROUP_ADMIN(bot, event) or await GROUP_OWNER(bot, event) or await SUPERUSER(bot, event): if await GROUP_ADMIN(bot, event) or await GROUP_OWNER(bot, event) or await SUPERUSER(bot, event):
# 添加启用/停用插件按钮 # 添加启用/停用插件按钮
cmd_toggle = f"{'disable' if session_enable else 'enable'}-plugin {plugin.module_name}" cmd_toggle = f"{'disable' if session_enable else 'enable'}-plugin {plugin.module_name}"
text_toggle = lang.get('npm.disable' if session_enable else 'npm.enable') text_toggle = lang.get("npm.disable" if session_enable else "npm.enable")
can_be_toggle = get_plugin_can_be_toggle(plugin.module_name) can_be_toggle = get_plugin_can_be_toggle(plugin.module_name)
btn_toggle = text_toggle if not can_be_toggle else md.button(text_toggle, cmd_toggle) btn_toggle = text_toggle if not can_be_toggle else md.button(text_toggle, cmd_toggle)
reply += f" {btn_toggle}" reply += f" {btn_toggle}"
if await SUPERUSER(bot, event): if await SUPERUSER(bot, event):
plugin_in_database = plugin_db.first(InstalledPlugin, 'module_name = ?', plugin.module_name) plugin_in_database = plugin_db.first(InstalledPlugin, "module_name = ?", plugin.module_name)
# 添加移除插件和全局切换按钮 # 添加移除插件和全局切换按钮
global_enable = get_plugin_global_enable(plugin.module_name) global_enable = get_plugin_global_enable(plugin.module_name)
btn_uninstall = ( btn_uninstall = (
md.button(lang.get('npm.uninstall'), f'npm uninstall {plugin.module_name}')) if plugin_in_database else lang.get( md.button(lang.get("npm.uninstall"), f'npm uninstall {plugin.module_name}')) if plugin_in_database else lang.get(
'npm.uninstall') 'npm.uninstall')
btn_toggle_global_text = lang.get('npm.disable_global' if global_enable else 'npm.enable_global') btn_toggle_global_text = lang.get("npm.disable_global" if global_enable else "npm.enable_global")
cmd_toggle_global = f'npm toggle-global {plugin.module_name}' cmd_toggle_global = f"npm toggle-global {plugin.module_name}"
btn_toggle_global = btn_toggle_global_text if not can_be_toggle else md.button(btn_toggle_global_text, cmd_toggle_global) btn_toggle_global = btn_toggle_global_text if not can_be_toggle else md.button(btn_toggle_global_text, cmd_toggle_global)
reply += f" {btn_uninstall} {btn_toggle_global}" reply += f" {btn_uninstall} {btn_toggle_global}"
@ -115,7 +115,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
ulang = get_user_lang(str(event.user_id)) ulang = get_user_lang(str(event.user_id))
plugin_module_name = result.args.get("plugin_name") plugin_module_name = result.args.get("plugin_name")
toggle = result.header_result == 'enable-plugin' # 判断是启用还是停用 toggle = result.header_result == "enable-plugin" # 判断是启用还是停用
session_enable = get_plugin_session_enable(event, plugin_module_name) # 获取插件当前状态 session_enable = get_plugin_session_enable(event, plugin_module_name) # 获取插件当前状态
@ -149,7 +149,6 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
else: else:
session.enabled_plugins.remove(plugin_module_name) session.enabled_plugins.remove(plugin_module_name)
if event.message_type == "private": if event.message_type == "private":
print("已保存")
user_db.upsert(session) user_db.upsert(session)
else: else:
group_db.upsert(session) group_db.upsert(session)

View File

@ -1,2 +0,0 @@
def detect_lang(input_str: str) -> str:
return "zh-CN" if input_str == "zh" else "en"

View File

@ -116,7 +116,7 @@ def get_profile_menu(key: str, ulang: Language) -> Optional[str]:
reply = f"**{setting_name} {ulang.get('user.profile.settings')}**\n***\n" reply = f"**{setting_name} {ulang.get('user.profile.settings')}**\n***\n"
if key == "lang": if key == "lang":
for lang_code, lang_name in get_all_lang().items(): for lang_code, lang_name in get_all_lang().items():
btn_set = md.button(ulang.get('user.profile.set'), f"profile set {key} {lang_code}") btn_set = md.button(ulang.get("user.profile.set"), f"profile set {key} {lang_code}")
reply += f"\n{btn_set} | **{lang_name}** - {lang_code}\n***\n" reply += f"\n{btn_set} | **{lang_name}** - {lang_code}\n***\n"
elif key == "timezone": elif key == "timezone":
for tz in representative_timezones_list: for tz in representative_timezones_list:
@ -135,9 +135,9 @@ def set_profile(key: str, value: str) -> bool:
是否成功设置输入合法性不通过返回False 是否成功设置输入合法性不通过返回False
""" """
if key == 'lang': if key == "lang":
if value in get_all_lang(): if value in get_all_lang():
return True return True
elif key == 'timezone': elif key == "timezone":
if value in pytz.all_timezones: if value in pytz.all_timezones:
return True return True

View File

@ -19,14 +19,14 @@ def load_from_yaml(file: str) -> dict:
global config global config
nonebot.logger.debug("Loading config from %s" % file) nonebot.logger.debug("Loading config from %s" % file)
if not os.path.exists(file): if not os.path.exists(file):
nonebot.logger.warning(f'Config file {file} not found, created default config, please modify it and restart') nonebot.logger.warning(f"Config file {file} not found, created default config, please modify it and restart")
with open(file, 'w', encoding='utf-8') as f: with open(file, "w", encoding="utf-8") as f:
yaml.dump(BasicConfig().dict(), f, default_flow_style=False) yaml.dump(BasicConfig().dict(), f, default_flow_style=False)
with open(file, 'r', encoding='utf-8') as f: with open(file, "r", encoding="utf-8") as f:
conf = yaml.load(f, Loader=yaml.FullLoader) conf = yaml.load(f, Loader=yaml.FullLoader)
config = conf config = conf
if conf is None: if conf is None:
nonebot.logger.warning(f'Config file {file} is empty, use default config. please modify it and restart') nonebot.logger.warning(f"Config file {file} is empty, use default config. please modify it and restart")
conf = BasicConfig().dict() conf = BasicConfig().dict()
return conf return conf

View File

@ -226,13 +226,13 @@ class Database(BaseORMAdapter):
return_data = {} return_data = {}
for k, v in data.items(): for k, v in data.items():
if isinstance(v, LiteModel): if isinstance(v, LiteModel):
return_data[f'{self.FOREIGNID}{k}'] = f'{self.ID}:{v.__class__.__name__}:{self.upsert(v)}' return_data[f"{self.FOREIGNID}{k}"] = f"{self.ID}:{v.__class__.__name__}:{self.upsert(v)}"
elif isinstance(v, list): elif isinstance(v, list):
return_data[f'{self.LIST}{k}'] = self._flat(v) return_data[f"{self.LIST}{k}"] = self._flat(v)
elif isinstance(v, dict): elif isinstance(v, dict):
return_data[f'{self.DICT}{k}'] = self._flat(v) return_data[f"{self.DICT}{k}"] = self._flat(v)
elif isinstance(v, BaseIterable): elif isinstance(v, BaseIterable):
return_data[f'{self.JSON}{k}'] = self._flat(v) return_data[f"{self.JSON}{k}"] = self._flat(v)
else: else:
return_data[k] = v return_data[k] = v
@ -240,7 +240,7 @@ class Database(BaseORMAdapter):
return_data = [] return_data = []
for v in data: for v in data:
if isinstance(v, LiteModel): if isinstance(v, LiteModel):
return_data.append(f'{self.ID}:{v.__class__.__name__}:{self.upsert(v)}') return_data.append(f"{self.ID}:{v.__class__.__name__}:{self.upsert(v)}")
elif isinstance(v, list): elif isinstance(v, list):
return_data.append(self._flat(v)) return_data.append(self._flat(v))
elif isinstance(v, dict): elif isinstance(v, dict):
@ -250,7 +250,7 @@ class Database(BaseORMAdapter):
else: else:
return_data.append(v) return_data.append(v)
else: else:
raise ValueError('数据类型错误') raise ValueError("数据类型错误")
return json.dumps(return_data) return json.dumps(return_data)
@ -263,7 +263,7 @@ class Database(BaseORMAdapter):
Returns: Returns:
""" """
return self.cursor.execute(f'SELECT * FROM sqlite_master WHERE type = "table" AND name = ?', (table_name,)).fetchone() return self.cursor.execute(f"SELECT * FROM sqlite_master WHERE type = 'table' AND name = ?", (table_name,)).fetchone()
def first(self, model: type(LiteModel), conditions, *args, default: Any = None) -> LiteModel | None: def first(self, model: type(LiteModel), conditions, *args, default: Any = None) -> LiteModel | None:
"""查询第一条数据 """查询第一条数据
@ -281,7 +281,7 @@ class Database(BaseORMAdapter):
if not self._detect_for_table(table_name): if not self._detect_for_table(table_name):
return default return default
self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args) self.cursor.execute(f"SELECT * FROM {table_name} WHERE {conditions}", args)
if row_data := self.cursor.fetchone(): if row_data := self.cursor.fetchone():
data = dict(row_data) data = dict(row_data)
return model(**self.convert_to_dict(data)) return model(**self.convert_to_dict(data))
@ -304,9 +304,9 @@ class Database(BaseORMAdapter):
return default return default
if conditions: if conditions:
self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args) self.cursor.execute(f"SELECT * FROM {table_name} WHERE {conditions}", args)
else: else:
self.cursor.execute(f'SELECT * FROM {table_name}') self.cursor.execute(f"SELECT * FROM {table_name}")
if row_datas := self.cursor.fetchall(): if row_datas := self.cursor.fetchall():
datas = [dict(row_data) for row_data in row_datas] datas = [dict(row_data) for row_data in row_datas]
return [model(**self.convert_to_dict(d)) for d in datas] if datas else default return [model(**self.convert_to_dict(d)) for d in datas] if datas else default
@ -327,8 +327,8 @@ class Database(BaseORMAdapter):
if not self._detect_for_table(table_name): if not self._detect_for_table(table_name):
return return
nonebot.logger.debug(f'DELETE FROM {table_name} WHERE {conditions}') nonebot.logger.debug(f"DELETE FROM {table_name} WHERE {conditions}")
self.cursor.execute(f'DELETE FROM {table_name} WHERE {conditions}', args) self.cursor.execute(f"DELETE FROM {table_name} WHERE {conditions}", args)
self.conn.commit() self.conn.commit()
def convert_to_dict(self, data: dict) -> dict: def convert_to_dict(self, data: dict) -> dict:
@ -346,8 +346,8 @@ class Database(BaseORMAdapter):
new_d = {} new_d = {}
for k, v in d.items(): for k, v in d.items():
if k.startswith(self.FOREIGNID): if k.startswith(self.FOREIGNID):
new_d[k.replace(self.FOREIGNID, '')] = load( new_d[k.replace(self.FOREIGNID, "")] = load(
dict(self.cursor.execute(f'SELECT * FROM {v.split(":", 2)[1]} WHERE id = ?', (v.split(":", 2)[2],)).fetchone())) dict(self.cursor.execute(f"SELECT * FROM {v.split(':', 2)[1]} WHERE id = ?", (v.split(":", 2)[2],)).fetchone()))
elif k.startswith(self.LIST): elif k.startswith(self.LIST):
if v == '': v = '[]' if v == '': v = '[]'

View File

@ -6,36 +6,36 @@ from liteyuki.utils.data import LiteModel, Database as DB
DATA_PATH = "data/liteyuki" DATA_PATH = "data/liteyuki"
user_db = DB(os.path.join(DATA_PATH, 'users.ldb')) user_db = DB(os.path.join(DATA_PATH, "users.ldb"))
group_db = DB(os.path.join(DATA_PATH, 'groups.ldb')) group_db = DB(os.path.join(DATA_PATH, "groups.ldb"))
plugin_db = DB(os.path.join(DATA_PATH, 'plugins.ldb')) plugin_db = DB(os.path.join(DATA_PATH, "plugins.ldb"))
common_db = DB(os.path.join(DATA_PATH, 'common.ldb')) common_db = DB(os.path.join(DATA_PATH, "common.ldb"))
class User(LiteModel): class User(LiteModel):
user_id: str = Field(str(), alias='user_id') user_id: str = Field(str(), alias="user_id")
username: str = Field(str(), alias='username') username: str = Field(str(), alias="username")
profile: dict[str, str] = Field(dict(), alias='profile') profile: dict[str, str] = Field(dict(), alias="profile")
enabled_plugins: list[str] = Field(list(), alias='enabled_plugins') enabled_plugins: list[str] = Field(list(), alias="enabled_plugins")
disabled_plugins: list[str] = Field(list(), alias='disabled_plugins') disabled_plugins: list[str] = Field(list(), alias="disabled_plugins")
class GroupChat(LiteModel): class GroupChat(LiteModel):
# Group是一个关键字所以这里用GroupChat # Group是一个关键字所以这里用GroupChat
group_id: str = Field(str(), alias='group_id') group_id: str = Field(str(), alias="group_id")
group_name: str = Field(str(), alias='group_name') group_name: str = Field(str(), alias="group_name")
enabled_plugins: list[str] = Field([], alias='enabled_plugins') enabled_plugins: list[str] = Field([], alias="enabled_plugins")
disabled_plugins: list[str] = Field([], alias='disabled_plugins') disabled_plugins: list[str] = Field([], alias="disabled_plugins")
class InstalledPlugin(LiteModel): class InstalledPlugin(LiteModel):
module_name: str = Field(str(), alias='module_name') module_name: str = Field(str(), alias="module_name")
version: str = Field(str(), alias='version') version: str = Field(str(), alias="version")
class GlobalPlugin(LiteModel): class GlobalPlugin(LiteModel):
module_name: str = Field(str(), alias='module_name') module_name: str = Field(str(), alias="module_name")
enabled: bool = Field(True, alias='enabled') enabled: bool = Field(True, alias="enabled")
def auto_migrate(): def auto_migrate():

View File

@ -1,20 +1,11 @@
import copy
import json
import os import os
import pickle import pickle
import sqlite3 import sqlite3
import types
from types import NoneType from types import NoneType
from collections.abc import Iterable
from pydantic import BaseModel, Field
from typing import Any from typing import Any
LOG_OUT = True import pydantic
from pydantic import BaseModel
def log(*args, **kwargs):
if LOG_OUT:
print(*args, **kwargs)
class LiteModel(BaseModel): class LiteModel(BaseModel):
@ -85,7 +76,12 @@ class Database:
elif model.TABLE_NAME not in table_list: elif model.TABLE_NAME not in table_list:
raise ValueError(f"数据模型 {model.__class__.__name__} 的表 {model.TABLE_NAME} 不存在,请先迁移") raise ValueError(f"数据模型 {model.__class__.__name__} 的表 {model.TABLE_NAME} 不存在,请先迁移")
else: else:
self._save(model.model_dump(by_alias=True)) if pydantic.__version__ < "1.8.2":
# 兼容pydantic 1.8.2以下版本
model_dict = model.dict(by_alias=True)
else:
model_dict = model.model_dump(by_alias=True)
self._save(model_dict)
def _save(self, obj: Any) -> Any: def _save(self, obj: Any) -> Any:
# obj = copy.deepcopy(obj) # obj = copy.deepcopy(obj)

View File

@ -30,14 +30,14 @@ def load_from_lang(file_path: str, lang_code: str = None):
""" """
try: try:
if lang_code is None: if lang_code is None:
lang_code = os.path.basename(file_path).split('.')[0] lang_code = os.path.basename(file_path).split(".")[0]
with open(file_path, 'r', encoding='utf-8') as file: with open(file_path, "r", encoding="utf-8") as file:
data = {} data = {}
for line in file: for line in file:
line = line.strip() line = line.strip()
if not line or line.startswith('#'): # 空行或注释 if not line or line.startswith("#"): # 空行或注释
continue continue
key, value = line.split('=', 1) key, value = line.split("=", 1)
data[key.strip()] = value.strip() data[key.strip()] = value.strip()
if lang_code not in _language_data: if lang_code not in _language_data:
_language_data[lang_code] = {} _language_data[lang_code] = {}
@ -56,8 +56,8 @@ def load_from_json(file_path: str, lang_code: str = None):
""" """
try: try:
if lang_code is None: if lang_code is None:
lang_code = os.path.basename(file_path).split('.')[0] lang_code = os.path.basename(file_path).split(".")[0]
with open(file_path, 'r', encoding='utf-8') as file: with open(file_path, "r", encoding="utf-8") as file:
data = json.load(file) data = json.load(file)
if lang_code not in _language_data: if lang_code not in _language_data:
_language_data[lang_code] = {} _language_data[lang_code] = {}
@ -77,9 +77,9 @@ def load_from_dir(dir_path: str):
try: try:
file_path = os.path.join(dir_path, file) file_path = os.path.join(dir_path, file)
if os.path.isfile(file_path): if os.path.isfile(file_path):
if file.endswith('.lang'): if file.endswith(".lang"):
load_from_lang(file_path) load_from_lang(file_path)
elif file.endswith('.json'): elif file.endswith(".json"):
load_from_json(file_path) load_from_json(file_path)
except Exception as e: except Exception as e:
nonebot.logger.error(f"Failed to load language data from {file}: {e}") nonebot.logger.error(f"Failed to load language data from {file}: {e}")
@ -140,7 +140,7 @@ def get_user_lang(user_id: str) -> Language:
username="Unknown" username="Unknown"
)) ))
return Language(user.profile.get('lang', config.get("default_language", get_system_lang_code()))) return Language(user.profile.get("lang", config.get("default_language", get_system_lang_code())))
def get_system_lang_code() -> str: def get_system_lang_code() -> str:

View File

@ -61,11 +61,11 @@ def init_log():
show_icon = config.get("log_icon", True) show_icon = config.get("log_icon", True)
lang = Language(config.get("default_language", get_system_lang_code())) lang = Language(config.get("default_language", get_system_lang_code()))
debug = lang.get('log.debug', default="==DEBUG") debug = lang.get("log.debug", default="==DEBUG")
info = lang.get('log.info', default="===INFO") info = lang.get("log.info", default="===INFO")
success = lang.get('log.success', default="SUCCESS") success = lang.get("log.success", default="SUCCESS")
warning = lang.get('log.warning', default="WARNING") warning = lang.get("log.warning", default="WARNING")
error = lang.get('log.error', default="==ERROR") error = lang.get("log.error", default="==ERROR")
logger.level("DEBUG", color="<blue>", icon=f"{'*️⃣' if show_icon else ''}{debug}") logger.level("DEBUG", color="<blue>", icon=f"{'*️⃣' if show_icon else ''}{debug}")
logger.level("INFO", color="<white>", icon=f"{'' if show_icon else ''}{info}") logger.level("INFO", color="<white>", icon=f"{'' if show_icon else ''}{info}")

View File

@ -1,14 +1,16 @@
from urllib.parse import quote
import nonebot import nonebot
from nonebot.adapters.onebot import v11, v12 from nonebot.adapters.onebot import v11, v12
from typing import Any from typing import Any
from .tools import de_escape, encode_url from .tools import encode_url
from .ly_typing import T_Bot, T_MessageEvent from .ly_typing import T_Bot, T_MessageEvent
async def send_markdown(markdown: str, bot: T_Bot, *, message_type: str = None, session_id: str | int = None, event: T_MessageEvent = None, **kwargs) -> dict[ async def send_markdown(markdown: str, bot: T_Bot, *, message_type: str = None, session_id: str | int = None, event: T_MessageEvent = None, **kwargs) -> dict[
str, Any]: str, Any]:
formatted_md = de_escape(markdown).replace("\n", r"\n").replace("\"", r'\\\"') formatted_md = v11.unescape(markdown).replace("\n", r"\n").replace("\"", r'\\\"')
if event is not None and message_type is None: if event is not None and message_type is None:
message_type = event.message_type message_type = event.message_type
session_id = event.user_id if event.message_type == "private" else event.group_id session_id = event.user_id if event.message_type == "private" else event.group_id
@ -89,7 +91,7 @@ class Markdown:
markdown格式的可点击回调按钮 markdown格式的可点击回调按钮
""" """
return f"[{name}](mqqapi://aio/inlinecmd?command={encode_url(cmd)}&reply={str(reply).lower()}&enter={str(enter).lower()})" return f"[{name}](mqqapi://aio/inlinecmd?command={quote(cmd)}&reply={str(reply).lower()}&enter={str(enter).lower()})"
@staticmethod @staticmethod
def link(name: str, url: str) -> str: def link(name: str, url: str) -> str:

View File

@ -1,207 +0,0 @@
import os
import pickle
import sqlite3
from types import NoneType
from typing import Any
import nonebot
from pydantic import BaseModel, Field
class LiteModel(BaseModel):
"""轻量级模型基类
类型注解统一使用Python3.9的PEP585标准如需使用泛型请使用typing模块的泛型类型
不允许使用id, table_name以及其他SQLite关键字作为字段名不允许使用JSON和ID必须指定默认值且默认值类型必须与字段类型一致
"""
__ID__: int = Field(None, alias='id')
__TABLE_NAME__: str = Field(None, alias='table_name')
class Database:
TYPE_MAPPING = {
int : "INTEGER",
float : "REAL",
str : "TEXT",
bool : "INTEGER",
bytes : "BLOB",
NoneType: "NULL",
dict : "BLOB", # LITEYUKIDICT{key_name}
list : "BLOB", # LITEYUKILIST{key_name}
tuple : "BLOB", # LITEYUKITUPLE{key_name}
set : "BLOB", # LITEYUKISET{key_name}
}
# 基础类型
BASIC_TYPE = [int, float, str, bool, bytes, NoneType]
# 可序列化类型
ITERABLE_TYPE = [dict, list, tuple, set]
LITEYUKI = "LITEYUKI"
# 字段前缀映射,默认基础类型为""
FIELD_PREFIX_MAPPING = {
dict : f"{LITEYUKI}DICT",
list : f"{LITEYUKI}LIST",
tuple : f"{LITEYUKI}TUPLE",
set : f"{LITEYUKI}SET",
type(LiteModel): f"{LITEYUKI}MODEL"
}
def __init__(self, db_name: str):
if not os.path.exists(os.path.dirname(db_name)):
os.makedirs(os.path.dirname(db_name))
self.conn = sqlite3.connect(db_name) # 连接对象
self.conn.row_factory = sqlite3.Row # 以字典形式返回查询结果
self.cursor = self.conn.cursor() # 游标对象
def auto_migrate(self, *args: LiteModel):
"""
自动迁移模型
Args:
*args: 模型类实例化对象支持空默认值不支持嵌套迁移
Returns:
"""
for model in args:
if not model.__TABLE_NAME__:
raise ValueError(f"数据模型{model.__class__.__name__}未提供表名")
# 若无则创建表
self.cursor.execute(
f'CREATE TABLE IF NOT EXISTS {model.__TABLE_NAME__} (id INTEGER PRIMARY KEY AUTOINCREMENT)'
)
# 获取表结构
new_fields, new_stored_types = (
zip(
*[(self._get_stored_field_prefix(model.__getattribute__(field)) + field, self._get_stored_type(model.__getattribute__(field)))
for field in model.__annotations__]
)
)
# 原有的字段列表
existing_fields = self.cursor.execute(f'PRAGMA table_info({model.__TABLE_NAME__})').fetchall()
existing_types = [field['name'] for field in existing_fields]
# 检测缺失字段由于SQLite是动态类型所以不需要检测类型
for n_field, n_type in zip(new_fields, new_stored_types):
if n_field not in existing_types:
nonebot.logger.debug(f'ALTER TABLE {model.__TABLE_NAME__} ADD COLUMN {n_field} {n_type}')
self.cursor.execute(
f'ALTER TABLE {model.__TABLE_NAME__} ADD COLUMN {n_field} {n_type}'
)
# 检测多余字段进行删除
for e_field in existing_types:
if e_field not in new_fields and e_field not in ['id']:
nonebot.logger.debug(f'ALTER TABLE {model.__TABLE_NAME__} DROP COLUMN {e_field}')
self.cursor.execute(
f'ALTER TABLE {model.__TABLE_NAME__} DROP COLUMN {e_field}'
)
self.conn.commit()
def save(self, *args: LiteModel) -> [int | tuple[int, ...]]:
"""
保存或更新模型
Args:
*args: 模型类实例化对象支持空默认值不支持嵌套迁移
Returns:
"""
ids = []
for model in args:
if not model.__TABLE_NAME__:
raise ValueError(f"数据模型{model.__class__.__name__}未提供表名")
if not self.cursor.execute(f'PRAGMA table_info({model.__TABLE_NAME__})').fetchall():
raise ValueError(f"数据表{model.__TABLE_NAME__}不存在,请先迁移{model.__class__.__name__}模型")
stored_fields, stored_values = [], []
for r_field in model.__annotations__:
r_value = model.__getattribute__(r_field)
stored_fields.append(self._get_stored_field_prefix(r_value) + r_field)
if type(r_value) in Database.BASIC_TYPE:
# int str float bool bytes NoneType
stored_values.append(r_value)
elif type(r_value) in Database.ITERABLE_TYPE:
# dict list tuple set
stored_values.append(pickle.dumps(self._flat_save(r_value)))
elif isinstance(r_value, LiteModel):
# LiteModel TABLE_NAME:ID
stored_values.append(f"{r_value.__TABLE_NAME__}:{self.save(r_value)}")
else:
raise ValueError(f"不支持的数据类型{type(r_value)}")
nonebot.logger.debug(f"INSERT OR REPLACE INTO {model.__TABLE_NAME__} ({','.join(stored_fields)}) VALUES ({','.join([_ for _ in stored_values])})")
self.cursor.execute(
f"INSERT OR REPLACE INTO {model.__TABLE_NAME__} ({','.join(stored_fields)}) VALUES ({','.join(['?' for _ in stored_values])})",
stored_values
)
ids.append(self.cursor.lastrowid)
self.conn.commit()
return tuple(ids) if len(ids) > 1 else ids[0]
# 检测id字段是否有1有则更新无则插入
def _flat_save(self, obj) -> Any:
"""扁平化存储
Args:
obj: 需要存储的对象
Returns:
存储的字节流
"""
# TODO 递归扁平化存储
if type(obj) in Database.ITERABLE_TYPE:
for i, item in enumerate(obj) if type(obj) in [list, tuple, set] else obj.items():
if type(item) in Database.BASIC_TYPE:
continue
elif type(item) in Database.ITERABLE_TYPE:
obj[i] = pickle.dumps(self._flat_save(item))
elif isinstance(item, LiteModel):
obj[i] = f"{item.__TABLE_NAME__}:{self.save(item)}"
else:
raise ValueError(f"不支持的数据类型{type(item)}")
else:
raise ValueError(f"不支持的数据类型{type(obj)}")
@staticmethod
def _get_stored_field_prefix(value) -> str:
"""获取存储字段前缀,一定在后加上字段名
LiteModel -> LITEYUKIID
dict -> LITEYUKIDICT
list -> LITEYUKILIST
tuple -> LITEYUKITUPLE
set -> LITEYUKISET
* -> ""
Args:
value: 储存的值
Returns:
Sqlite3存储字段
"""
return Database.FIELD_PREFIX_MAPPING.get(type(value), "")
@staticmethod
def _get_stored_type(value) -> str:
"""获取存储类型
Args:
value: 储存的值
Returns:
Sqlite3存储类型
"""
return Database.TYPE_MAPPING.get(type(value), "TEXT")

View File

@ -35,23 +35,6 @@ def convert_size(size: int, precision: int = 2, add_unit: bool = True, suffix: s
return f"{size:.{precision}f}" return f"{size:.{precision}f}"
def de_escape(text: str) -> str:
str_map = {
"&#91;": "[",
"&#93;": "]",
"&amp;": "&",
"&#44;": ",",
}
for k, v in str_map.items():
text = text.replace(k, v)
return text
def encode_url(text: str) -> str:
return quote(text, safe="")
def keywords_in_text(keywords: list[str], text: str, all_matched: bool) -> bool: def keywords_in_text(keywords: list[str], text: str, all_matched: bool) -> bool:
""" """
检查关键词是否在文本中 检查关键词是否在文本中