nonebot2/nonebot/adapters/_base.py

552 lines
14 KiB
Python
Raw Normal View History

2021-02-19 15:24:52 +08:00
"""
协议适配基类
============
各协议请继承以下基类并使用 ``driver.register_adapter`` 注册适配器
"""
import abc
2021-03-31 16:51:09 +08:00
import asyncio
2021-06-14 19:52:35 +08:00
from copy import deepcopy
from functools import partial
2021-08-27 14:46:15 +08:00
from typing_extensions import Protocol
2021-08-27 15:08:26 +08:00
from dataclasses import dataclass, field, asdict
2021-08-27 14:46:15 +08:00
from typing import (Any, Set, List, Dict, Type, Tuple, Union, TypeVar, Mapping,
Generic, Optional, Iterable)
2021-02-19 15:24:52 +08:00
from pydantic import BaseModel
from nonebot.log import logger
2021-08-27 14:46:15 +08:00
from nonebot.config import Config
from nonebot.utils import DataclassEncoder
2021-08-27 14:46:15 +08:00
from nonebot.typing import T_CallingAPIHook, T_CalledAPIHook
from nonebot.drivers import Driver, HTTPConnection, HTTPResponse
from ._formatter import MessageFormatter
2021-02-19 15:24:52 +08:00
2021-03-31 16:51:09 +08:00
class _ApiCall(Protocol):
2021-06-14 19:52:35 +08:00
async def __call__(self, **kwargs: Any) -> Any:
2021-03-31 16:51:09 +08:00
...
2021-02-19 15:24:52 +08:00
class Bot(abc.ABC):
"""
Bot 基类用于处理上报消息并提供 API 调用接口
"""
2021-06-14 19:52:35 +08:00
driver: Driver
2021-02-19 15:24:52 +08:00
"""Driver 对象"""
2021-06-14 19:52:35 +08:00
config: Config
2021-02-19 15:24:52 +08:00
"""Config 配置对象"""
2021-04-01 20:23:55 +08:00
_calling_api_hook: Set[T_CallingAPIHook] = set()
2021-03-31 16:51:09 +08:00
"""
:类型: ``Set[T_CallingAPIHook]``
:说明: call_api 时执行的函数
"""
2021-04-01 20:23:55 +08:00
_called_api_hook: Set[T_CalledAPIHook] = set()
"""
:类型: ``Set[T_CalledAPIHook]``
:说明: call_api 后执行的函数
"""
2021-02-19 15:24:52 +08:00
2021-06-10 21:52:20 +08:00
def __init__(self, self_id: str, request: HTTPConnection):
2021-02-19 15:24:52 +08:00
"""
:参数:
* ``self_id: str``: 机器人 ID
2021-06-14 19:52:35 +08:00
* ``request: HTTPConnection``: request 连接对象
2021-02-19 15:24:52 +08:00
"""
2021-06-10 21:52:20 +08:00
self.self_id: str = self_id
2021-02-19 15:24:52 +08:00
"""机器人 ID"""
2021-06-10 21:52:20 +08:00
self.request: HTTPConnection = request
"""连接信息"""
2021-02-19 15:24:52 +08:00
2021-03-31 16:51:09 +08:00
def __getattr__(self, name: str) -> _ApiCall:
2021-02-19 15:24:52 +08:00
return partial(self.call_api, name)
@property
@abc.abstractmethod
def type(self) -> str:
"""Adapter 类型"""
raise NotImplementedError
@classmethod
def register(cls, driver: Driver, config: Config, **kwargs):
2021-02-19 15:24:52 +08:00
"""
:说明:
``register`` 方法会在 ``driver.register_adapter`` 时被调用用于初始化相关配置
2021-02-19 15:24:52 +08:00
"""
cls.driver = driver
cls.config = config
@classmethod
@abc.abstractmethod
2021-06-10 21:52:20 +08:00
async def check_permission(
2021-06-14 19:52:35 +08:00
cls, driver: Driver, request: HTTPConnection
2021-06-10 21:52:20 +08:00
) -> Tuple[Optional[str], Optional[HTTPResponse]]:
2021-02-19 15:24:52 +08:00
"""
:说明:
检查连接请求是否合法的函数如果合法则返回当前连接 ``唯一标识符``通常为机器人 ID如果不合法则抛出 ``RequestDenied`` 异常
:参数:
* ``driver: Driver``: Driver 对象
2021-06-14 19:52:35 +08:00
* ``request: HTTPConnection``: request 请求详情
2021-02-19 15:24:52 +08:00
:返回:
2021-06-14 19:52:35 +08:00
- ``Optional[str]``: 连接唯一标识符``None`` 代表连接不合法
- ``Optional[HTTPResponse]``: HTTP 上报响应
2021-02-19 15:24:52 +08:00
"""
raise NotImplementedError
@abc.abstractmethod
2021-06-10 21:52:20 +08:00
async def handle_message(self, message: bytes):
2021-02-19 15:24:52 +08:00
"""
:说明:
处理上报消息的函数转换为 ``Event`` 事件后调用 ``nonebot.message.handle_event`` 进一步处理事件
:参数:
2021-06-10 21:52:20 +08:00
* ``message: bytes``: 收到的上报消息
2021-02-19 15:24:52 +08:00
"""
raise NotImplementedError
@abc.abstractmethod
2021-03-31 16:51:09 +08:00
async def _call_api(self, api: str, **data) -> Any:
"""
:说明:
``adapter`` 实际调用 api 的逻辑实现函数实现该方法以调用 api
:参数:
* ``api: str``: API 名称
* ``**data``: API 数据
"""
raise NotImplementedError
async def call_api(self, api: str, **data: Any) -> Any:
2021-02-19 15:24:52 +08:00
"""
:说明:
调用机器人 API 接口可以通过该函数或直接通过 bot 属性进行调用
:参数:
* ``api: str``: API 名称
* ``**data``: API 数据
:示例:
.. code-block:: python
await bot.call_api("send_msg", message="hello world")
await bot.send_msg(message="hello world")
"""
2021-04-01 20:23:55 +08:00
coros = list(map(lambda x: x(self, api, data), self._calling_api_hook))
2021-03-31 16:51:09 +08:00
if coros:
try:
logger.debug("Running CallingAPI hooks...")
await asyncio.gather(*coros)
except Exception as e:
logger.opt(colors=True, exception=e).error(
"<r><bg #f8bbd0>Error when running CallingAPI hook. "
"Running cancelled!</bg #f8bbd0></r>")
2021-04-01 20:23:55 +08:00
exception = None
result = None
try:
2021-05-24 16:49:27 +08:00
result = await self._call_api(api, **data)
2021-04-01 20:23:55 +08:00
except Exception as e:
exception = e
coros = list(
map(lambda x: x(self, exception, api, data, result),
self._called_api_hook))
if coros:
try:
logger.debug("Running CalledAPI hooks...")
await asyncio.gather(*coros)
except Exception as e:
logger.opt(colors=True, exception=e).error(
"<r><bg #f8bbd0>Error when running CalledAPI hook. "
"Running cancelled!</bg #f8bbd0></r>")
2021-03-31 16:51:09 +08:00
2021-04-01 20:23:55 +08:00
if exception:
raise exception
return result
2021-02-19 15:24:52 +08:00
@abc.abstractmethod
2021-03-24 00:28:37 +08:00
async def send(self, event: "Event", message: Union[str, "Message",
"MessageSegment"],
**kwargs) -> Any:
2021-02-19 15:24:52 +08:00
"""
:说明:
调用机器人基础发送消息接口
:参数:
* ``event: Event``: 上报事件
* ``message: Union[str, Message, MessageSegment]``: 要发送的消息
* ``**kwargs``
"""
raise NotImplementedError
2021-03-31 16:51:09 +08:00
@classmethod
def on_calling_api(cls, func: T_CallingAPIHook) -> T_CallingAPIHook:
2021-06-14 19:52:35 +08:00
"""
:说明:
调用 api 预处理
:参数:
* ``bot: Bot``: 当前 bot 对象
* ``api: str``: 调用的 api 名称
* ``data: Dict[str, Any]``: api 调用的参数字典
"""
2021-04-01 20:23:55 +08:00
cls._calling_api_hook.add(func)
return func
@classmethod
def on_called_api(cls, func: T_CalledAPIHook) -> T_CalledAPIHook:
2021-06-14 19:52:35 +08:00
"""
:说明:
调用 api 后处理
:参数:
* ``bot: Bot``: 当前 bot 对象
* ``exception: Optional[Exception]``: 调用 api 时发生的错误
* ``api: str``: 调用的 api 名称
* ``data: Dict[str, Any]``: api 调用的参数字典
* ``result: Any``: api 调用的返回
"""
2021-04-01 20:23:55 +08:00
cls._called_api_hook.add(func)
2021-03-31 16:51:09 +08:00
return func
2021-02-19 15:24:52 +08:00
2021-06-17 01:07:19 +08:00
T = TypeVar("T")
2021-06-18 01:23:13 +08:00
TMS = TypeVar("TMS", covariant=True)
2021-06-17 01:07:19 +08:00
TM = TypeVar("TM", bound="Message")
2021-02-19 15:24:52 +08:00
@dataclass
2021-06-17 01:07:19 +08:00
class MessageSegment(Mapping, abc.ABC, Generic[TM]):
2021-02-19 15:24:52 +08:00
"""消息段基类"""
type: str
"""
- 类型: ``str``
- 说明: 消息段类型
"""
data: Dict[str, Any] = field(default_factory=lambda: {})
"""
- 类型: ``Dict[str, Union[str, list]]``
- 说明: 消息段数据
"""
2021-06-14 19:52:35 +08:00
@classmethod
@abc.abstractmethod
2021-06-17 01:07:19 +08:00
def get_message_class(cls) -> Type[TM]:
2021-06-14 19:52:35 +08:00
raise NotImplementedError
2021-02-19 15:24:52 +08:00
@abc.abstractmethod
2021-03-02 14:35:02 +08:00
def __str__(self) -> str:
2021-02-19 15:24:52 +08:00
"""该消息段所代表的 str在命令匹配部分使用"""
raise NotImplementedError
2021-03-02 14:35:02 +08:00
def __len__(self) -> int:
return len(str(self))
2021-06-17 01:07:19 +08:00
def __ne__(self: T, other: T) -> bool:
2021-03-02 14:35:02 +08:00
return not self == other
2021-06-17 01:07:19 +08:00
def __add__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> TM:
2021-06-19 15:18:57 +08:00
return self.get_message_class()(self) + other # type: ignore
2021-02-19 15:24:52 +08:00
2021-06-17 01:07:19 +08:00
def __radd__(self, other: Union[str, Mapping, Iterable[Mapping]]) -> TM:
2021-06-19 15:18:57 +08:00
return self.get_message_class()(other) + self # type: ignore
2021-02-19 15:24:52 +08:00
2021-06-14 19:52:35 +08:00
def __getitem__(self, key: str):
return getattr(self, key)
2021-02-19 15:24:52 +08:00
2021-06-14 19:52:35 +08:00
def __setitem__(self, key: str, value: Any):
return setattr(self, key, value)
2021-02-19 15:24:52 +08:00
2021-03-02 14:35:02 +08:00
def __iter__(self):
yield from asdict(self).keys()
2021-03-02 14:35:02 +08:00
2021-06-14 19:52:35 +08:00
def __contains__(self, key: Any) -> bool:
return key in asdict(self).keys()
2021-03-02 14:35:02 +08:00
2021-06-14 19:52:35 +08:00
def get(self, key: str, default: Any = None):
2021-02-19 15:24:52 +08:00
return getattr(self, key, default)
2021-03-02 14:35:02 +08:00
def keys(self):
return asdict(self).keys()
2021-03-02 14:35:02 +08:00
def values(self):
return asdict(self).values()
2021-03-02 14:35:02 +08:00
def items(self):
return asdict(self).items()
2021-03-02 14:35:02 +08:00
2021-06-17 01:07:19 +08:00
def copy(self: T) -> T:
2021-06-14 19:52:35 +08:00
return deepcopy(self)
2021-02-19 15:24:52 +08:00
@abc.abstractmethod
def is_text(self) -> bool:
raise NotImplementedError
2021-06-17 01:07:19 +08:00
class Message(List[TMS], abc.ABC):
2021-02-19 15:24:52 +08:00
"""消息数组"""
2021-06-17 01:07:19 +08:00
def __init__(self: TM,
message: Union[str, None, Mapping, Iterable[Mapping], TMS, TM,
Any] = None,
2021-02-19 15:24:52 +08:00
*args,
**kwargs):
"""
:参数:
* ``message: Union[str, list, dict, MessageSegment, Message, Any]``: 消息内容
"""
super().__init__(*args, **kwargs)
if message is None:
return
elif isinstance(message, Message):
self.extend(message)
elif isinstance(message, MessageSegment):
self.append(message)
else:
self.extend(self._construct(message))
@classmethod
2021-08-27 15:08:26 +08:00
def template(cls: Type[TM], format_string: str) -> MessageFormatter[TM]:
return MessageFormatter(cls, format_string)
2021-06-14 19:52:35 +08:00
@classmethod
@abc.abstractmethod
2021-06-17 01:07:19 +08:00
def get_segment_class(cls) -> Type[TMS]:
2021-06-14 19:52:35 +08:00
raise NotImplementedError
2021-02-19 15:24:52 +08:00
def __str__(self):
2021-06-14 19:52:35 +08:00
return "".join(str(seg) for seg in self)
2021-02-19 15:24:52 +08:00
@classmethod
def __get_validators__(cls):
yield cls._validate
@classmethod
def _validate(cls, value):
return cls(value)
@staticmethod
@abc.abstractmethod
def _construct(
2021-06-17 01:07:19 +08:00
msg: Union[str, Mapping, Iterable[Mapping], Any]) -> Iterable[TMS]:
2021-02-19 15:24:52 +08:00
raise NotImplementedError
2021-06-17 01:07:19 +08:00
def __add__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM:
2021-06-14 19:52:35 +08:00
result = self.copy()
result += other
2021-02-19 15:24:52 +08:00
return result
2021-06-17 01:07:19 +08:00
def __radd__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM:
2021-06-19 15:18:57 +08:00
result = self.__class__(other) # type: ignore
2021-06-14 19:52:35 +08:00
return result + self
2021-02-19 15:24:52 +08:00
2021-06-17 01:07:19 +08:00
def __iadd__(self: TM, other: Union[str, Mapping, Iterable[Mapping]]) -> TM:
2021-06-14 19:52:35 +08:00
if isinstance(other, MessageSegment):
2021-02-19 15:24:52 +08:00
self.append(other)
elif isinstance(other, Message):
self.extend(other)
2021-06-14 19:52:35 +08:00
else:
self.extend(self._construct(other))
2021-02-19 15:24:52 +08:00
return self
2021-06-17 01:07:19 +08:00
def append(self: TM, obj: Union[str, TMS]) -> TM:
2021-02-19 15:24:52 +08:00
"""
:说明:
添加一个消息段到消息数组末尾
:参数:
* ``obj: Union[str, MessageSegment]``: 要添加的消息段
"""
if isinstance(obj, MessageSegment):
2021-06-14 19:52:35 +08:00
super(Message, self).append(obj)
2021-02-19 15:24:52 +08:00
elif isinstance(obj, str):
self.extend(self._construct(obj))
else:
raise ValueError(f"Unexpected type: {type(obj)} {obj}")
return self
2021-06-17 01:07:19 +08:00
def extend(self: TM, obj: Union[TM, Iterable[TMS]]) -> TM:
2021-02-19 15:24:52 +08:00
"""
:说明:
拼接一个消息数组或多个消息段到消息数组末尾
:参数:
* ``obj: Union[Message, Iterable[MessageSegment]]``: 要添加的消息数组
"""
for segment in obj:
self.append(segment)
return self
2021-06-17 01:07:19 +08:00
def copy(self: TM) -> TM:
2021-06-14 19:52:35 +08:00
return deepcopy(self)
2021-02-19 15:24:52 +08:00
2021-06-17 01:07:19 +08:00
def extract_plain_text(self: "Message[MessageSegment]") -> str:
2021-02-19 15:24:52 +08:00
"""
:说明:
提取消息内纯文本消息
"""
2021-06-14 19:52:35 +08:00
return "".join(str(seg) for seg in self if seg.is_text())
2021-02-19 15:24:52 +08:00
class Event(abc.ABC, BaseModel):
"""Event 基类。提供获取关键信息的方法,其余信息可直接获取。"""
class Config:
extra = "allow"
json_encoders = {Message: DataclassEncoder}
@abc.abstractmethod
def get_type(self) -> str:
2021-02-19 15:24:52 +08:00
"""
:说明:
获取事件类型的方法类型通常为 NoneBot 内置的四种类型
:返回:
* ``Literal["message", "notice", "request", "meta_event"]``
* 其他自定义 ``str``
2021-02-19 15:24:52 +08:00
"""
raise NotImplementedError
@abc.abstractmethod
def get_event_name(self) -> str:
"""
:说明:
获取事件名称的方法
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_event_description(self) -> str:
"""
:说明:
获取事件描述的方法通常为事件具体内容
:返回:
* ``str``
"""
raise NotImplementedError
def __str__(self) -> str:
return f"[{self.get_event_name()}]: {self.get_event_description()}"
def get_log_string(self) -> str:
"""
:说明:
获取事件日志信息的方法通常你不需要修改这个方法只有当希望 NoneBot 隐藏该事件日志时可以抛出 ``NoLogException`` 异常
:返回:
* ``str``
:异常:
- ``NoLogException``
"""
return f"[{self.get_event_name()}]: {self.get_event_description()}"
@abc.abstractmethod
def get_user_id(self) -> str:
"""
:说明:
获取事件主体 id 的方法通常是用户 id
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_session_id(self) -> str:
"""
:说明:
获取会话 id 的方法用于判断当前事件属于哪一个会话通常是用户 id群组 id 组合
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_message(self) -> Message:
2021-02-19 15:24:52 +08:00
"""
:说明:
获取事件消息内容的方法
:返回:
* ``Message``
"""
raise NotImplementedError
def get_plaintext(self) -> str:
"""
:说明:
获取消息纯文本的方法通常不需要修改默认通过 ``get_message().extract_plain_text`` 获取
:返回:
* ``str``
"""
return self.get_message().extract_plain_text()
@abc.abstractmethod
def is_tome(self) -> bool:
"""
:说明:
获取事件是否与机器人有关的方法
:返回:
* ``bool``
"""
raise NotImplementedError