添加Bot和状态支持,重构Caller类以实现依赖注入;新增获取设备信息和运行代码的功能

This commit is contained in:
远野千束(神羽) 2024-12-15 18:27:30 +08:00
parent 5fc4140cf7
commit eb5dcb443d
3 changed files with 107 additions and 14 deletions

View File

@ -17,10 +17,11 @@ from azure.ai.inference.models import (
) )
from azure.core.credentials import AzureKeyCredential from azure.core.credentials import AzureKeyCredential
from nonebot import get_driver, logger, on_command, on_message from nonebot import get_driver, logger, on_command, on_message
from nonebot.adapters import Event, Message from nonebot.adapters import Bot, Event, Message
from nonebot.params import CommandArg from nonebot.params import CommandArg
from nonebot.permission import SUPERUSER from nonebot.permission import SUPERUSER
from nonebot.rule import Rule, to_me from nonebot.rule import Rule, to_me
from nonebot.typing import T_State
from nonebot_plugin_alconna import MsgTarget, UniMessage, UniMsg, on_alconna from nonebot_plugin_alconna import MsgTarget, UniMessage, UniMsg, on_alconna
from nonebot_plugin_marshoai.plugin.func_call.caller import get_function_calls from nonebot_plugin_marshoai.plugin.func_call.caller import get_function_calls
@ -203,7 +204,13 @@ async def refresh_data():
@marsho_at.handle() @marsho_at.handle()
@marsho_cmd.handle() @marsho_cmd.handle()
async def marsho(target: MsgTarget, event: Event, text: Optional[UniMsg] = None): async def marsho(
target: MsgTarget,
event: Event,
bot: Bot,
state: T_State,
text: Optional[UniMsg] = None,
):
global target_list global target_list
if event.get_message().extract_plain_text() and ( if event.get_message().extract_plain_text() and (
not text not text
@ -330,8 +337,13 @@ async def marsho(target: MsgTarget, event: Event, text: Optional[UniMsg] = None)
tool_call.function.name tool_call.function.name
): ):
logger.debug(f"调用插件函数 {tool_call.function.name}") logger.debug(f"调用插件函数 {tool_call.function.name}")
# 权限检查,规则检查 TODO
# 实现依赖注入检查函数参数及参数注解类型对Event类型的参数进行注入 # 实现依赖注入检查函数参数及参数注解类型对Event类型的参数进行注入
caller.event = event caller.event, caller.bot, caller.state = (
event,
bot,
state,
)
func_return = await caller.call(**function_args) func_return = await caller.call(**function_args)
else: else:
logger.error(f"未找到函数 {tool_call.function.name}") logger.error(f"未找到函数 {tool_call.function.name}")

View File

@ -2,7 +2,10 @@ import inspect
from typing import Any from typing import Any
from nonebot import logger from nonebot import logger
from nonebot.adapters import Event from nonebot.adapters import Bot, Event
from nonebot.permission import Permission
from nonebot.rule import Rule
from nonebot.typing import T_State
from ..typing import ASYNC_FUNCTION_CALL_FUNC, F from ..typing import ASYNC_FUNCTION_CALL_FUNC, F
from .utils import async_wrap, is_coroutine_callable from .utils import async_wrap, is_coroutine_callable
@ -17,23 +20,36 @@ class Caller:
self.func: ASYNC_FUNCTION_CALL_FUNC | None = None self.func: ASYNC_FUNCTION_CALL_FUNC | None = None
self._parameters: dict[str, Any] = {} self._parameters: dict[str, Any] = {}
"""依赖注入的参数""" """依赖注入的参数"""
self.bot: Bot | None = None
self.event: Event | None = None self.event: Event | None = None
self.state: T_State | None = None
self._permission: Permission | None = None
self._rule: Rule | None = None
def params(self, **kwargs: Any) -> "Caller": def params(self, **kwargs: Any) -> "Caller":
self._parameters.update(kwargs) self._parameters.update(kwargs)
return self return self
def param(self, name: str, param: Any) -> "Caller": def permission(self, permission: Permission) -> "Caller":
"""设置一个函数参数 self._permission = self._permission or permission
return self
Args: async def pre_check(self) -> tuple[bool, str]:
name (str): 参数名 if self.bot is None or self.event is None:
param (Any): 参数对象 return False, "Context is None"
if self._permission and not await self._permission(self.bot, self.event):
return False, "Permission Denied 权限不足"
Returns: if self.state is None:
Caller: Caller对象 return False, "State is None"
""" if self._rule and not await self._rule(self.bot, self.event, self.state):
self._parameters[name] = param return False, "Rule Denied 规则不匹配"
return True, ""
def rule(self, rule: Rule) -> "Caller":
self._rule = self._rule and rule
return self return self
def name(self, name: str) -> "Caller": def name(self, name: str) -> "Caller":
@ -113,12 +129,19 @@ class Caller:
def set_event(self, event: Event): def set_event(self, event: Event):
self.event = event self.event = event
def set_bot(self, bot: Bot):
self.bot = bot
async def call(self, *args: Any, **kwargs: Any) -> Any: async def call(self, *args: Any, **kwargs: Any) -> Any:
"""调用函数 """调用函数
Returns: Returns:
Any: 函数返回值 Any: 函数返回值
""" """
y, r = await self.pre_check()
if not y:
return r
if self.func is None: if self.func is None:
raise ValueError("未注册函数对象") raise ValueError("未注册函数对象")
sig = inspect.signature(self.func) sig = inspect.signature(self.func)
@ -127,11 +150,18 @@ class Caller:
param.annotation, Event param.annotation, Event
): ):
kwargs[name] = self.event kwargs[name] = self.event
if issubclass(param.annotation, Caller) or isinstance( if issubclass(param.annotation, Caller) or isinstance(
param.annotation, Caller param.annotation, Caller
): ):
kwargs[name] = self kwargs[name] = self
if issubclass(param.annotation, Bot) or isinstance(param.annotation, Bot):
kwargs[name] = self.bot
if param.annotation == T_State:
kwargs[name] = self.state
# 检查形参是否有默认值或传入若没有则用parameters中的默认值填充 # 检查形参是否有默认值或传入若没有则用parameters中的默认值填充
for name, param in sig.parameters.items(): for name, param in sig.parameters.items():
if name not in kwargs: if name not in kwargs:

