Merge remote-tracking branch 'origin/main'

This commit is contained in:
snowy 2024-05-20 23:31:45 +08:00
commit c2593e71c0
24 changed files with 250 additions and 78 deletions

View File

@ -17,7 +17,7 @@ from liteyuki.utils.base.language import get_user_lang
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message.message import MarkdownMessage as md, broadcast_to_superusers from liteyuki.utils.message.message import MarkdownMessage as md, broadcast_to_superusers
from liteyuki.utils.base.reloader import Reloader from liteyuki.utils.base.reloader import Reloader
from liteyuki.utils import satori_utils from liteyuki.utils import event as event_utils, satori_utils
from .api import update_liteyuki from .api import update_liteyuki
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
@ -83,7 +83,7 @@ async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent):
"reload": True, "reload": True,
"reload_time": time.time(), "reload_time": time.time(),
"reload_bot_id": bot.self_id, "reload_bot_id": bot.self_id,
"reload_session_type": satori_utils.get_message_type(event), "reload_session_type": event_utils.get_message_type(event),
"reload_session_id": (event.group_id if event.message_type == "group" else event.user_id) if not isinstance(event,satori.event.Event) else event.channel.id, "reload_session_id": (event.group_id if event.message_type == "group" else event.user_id) if not isinstance(event,satori.event.Event) else event.channel.id,
"delta_time": 0 "delta_time": 0
} }
@ -118,7 +118,7 @@ async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent):
).handle() ).handle()
# Satori OK # Satori OK
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, matcher: Matcher): async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, matcher: Matcher):
ulang = get_user_lang(str(satori_utils.get_user_id(event))) ulang = get_user_lang(str(event_utils.get_user_id(event)))
stored_config: StoredConfig = common_db.where_one(StoredConfig(), default=StoredConfig()) stored_config: StoredConfig = common_db.where_one(StoredConfig(), default=StoredConfig())
if result.subcommands.get("set"): if result.subcommands.get("set"):
key, value = result.subcommands.get("set").args.get("key"), result.subcommands.get("set").args.get("value") key, value = result.subcommands.get("set").args.get("key"), result.subcommands.get("set").args.get("value")
@ -168,7 +168,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, matcher: Matcher
async def _(event: T_MessageEvent, matcher: Matcher): async def _(event: T_MessageEvent, matcher: Matcher):
global markdown_image global markdown_image
# 切换图片模式False以图片形式发送True以markdown形式发送 # 切换图片模式False以图片形式发送True以markdown形式发送
ulang = get_user_lang(str(satori_utils.get_user_id(event))) ulang = get_user_lang(str(event_utils.get_user_id(event)))
stored_config: StoredConfig = common_db.where_one(StoredConfig(), default=StoredConfig()) stored_config: StoredConfig = common_db.where_one(StoredConfig(), default=StoredConfig())
stored_config.config["markdown_image"] = not stored_config.config.get("markdown_image", False) stored_config.config["markdown_image"] = not stored_config.config.get("markdown_image", False)
markdown_image = stored_config.config["markdown_image"] markdown_image = stored_config.config["markdown_image"]

View File

