From b09671093d68c06aa69e3897c8b7855e844876fb Mon Sep 17 00:00:00 2001 From: Asankilp Date: Thu, 24 Oct 2024 23:52:09 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E6=A4=8Dnb2=20localstore=E6=8F=92?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E5=AE=9E=E7=8E=B0=E5=A4=B8=E8=B5=9E=E5=90=8D?= =?UTF-8?q?=E5=8D=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 7 +- marshoai/__plugin__.py | 25 +++--- marshoai/config.py | 4 +- marshoai/constants.py | 5 +- marshoai/localstore/__init__.py | 110 ++++++++++++++++++++++++ marshoai/localstore/source.py | 146 ++++++++++++++++++++++++++++++++ marshoai/util.py | 120 ++++++++++++++------------ 7 files changed, 343 insertions(+), 74 deletions(-) create mode 100644 marshoai/localstore/__init__.py create mode 100644 marshoai/localstore/source.py diff --git a/README.md b/README.md index 720d4f4..c74da38 100644 --- a/README.md +++ b/README.md @@ -58,13 +58,16 @@ _✨ 使用 Azure OpenAI 推理服务的聊天机器人(施工中) ✨_ | MARSHOAI_PROMPT | 否 | 猫娘 Marsho 人设提示词 | Marsho 的基本系统提示词 | | MARSHOAI_ADDITIONAL_PROMPT | 否 | 无 | Marsho 的扩展系统提示词 | | MARSHOAI_POKE_SUFFIX | 否 | `揉了揉你的猫耳` | 对 Marsho 所连接的 OneBot 用户进行双击戳一戳时,构建的聊天内容。此配置项为空字符串时,戳一戳响应功能会被禁用。例如,默认值构建的聊天内容将为`*[昵称]揉了揉你的猫耳`。 | -| MARSHOAI_ENABLE_PRAISES | 否 | `true` | 是否启用夸赞名单功能(未实现) | +| MARSHOAI_ENABLE_PRAISES | 否 | `true` | 是否启用夸赞名单功能 | | MARSHOAI_ENABLE_TIME_PROMPT | 否 | `true` | 是否启用实时更新的日期与时间(精确到秒)与农历日期系统提示词 | | MARSHOAI_AZURE_ENDPOINT | 否 | `https://models.inference.ai.azure.com` | 调用 Azure OpenAI 服务的 API 终结点 | | MARSHOAI_TEMPERATURE | 否 | 无 | 进行推理时的温度参数 | | MARSHOAI_TOP_P | 否 | 无 | 进行推理时的核采样参数 | | MARSHOAI_MAX_TOKENS | 否 | 无 | 返回消息的最大 token 数 | -## © 版权说明 +## ❤ 鸣谢&版权说明 +本项目使用了以下项目的代码: +- [nonebot-plugin-localstore](https://github.com/nonebot/plugin-localstore) + "Marsho" logo 由 [@Asankilp](https://github.com/Asankilp) 绘制,基于 [CC BY-NC-SA 4.0](http://creativecommons.org/licenses/by-nc-sa/4.0/) 许可下提供。 "Melobot" logo 由 [@mldkouo](https://github.com/mldkouo) 绘制,版权归属于 [@Meloland](https://github.com/meloland)。 \ No newline at end of file diff --git a/marshoai/__plugin__.py b/marshoai/__plugin__.py index c961cf4..e0b24dc 100644 --- a/marshoai/__plugin__.py +++ b/marshoai/__plugin__.py @@ -14,7 +14,9 @@ from .config import Config from .util import * from .models import MarshoContext from .checkers import superuser_checker, PokeMarshoChecker +from .localstore import PluginStore config = Config() +store = PluginStore(PLUGIN_NAME) model_name = config.marshoai_default_model context = MarshoContext() token = config.marshoai_token @@ -24,7 +26,9 @@ client = ChatCompletionsClient( credential=AzureKeyCredential(token) ) - +@on_command(checker=superuser_checker, cmd_start="/", cmd_sep=" ", targets="praises") +async def praises(): + await send_text(build_praises()) @on_command(checker=superuser_checker, cmd_start="/", cmd_sep=" ", targets="changemodel") async def changemodel(args: ParseArgs = Args()): @@ -34,17 +38,11 @@ async def changemodel(args: ParseArgs = Args()): @on_command(checker=superuser_checker, cmd_start="/", cmd_sep=" ", targets="contexts") async def contexts(event: Union[GroupMessageEvent, PrivateMessageEvent]): - try: - await send_text(str(context.build(event.group_id, event.is_private)[1:])) - except AttributeError: - await send_text(str(context.build(event.user_id, event.is_private)[1:])) + await send_text(str(context.build(get_target_id(event), event.is_private)[1:])) @on_start_match("reset") async def reset(event: Union[GroupMessageEvent, PrivateMessageEvent]): - try: - context.reset(event.group_id, event.is_private) - except AttributeError: - context.reset(event.user_id, event.is_private) + context.reset(get_target_id(event), event.is_private) await send_text("上下文已重置") @@ -56,16 +54,14 @@ async def marsho_main(event: Union[GroupMessageEvent, PrivateMessageEvent], is_g if event.text.lstrip("marsho") == "": await send_text(USAGE) await send_text(INTRODUCTION) + await send_text(str(store.get_plugin_data_dir())) return # await UniMessage(str(text)).send() try: is_support_image_model = model_name.lower() in SUPPORT_IMAGE_MODELS usermsg = [] if is_support_image_model else "" user_id = event.sender.user_id - try: - target_id = event.group_id - except AttributeError: - target_id = event.user_id + target_id = get_target_id(event) nickname_prompt = "" marsho_string_removed = False for i in event.get_segments("image"): @@ -109,6 +105,7 @@ async def marsho_main(event: Union[GroupMessageEvent, PrivateMessageEvent], is_g async def poke(event: PokeNotifyEvent, adapter: Adapter): # 尚未实现私聊戳一戳 QwQ #await adapter.send_custom(str(event.user_id),group_id=event.group_id) user_id = event.user_id + target_id = get_target_id(event) # nicknames = await get_nicknames() # nickname = nicknames.get(user_id, "") nickname = "" @@ -129,4 +126,4 @@ async def poke(event: PokeNotifyEvent, adapter: Adapter): # 尚未实现私聊 class MarshoAI(Plugin): version = VERSION - flows = [changemodel,marsho,reset,poke,contexts] + flows = [changemodel,marsho,reset,poke,contexts,praises] diff --git a/marshoai/config.py b/marshoai/config.py index dc72de5..d611a09 100644 --- a/marshoai/config.py +++ b/marshoai/config.py @@ -54,6 +54,6 @@ class Config: if name in self.config_items: value = os.getenv(name.upper()) return self._convert_string(value) if value is not None else self.config_items[name] - - raise AttributeError(f"'Config' object has no attribute '{name}'") + else: + return None diff --git a/marshoai/constants.py b/marshoai/constants.py index 9afc5ee..918b679 100644 --- a/marshoai/constants.py +++ b/marshoai/constants.py @@ -1,5 +1,6 @@ -__version__ = "0.3.1" +__version__ = "0.3.1.1" VERSION = __version__ +PLUGIN_NAME = "marshoai" USAGE: str = f"""MarshoAI-Melobot Beta v{__version__} by Asankilp 用法: marsho <聊天内容> : 与 Marsho 进行对话。当模型为 GPT-4o(-mini) 等时,可以带上图片进行对话。 @@ -7,6 +8,8 @@ USAGE: str = f"""MarshoAI-Melobot Beta v{__version__} by Asankilp 超级用户命令(均需要加上命令前缀使用): /changemodel <模型名> : 切换全局 AI 模型。 /contexts : 返回当前会话的上下文列表。 ※当上下文包含图片时,不要使用此命令。 + /praises : 返回夸赞名单的提示词。 + ※本AI的回答"按原样"提供,不提供任何担保。AI也会犯错,请仔细甄别回答的准确性。""" diff --git a/marshoai/localstore/__init__.py b/marshoai/localstore/__init__.py new file mode 100644 index 0000000..ed83da1 --- /dev/null +++ b/marshoai/localstore/__init__.py @@ -0,0 +1,110 @@ +# 对nonebot-plugin-localstore的简单重新实现 +from pathlib import Path +from typing import Callable, Optional +from typing_extensions import ParamSpec + + +from ..config import Config +from .source import user_data_dir, user_cache_dir, user_config_dir + +plugin_config = Config() + +P = ParamSpec("P") + +APP_NAME = "melobot" +BASE_CACHE_DIR = ( + user_cache_dir(APP_NAME).resolve() + if plugin_config.localstore_cache_dir is None + else plugin_config.localstore_cache_dir.resolve() +) +BASE_CONFIG_DIR = ( + user_config_dir(APP_NAME).resolve() + if plugin_config.localstore_config_dir is None + else plugin_config.localstore_config_dir.resolve() +) +BASE_DATA_DIR = ( + user_data_dir(APP_NAME).resolve() + if plugin_config.localstore_data_dir is None + else plugin_config.localstore_data_dir.resolve() +) + + +def _ensure_dir(path: Path) -> None: + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + elif not path.is_dir(): + raise RuntimeError(f"{path} is not a directory") + + +def _auto_create_dir(func: Callable[P, Path]) -> Callable[P, Path]: + def wrapper(*args: P.args, **kwargs: P.kwargs) -> Path: + path = func(*args, **kwargs) + _ensure_dir(path) + return path + + return wrapper + + +@_auto_create_dir +def get_cache_dir(plugin_name: Optional[str]) -> Path: + return BASE_CACHE_DIR / plugin_name if plugin_name else BASE_CACHE_DIR + + +def get_cache_file(plugin_name: Optional[str], filename: str) -> Path: + return get_cache_dir(plugin_name) / filename + + +@_auto_create_dir +def get_config_dir(plugin_name: Optional[str]) -> Path: + return BASE_CONFIG_DIR / plugin_name if plugin_name else BASE_CONFIG_DIR + + +def get_config_file(plugin_name: Optional[str], filename: str) -> Path: + return get_config_dir(plugin_name) / filename + + +@_auto_create_dir +def get_data_dir(plugin_name: Optional[str]) -> Path: + return BASE_DATA_DIR / plugin_name if plugin_name else BASE_DATA_DIR + + +def get_data_file(plugin_name: Optional[str], filename: str) -> Path: + return get_data_dir(plugin_name) / filename + + +class PluginStore(): + def __init__(self, name: str): + self.name = name + + def _get_plugin_path(self, base_dir: Path, plugin: str) -> Path: + return base_dir.joinpath(plugin) + + + @_auto_create_dir + def get_plugin_cache_dir(self) -> Path: + plugin = self.name + return self._get_plugin_path(BASE_CACHE_DIR, plugin) + + + def get_plugin_cache_file(self, filename: str) -> Path: + return self.get_plugin_cache_dir() / filename + + + @_auto_create_dir + def get_plugin_config_dir(self) -> Path: + plugin = self.name + return self._get_plugin_path(BASE_CONFIG_DIR, plugin) + + + def get_plugin_config_file(self, filename: str) -> Path: + return self.get_plugin_config_dir() / filename + + + @_auto_create_dir + def get_plugin_data_dir(self) -> Path: + plugin = self.name + return self._get_plugin_path(BASE_DATA_DIR, plugin) + + + def get_plugin_data_file(self, filename: str) -> Path: + return self.get_plugin_data_dir() / filename diff --git a/marshoai/localstore/source.py b/marshoai/localstore/source.py new file mode 100644 index 0000000..86136be --- /dev/null +++ b/marshoai/localstore/source.py @@ -0,0 +1,146 @@ +import os +import sys +from pathlib import Path +from typing import Literal + +WINDOWS = sys.platform.startswith("win") or (sys.platform == "cli" and os.name == "nt") + + +def user_cache_dir(appname: str) -> Path: + r""" + Return full path to the user-specific cache dir for this application. + "appname" is the name of application. + Typical user cache directories are: + macOS: ~/Library/Caches/ + Unix: ~/.cache/ (XDG default) + Windows: C:\Users\\AppData\Local\\Cache + On Windows the only suggestion in the MSDN docs is that local settings go + in the `CSIDL_LOCAL_APPDATA` directory. This is identical to the + non-roaming app data dir (the default returned by `user_data_dir`). Apps + typically put cache data somewhere *under* the given dir here. Some + examples: + ...\Mozilla\Firefox\Profiles\\Cache + ...\Acme\SuperApp\Cache\1.0 + OPINION: This function appends "Cache" to the `CSIDL_LOCAL_APPDATA` value. + """ + if WINDOWS: + return _get_win_folder("CSIDL_LOCAL_APPDATA") / appname / "Cache" + elif sys.platform == "darwin": + return Path("~/Library/Caches").expanduser() / appname + else: + return Path(os.getenv("XDG_CACHE_HOME", "~/.cache")).expanduser() / appname + + +def user_data_dir(appname: str, roaming: bool = False) -> Path: + r""" + Return full path to the user-specific data dir for this application. + "appname" is the name of application. + If None, just the system directory is returned. + "roaming" (boolean, default False) can be set True to use the Windows + roaming appdata directory. That means that for users on a Windows + network setup for roaming profiles, this user data will be + sync'd on login. See + + for a discussion of issues. + Typical user data directories are: + macOS: ~/Library/Application Support/ + Unix: ~/.local/share/ # or in + $XDG_DATA_HOME, if defined + Win XP (not roaming): C:\Documents and Settings\\ ... + ...Application Data\ + Win XP (roaming): C:\Documents and Settings\\Local ... + ...Settings\Application Data\ + Win 7 (not roaming): C:\Users\\AppData\Local\ + Win 7 (roaming): C:\Users\\AppData\Roaming\ + For Unix, we follow the XDG spec and support $XDG_DATA_HOME. + That means, by default "~/.local/share/". + """ + if WINDOWS: + const = "CSIDL_APPDATA" if roaming else "CSIDL_LOCAL_APPDATA" + return Path(_get_win_folder(const)) / appname + elif sys.platform == "darwin": + return Path("~/Library/Application Support/").expanduser() / appname + else: + return Path(os.getenv("XDG_DATA_HOME", "~/.local/share")).expanduser() / appname + + +def user_config_dir(appname: str, roaming: bool = True) -> Path: + """Return full path to the user-specific config dir for this application. + "appname" is the name of application. + If None, just the system directory is returned. + "roaming" (boolean, default True) can be set False to not use the + Windows roaming appdata directory. That means that for users on a + Windows network setup for roaming profiles, this user data will be + sync'd on login. See + + for a discussion of issues. + Typical user data directories are: + macOS: same as user_data_dir + Unix: ~/.config/ + Win *: same as user_data_dir + For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME. + That means, by default "~/.config/". + """ + if WINDOWS: + return user_data_dir(appname, roaming=roaming) + elif sys.platform == "darwin": + return user_data_dir(appname) + else: + return Path(os.getenv("XDG_CONFIG_HOME", "~/.config")).expanduser() / appname + + +# -- Windows support functions -- +def _get_win_folder_from_registry( + csidl_name: Literal["CSIDL_APPDATA", "CSIDL_COMMON_APPDATA", "CSIDL_LOCAL_APPDATA"] +) -> Path: + """ + This is a fallback technique at best. I'm not sure if using the + registry for this guarantees us the correct answer for all CSIDL_* + names. + """ + import winreg + + shell_folder_name = { + "CSIDL_APPDATA": "AppData", + "CSIDL_COMMON_APPDATA": "Common AppData", + "CSIDL_LOCAL_APPDATA": "Local AppData", + }[csidl_name] + + key = winreg.OpenKey( + winreg.HKEY_CURRENT_USER, + r"Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders", + ) + directory, _type = winreg.QueryValueEx(key, shell_folder_name) + return Path(directory) + + +def _get_win_folder_with_ctypes( + csidl_name: Literal["CSIDL_APPDATA", "CSIDL_COMMON_APPDATA", "CSIDL_LOCAL_APPDATA"] +) -> Path: + csidl_const = { + "CSIDL_APPDATA": 26, + "CSIDL_COMMON_APPDATA": 35, + "CSIDL_LOCAL_APPDATA": 28, + }[csidl_name] + + buf = ctypes.create_unicode_buffer(1024) + ctypes.windll.shell32.SHGetFolderPathW(None, csidl_const, None, 0, buf) + + # Downgrade to short path name if have highbit chars. See + # . + has_high_char = any(ord(c) > 255 for c in buf) + if has_high_char: + buf2 = ctypes.create_unicode_buffer(1024) + if ctypes.windll.kernel32.GetShortPathNameW(buf.value, buf2, 1024): + buf = buf2 + + return Path(buf.value) + + +if WINDOWS: + try: + import ctypes + + _get_win_folder = _get_win_folder_with_ctypes + except ImportError: + _get_win_folder = _get_win_folder_from_registry diff --git a/marshoai/util.py b/marshoai/util.py index 6048da7..5d2b581 100644 --- a/marshoai/util.py +++ b/marshoai/util.py @@ -5,11 +5,15 @@ import json import httpx from datetime import datetime from zhDateTime import DateTime -from pathlib import Path +from .localstore import PluginStore +from typing import Union from azure.ai.inference.aio import ChatCompletionsClient from azure.ai.inference.models import SystemMessage, UserMessage +from melobot.protocols.onebot.v11.adapter.event import GroupMessageEvent, PrivateMessageEvent from .config import Config +from .constants import PLUGIN_NAME config = Config() +store = PluginStore(PLUGIN_NAME) async def get_image_b64(url): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' @@ -38,68 +42,74 @@ async def make_chat(client: ChatCompletionsClient, msg, model_name: str): max_tokens=config.marshoai_max_tokens, top_p=config.marshoai_top_p ) -# def get_praises(): -# praises_file = store.get_plugin_data_file("praises.json") # 夸赞名单文件使用localstore存储 -# if not os.path.exists(praises_file): -# init_data = { -# "like": [ -# {"name":"Asankilp","advantages":"赋予了Marsho猫娘人格,使用vim与vscode为Marsho写了许多代码,使Marsho更加可爱"} -# ] -# } -# with open(praises_file,"w",encoding="utf-8") as f: -# json.dump(init_data,f,ensure_ascii=False,indent=4) -# with open(praises_file,"r",encoding="utf-8") as f: -# data = json.load(f) -# return data +def get_praises(): + praises_file = store.get_plugin_data_file("praises.json") # 夸赞名单文件使用localstore存储 + if not os.path.exists(praises_file): + init_data = { + "like": [ + {"name":"Asankilp","advantages":"赋予了Marsho猫娘人格,使用vim与vscode为Marsho写了许多代码,使Marsho更加可爱"} + ] + } + with open(praises_file,"w",encoding="utf-8") as f: + json.dump(init_data,f,ensure_ascii=False,indent=4) + with open(praises_file,"r",encoding="utf-8") as f: + data = json.load(f) + return data -# def build_praises(): -# praises = get_praises() -# result = ["你喜欢以下几个人物,他们有各自的优点:"] -# for item in praises["like"]: -# result.append(f"名字:{item['name']},优点:{item['advantages']}") -# return "\n".join(result) +def build_praises(): + praises = get_praises() + result = ["你喜欢以下几个人物,他们有各自的优点:"] + for item in praises["like"]: + result.append(f"名字:{item['name']},优点:{item['advantages']}") + return "\n".join(result) -# async def save_context_to_json(name: str, context: str): -# context_dir = store.get_plugin_data_dir() / "contexts" -# os.makedirs(context_dir, exist_ok=True) -# file_path = os.path.join(context_dir, f"{name}.json") -# with open(file_path, 'w', encoding='utf-8') as json_file: -# json.dump(context, json_file, ensure_ascii=False, indent=4) +async def save_context_to_json(name: str, context: str): + context_dir = store.get_plugin_data_dir() / "contexts" + os.makedirs(context_dir, exist_ok=True) + file_path = os.path.join(context_dir, f"{name}.json") + with open(file_path, 'w', encoding='utf-8') as json_file: + json.dump(context, json_file, ensure_ascii=False, indent=4) -# async def load_context_from_json(name: str): -# context_dir = store.get_plugin_data_dir() / "contexts" -# os.makedirs(context_dir, exist_ok=True) -# file_path = os.path.join(context_dir, f"{name}.json") -# try: -# with open(file_path, 'r', encoding='utf-8') as json_file: -# return json.load(json_file) -# except FileNotFoundError: -# return [] -# async def set_nickname(user_id: str, name: str): -# filename = store.get_plugin_data_file("nickname.json") -# if not os.path.exists(filename): -# data = {} -# else: -# with open(filename,'r') as f: -# data = json.load(f) -# data[user_id] = name -# with open(filename, 'w') as f: -# json.dump(data, f, ensure_ascii=False, indent=4) +async def load_context_from_json(name: str): + context_dir = store.get_plugin_data_dir() / "contexts" + os.makedirs(context_dir, exist_ok=True) + file_path = os.path.join(context_dir, f"{name}.json") + try: + with open(file_path, 'r', encoding='utf-8') as json_file: + return json.load(json_file) + except FileNotFoundError: + return [] +async def set_nickname(user_id: str, name: str): + filename = store.get_plugin_data_file("nickname.json") + if not os.path.exists(filename): + data = {} + else: + with open(filename,'r') as f: + data = json.load(f) + data[user_id] = name + with open(filename, 'w') as f: + json.dump(data, f, ensure_ascii=False, indent=4) -# async def get_nicknames(): -# filename = store.get_plugin_data_file("nickname.json") -# try: -# with open(filename, 'r', encoding='utf-8') as f: -# return json.load(f) -# except FileNotFoundError: -# return {} +async def get_nicknames(): + filename = store.get_plugin_data_file("nickname.json") + try: + with open(filename, 'r', encoding='utf-8') as f: + return json.load(f) + except FileNotFoundError: + return {} + +def get_target_id(event: Union[GroupMessageEvent, PrivateMessageEvent]): + try: + return event.group_id + except AttributeError: + return event.user_id def get_prompt(): prompts = "" prompts += config.marshoai_additional_prompt - # if config.marshoai_enable_praises: - # praises_prompt = build_praises() - # prompts += praises_prompt + if config.marshoai_enable_praises: + praises_prompt = build_praises() + prompts += praises_prompt if config.marshoai_enable_time_prompt: current_time = datetime.now().strftime('%Y.%m.%d %H:%M:%S') current_lunar_date = DateTime.now().to_lunar().date_hanzify()[5:] #库更新之前使用切片