Merge pull request #37 from expliyh/satori

试着添加了对于 NoneBot-Adapter-Satori 的支持
This commit is contained in:
Snowykami 2024-05-16 23:31:54 +08:00 committed by GitHub
commit 06a109d2b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 481 additions and 209 deletions

1
.gitignore vendored
View File

@ -35,3 +35,4 @@ prompt.txt
# js
**/echarts.js
.env

View File

@ -5,6 +5,7 @@ from typing import Any
import nonebot
import pip
from nonebot import Bot, get_driver, require
from nonebot.adapters import satori
from nonebot.adapters.onebot.v11 import Message, escape, unescape
from nonebot.exception import MockApiException
from nonebot.internal.matcher import Matcher
@ -16,6 +17,7 @@ from liteyuki.utils.base.language import get_user_lang
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message.message import MarkdownMessage as md, broadcast_to_superusers
from liteyuki.utils.base.reloader import Reloader
from liteyuki.utils import satori_utils
from .api import update_liteyuki
require("nonebot_plugin_alconna")
@ -35,6 +37,7 @@ markdown_image = common_db.where_one(StoredConfig(), default=StoredConfig()).con
),
permission=SUPERUSER
).handle()
# Satori OK
async def _(bot: T_Bot, matcher: Matcher, result: Arparma):
if result.main_args.get("text"):
await matcher.finish(Message(unescape(result.main_args.get("text"))))
@ -49,9 +52,11 @@ async def _(bot: T_Bot, matcher: Matcher, result: Arparma):
),
permission=SUPERUSER
).handle()
# Satori OK
async def _(bot: T_Bot, event: T_MessageEvent):
# 使用git pull更新
ulang = get_user_lang(str(event.user_id))
ulang = get_user_lang(str(event.user.id if isinstance(event, satori.event.Event) else event.user_id))
success, logs = update_liteyuki()
reply = "Liteyuki updated!\n"
reply += f"```\n{logs}\n```\n"
@ -68,6 +73,7 @@ async def _(bot: T_Bot, event: T_MessageEvent):
),
permission=SUPERUSER
).handle()
# Satori OK
async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent):
await matcher.send("Liteyuki reloading")
temp_data = common_db.where_one(TempConfig(), default=TempConfig())
@ -77,8 +83,8 @@ async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent):
"reload": True,
"reload_time": time.time(),
"reload_bot_id": bot.self_id,
"reload_session_type": event.message_type,
"reload_session_id" : event.group_id if event.message_type == "group" else event.user_id,
"reload_session_type": satori_utils.get_message_type(event),
"reload_session_id": (event.group_id if event.message_type == "group" else event.user_id) if not isinstance(event,satori.event.Event) else event.channel.id,
"delta_time": 0
}
)
@ -110,8 +116,9 @@ async def _(matcher: Matcher, bot: T_Bot, event: T_MessageEvent):
),
permission=SUPERUSER
).handle()
# Satori OK
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, matcher: Matcher):
ulang = get_user_lang(str(event.user_id))
ulang = get_user_lang(str(satori_utils.get_user_id(event)))
stored_config: StoredConfig = common_db.where_one(StoredConfig(), default=StoredConfig())
if result.subcommands.get("set"):
key, value = result.subcommands.get("set").args.get("key"), result.subcommands.get("set").args.get("value")
@ -157,15 +164,17 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, matcher: Matcher
),
permission=SUPERUSER
).handle()
# Satori OK
async def _(event: T_MessageEvent, matcher: Matcher):
global markdown_image
# 切换图片模式False以图片形式发送True以markdown形式发送
ulang = get_user_lang(str(event.user_id))
ulang = get_user_lang(str(satori_utils.get_user_id(event)))
stored_config: StoredConfig = common_db.where_one(StoredConfig(), default=StoredConfig())
stored_config.config["markdown_image"] = not stored_config.config.get("markdown_image", False)
markdown_image = stored_config.config["markdown_image"]
common_db.save(stored_config)
await matcher.finish(ulang.get("liteyuki.image_mode_on" if stored_config.config["markdown_image"] else "liteyuki.image_mode_off"))
await matcher.finish(
ulang.get("liteyuki.image_mode_on" if stored_config.config["markdown_image"] else "liteyuki.image_mode_off"))
@on_alconna(
@ -174,6 +183,7 @@ async def _(event: T_MessageEvent, matcher: Matcher):
),
aliases={"轻雪文档"},
).handle()
# Satori OK
async def _(matcher: Matcher):
await matcher.finish("https://bot.liteyuki.icu/usage")
@ -226,7 +236,8 @@ async def _(result: Arparma, bot: T_Bot, event: T_MessageEvent, matcher: Matcher
@Bot.on_calling_api # 图片模式检测
async def test_for_md_image(bot: T_Bot, api: str, data: dict):
# 截获大图发送转换为markdown发送
if api in ["send_msg", "send_private_msg", "send_group_msg"] and markdown_image and data.get("user_id") != bot.self_id:
if api in ["send_msg", "send_private_msg", "send_group_msg"] and markdown_image and data.get(
"user_id") != bot.self_id:
if api == "send_msg" and data.get("message_type") == "private" or api == "send_private_msg":
session_type = "private"
session_id = data.get("user_id")
@ -239,10 +250,12 @@ async def test_for_md_image(bot: T_Bot, api: str, data: dict):
file: str = data["message"][0].data.get("file")
# file:// http:// base64://
if file.startswith("http"):
result = await md.send_md(await md.image_async(file), bot, message_type=session_type, session_id=session_id)
result = await md.send_md(await md.image_async(file), bot, message_type=session_type,
session_id=session_id)
elif file.startswith("file"):
file = file.replace("file://", "")
result = await md.send_image(open(file, "rb").read(), bot, message_type=session_type, session_id=session_id)
result = await md.send_image(open(file, "rb").read(), bot, message_type=session_type,
session_id=session_id)
elif file.startswith("base64"):
file_bytes = base64.b64decode(file.replace("base64://", ""))
result = await md.send_image(file_bytes, bot, message_type=session_type, session_id=session_id)
@ -269,6 +282,8 @@ async def on_shutdown():
@driver.on_bot_connect
async def _(bot: T_Bot):
temp_data = common_db.where_one(TempConfig(), default=TempConfig())
if isinstance(bot, satori.Bot):
await satori_utils.user_infos.load_friends(bot)
# 用于重启计时
if temp_data.data.get("reload", False):
temp_data.data["reload"] = False
@ -279,6 +294,12 @@ async def _(bot: T_Bot):
reload_session_id = temp_data.data.get("reload_session_id", 0)
delta_time = temp_data.data.get("delta_time", 0)
common_db.save(temp_data) # 更新数据
if isinstance(bot,satori.Bot):
await bot.send_message(
channel_id=reload_session_id,
message="Liteyuki reloaded in %.2f s" % delta_time
)
else:
await bot.call_api(
"send_msg",
message_type=reload_session_type,

View File

@ -3,7 +3,9 @@ from typing import Optional
import aiofiles
import nonebot.plugin
from nonebot.adapters import satori
from liteyuki.utils import satori_utils
from liteyuki.utils.base.data import LiteModel
from liteyuki.utils.base.data_manager import GlobalPlugin, Group, User, group_db, plugin_db, user_db
from liteyuki.utils.base.ly_typing import T_MessageEvent
@ -95,16 +97,23 @@ def get_plugin_session_enable(event: T_MessageEvent, plugin_name: str) -> bool:
Returns:
bool: 插件当前状态
"""
if event.message_type == "group":
group_id = str(event.group_id)
if isinstance(event, satori.event.Event):
if event.guild is not None:
message_type = "group"
else:
message_type = "private"
else:
message_type = event.message_type
if message_type == "group":
group_id = str(event.guild.id if isinstance(event, satori.event.Event) else event.group_id)
if group_id not in __group_data:
group: Group = group_db.where_one(Group(), "group_id = ?", group_id, default=Group(group_id=group_id))
__group_data[str(event.group_id)] = group
__group_data[str(group_id)] = group
session = __group_data[group_id]
else:
# session: User = user_db.first(User(), "user_id = ?", event.user_id, default=User(user_id=str(event.user_id)))
user_id = str(event.user_id)
user_id = str(event.user.id if isinstance(event, satori.event.Event) else event.user_id)
if user_id not in __user_data:
user: User = user_db.where_one(User(), "user_id = ?", user_id, default=User(user_id=user_id))
__user_data[user_id] = user
@ -131,10 +140,12 @@ def set_plugin_session_enable(event: T_MessageEvent, plugin_name: str, enable: b
Returns:
"""
if event.message_type == "group":
session = group_db.where_one(Group(), "group_id = ?", str(event.group_id), default=Group(group_id=str(event.group_id)))
if satori_utils.get_message_type(event) == "group":
session = group_db.where_one(Group(), "group_id = ?", str(satori_utils.get_group_id(event)),
default=Group(group_id=str(satori_utils.get_group_id(event))))
else:
session = user_db.where_one(User(), "user_id = ?", str(event.user_id), default=User(user_id=str(event.user_id)))
session = user_db.where_one(User(), "user_id = ?", str(satori_utils.get_user_id(event)),
default=User(user_id=str(satori_utils.get_user_id(event))))
default_enable = get_plugin_default_enable(plugin_name)
if default_enable:
if enable:
@ -147,12 +158,12 @@ def set_plugin_session_enable(event: T_MessageEvent, plugin_name: str, enable: b
else:
session.enabled_plugins.remove(plugin_name)
if event.message_type == "group":
__group_data[str(event.group_id)] = session
if satori_utils.get_message_type(event) == "group":
__group_data[str(satori_utils.get_group_id(event))] = session
print(session)
group_db.save(session)
else:
__user_data[str(event.user_id)] = session
__user_data[str(satori_utils.get_user_id(event))] = session
user_db.save(session)

View File

@ -0,0 +1,16 @@
from nonebot.plugin import PluginMetadata
from .auto_update import *
__author__ = "expliyh"
__plugin_meta__ = PluginMetadata(
name="Satori 用户数据自动更新(临时措施)",
description="",
usage="",
type="application",
homepage="https://github.com/snowykami/LiteyukiBot",
extra={
"liteyuki": True,
"toggleable" : True,
"default_enable" : True,
}
)

View File

@ -0,0 +1,22 @@
from liteyuki.utils import satori_utils
from nonebot.message import event_preprocessor
# from nonebot_plugin_alconna.typings import Event
from liteyuki.utils.base.ly_typing import T_MessageEvent
from liteyuki.utils import satori_utils
from nonebot.adapters import satori
from nonebot_plugin_alconna.typings import Event
from liteyuki.plugins.liteyuki_status.counter_for_satori import satori_counter
@event_preprocessor
async def pre_handle(event: Event):
print("UPDATE_USER")
print(event.__dict__)
if isinstance(event, satori.MessageEvent):
if event.user.id == event.self_id:
satori_counter.msg_sent += 1
else:
satori_counter.msg_received += 1
if event.user.name is not None:
await satori_utils.user_infos.put(event.user)
print(event.user)

View File

@ -9,7 +9,21 @@ from liteyuki.utils.message.npl import convert_seconds_to_time
from contextvars import ContextVar
async def get_stat_msg_image(duration: int, period: int, group_id: str = None, bot_id: str = None, ulang: Language = Language()) -> bytes:
async def count_msg_by_bot_id(bot_id: str) -> int:
condition = " AND bot_id = ?"
condition_args = [bot_id]
msg_rows = msg_db.where_all(
MessageEventModel(),
condition,
*condition_args
)
return len(msg_rows)
async def get_stat_msg_image(duration: int, period: int, group_id: str = None, bot_id: str = None,
ulang: Language = Language()) -> bytes:
"""
获取统计消息
Args:

View File

@ -1,6 +1,7 @@
from nonebot import Bot, require
from liteyuki.utils.message.npl import convert_duration, convert_time_to_seconds
from .stat_api import *
from ...utils import satori_utils
from ...utils.base.language import Language
from ...utils.base.ly_typing import T_MessageEvent
@ -44,7 +45,7 @@ stat_msg = on_alconna(
@stat_msg.assign("message")
async def _(result: Arparma, event: T_MessageEvent, bot: Bot):
ulang = Language(event.user_id)
ulang = Language(satori_utils.get_user_id(event))
try:
duration = convert_time_to_seconds(result.other_args.get("duration", "2d")) # 秒数
@ -57,7 +58,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: Bot):
bot_id = result.other_args.get("bot_id")
if group_id in ["current", "c"]:
group_id = str(event.group_id)
group_id = str(satori_utils.get_group_id(event))
if group_id in ["all", "a"]:
group_id = "all"

View File

@ -4,17 +4,24 @@ from nonebot import require
from nonebot.message import event_postprocessor
from liteyuki.utils.base.data import Database, LiteModel
from liteyuki.utils.base.ly_typing import v11
from liteyuki.utils.base.ly_typing import v11, satori
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent
from .common import MessageEventModel, msg_db
from ...utils import satori_utils
require("nonebot_plugin_alconna")
@event_postprocessor
async def general_event_monitor(bot: T_Bot, event: T_MessageEvent):
if isinstance(bot, satori.Bot):
return await satori_event_monitor(bot, event)
else:
return await onebot_v11_event_monitor(bot, event)
async def onebot_v11_event_monitor(bot: v11.Bot, event: v11.MessageEvent):
if event.message_type == "group":
event: v11.GroupMessageEvent
@ -36,3 +43,26 @@ async def onebot_v11_event_monitor(bot: v11.Bot, event: v11.MessageEvent):
message_type=event.message_type,
)
msg_db.save(mem)
async def satori_event_monitor(bot: satori.Bot, event: satori.MessageEvent):
if event.guild is not None:
event: satori.MessageEvent
group_id = str(event.guild.id)
else:
group_id = ""
mem = MessageEventModel(
time=int(time.time()),
bot_id=bot.self_id,
adapter="satori",
group_id=group_id,
user_id=str(event.user.id),
message_id=str(event.message.id),
message=event.message,
message_text=event.message.content,
message_type=satori_utils.get_message_type(event),
)
msg_db.save(mem)

View File

@ -5,12 +5,16 @@ import nonebot
import psutil
from cpuinfo import cpuinfo
from nonebot import require
from nonebot.adapters import satori
from liteyuki.utils import __NAME__, __VERSION__
from liteyuki.utils.base.config import get_config
from liteyuki.utils.base.data_manager import TempConfig, common_db
from liteyuki.utils.base.language import Language
from liteyuki.utils.base.resource import get_loaded_resource_packs, get_path
from liteyuki.utils.message.html_tool import template2image
from liteyuki.utils import satori_utils
from .counter_for_satori import satori_counter
require("nonebot_plugin_apscheduler")
from nonebot_plugin_apscheduler import scheduler
@ -77,7 +81,8 @@ async def refresh_status_card():
)
async def generate_status_card(bot: dict, hardware: dict, liteyuki: dict, lang="zh-CN", bot_id="0", use_cache=False) -> bytes:
async def generate_status_card(bot: dict, hardware: dict, liteyuki: dict, lang="zh-CN", bot_id="0",
use_cache=False) -> bytes:
if not use_cache:
return await template2image(
get_path("templates/status.html", abs_path=True),
@ -143,6 +148,16 @@ async def get_bots_data(self_id: str = "0") -> dict:
status = {}
bot_name = bot_id
version_info = {}
if isinstance(bot, satori.Bot):
try:
bot_name = (await satori_utils.user_infos.get(bot.self_id)).name
groups = str(await satori_utils.count_groups(bot))
friends = str(await satori_utils.count_friends(bot))
status = {}
version_info = await bot.get_version_info()
except Exception:
pass
else:
try:
# API fetch
bot_name = (await bot.get_login_info())["nickname"]
@ -157,6 +172,9 @@ async def get_bots_data(self_id: str = "0") -> dict:
app_name = version_info.get("app_name", "UnknownImplementation")
if app_name in ["Lagrange.OneBot", "LLOneBot", "Shamrock"]:
icon = f"https://q.qlogo.cn/g?b=qq&nk={bot_id}&s=640"
elif isinstance(bot, satori.Bot):
app_name = "Satori"
icon = (await bot.login_get()).user.avatar
else:
icon = None
bot_data = {
@ -166,8 +184,8 @@ async def get_bots_data(self_id: str = "0") -> dict:
"protocol_name": protocol_names.get(version_info.get("protocol_name"), "Online"),
"groups": groups,
"friends": friends,
"message_sent" : statistics.get("message_sent", 0),
"message_received": statistics.get("message_received", 0),
"message_sent": satori_counter.msg_sent if isinstance(bot, satori.Bot) else statistics.get("message_sent", 0),
"message_received": satori_counter.msg_received if isinstance(bot, satori.Bot) else statistics.get("message_received", 0),
"app_name": app_name
}
result["bots"].append(bot_data)

View File

@ -0,0 +1,10 @@
class SatoriCounter:
msg_sent: int
msg_received: int
def __init__(self):
self.msg_sent = 0
self.msg_received = 0
satori_counter = SatoriCounter()

View File

@ -4,6 +4,7 @@ from liteyuki.utils.base.resource import get_path
from liteyuki.utils.message.html_tool import template2image
from liteyuki.utils.base.language import get_user_lang
from .api import *
from ...utils import satori_utils
from ...utils.base.ly_typing import T_Bot, T_MessageEvent
require("nonebot_plugin_alconna")
@ -27,7 +28,7 @@ status_alc = on_alconna(
@status_alc.handle()
async def _(event: T_MessageEvent, bot: T_Bot):
ulang = get_user_lang(event.user_id)
ulang = get_user_lang(satori_utils.get_user_id(event))
if ulang.lang_code in status_card_cache:
image = status_card_cache[ulang.lang_code]
else:

View File

@ -52,7 +52,6 @@ def get_uni_set() -> set:
async def pre_handle(event: Event):
try:
user_id = str(event.get_user_id())
except:
return

View File

@ -9,6 +9,7 @@ from liteyuki.utils.base.language import Language, change_user_lang, get_all_lan
from liteyuki.utils.base.ly_typing import T_Bot, T_MessageEvent
from liteyuki.utils.message.message import MarkdownMessage as md
from .const import representative_timezones_list
from ...utils import satori_utils
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import Alconna, Args, Arparma, Subcommand, on_alconna
@ -41,12 +42,13 @@ class Profile(LiteModel):
@profile_alc.handle()
async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
user: User = user_db.where_one(User(), "user_id = ?", event.user_id, default=User(user_id=str(event.user_id)))
ulang = get_user_lang(str(event.user_id))
user: User = user_db.where_one(User(), "user_id = ?", satori_utils.get_user_id(event),
default=User(user_id=str(satori_utils.get_user_id(event))))
ulang = get_user_lang(str(satori_utils.get_user_id(event)))
if result.subcommands.get("set"):
if result.subcommands["set"].args.get("value"):
# 对合法性进行校验后设置
r = set_profile(result.args["key"], result.args["value"], str(event.user_id))
r = set_profile(result.args["key"], result.args["value"], str(satori_utils.get_user_id(event)))
if r:
user.profile[result.args["key"]] = result.args["value"]
user_db.save(user) # 数据库保存

View File

@ -1,4 +1,5 @@
from nonebot import require, on_endswith
from nonebot.adapters import satori
from nonebot.adapters.onebot.v11 import MessageSegment
from nonebot.internal.matcher import Matcher
@ -10,6 +11,7 @@ from liteyuki.utils.base.data_manager import User, user_db
from liteyuki.utils.base.language import Language, get_user_lang
from liteyuki.utils.base.resource import get_path
from liteyuki.utils.message.html_tool import template2image
from ...utils import satori_utils
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import on_alconna, Alconna, Args, MultiVar, Arparma
@ -26,6 +28,9 @@ async def _(result: Arparma, event: T_MessageEvent, matcher: Matcher):
"""await alconna.send("weather", city)"""
kws = result.main_args.get("keywords")
image = await get_weather_now_card(matcher, event, kws)
if isinstance(event, satori.event.Event):
await matcher.finish(satori.MessageSegment.image(raw=image, mime="image/png"))
else:
await matcher.finish(MessageSegment.image(image))
@ -38,11 +43,11 @@ async def _(event: T_MessageEvent, matcher: Matcher):
async def get_weather_now_card(matcher: Matcher, event: T_MessageEvent, keyword: list[str], tip: bool = True):
ulang = get_user_lang(event.user_id)
ulang = get_user_lang(satori_utils.get_user_id(event))
qw_lang = get_qw_lang(ulang.lang_code)
key = get_config("weather_key")
is_dev = get_memory_data("weather.is_dev", True)
user: User = user_db.where_one(User(), "user_id = ?", event.user_id, default=User())
user: User = user_db.where_one(User(), "user_id = ?", satori_utils.get_user_id(event), default=User())
# params
unit = user.profile.get("unit", "m")
stored_location = user.profile.get("location", None)

View File

@ -1,7 +1,8 @@
from nonebot.adapters.onebot import v11, v12
from nonebot.adapters import satori
T_Bot = v11.Bot | v12.Bot
T_GroupMessageEvent = v11.GroupMessageEvent | v12.GroupMessageEvent
T_PrivateMessageEvent = v11.PrivateMessageEvent | v12.PrivateMessageEvent
T_MessageEvent = v11.MessageEvent | v12.MessageEvent
T_Message = v11.Message | v12.Message
T_Bot = v11.Bot | v12.Bot | satori.Bot
T_GroupMessageEvent = v11.GroupMessageEvent | v12.GroupMessageEvent | satori.MessageEvent
T_PrivateMessageEvent = v11.PrivateMessageEvent | v12.PrivateMessageEvent | satori.MessageEvent
T_MessageEvent = v11.MessageEvent | v12.MessageEvent | satori.MessageEvent
T_Message = v11.Message | v12.Message | satori.Message

View File

@ -7,6 +7,7 @@ from PIL import Image
import aiohttp
import nonebot
from nonebot import require
from nonebot.adapters import satori
from nonebot.adapters.onebot import v11
from typing import Any, Type
@ -65,8 +66,17 @@ class MarkdownMessage:
"""
formatted_md = v11.unescape(markdown).replace("\n", r"\n").replace('"', r'\\\"')
if event is not None and message_type is None:
if isinstance(event, satori.event.Event):
message_type = "private" if event.guild is None else "group"
group_id = event.guild.id if event.guild is not None else None
else:
assert event is not None
message_type = event.message_type
session_id = event.user_id if event.message_type == "private" else event.group_id
group_id = event.group_id if message_type == "group" else None
user_id = event.user.id if isinstance(event, satori.event.Event) else event.user_id
session_id = user_id if message_type == "private" else group_id
else:
pass
try:
raise TencentBannedMarkdownError("Tencent banned markdown")
forward_id = await bot.call_api(
@ -117,6 +127,13 @@ class MarkdownMessage:
width=540,
device_scale_factor=4
)
if isinstance(bot, satori.Bot):
msg_seg = satori.MessageSegment.image(raw=md_image_bytes,mime="image/png")
data = await bot.send(
event=event,
message=msg_seg
)
else:
data = await bot.send_msg(
message_type=message_type,
group_id=session_id,
@ -164,7 +181,8 @@ class MarkdownMessage:
if method == 2:
base64_string = base64.b64encode(image).decode("utf-8")
data = await bot.call_api("upload_image", file=f"base64://{base64_string}")
await MarkdownMessage.send_md(MarkdownMessage.image(data, Image.open(io.BytesIO(image)).size), bot, event=event, message_type=message_type,
await MarkdownMessage.send_md(MarkdownMessage.image(data, Image.open(io.BytesIO(image)).size), bot,
event=event, message_type=message_type,
session_id=session_id, **kwargs)
# 其他实现端方案
@ -178,7 +196,8 @@ class MarkdownMessage:
image_url = (await bot.get_msg(message_id=image_message_id))["message"][0]["data"]["url"]
image_size = Image.open(io.BytesIO(image)).size
image_md = MarkdownMessage.image(image_url, image_size)
return await MarkdownMessage.send_md(image_md, bot, message_type=message_type, session_id=session_id, event=event, **kwargs)
return await MarkdownMessage.send_md(image_md, bot, message_type=message_type, session_id=session_id,
event=event, **kwargs)
if data is None:
data = await bot.send_msg(

View File

@ -0,0 +1,5 @@
from .user_info import user_infos
from .get_message_type import get_message_type
from .event_tools import *
from .count_friends import count_friends
from .count_groups import count_groups

View File

@ -0,0 +1,13 @@
from nonebot.adapters import satori
async def count_friends(bot: satori.Bot) -> int:
cnt: int = 0
friend_response = await bot.friend_list()
while friend_response.next is not None:
cnt += len(friend_response.data)
friend_response = await bot.friend_list(next_token=friend_response.next)
cnt += len(friend_response.data)
return cnt - 1

View File

@ -0,0 +1,13 @@
from nonebot.adapters import satori
async def count_groups(bot: satori.Bot) -> int:
cnt: int = 0
group_response = await bot.guild_list()
while group_response.next is not None:
cnt += len(group_response.data)
group_response = await bot.friend_list(next_token=group_response.next)
cnt += len(group_response.data)
return cnt - 1

View File

@ -0,0 +1,17 @@
from nonebot.adapters import satori
from liteyuki.utils.base.ly_typing import T_MessageEvent
def get_user_id(event: T_MessageEvent):
if isinstance(event, satori.event.Event):
return event.user.id
else:
return event.user_id
def get_group_id(event: T_MessageEvent):
if isinstance(event, satori.event.Event):
return event.guild.id
else:
return event.group_id

View File

@ -0,0 +1,10 @@
from nonebot.adapters import satori
from liteyuki.utils.base.ly_typing import T_MessageEvent
def get_message_type(event: T_MessageEvent) -> str:
if isinstance(event, satori.event.Event):
return "private" if event.guild is None else "group"
else:
return event.message_type

View File

@ -0,0 +1,38 @@
import nonebot
from nonebot.adapters import satori
from nonebot.adapters.satori.models import User
class UserInfo:
user_infos: dict = {}
async def load_friends(self, bot: satori.Bot):
nonebot.logger.info("Update user info from friends")
friend_response = await bot.friend_list()
while friend_response.next is not None:
for i in friend_response.data:
i: User = i
self.user_infos[str(i.id)] = i
friend_response = await bot.friend_list(next_token=friend_response.next)
for i in friend_response.data:
i: User = i
self.user_infos[str(i.id)] = i
nonebot.logger.info("Finish update user info")
async def get(self, uid: int | str) -> User | None:
try:
return self.user_infos[str(uid)]
except KeyError:
return None
async def put(self, user: User):
self.user_infos[str(user.id)] = user
def __init__(self):
pass
user_infos = UserInfo()

View File

@ -1,5 +1,6 @@
import nonebot
from nonebot.adapters.onebot import v11, v12
from nonebot.adapters import satori
from liteyuki.utils import init
from liteyuki.utils.base.config import load_from_yaml
from liteyuki.utils.base.data_manager import StoredConfig, common_db
@ -11,7 +12,10 @@ if __name__ == "__mp_main__":
static_config = load_from_yaml("config.yml")
store_config.update(static_config)
nonebot.init(**store_config)
if not store_config['enable_satori']:
adapters = [v11.Adapter, v12.Adapter]
else:
adapters = [v11.Adapter, v12.Adapter, satori.Adapter]
driver = nonebot.get_driver()
for adapter in adapters:

View File

@ -9,6 +9,7 @@ nonebot-plugin-htmlrender~=0.3.1
nonebot-adapter-onebot~=2.4.3
nonebot-plugin-alconna~=0.43.0
nonebot_plugin_apscheduler~=0.4.0
nonebot-adapter-satori~=0.11.5
packaging~=23.1
psutil~=5.9.8
py-cpuinfo~=9.0.0