@ -5,7 +5,7 @@ import aiofiles
import nonebot.plugin import nonebot.plugin
from nonebot.adapters import satori from nonebot.adapters import satori
from liteyuki.utils import satori_utils from liteyuki.utils import event as event_utils
from liteyuki.utils.base.data import LiteModel from liteyuki.utils.base.data import LiteModel
from liteyuki.utils.base.data_manager import GlobalPlugin, Group, User, group_db, plugin_db, user_db from liteyuki.utils.base.data_manager import GlobalPlugin, Group, User, group_db, plugin_db, user_db
from liteyuki.utils.base.ly_typing import T_MessageEvent from liteyuki.utils.base.ly_typing import T_MessageEvent
@ -140,12 +140,12 @@ def set_plugin_session_enable(event: T_MessageEvent, plugin_name: str, enable: b
Returns: Returns:
""" """
if satori_utils.get_message_type(event) == "group": if event_utils.get_message_type(event) == "group":
session = group_db.where_one(Group(), "group_id = ?", str(satori_utils.get_group_id(event)), session = group_db.where_one(Group(), "group_id = ?", str(event_utils.get_group_id(event)),
default=Group(group_id=str(satori_utils.get_group_id(event)))) default=Group(group_id=str(event_utils.get_group_id(event))))
else: else:
session = user_db.where_one(User(), "user_id = ?", str(satori_utils.get_user_id(event)), session = user_db.where_one(User(), "user_id = ?", str(event_utils.get_user_id(event)),
default=User(user_id=str(satori_utils.get_user_id(event)))) default=User(user_id=str(event_utils.get_user_id(event))))
default_enable = get_plugin_default_enable(plugin_name) default_enable = get_plugin_default_enable(plugin_name)
if default_enable: if default_enable:
if enable: if enable:
@ -158,12 +158,12 @@ def set_plugin_session_enable(event: T_MessageEvent, plugin_name: str, enable: b
else: else:
session.enabled_plugins.remove(plugin_name) session.enabled_plugins.remove(plugin_name)
if satori_utils.get_message_type(event) == "group": if event_utils.get_message_type(event) == "group":
__group_data[str(satori_utils.get_group_id(event))] = session __group_data[str(event_utils.get_group_id(event))] = session
print(session) print(session)
group_db.save(session) group_db.save(session)
else: else:
__user_data[str(satori_utils.get_user_id(event))] = session __user_data[str(event_utils.get_user_id(event))] = session
user_db.save(session) user_db.save(session)

View File

@ -1,4 +1,5 @@
from liteyuki.utils import satori_utils import nonebot
from nonebot.message import event_preprocessor from nonebot.message import event_preprocessor
# from nonebot_plugin_alconna.typings import Event # from nonebot_plugin_alconna.typings import Event
from liteyuki.utils.base.ly_typing import T_MessageEvent from liteyuki.utils.base.ly_typing import T_MessageEvent
@ -10,13 +11,11 @@ from liteyuki.plugins.liteyuki_status.counter_for_satori import satori_counter
@event_preprocessor @event_preprocessor
async def pre_handle(event: Event): async def pre_handle(event: Event):
print("UPDATE_USER")
print(event.__dict__)
if isinstance(event, satori.MessageEvent): if isinstance(event, satori.MessageEvent):
if event.user.id == event.self_id: if event.user.id == event.self_id:
satori_counter.msg_sent += 1 satori_counter.msg_sent += 1
else: else:
satori_counter.msg_received += 1 satori_counter.msg_received += 1
if event.user.name is not None: if event.user.name is not None:
await satori_utils.user_infos.put(event.user) if await satori_utils.user_infos.put(event.user):
print(event.user) nonebot.logger.info(f"Satori user {event.user.name}<{event.user.id}> updated")

View File

@ -1,9 +1,9 @@
from nonebot import Bot, require from nonebot import Bot, require
from liteyuki.utils.message.npl import convert_duration, convert_time_to_seconds from liteyuki.utils.message.npl import convert_duration, convert_time_to_seconds
from .stat_api import * from .stat_api import *
from ...utils import satori_utils from liteyuki.utils import event as event_utils
from ...utils.base.language import Language from liteyuki.utils.base.language import Language
from ...utils.base.ly_typing import T_MessageEvent from liteyuki.utils.base.ly_typing import T_MessageEvent
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
@ -45,7 +45,7 @@ stat_msg = on_alconna(
@stat_msg.assign("message") @stat_msg.assign("message")
async def _(result: Arparma, event: T_MessageEvent, bot: Bot): async def _(result: Arparma, event: T_MessageEvent, bot: Bot):
ulang = Language(satori_utils.get_user_id(event)) ulang = Language(event_utils.get_user_id(event))
try: try:
duration = convert_time_to_seconds(result.other_args.get("duration", "2d")) # 秒数 duration = convert_time_to_seconds(result.other_args.get("duration", "2d")) # 秒数
@ -58,7 +58,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: Bot):
bot_id = result.other_args.get("bot_id") bot_id = result.other_args.get("bot_id")
if group_id in ["current", "c"]: if group_id in ["current", "c"]:
group_id = str(satori_utils.get_group_id(event)) group_id = str(event_utils.get_group_id(event))
if group_id in ["all", "a"]: if group_id in ["all", "a"]:
group_id = "all" group_id = "all"

View File

@ -4,24 +4,27 @@ from nonebot import require
from nonebot.message import event_postprocessor from nonebot.message import event_postprocessor
from liteyuki.utils.base.data import Database, LiteModel from liteyuki.utils.base.data import Database, LiteModel
from liteyuki.utils.base.ly_typing import v11, satori from liteyuki.utils.base.ly_typing import v11, v12, satori
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent
from .common import MessageEventModel, msg_db from .common import MessageEventModel, msg_db
from liteyuki.utils import satori_utils from liteyuki.utils import event as event_utils
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
@event_postprocessor
async def general_event_monitor(bot: T_Bot, event: T_MessageEvent): async def general_event_monitor(bot: T_Bot, event: T_MessageEvent):
if isinstance(bot, satori.Bot): print("POST PROCESS")
return await satori_event_monitor(bot, event) # if isinstance(bot, satori.Bot):
elif isinstance(bot, v11.Bot): # print("POST PROCESS SATORI EVENT")
return await onebot_v11_event_monitor(bot, event) # return await satori_event_monitor(bot, event)
# elif isinstance(bot, v11.Bot):
# print("POST PROCESS V11 EVENT")
# return await onebot_v11_event_monitor(bot, event)
@event_postprocessor
async def onebot_v11_event_monitor(bot: v11.Bot, event: v11.MessageEvent): async def onebot_v11_event_monitor(bot: v11.Bot, event: v11.MessageEvent):
if event.message_type == "group": if event.message_type == "group":
event: v11.GroupMessageEvent event: v11.GroupMessageEvent
@ -44,6 +47,30 @@ async def onebot_v11_event_monitor(bot: v11.Bot, event: v11.MessageEvent):
msg_db.save(mem) msg_db.save(mem)
@event_postprocessor
async def onebot_v12_event_monitor(bot: v12.Bot, event: v12.MessageEvent):
if event.message_type == "group":
event: v12.GroupMessageEvent
group_id = str(event.group_id)
else:
group_id = ""
mem = MessageEventModel(
time=int(time.time()),
bot_id=bot.self_id,
adapter="onebot.v12",
group_id=group_id,
user_id=str(event.user_id),
message_id=str(event.message_id),
message=event.message,
message_text=event.raw_message,
message_type=event.message_type,
)
msg_db.save(mem)
@event_postprocessor
async def satori_event_monitor(bot: satori.Bot, event: satori.MessageEvent): async def satori_event_monitor(bot: satori.Bot, event: satori.MessageEvent):
if event.guild is not None: if event.guild is not None:
event: satori.MessageEvent event: satori.MessageEvent
@ -60,6 +87,6 @@ async def satori_event_monitor(bot: satori.Bot, event: satori.MessageEvent):
message_id=str(event.message.id), message_id=str(event.message.id),
message=event.message, message=event.message,
message_text=event.message.content, message_text=event.message.content,
message_type=satori_utils.get_message_type(event), message_type=event_utils.get_message_type(event),
) )
msg_db.save(mem) msg_db.save(mem)

