mirror of
synced 2025-03-11 07:49:34 +08:00
460 lines
12 KiB
460 lines
12 KiB
各协议请继承以下基类,并使用 ``driver.register_adapter`` 注册适配器
import abc
from functools import reduce, partial
from dataclasses import dataclass, field
from typing import Any, Dict, Union, TypeVar, Optional, Callable, Iterable, Awaitable, Generic, TYPE_CHECKING
from pydantic import BaseModel
from nonebot.config import Config
from nonebot.drivers import Driver, WebSocket
class Bot(abc.ABC):
Bot 基类。用于处理上报消息,并提供 API 调用接口。
def __init__(self,
driver: "Driver",
connection_type: str,
config: Config,
self_id: str,
websocket: Optional["WebSocket"] = None):
* ``driver: Driver``: Driver 对象
* ``connection_type: str``: http 或者 websocket
* ``config: Config``: Config 对象
* ``self_id: str``: 机器人 ID
* ``websocket: Optional[WebSocket]``: Websocket 连接对象
self.driver = driver
"""Driver 对象"""
self.connection_type = connection_type
self.config = config
"""Config 配置对象"""
self.self_id = self_id
"""机器人 ID"""
self.websocket = websocket
"""Websocket 连接对象"""
def __getattr__(self, name: str) -> Callable[..., Awaitable[Any]]:
return partial(self.call_api, name)
def type(self) -> str:
"""Adapter 类型"""
raise NotImplementedError
async def check_permission(cls, driver: "Driver", connection_type: str,
headers: dict, body: Optional[dict]) -> str:
检查连接请求是否合法的函数,如果合法则返回当前连接 ``唯一标识符``,通常为机器人 ID;如果不合法则抛出 ``RequestDenied`` 异常。
* ``driver: Driver``: Driver 对象
* ``connection_type: str``: 连接类型
* ``headers: dict``: 请求头
* ``body: Optional[dict]``: 请求数据,WebSocket 连接该部分为空
- ``str``: 连接唯一标识符
- ``RequestDenied``: 请求非法
raise NotImplementedError
async def handle_message(self, message: dict):
处理上报消息的函数,转换为 ``Event`` 事件后调用 ``nonebot.message.handle_event`` 进一步处理事件。
* ``message: dict``: 收到的上报消息
raise NotImplementedError
async def call_api(self, api: str, **data):
调用机器人 API 接口,可以通过该函数或直接通过 bot 属性进行调用
* ``api: str``: API 名称
* ``**data``: API 数据
.. code-block:: python
await bot.call_api("send_msg", message="hello world"})
await bot.send_msg(message="hello world")
raise NotImplementedError
async def send(self, event: "BaseEvent",
message: Union[str, "BaseMessage",
"BaseMessageSegment"], **kwargs):
* ``event: Event``: 上报事件
* ``message: Union[str, Message, MessageSegment]``: 要发送的消息
* ``**kwargs``
raise NotImplementedError
T = TypeVar("T", bound=BaseModel)
class Event(abc.ABC, Generic[T]):
Event 基类。提供上报信息的关键信息,其余信息可从原始上报消息获取。
def __init__(self, raw_event: Union[dict, T]):
* ``raw_event: Union[dict, T]``: 原始上报消息
self._raw_event = raw_event
def __repr__(self) -> str:
return f"<Event {self.self_id}: {self.name} {self.time}>"
def raw_event(self) -> Union[dict, T]:
return self._raw_event
def id(self) -> int:
"""事件 ID"""
raise NotImplementedError
def name(self) -> str:
raise NotImplementedError
def self_id(self) -> str:
"""机器人 ID"""
raise NotImplementedError
def time(self) -> int:
raise NotImplementedError
def type(self) -> str:
raise NotImplementedError
def type(self, value) -> None:
raise NotImplementedError
def detail_type(self) -> str:
raise NotImplementedError
def detail_type(self, value) -> None:
raise NotImplementedError
def sub_type(self) -> Optional[str]:
raise NotImplementedError
def sub_type(self, value) -> None:
raise NotImplementedError
def user_id(self) -> Optional[int]:
"""触发事件的主体 ID"""
raise NotImplementedError
def user_id(self, value) -> None:
raise NotImplementedError
def group_id(self) -> Optional[int]:
"""触发事件的主体群 ID"""
raise NotImplementedError
def group_id(self, value) -> None:
raise NotImplementedError
def to_me(self) -> Optional[bool]:
raise NotImplementedError
def to_me(self, value) -> None:
raise NotImplementedError
def message(self) -> Optional["BaseMessage"]:
raise NotImplementedError
def message(self, value) -> None:
raise NotImplementedError
def reply(self) -> Optional[dict]:
raise NotImplementedError
def reply(self, value) -> None:
raise NotImplementedError
def raw_message(self) -> Optional[str]:
raise NotImplementedError
def raw_message(self, value) -> None:
raise NotImplementedError
def plain_text(self) -> Optional[str]:
raise NotImplementedError
def sender(self) -> Optional[dict]:
raise NotImplementedError
def sender(self, value) -> None:
raise NotImplementedError
class MessageSegment(abc.ABC):
type: str
- 类型: ``str``
- 说明: 消息段类型
data: Dict[str, Any] = field(default_factory=lambda: {})
- 类型: ``Dict[str, Union[str, list]]``
- 说明: 消息段数据
def __str__(self):
"""该消息段所代表的 str,在命令匹配部分使用"""
raise NotImplementedError
def __add__(self, other):
if isinstance(other, str):
elif isinstance(other, MessageSegment):
注意:不能返回 self,需要返回一个新生成的对象
raise NotImplementedError
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
return setattr(self, key, value)
def get(self, key, default=None):
return getattr(self, key, default)
def text(cls, text: str) -> "BaseMessageSegment":
return cls("text", {"text": text})
class Message(list, abc.ABC):
def __init__(self,
message: Union[str, dict, list, BaseModel, BaseMessageSegment,
"BaseMessage"] = None,
* ``message: Union[str, dict, list, BaseModel, MessageSegment, Message]``: 消息内容
super().__init__(*args, **kwargs)
if isinstance(message, (str, dict, list, BaseModel)):
elif isinstance(message, BaseMessage):
elif isinstance(message, BaseMessageSegment):
def __str__(self):
return ''.join((str(seg) for seg in self))
def _construct(
msg: Union[str, dict, list,
BaseModel]) -> Iterable[BaseMessageSegment]:
raise NotImplementedError
def __add__(
self, other: Union[str, BaseMessageSegment,
"BaseMessage"]) -> "BaseMessage":
result = self.__class__(self)
if isinstance(other, str):
elif isinstance(other, BaseMessageSegment):
elif isinstance(other, BaseMessage):
return result
def __radd__(self, other: Union[str, BaseMessageSegment, "BaseMessage"]):
result = self.__class__(other)
return result.__add__(self)
def append(self, obj: Union[str, BaseMessageSegment]) -> "BaseMessage":
* ``obj: Union[str, MessageSegment]``: 要添加的消息段
if isinstance(obj, BaseMessageSegment):
elif isinstance(obj, str):
raise ValueError(f"Unexpected type: {type(obj)} {obj}")
return self
def extend(
self, obj: Union["BaseMessage",
Iterable[BaseMessageSegment]]) -> "BaseMessage":
* ``obj: Union[Message, Iterable[MessageSegment]]``: 要添加的消息数组
for segment in obj:
return self
def reduce(self) -> None:
缩减消息数组,即按 MessageSegment 的实现拼接相邻消息段
index = 0
while index < len(self):
if index > 0 and self[
index - 1].type == "text" and self[index].type == "text":
self[index - 1] += self[index]
del self[index]
index += 1
def extract_plain_text(self) -> str:
def _concat(x: str, y: BaseMessageSegment) -> str:
return f"{x} {y}" if y.type == "text" else x
plain_text = reduce(_concat, self, "")
return plain_text[1:] if plain_text else plain_text