移植nb2 localstore插件,实现夸赞名单

This commit is contained in:
Asankilp 2024-10-24 23:52:09 +08:00
parent 70b1a96ad3
commit b09671093d
7 changed files with 343 additions and 74 deletions

View File

@ -58,13 +58,16 @@ _✨ 使用 Azure OpenAI 推理服务的聊天机器人(施工中) ✨_
| MARSHOAI_PROMPT | 否 | 猫娘 Marsho 人设提示词 | Marsho 的基本系统提示词 |
| MARSHOAI_ADDITIONAL_PROMPT | 否 | 无 | Marsho 的扩展系统提示词 |
| MARSHOAI_POKE_SUFFIX | 否 | `揉了揉你的猫耳` | 对 Marsho 所连接的 OneBot 用户进行双击戳一戳时,构建的聊天内容。此配置项为空字符串时,戳一戳响应功能会被禁用。例如,默认值构建的聊天内容将为`*[昵称]揉了揉你的猫耳`。 |
| MARSHOAI_ENABLE_PRAISES | 否 | `true` | 是否启用夸赞名单功能(未实现) |
| MARSHOAI_ENABLE_PRAISES | 否 | `true` | 是否启用夸赞名单功能 |
| MARSHOAI_ENABLE_TIME_PROMPT | 否 | `true` | 是否启用实时更新的日期与时间(精确到秒)与农历日期系统提示词 |
| MARSHOAI_AZURE_ENDPOINT | 否 | `https://models.inference.ai.azure.com` | 调用 Azure OpenAI 服务的 API 终结点 |
| MARSHOAI_TEMPERATURE | 否 | 无 | 进行推理时的温度参数 |
| MARSHOAI_TOP_P | 否 | 无 | 进行推理时的核采样参数 |
| MARSHOAI_MAX_TOKENS | 否 | 无 | 返回消息的最大 token 数 |
## © 版权说明
## ❤ 鸣谢&版权说明
本项目使用了以下项目的代码:
- [nonebot-plugin-localstore](https://github.com/nonebot/plugin-localstore)
"Marsho" logo 由 [@Asankilp](https://github.com/Asankilp) 绘制,基于 [CC BY-NC-SA 4.0](http://creativecommons.org/licenses/by-nc-sa/4.0/) 许可下提供。
"Melobot" logo 由 [@mldkouo](https://github.com/mldkouo) 绘制,版权归属于 [@Meloland](https://github.com/meloland)。

View File

@ -14,7 +14,9 @@ from .config import Config
from .util import *
from .models import MarshoContext
from .checkers import superuser_checker, PokeMarshoChecker
from .localstore import PluginStore
config = Config()
store = PluginStore(PLUGIN_NAME)
model_name = config.marshoai_default_model
context = MarshoContext()
token = config.marshoai_token
@ -24,7 +26,9 @@ client = ChatCompletionsClient(
credential=AzureKeyCredential(token)
)
@on_command(checker=superuser_checker, cmd_start="/", cmd_sep=" ", targets="praises")
async def praises():
await send_text(build_praises())
@on_command(checker=superuser_checker, cmd_start="/", cmd_sep=" ", targets="changemodel")
async def changemodel(args: ParseArgs = Args()):
@ -34,17 +38,11 @@ async def changemodel(args: ParseArgs = Args()):
@on_command(checker=superuser_checker, cmd_start="/", cmd_sep=" ", targets="contexts")
async def contexts(event: Union[GroupMessageEvent, PrivateMessageEvent]):
try:
await send_text(str(context.build(event.group_id, event.is_private)[1:]))
except AttributeError:
await send_text(str(context.build(event.user_id, event.is_private)[1:]))
await send_text(str(context.build(get_target_id(event), event.is_private)[1:]))
@on_start_match("reset")
async def reset(event: Union[GroupMessageEvent, PrivateMessageEvent]):
try:
context.reset(event.group_id, event.is_private)
except AttributeError:
context.reset(event.user_id, event.is_private)
context.reset(get_target_id(event), event.is_private)
await send_text("上下文已重置")
@ -56,16 +54,14 @@ async def marsho_main(event: Union[GroupMessageEvent, PrivateMessageEvent], is_g
if event.text.lstrip("marsho") == "":
await send_text(USAGE)
await send_text(INTRODUCTION)
await send_text(str(store.get_plugin_data_dir()))
return
# await UniMessage(str(text)).send()
try:
is_support_image_model = model_name.lower() in SUPPORT_IMAGE_MODELS
usermsg = [] if is_support_image_model else ""
user_id = event.sender.user_id
try:
target_id = event.group_id
except AttributeError:
target_id = event.user_id
target_id = get_target_id(event)
nickname_prompt = ""
marsho_string_removed = False
for i in event.get_segments("image"):
@ -109,6 +105,7 @@ async def marsho_main(event: Union[GroupMessageEvent, PrivateMessageEvent], is_g
async def poke(event: PokeNotifyEvent, adapter: Adapter): # 尚未实现私聊戳一戳 QwQ
#await adapter.send_custom(str(event.user_id),group_id=event.group_id)
user_id = event.user_id
target_id = get_target_id(event)
# nicknames = await get_nicknames()
# nickname = nicknames.get(user_id, "")
nickname = ""
@ -129,4 +126,4 @@ async def poke(event: PokeNotifyEvent, adapter: Adapter): # 尚未实现私聊
class MarshoAI(Plugin):
version = VERSION
flows = [changemodel,marsho,reset,poke,contexts]
flows = [changemodel,marsho,reset,poke,contexts,praises]

View File

@ -54,6 +54,6 @@ class Config:
if name in self.config_items:
value = os.getenv(name.upper())
return self._convert_string(value) if value is not None else self.config_items[name]
raise AttributeError(f"'Config' object has no attribute '{name}'")
else:
return None

View File

@ -1,5 +1,6 @@
__version__ = "0.3.1"
__version__ = "0.3.1.1"
VERSION = __version__
PLUGIN_NAME = "marshoai"
USAGE: str = f"""MarshoAI-Melobot Beta v{__version__} by Asankilp
用法
marsho <聊天内容> : Marsho 进行对话当模型为 GPT-4o(-mini) 等时可以带上图片进行对话
@ -7,6 +8,8 @@ USAGE: str = f"""MarshoAI-Melobot Beta v{__version__} by Asankilp
超级用户命令(均需要加上命令前缀使用):
/changemodel <模型名> : 切换全局 AI 模型
/contexts : 返回当前会话的上下文列表 当上下文包含图片时不要使用此命令
/praises : 返回夸赞名单的提示词
本AI的回答"按原样"提供不提供任何担保AI也会犯错请仔细甄别回答的准确性"""

View File

@ -0,0 +1,110 @@
# 对nonebot-plugin-localstore的简单重新实现
from pathlib import Path
from typing import Callable, Optional
from typing_extensions import ParamSpec
from ..config import Config
from .source import user_data_dir, user_cache_dir, user_config_dir
plugin_config = Config()
P = ParamSpec("P")
APP_NAME = "melobot"
BASE_CACHE_DIR = (
user_cache_dir(APP_NAME).resolve()
if plugin_config.localstore_cache_dir is None
else plugin_config.localstore_cache_dir.resolve()
)
BASE_CONFIG_DIR = (
user_config_dir(APP_NAME).resolve()
if plugin_config.localstore_config_dir is None
else plugin_config.localstore_config_dir.resolve()
)
BASE_DATA_DIR = (
user_data_dir(APP_NAME).resolve()
if plugin_config.localstore_data_dir is None
else plugin_config.localstore_data_dir.resolve()
)
def _ensure_dir(path: Path) -> None:
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
elif not path.is_dir():
raise RuntimeError(f"{path} is not a directory")
def _auto_create_dir(func: Callable[P, Path]) -> Callable[P, Path]:
def wrapper(*args: P.args, **kwargs: P.kwargs) -> Path:
path = func(*args, **kwargs)
_ensure_dir(path)
return path
return wrapper
@_auto_create_dir
def get_cache_dir(plugin_name: Optional[str]) -> Path:
return BASE_CACHE_DIR / plugin_name if plugin_name else BASE_CACHE_DIR
def get_cache_file(plugin_name: Optional[str], filename: str) -> Path:
return get_cache_dir(plugin_name) / filename
@_auto_create_dir
def get_config_dir(plugin_name: Optional[str]) -> Path:
return BASE_CONFIG_DIR / plugin_name if plugin_name else BASE_CONFIG_DIR
def get_config_file(plugin_name: Optional[str], filename: str) -> Path:
return get_config_dir(plugin_name) / filename
@_auto_create_dir
def get_data_dir(plugin_name: Optional[str]) -> Path:
return BASE_DATA_DIR / plugin_name if plugin_name else BASE_DATA_DIR
def get_data_file(plugin_name: Optional[str], filename: str) -> Path:
return get_data_dir(plugin_name) / filename
class PluginStore():
def __init__(self, name: str):
self.name = name
def _get_plugin_path(self, base_dir: Path, plugin: str) -> Path:
return base_dir.joinpath(plugin)
@_auto_create_dir
def get_plugin_cache_dir(self) -> Path:
plugin = self.name
return self._get_plugin_path(BASE_CACHE_DIR, plugin)
def get_plugin_cache_file(self, filename: str) -> Path:
return self.get_plugin_cache_dir() / filename
@_auto_create_dir
def get_plugin_config_dir(self) -> Path:
plugin = self.name
return self._get_plugin_path(BASE_CONFIG_DIR, plugin)
def get_plugin_config_file(self, filename: str) -> Path:
return self.get_plugin_config_dir() / filename
@_auto_create_dir
def get_plugin_data_dir(self) -> Path:
plugin = self.name
return self._get_plugin_path(BASE_DATA_DIR, plugin)
def get_plugin_data_file(self, filename: str) -> Path:
return self.get_plugin_data_dir() / filename

View File

@ -0,0 +1,146 @@
import os
import sys
from pathlib import Path
from typing import Literal
WINDOWS = sys.platform.startswith("win") or (sys.platform == "cli" and os.name == "nt")
def user_cache_dir(appname: str) -> Path:
r"""
Return full path to the user-specific cache dir for this application.
"appname" is the name of application.
Typical user cache directories are:
macOS: ~/Library/Caches/<AppName>
Unix: ~/.cache/<AppName> (XDG default)
Windows: C:\Users\<username>\AppData\Local\<AppName>\Cache
On Windows the only suggestion in the MSDN docs is that local settings go
in the `CSIDL_LOCAL_APPDATA` directory. This is identical to the
non-roaming app data dir (the default returned by `user_data_dir`). Apps
typically put cache data somewhere *under* the given dir here. Some
examples:
...\Mozilla\Firefox\Profiles\<ProfileName>\Cache
...\Acme\SuperApp\Cache\1.0
OPINION: This function appends "Cache" to the `CSIDL_LOCAL_APPDATA` value.
"""
if WINDOWS:
return _get_win_folder("CSIDL_LOCAL_APPDATA") / appname / "Cache"
elif sys.platform == "darwin":
return Path("~/Library/Caches").expanduser() / appname
else:
return Path(os.getenv("XDG_CACHE_HOME", "~/.cache")).expanduser() / appname
def user_data_dir(appname: str, roaming: bool = False) -> Path:
r"""
Return full path to the user-specific data dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"roaming" (boolean, default False) can be set True to use the Windows
roaming appdata directory. That means that for users on a Windows
network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user data directories are:
macOS: ~/Library/Application Support/<AppName>
Unix: ~/.local/share/<AppName> # or in
$XDG_DATA_HOME, if defined
Win XP (not roaming): C:\Documents and Settings\<username>\ ...
...Application Data\<AppName>
Win XP (roaming): C:\Documents and Settings\<username>\Local ...
...Settings\Application Data\<AppName>
Win 7 (not roaming): C:\Users\<username>\AppData\Local\<AppName>
Win 7 (roaming): C:\Users\<username>\AppData\Roaming\<AppName>
For Unix, we follow the XDG spec and support $XDG_DATA_HOME.
That means, by default "~/.local/share/<AppName>".
"""
if WINDOWS:
const = "CSIDL_APPDATA" if roaming else "CSIDL_LOCAL_APPDATA"
return Path(_get_win_folder(const)) / appname
elif sys.platform == "darwin":
return Path("~/Library/Application Support/").expanduser() / appname
else:
return Path(os.getenv("XDG_DATA_HOME", "~/.local/share")).expanduser() / appname
def user_config_dir(appname: str, roaming: bool = True) -> Path:
"""Return full path to the user-specific config dir for this application.
"appname" is the name of application.
If None, just the system directory is returned.
"roaming" (boolean, default True) can be set False to not use the
Windows roaming appdata directory. That means that for users on a
Windows network setup for roaming profiles, this user data will be
sync'd on login. See
<http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx>
for a discussion of issues.
Typical user data directories are:
macOS: same as user_data_dir
Unix: ~/.config/<AppName>
Win *: same as user_data_dir
For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME.
That means, by default "~/.config/<AppName>".
"""
if WINDOWS:
return user_data_dir(appname, roaming=roaming)
elif sys.platform == "darwin":
return user_data_dir(appname)
else:
return Path(os.getenv("XDG_CONFIG_HOME", "~/.config")).expanduser() / appname
# -- Windows support functions --
def _get_win_folder_from_registry(
csidl_name: Literal["CSIDL_APPDATA", "CSIDL_COMMON_APPDATA", "CSIDL_LOCAL_APPDATA"]
) -> Path:
"""
This is a fallback technique at best. I'm not sure if using the
registry for this guarantees us the correct answer for all CSIDL_*
names.
"""
import winreg
shell_folder_name = {
"CSIDL_APPDATA": "AppData",
"CSIDL_COMMON_APPDATA": "Common AppData",
"CSIDL_LOCAL_APPDATA": "Local AppData",
}[csidl_name]
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders",
)
directory, _type = winreg.QueryValueEx(key, shell_folder_name)
return Path(directory)
def _get_win_folder_with_ctypes(
csidl_name: Literal["CSIDL_APPDATA", "CSIDL_COMMON_APPDATA", "CSIDL_LOCAL_APPDATA"]
) -> Path:
csidl_const = {
"CSIDL_APPDATA": 26,
"CSIDL_COMMON_APPDATA": 35,
"CSIDL_LOCAL_APPDATA": 28,
}[csidl_name]
buf = ctypes.create_unicode_buffer(1024)
ctypes.windll.shell32.SHGetFolderPathW(None, csidl_const, None, 0, buf)
# Downgrade to short path name if have highbit chars. See
# <http://bugs.activestate.com/show_bug.cgi?id=85099>.
has_high_char = any(ord(c) > 255 for c in buf)
if has_high_char:
buf2 = ctypes.create_unicode_buffer(1024)
if ctypes.windll.kernel32.GetShortPathNameW(buf.value, buf2, 1024):
buf = buf2
return Path(buf.value)
if WINDOWS:
try:
import ctypes
_get_win_folder = _get_win_folder_with_ctypes
except ImportError:
_get_win_folder = _get_win_folder_from_registry

View File

@ -5,11 +5,15 @@ import json
import httpx
from datetime import datetime
from zhDateTime import DateTime
from pathlib import Path
from .localstore import PluginStore
from typing import Union
from azure.ai.inference.aio import ChatCompletionsClient
from azure.ai.inference.models import SystemMessage, UserMessage
from melobot.protocols.onebot.v11.adapter.event import GroupMessageEvent, PrivateMessageEvent
from .config import Config
from .constants import PLUGIN_NAME
config = Config()
store = PluginStore(PLUGIN_NAME)
async def get_image_b64(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
@ -38,68 +42,74 @@ async def make_chat(client: ChatCompletionsClient, msg, model_name: str):
max_tokens=config.marshoai_max_tokens,
top_p=config.marshoai_top_p
)
# def get_praises():
# praises_file = store.get_plugin_data_file("praises.json") # 夸赞名单文件使用localstore存储
# if not os.path.exists(praises_file):
# init_data = {
# "like": [
# {"name":"Asankilp","advantages":"赋予了Marsho猫娘人格使用vim与vscode为Marsho写了许多代码使Marsho更加可爱"}
# ]
# }
# with open(praises_file,"w",encoding="utf-8") as f:
# json.dump(init_data,f,ensure_ascii=False,indent=4)
# with open(praises_file,"r",encoding="utf-8") as f:
# data = json.load(f)
# return data
def get_praises():
praises_file = store.get_plugin_data_file("praises.json") # 夸赞名单文件使用localstore存储
if not os.path.exists(praises_file):
init_data = {
"like": [
{"name":"Asankilp","advantages":"赋予了Marsho猫娘人格使用vim与vscode为Marsho写了许多代码使Marsho更加可爱"}
]
}
with open(praises_file,"w",encoding="utf-8") as f:
json.dump(init_data,f,ensure_ascii=False,indent=4)
with open(praises_file,"r",encoding="utf-8") as f:
data = json.load(f)
return data
# def build_praises():
# praises = get_praises()
# result = ["你喜欢以下几个人物,他们有各自的优点:"]
# for item in praises["like"]:
# result.append(f"名字:{item['name']},优点:{item['advantages']}")
# return "\n".join(result)
def build_praises():
praises = get_praises()
result = ["你喜欢以下几个人物,他们有各自的优点:"]
for item in praises["like"]:
result.append(f"名字:{item['name']},优点:{item['advantages']}")
return "\n".join(result)
# async def save_context_to_json(name: str, context: str):
# context_dir = store.get_plugin_data_dir() / "contexts"
# os.makedirs(context_dir, exist_ok=True)
# file_path = os.path.join(context_dir, f"{name}.json")
# with open(file_path, 'w', encoding='utf-8') as json_file:
# json.dump(context, json_file, ensure_ascii=False, indent=4)
async def save_context_to_json(name: str, context: str):
context_dir = store.get_plugin_data_dir() / "contexts"
os.makedirs(context_dir, exist_ok=True)
file_path = os.path.join(context_dir, f"{name}.json")
with open(file_path, 'w', encoding='utf-8') as json_file:
json.dump(context, json_file, ensure_ascii=False, indent=4)
# async def load_context_from_json(name: str):
# context_dir = store.get_plugin_data_dir() / "contexts"
# os.makedirs(context_dir, exist_ok=True)
# file_path = os.path.join(context_dir, f"{name}.json")
# try:
# with open(file_path, 'r', encoding='utf-8') as json_file:
# return json.load(json_file)
# except FileNotFoundError:
# return []
# async def set_nickname(user_id: str, name: str):
# filename = store.get_plugin_data_file("nickname.json")
# if not os.path.exists(filename):
# data = {}
# else:
# with open(filename,'r') as f:
# data = json.load(f)
# data[user_id] = name
# with open(filename, 'w') as f:
# json.dump(data, f, ensure_ascii=False, indent=4)
async def load_context_from_json(name: str):
context_dir = store.get_plugin_data_dir() / "contexts"
os.makedirs(context_dir, exist_ok=True)
file_path = os.path.join(context_dir, f"{name}.json")
try:
with open(file_path, 'r', encoding='utf-8') as json_file:
return json.load(json_file)
except FileNotFoundError:
return []
async def set_nickname(user_id: str, name: str):
filename = store.get_plugin_data_file("nickname.json")
if not os.path.exists(filename):
data = {}
else:
with open(filename,'r') as f:
data = json.load(f)
data[user_id] = name
with open(filename, 'w') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
# async def get_nicknames():
# filename = store.get_plugin_data_file("nickname.json")
# try:
# with open(filename, 'r', encoding='utf-8') as f:
# return json.load(f)
# except FileNotFoundError:
# return {}
async def get_nicknames():
filename = store.get_plugin_data_file("nickname.json")
try:
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
return {}
def get_target_id(event: Union[GroupMessageEvent, PrivateMessageEvent]):
try:
return event.group_id
except AttributeError:
return event.user_id
def get_prompt():
prompts = ""
prompts += config.marshoai_additional_prompt
# if config.marshoai_enable_praises:
# praises_prompt = build_praises()
# prompts += praises_prompt
if config.marshoai_enable_praises:
praises_prompt = build_praises()
prompts += praises_prompt
if config.marshoai_enable_time_prompt:
current_time = datetime.now().strftime('%Y.%m.%d %H:%M:%S')
current_lunar_date = DateTime.now().to_lunar().date_hanzify()[5:] #库更新之前使用切片