View File

@ -170,7 +170,7 @@ async def get_bots_data(self_id: str = "0") -> dict:
statistics = status.get("stat", {}) statistics = status.get("stat", {})
app_name = version_info.get("app_name", "UnknownImplementation") app_name = version_info.get("app_name", "UnknownImplementation")
if app_name in ["Lagrange.OneBot", "LLOneBot", "Shamrock"]: if app_name in ["Lagrange.OneBot", "LLOneBot", "Shamrock", "NapCat.Onebot"]:
icon = f"https://q.qlogo.cn/g?b=qq&nk={bot_id}&s=640" icon = f"https://q.qlogo.cn/g?b=qq&nk={bot_id}&s=640"
elif isinstance(bot, satori.Bot): elif isinstance(bot, satori.Bot):
app_name = "Satori" app_name = "Satori"

View File

@ -4,8 +4,8 @@ from liteyuki.utils.base.resource import get_path
from liteyuki.utils.message.html_tool import template2image from liteyuki.utils.message.html_tool import template2image
from liteyuki.utils.base.language import get_user_lang from liteyuki.utils.base.language import get_user_lang
from .api import * from .api import *
from ...utils import satori_utils from liteyuki.utils import event as event_utils
from ...utils.base.ly_typing import T_Bot, T_MessageEvent from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import on_alconna, Alconna, Args, Subcommand, Arparma, UniMessage from nonebot_plugin_alconna import on_alconna, Alconna, Args, Subcommand, Arparma, UniMessage
@ -28,7 +28,7 @@ status_alc = on_alconna(
@status_alc.handle() @status_alc.handle()
async def _(event: T_MessageEvent, bot: T_Bot): async def _(event: T_MessageEvent, bot: T_Bot):
ulang = get_user_lang(satori_utils.get_user_id(event)) ulang = get_user_lang(event_utils.get_user_id(event))
if ulang.lang_code in status_card_cache: if ulang.lang_code in status_card_cache:
image = status_card_cache[ulang.lang_code] image = status_card_cache[ulang.lang_code]
else: else:

View File

@ -9,7 +9,7 @@ from liteyuki.utils.base.language import Language, change_user_lang, get_all_lan
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message.message import MarkdownMessage as md from liteyuki.utils.message.message import MarkdownMessage as md
from .const import representative_timezones_list from .const import representative_timezones_list
from ...utils import satori_utils from liteyuki.utils import event as event_utils
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import Alconna, Args, Arparma, Subcommand, on_alconna from nonebot_plugin_alconna import Alconna, Args, Arparma, Subcommand, on_alconna
@ -42,13 +42,13 @@ class Profile(LiteModel):
@profile_alc.handle() @profile_alc.handle()
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot): async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
user: User = user_db.where_one(User(), "user_id = ?", satori_utils.get_user_id(event), user: User = user_db.where_one(User(), "user_id = ?", event_utils.get_user_id(event),
default=User(user_id=str(satori_utils.get_user_id(event)))) default=User(user_id=str(event_utils.get_user_id(event))))
ulang = get_user_lang(str(satori_utils.get_user_id(event))) ulang = get_user_lang(str(event_utils.get_user_id(event)))
if result.subcommands.get("set"): if result.subcommands.get("set"):
if result.subcommands["set"].args.get("value"): if result.subcommands["set"].args.get("value"):
# 对合法性进行校验后设置 # 对合法性进行校验后设置
r = set_profile(result.args["key"], result.args["value"], str(satori_utils.get_user_id(event))) r = set_profile(result.args["key"], result.args["value"], str(event_utils.get_user_id(event)))
if r: if r:
user.profile[result.args["key"]] = result.args["value"] user.profile[result.args["key"]] = result.args["value"]
user_db.save(user) # 数据库保存 user_db.save(user) # 数据库保存