View File

@ -1,4 +1,10 @@
import os
import platform
import psutil
from nonebot.adapters import Bot
from nonebot.adapters.onebot.v11 import MessageEvent from nonebot.adapters.onebot.v11 import MessageEvent
from nonebot.permission import SUPERUSER
from nonebot_plugin_marshoai.plugin import ( from nonebot_plugin_marshoai.plugin import (
Integer, Integer,
@ -53,4 +59,49 @@ def get_location() -> str:
@on_function_call(description="获取聊天者个人信息及发送的消息和function call调用参数") @on_function_call(description="获取聊天者个人信息及发送的消息和function call调用参数")
async def get_user_info(e: MessageEvent, c: Caller) -> str: async def get_user_info(e: MessageEvent, c: Caller) -> str:
return f"用户ID: {e.user_id} 用户昵称: {e.sender.nickname} FC调用参数:{c._parameters} 消息内容: {e.raw_message}" return (
f"用户ID: {e.user_id} "
"用户昵称: {e.sender.nickname} "
"FC调用参数:{c._parameters} "
"消息内容: {e.raw_message}"
)
@on_function_call(description="获取设备信息")
def get_device_info() -> str:
"""获取机器人所运行的设备信息"""
# 进行一系列获取设备信息操作...
data = {
"cpu 性能": f"{psutil.cpu_percent()}% {psutil.cpu_freq().current:.2f}MHz {psutil.cpu_count()}线程 {psutil.cpu_count(logical=False)}物理核",
"memory 内存": f"{psutil.virtual_memory().percent}% {psutil.virtual_memory().available / 1024 / 1024 / 1024:.2f}/{psutil.virtual_memory().total / 1024 / 1024 / 1024:.2f}GB",
"swap 交换分区": f"{psutil.swap_memory().percent}% {psutil.swap_memory().used / 1024 / 1024 / 1024:.2f}/{psutil.swap_memory().total / 1024 / 1024 / 1024:.2f}GB",
"cpu 信息": f"{psutil.cpu_stats()}",
"system 系统": f"system: {platform.system()}, version: {platform.version()}, arch: {platform.architecture()}, machine: {platform.machine()}",
}
return str(data)
@on_function_call(description="在设备上运行Python代码,需要超级用户权限").params(
code=String(description="Python代码内容")
).permission(SUPERUSER)
async def run_python_code(code: str, b: Bot, e: MessageEvent) -> str:
"""运行Python代码"""
try:
r = eval(code)
except Exception as e:
return "运行出错: " + str(e)
return "运行成功: " + str(r)
@on_function_call(description="运行shell命令,需要超级用户权限").params(
command=String(description="shell命令内容")
).permission(SUPERUSER)
async def run_shell_command(command: str, b: Bot, e: MessageEvent) -> str:
"""运行shell命令"""
try:
r = os.popen(command).read()
except Exception as e:
return "运行出错: " + str(e)
return "运行成功: " + str(r)