import{_ as i,c as a,ae as n,o as t}from"./chunks/framework.BzDBnRMZ.js";const g=JSON.parse('{"title":"util","description":"","frontmatter":{"title":"util","order":100},"headers":[],"relativePath":"dev/api/util.md","filePath":"zh/dev/api/util.md","lastUpdated":1734175019000}'),h={name:"dev/api/util.md"};function k(l,s,p,e,r,E){return t(),a("div",null,s[0]||(s[0]=[n(`

模块 nonebot_plugin_marshoai.util

var nickname_json

var praises_json

var loaded_target_list


async func get_image_raw_and_type(url: str, timeout: int = 10) -> Optional[tuple[bytes, str]]

说明: 获取图片的二进制数据

参数:

源代码在GitHub上查看
python
async def get_image_raw_and_type(url: str, timeout: int=10) -> Optional[tuple[bytes, str]]:
    async with httpx.AsyncClient() as client:
        response = await client.get(url, headers=_browser_headers, timeout=timeout)
        if response.status_code == 200:
            content_type = response.headers.get('Content-Type')
            if not content_type:
                content_type = mimetypes.guess_type(url)[0]
            return (response.content, str(content_type))
        else:
            return None

async func get_image_b64(url: str, timeout: int = 10) -> Optional[str]

说明: 获取图片的base64编码

参数:

源代码在GitHub上查看
python
async def get_image_b64(url: str, timeout: int=10) -> Optional[str]:
    if (data_type := (await get_image_raw_and_type(url, timeout))):
        base64_image = base64.b64encode(data_type[0]).decode('utf-8')
        data_url = 'data:{};base64,{}'.format(data_type[1], base64_image)
        return data_url
    else:
        return None

async func make_chat(client: ChatCompletionsClient, msg: list, model_name: str, tools: Optional[list] = None)

说明: 调用ai获取回复

参数:

源代码在GitHub上查看
python
async def make_chat(client: ChatCompletionsClient, msg: list, model_name: str, tools: Optional[list]=None):
    return await client.complete(messages=msg, model=model_name, tools=tools, temperature=config.marshoai_temperature, max_tokens=config.marshoai_max_tokens, top_p=config.marshoai_top_p)

async func make_chat_openai(client: AsyncOpenAI, msg: list, model_name: str, tools: Optional[list] = None)

说明: 使用 Openai SDK 调用ai获取回复

参数:

源代码在GitHub上查看
python
async def make_chat_openai(client: AsyncOpenAI, msg: list, model_name: str, tools: Optional[list]=None):
    return await client.chat.completions.create(messages=msg, model=model_name, tools=tools or NOT_GIVEN, temperature=config.marshoai_temperature or NOT_GIVEN, max_tokens=config.marshoai_max_tokens or NOT_GIVEN, top_p=config.marshoai_top_p or NOT_GIVEN, timeout=config.marshoai_timeout)

func get_praises()

源代码在GitHub上查看
python
def get_praises():
    global praises_json
    if praises_json is None:
        praises_file = store.get_plugin_data_file('praises.json')
        if not praises_file.exists():
            with open(praises_file, 'w', encoding='utf-8') as f:
                json.dump(_praises_init_data, f, ensure_ascii=False, indent=4)
        with open(praises_file, 'r', encoding='utf-8') as f:
            data = json.load(f)
        praises_json = data
    return praises_json

async func refresh_praises_json()

源代码在GitHub上查看
python
async def refresh_praises_json():
    global praises_json
    praises_file = store.get_plugin_data_file('praises.json')
    if not praises_file.exists():
        with open(praises_file, 'w', encoding='utf-8') as f:
            json.dump(_praises_init_data, f, ensure_ascii=False, indent=4)
    async with aiofiles.open(praises_file, 'r', encoding='utf-8') as f:
        data = json.loads(await f.read())
    praises_json = data

func build_praises() -> str

源代码在GitHub上查看
python
def build_praises() -> str:
    praises = get_praises()
    result = ['你喜欢以下几个人物,他们有各自的优点:']
    for item in praises['like']:
        result.append(f"名字:{item['name']},优点:{item['advantages']}")
    return '\\n'.join(result)

async func save_context_to_json(name: str, context: Any, path: str)

源代码在GitHub上查看
python
async def save_context_to_json(name: str, context: Any, path: str):
    (context_dir := (store.get_plugin_data_dir() / path)).mkdir(parents=True, exist_ok=True)
    with open(context_dir / f'{name}.json', 'w', encoding='utf-8') as json_file:
        json.dump(context, json_file, ensure_ascii=False, indent=4)

async func load_context_from_json(name: str, path: str) -> list

说明: 从指定路径加载历史记录

源代码在GitHub上查看
python
async def load_context_from_json(name: str, path: str) -> list:
    (context_dir := (store.get_plugin_data_dir() / path)).mkdir(parents=True, exist_ok=True)
    if (file_path := (context_dir / f'{name}.json')).exists():
        async with aiofiles.open(file_path, 'r', encoding='utf-8') as json_file:
            return json.loads(await json_file.read())
    else:
        return []

async func get_nicknames()

说明: 获取nickname_json, 优先来源于全局变量

源代码在GitHub上查看
python
async def get_nicknames():
    global nickname_json
    if nickname_json is None:
        filename = store.get_plugin_data_file('nickname.json')
        try:
            async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
                nickname_json = json.loads(await f.read())
        except Exception:
            nickname_json = {}
    return nickname_json

async func set_nickname(user_id: str, name: str)

源代码在GitHub上查看
python
async def set_nickname(user_id: str, name: str):
    global nickname_json
    filename = store.get_plugin_data_file('nickname.json')
    if not filename.exists():
        data = {}
    else:
        async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
            data = json.loads(await f.read())
    data[user_id] = name
    if name == '' and user_id in data:
        del data[user_id]
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4)
    nickname_json = data

async func refresh_nickname_json()

说明: 强制刷新nickname_json, 刷新全局变量

源代码在GitHub上查看
python
async def refresh_nickname_json():
    global nickname_json
    try:
        async with aiofiles.open(store.get_plugin_data_file('nickname.json'), 'r', encoding='utf-8') as f:
            nickname_json = json.loads(await f.read())
    except Exception:
        logger.error('刷新 nickname_json 表错误:无法载入 nickname.json 文件')

func get_prompt()

说明: 获取系统提示词

源代码在GitHub上查看
python
def get_prompt():
    prompts = config.marshoai_additional_prompt
    if config.marshoai_enable_praises:
        praises_prompt = build_praises()
        prompts += praises_prompt
    if config.marshoai_enable_time_prompt:
        prompts += _time_prompt.format(date_time=(current_time := DateTime.now()).strftime('%Y年%m月%d日 %H:%M:%S'), weekday_name=_weekdays[current_time.weekday()], lunar_date=current_time.chinesize.date_hanzify('农历{干支年}{生肖}{月份}{数序日}'))
    marsho_prompt = config.marshoai_prompt
    spell = SystemMessage(content=marsho_prompt + prompts).as_dict()
    return spell

func suggest_solution(errinfo: str) -> str

源代码在GitHub上查看
python
def suggest_solution(errinfo: str) -> str:
    suggestions = {'content_filter': '消息已被内容过滤器过滤。请调整聊天内容后重试。', 'RateLimitReached': '模型达到调用速率限制。请稍等一段时间或联系Bot管理员。', 'tokens_limit_reached': '请求token达到上限。请重置上下文。', 'content_length_limit': '请求体过大。请重置上下文。', 'unauthorized': '访问token无效。请联系Bot管理员。', 'invalid type: parameter messages.content is of type array but should be of type string.': '聊天请求体包含此模型不支持的数据类型。请重置上下文。', 'At most 1 image(s) may be provided in one request.': '此模型只能在上下文中包含1张图片。如果此前的聊天已经发送过图片,请重置上下文。'}
    for key, suggestion in suggestions.items():
        if key in errinfo:
            return f'\\n{suggestion}'
    return ''

async func get_backup_context(target_id: str, target_private: bool) -> list

说明: 获取历史上下文

源代码在GitHub上查看
python
async def get_backup_context(target_id: str, target_private: bool) -> list:
    global loaded_target_list
    if target_private:
        target_uid = f'private_{target_id}'
    else:
        target_uid = f'group_{target_id}'
    if target_uid not in loaded_target_list:
        loaded_target_list.append(target_uid)
        return await load_context_from_json(f'back_up_context_{target_uid}', 'contexts/backup')
    return []

func extract_content_and_think(message: ChatCompletionMessage) -> tuple[str, str | None, ChatCompletionMessage]

说明: 处理 API 返回的消息对象,提取其中的内容和思维链,并返回处理后的消息,思维链,消息对象。

参数:

源代码在GitHub上查看
python
def extract_content_and_think(message: ChatCompletionMessage) -> tuple[str, str | None, ChatCompletionMessage]:
    try:
        thinking = message.reasoning_content
    except AttributeError:
        thinking = None
    if thinking:
        delattr(message, 'reasoning_content')
    else:
        think_blocks = re.findall('<think>(.*?)</think>', message.content or '', flags=re.DOTALL)
        thinking = '\\n'.join([block.strip() for block in think_blocks if block.strip()])
    content = re.sub('<think>.*?</think>', '', message.content or '', flags=re.DOTALL).strip()
    message.content = content
    return (content, thinking, message)

var latex_convert


@get_driver().on_bot_connect

async func load_latex_convert()

源代码在GitHub上查看
python
@get_driver().on_bot_connect
async def load_latex_convert():
    await latex_convert.load_channel(None)

async func get_uuid_back2codeblock(msg: str, code_blank_uuid_map: list[tuple[str, str]])

源代码在GitHub上查看
python
async def get_uuid_back2codeblock(msg: str, code_blank_uuid_map: list[tuple[str, str]]):
    for torep, rep in code_blank_uuid_map:
        msg = msg.replace(torep, rep)
    return msg

async func parse_richtext(msg: str) -> UniMessage

说明: 人工智能给出的回答一般不会包含 HTML 嵌入其中,但是包含图片或者 LaTeX 公式、代码块,都很正常。 这个函数会把这些都以图片形式嵌入消息体。

源代码在GitHub上查看
python
async def parse_richtext(msg: str) -> UniMessage:
    if not IMG_LATEX_PATTERN.search(msg):
        return UniMessage(msg)
    result_msg = UniMessage()
    code_blank_uuid_map = [(uuid.uuid4().hex, cbp.group()) for cbp in CODE_BLOCK_PATTERN.finditer(msg)]
    last_tag_index = 0
    for rep, torep in code_blank_uuid_map:
        msg = msg.replace(torep, rep)
    for each_find_tag in IMG_LATEX_PATTERN.finditer(msg):
        tag_found = await get_uuid_back2codeblock(each_find_tag.group(), code_blank_uuid_map)
        result_msg.append(TextMsg(await get_uuid_back2codeblock(msg[last_tag_index:msg.find(tag_found)], code_blank_uuid_map)))
        last_tag_index = msg.find(tag_found) + len(tag_found)
        if each_find_tag.group(1):
            image_description = tag_found[2:tag_found.find(']')]
            image_url = tag_found[tag_found.find('(') + 1:-1]
            if (image_ := (await get_image_raw_and_type(image_url))):
                result_msg.append(ImageMsg(raw=image_[0], mimetype=image_[1], name=image_description + '.png'))
                result_msg.append(TextMsg('({})'.format(image_description)))
            else:
                result_msg.append(TextMsg(tag_found))
        elif each_find_tag.group(2):
            latex_exp = await get_uuid_back2codeblock(each_find_tag.group().replace('$', '').replace('\\\\(', '').replace('\\\\)', '').replace('\\\\[', '').replace('\\\\]', ''), code_blank_uuid_map)
            latex_generate_ok, latex_generate_result = await latex_convert.generate_png(latex_exp, dpi=300, foreground_colour=config.marshoai_main_colour)
            if latex_generate_ok:
                result_msg.append(ImageMsg(raw=latex_generate_result, mimetype='image/png', name='latex.png'))
            else:
                result_msg.append(TextMsg(latex_exp + '(公式解析失败)'))
                if isinstance(latex_generate_result, str):
                    result_msg.append(TextMsg(latex_generate_result))
                else:
                    result_msg.append(ImageMsg(raw=latex_generate_result, mimetype='image/png', name='latex_error.png'))
        else:
            result_msg.append(TextMsg(tag_found + '(未知内容解析失败)'))
    result_msg.append(TextMsg(await get_uuid_back2codeblock(msg[last_tag_index:], code_blank_uuid_map)))
    return result_msg

var thinking

`,90)]))}const y=i(h,[["render",k]]);export{g as __pageData,y as default};