View File

@ -11,43 +11,46 @@ from liteyuki.utils.base.data_manager import User, user_db
from liteyuki.utils.base.language import Language, get_user_lang from liteyuki.utils.base.language import Language, get_user_lang
from liteyuki.utils.base.resource import get_path from liteyuki.utils.base.resource import get_path
from liteyuki.utils.message.html_tool import template2image from liteyuki.utils.message.html_tool import template2image
from ...utils import satori_utils from liteyuki.utils import event as event_utils
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import on_alconna, Alconna, Args, MultiVar, Arparma from nonebot_plugin_alconna import on_alconna, Alconna, Args, MultiVar, Arparma, UniMessage
wx_alc = on_alconna(
@on_alconna(
aliases={"天气"}, aliases={"天气"},
command=Alconna( command=Alconna(
"weather", "weather",
Args["keywords", MultiVar(str), []], Args["keywords", MultiVar(str), []],
), ),
).handle() )
@wx_alc.handle()
async def _(result: Arparma, event: T_MessageEvent, matcher: Matcher): async def _(result: Arparma, event: T_MessageEvent, matcher: Matcher):
"""await alconna.send("weather", city)""" """await alconna.send("weather", city)"""
kws = result.main_args.get("keywords") kws = result.main_args.get("keywords")
image = await get_weather_now_card(matcher, event, kws) image = await get_weather_now_card(matcher, event, kws)
await wx_alc.finish(UniMessage.image(raw=image))
@on_endswith(("天气", "weather")).handle()
async def _(event: T_MessageEvent, matcher: Matcher):
"""await alconna.send("weather", city)"""
# kws = event.message.extract_plain_text()
kws = event.get_plaintext()
image = await get_weather_now_card(matcher, event, [kws.replace("天气", "").replace("weather", "")], False)
if isinstance(event, satori.event.Event): if isinstance(event, satori.event.Event):
await matcher.finish(satori.MessageSegment.image(raw=image, mime="image/png")) await matcher.finish(satori.MessageSegment.image(raw=image, mime="image/png"))
else: else:
await matcher.finish(MessageSegment.image(image)) await matcher.finish(MessageSegment.image(image))
@on_endswith(("天气", "weather")).handle()
async def _(event: T_MessageEvent, matcher: Matcher):
"""await alconna.send("weather", city)"""
kws = event.message.extract_plain_text()
image = await get_weather_now_card(matcher, event, [kws.replace("天气", "").replace("weather", "")], False)
await matcher.finish(MessageSegment.image(image))
async def get_weather_now_card(matcher: Matcher, event: T_MessageEvent, keyword: list[str], tip: bool = True): async def get_weather_now_card(matcher: Matcher, event: T_MessageEvent, keyword: list[str], tip: bool = True):
ulang = get_user_lang(satori_utils.get_user_id(event)) ulang = get_user_lang(event_utils.get_user_id(event))
qw_lang = get_qw_lang(ulang.lang_code) qw_lang = get_qw_lang(ulang.lang_code)
key = get_config("weather_key") key = get_config("weather_key")
is_dev = get_memory_data("weather.is_dev", True) is_dev = get_memory_data("weather.is_dev", True)
user: User = user_db.where_one(User(), "user_id = ?", satori_utils.get_user_id(event), default=User()) user: User = user_db.where_one(User(), "user_id = ?", event_utils.get_user_id(event), default=User())
# params # params
unit = user.profile.get("unit", "m") unit = user.profile.get("unit", "m")
stored_location = user.profile.get("location", None) stored_location = user.profile.get("location", None)

View File

@ -0,0 +1,14 @@
from . import (
satori,
onebot
)
def init(config: dict):
onebot.init()
satori.init(config)
def register():
onebot.register()
satori.register()

View File

@ -0,0 +1,12 @@
import nonebot
from nonebot.adapters.onebot import v11, v12
def init():
pass
def register():
driver = nonebot.get_driver()
driver.register_adapter(v11.Adapter)
driver.register_adapter(v12.Adapter)

View File

@ -0,0 +1,26 @@
import json
import os
import nonebot
from nonebot.adapters import satori
def init(config: dict):
if config.get("satori", None) is None:
nonebot.logger.info("Satori config not found, skip Satori init.")
return None
satori_config = config.get("satori")
if not satori_config.get("enable", False):
nonebot.logger.info("Satori not enabled, skip Satori init.")
return None
if os.getenv("SATORI_CLIENTS", None) is not None:
nonebot.logger.info("Satori clients already set in environment variable, skip.")
os.environ["SATORI_CLIENTS"] = json.dumps(satori_config.get("hosts", []), ensure_ascii=False)
config['satori_clients'] = satori_config.get("hosts", [])
return
def register():
if os.getenv("SATORI_CLIENTS", None) is not None:
driver = nonebot.get_driver()
driver.register_adapter(satori.Adapter)

View File

@ -1,4 +1,5 @@
import os import os
from typing import List
import nonebot import nonebot
import yaml import yaml
@ -11,12 +12,26 @@ from ..message.tools import random_hex_string
config = {} # 全局配置,确保加载后读取 config = {} # 全局配置,确保加载后读取
class SatoriNodeConfig(BaseModel):
host: str = ""
port: str = "5500"
path: str = ""
token: str = ""
class SatoriConfig(BaseModel):
comment: str = "These features are still in development. Do not enable in production environment."
enable: bool = False
hosts: List[SatoriNodeConfig] = [SatoriNodeConfig()]
class BasicConfig(BaseModel): class BasicConfig(BaseModel):
host: str = "127.0.0.1" host: str = "127.0.0.1"
port: int = 20216 port: int = 20216
superusers: list[str] = [] superusers: list[str] = []
command_start: list[str] = ["/", ""] command_start: list[str] = ["/", ""]
nickname: list[str] = [f"LiteyukiBot-{random_hex_string(6)}"] nickname: list[str] = [f"LiteyukiBot-{random_hex_string(6)}"]
satori: SatoriConfig = SatoriConfig()
def load_from_yaml(file: str) -> dict: def load_from_yaml(file: str) -> dict:

View File

@ -4,5 +4,5 @@ from nonebot.adapters import satori
T_Bot = v11.Bot | v12.Bot | satori.Bot T_Bot = v11.Bot | v12.Bot | satori.Bot
T_GroupMessageEvent = v11.GroupMessageEvent | v12.GroupMessageEvent T_GroupMessageEvent = v11.GroupMessageEvent | v12.GroupMessageEvent
T_PrivateMessageEvent = v11.PrivateMessageEvent | v12.PrivateMessageEvent T_PrivateMessageEvent = v11.PrivateMessageEvent | v12.PrivateMessageEvent
T_MessageEvent = v11.MessageEvent | v12.MessageEvent # | satori.MessageEvent T_MessageEvent = v11.MessageEvent | v12.MessageEvent | satori.MessageEvent
T_Message = v11.Message | v12.Message | satori.Message T_Message = v11.Message | v12.Message | satori.Message

View File

@ -0,0 +1,6 @@
from .auto_set_env import auto_set_env
def init(config: dict):
auto_set_env(config)
return

View File

@ -0,0 +1,21 @@
import os
import dotenv
import nonebot
from .defines import *
def auto_set_env(config: dict):
dotenv.load_dotenv(".env")
if os.getenv("DRIVER", None) is not None:
print(os.getenv("DRIVER"))
nonebot.logger.info("Driver already set in environment variable, skip auto configure.")
return
if config.get("satori", {'enable': False}).get("enable", False):
os.environ["DRIVER"] = get_driver_string(ASGI_DRIVER, HTTPX_DRIVER, WEBSOCKETS_DRIVER)
nonebot.logger.info("Enable Satori, set driver to ASGI+HTTPX+WEBSOCKETS")
else:
os.environ["DRIVER"] = get_driver_string(ASGI_DRIVER)
nonebot.logger.info("Disable Satori, set driver to ASGI")
return

View File

@ -0,0 +1,17 @@
ASGI_DRIVER = "~fastapi"
HTTPX_DRIVER = "~httpx"
WEBSOCKETS_DRIVER = "~websockets"
def get_driver_string(*argv):
output_string = ""
if ASGI_DRIVER in argv:
output_string += ASGI_DRIVER
for arg in argv:
if arg != ASGI_DRIVER:
output_string = f"{output_string}+{arg}"
return output_string
def get_driver_full_string(*argv):
return f"DRIVER={get_driver_string(argv)}"

View File

@ -0,0 +1 @@
from .get_info import *

View File

@ -15,3 +15,10 @@ def get_group_id(event: T_MessageEvent):
return event.guild.id return event.guild.id
else: else:
return event.group_id return event.group_id
def get_message_type(event: T_MessageEvent) -> str:
if isinstance(event, satori.event.Event):
return "private" if event.guild is None else "group"
else:
return event.message_type

View File

@ -1,5 +1,3 @@
from .user_info import user_infos from .user_info import user_infos
from .get_message_type import get_message_type
from .event_tools import *
from .count_friends import count_friends from .count_friends import count_friends
from .count_groups import count_groups from .count_groups import count_groups

View File

@ -1,10 +0,0 @@
from nonebot.adapters import satori
from liteyuki.utils.base.ly_typing import T_MessageEvent
def get_message_type(event: T_MessageEvent) -> str:
if isinstance(event, satori.event.Event):
return "private" if event.guild is None else "group"
else:
return event.message_type

View File

@ -28,8 +28,34 @@ class UserInfo:
except KeyError: except KeyError:
return None return None
async def put(self, user: User): async def put(self, user: User) -> bool:
"""
向用户信息数据库中添加/修改一项返回值仅代表数据是否变更不代表操作是否成功
Args:
user: 要加入数据库的用户
Returns: 当数据库中用户信息发生变化时返回 True, 否则返回 False
"""
try:
old_user: User = self.user_infos[str(user.id)]
attr_edited = False
if user.name is not None:
if old_user.name != user.name:
attr_edited = True
self.user_infos[str(user.id)].name = user.name
if user.nick is not None:
if old_user.nick != user.nick:
attr_edited = True
self.user_infos[str(user.id)].nick = user.nick
if user.avatar is not None:
if old_user.avatar != user.avatar:
attr_edited = True
self.user_infos[str(user.id)].avatar = user.avatar
return attr_edited
except KeyError:
self.user_infos[str(user.id)] = user self.user_infos[str(user.id)] = user
return True
def __init__(self): def __init__(self):
pass pass

22
main.py
View File

@ -1,7 +1,7 @@
import nonebot import nonebot
from nonebot.adapters.onebot import v11, v12 from nonebot.adapters.onebot import v11, v12
from nonebot.adapters import satori from nonebot.adapters import satori
from liteyuki.utils import init from liteyuki.utils import init, driver_manager, adapter_manager
from liteyuki.utils.base.config import load_from_yaml from liteyuki.utils.base.config import load_from_yaml
from liteyuki.utils.base.data_manager import StoredConfig, common_db from liteyuki.utils.base.data_manager import StoredConfig, common_db
from liteyuki.utils.base.ly_api import liteyuki_api from liteyuki.utils.base.ly_api import liteyuki_api
@ -11,16 +11,22 @@ if __name__ == "__mp_main__":
store_config: dict = common_db.where_one(StoredConfig(), default=StoredConfig()).config store_config: dict = common_db.where_one(StoredConfig(), default=StoredConfig()).config
static_config = load_from_yaml("config.yml") static_config = load_from_yaml("config.yml")
store_config.update(static_config) store_config.update(static_config)
driver_manager.init(config=store_config)
adapter_manager.init(store_config)
nonebot.init(**store_config) nonebot.init(**store_config)
if not store_config.get("enable_satori", False): adapter_manager.register()
adapters = [v11.Adapter, v12.Adapter]
else:
adapters = [v11.Adapter, v12.Adapter, satori.Adapter]
driver = nonebot.get_driver()
for adapter in adapters: # print(nonebot.get_adapters()['Satori'].__dict__)
driver.register_adapter(adapter)
# if not store_config.get("enable_satori", False):
# adapters = [v11.Adapter, v12.Adapter]
# else:
# adapters = [v11.Adapter, v12.Adapter, satori.Adapter]
# driver = nonebot.get_driver()
# # print(driver.__dict__)
# for adapter in adapters:
# driver.register_adapter(adapter)
try: try:
nonebot.load_plugin("liteyuki.liteyuki_main") nonebot.load_plugin("liteyuki.liteyuki_main")

View File

@ -23,3 +23,7 @@ importlib_metadata~=7.0.2
requests~=2.31.0 requests~=2.31.0
watchdog~=4.0.0 watchdog~=4.0.0
pillow~=10.2.0 pillow~=10.2.0
pip~=23.2.1
fastapi~=0.110.0
python-dotenv~=1.0.1