🔀 Merge pull request #224

New: Import hook
Fix: literal event type
This commit is contained in:
Ju4tCode 2021-02-21 11:09:52 +08:00 committed by GitHub
commit 7d3b397ff4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1038 additions and 730 deletions

View File

@ -10,6 +10,12 @@ jobs:
steps:
- uses: actions/checkout@v2
if: github.event_name == 'push'
- uses: actions/checkout@v2
if: github.event_name != 'push'
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Get yarn cache directory path
id: yarn-cache-dir-path
@ -46,7 +52,7 @@ jobs:
publish-dir: './docs/.vuepress/dist'
production-branch: master
github-token: ${{ secrets.GITHUB_TOKEN }}
deploy-message: 'Deploy ${{ env.BRANCH_NAME }}@${{ github.sha }}'
deploy-message: 'Deploy ${{ env.DEPLOY_NAME }}@${{ github.sha }}'
enable-commit-comment: false
alias: ${{ env.DEPLOY_NAME }}
env:

View File

@ -17,6 +17,16 @@ sidebarDepth: 0
Bot 基类。用于处理上报消息,并提供 API 调用接口。
### `driver`
Driver 对象
### `config`
Config 配置对象
### _abstract_ `__init__(connection_type, self_id, *, websocket=None)`
@ -33,34 +43,35 @@ Bot 基类。用于处理上报消息,并提供 API 调用接口。
### _abstract async_ `call_api(api, **data)`
### `connection_type`
连接类型
### `self_id`
机器人 ID
### `websocket`
Websocket 连接对象
### _abstract property_ `type`
Adapter 类型
### _classmethod_ `register(driver, config)`
* **说明**
调用机器人 API 接口,可以通过该函数或直接通过 bot 属性进行调用
register 方法会在 driver.register_adapter 时被调用,用于初始化相关配置
* **参数**
* `api: str`: API 名称
* `**data`: API 数据
* **示例**
```python
await bot.call_api("send_msg", message="hello world")
await bot.send_msg(message="hello world")
```
### _abstract async classmethod_ `check_permission(driver, connection_type, headers, body)`
@ -116,15 +127,34 @@ await bot.send_msg(message="hello world")
### _classmethod_ `register(driver, config)`
### _abstract async_ `call_api(api, **data)`
* **说明**
register 方法会在 driver.register_adapter 时被调用,用于初始化相关配置
调用机器人 API 接口,可以通过该函数或直接通过 bot 属性进行调用
* **参数**
* `api: str`: API 名称
* `**data`: API 数据
* **示例**
```python
await bot.call_api("send_msg", message="hello world")
await bot.send_msg(message="hello world")
```
### _abstract async_ `send(event, message, **kwargs)`
@ -147,167 +177,29 @@ await bot.send_msg(message="hello world")
### _abstract property_ `type`
## _class_ `MessageSegment`
Adapter 类型
基类:`abc.ABC`
消息段基类
## _class_ `Event`
基类:`abc.ABC`, `pydantic.main.BaseModel`
### `type`
Event 基类。提供获取关键信息的方法,其余信息可直接获取。
* 类型: `str`
### _abstract_ `get_event_description()`
* 说明: 消息段类型
* **说明**
获取事件描述的方法,通常为事件具体内容。
### `data`
* 类型: `Dict[str, Union[str, list]]`
* **返回**
* `str`
### _abstract_ `get_event_name()`
* **说明**
获取事件名称的方法。
* **返回**
* `str`
### `get_log_string()`
* **说明**
获取事件日志信息的方法,通常你不需要修改这个方法,只有当希望 NoneBot 隐藏该事件日志时,可以抛出 `NoLogException` 异常。
* **返回**
* `str`
* **异常**
* `NoLogException`
### _abstract_ `get_message()`
* **说明**
获取事件消息内容的方法。
* **返回**
* `Message`
### `get_plaintext()`
* **说明**
获取消息纯文本的方法,通常不需要修改,默认通过 `get_message().extract_plain_text` 获取。
* **返回**
* `str`
### _abstract_ `get_session_id()`
* **说明**
获取会话 id 的方法,用于判断当前事件属于哪一个会话,通常是用户 id、群组 id 组合。
* **返回**
* `str`
### _abstract_ `get_type()`
* **说明**
获取事件类型的方法,类型通常为 NoneBot 内置的四种类型。
* **返回**
* `Literal["message", "notice", "request", "meta_event"]`
### _abstract_ `get_user_id()`
* **说明**
获取事件主体 id 的方法,通常是用户 id 。
* **返回**
* `str`
### _abstract_ `is_tome()`
* **说明**
获取事件是否与机器人有关的方法。
* **返回**
* `bool`
* 说明: 消息段数据
## _class_ `Message`
@ -359,15 +251,6 @@ Event 基类。提供获取关键信息的方法,其余信息可直接获取
### `extract_plain_text()`
* **说明**
提取消息内纯文本消息
### `reduce()`
@ -377,8 +260,170 @@ Event 基类。提供获取关键信息的方法,其余信息可直接获取
## _class_ `MessageSegment`
### `extract_plain_text()`
基类:`abc.ABC`
消息段基类
* **说明**
提取消息内纯文本消息
## _class_ `Event`
基类:`abc.ABC`, `pydantic.main.BaseModel`
Event 基类。提供获取关键信息的方法,其余信息可直接获取。
### _abstract_ `get_type()`
* **说明**
获取事件类型的方法,类型通常为 NoneBot 内置的四种类型。
* **返回**
* `Literal["message", "notice", "request", "meta_event"]`
* `str`
### _abstract_ `get_event_name()`
* **说明**
获取事件名称的方法。
* **返回**
* `str`
### _abstract_ `get_event_description()`
* **说明**
获取事件描述的方法,通常为事件具体内容。
* **返回**
* `str`
### `get_log_string()`
* **说明**
获取事件日志信息的方法,通常你不需要修改这个方法,只有当希望 NoneBot 隐藏该事件日志时,可以抛出 `NoLogException` 异常。
* **返回**
* `str`
* **异常**
* `NoLogException`
### _abstract_ `get_user_id()`
* **说明**
获取事件主体 id 的方法,通常是用户 id 。
* **返回**
* `str`
### _abstract_ `get_session_id()`
* **说明**
获取会话 id 的方法,用于判断当前事件属于哪一个会话,通常是用户 id、群组 id 组合。
* **返回**
* `str`
### _abstract_ `get_message()`
* **说明**
获取事件消息内容的方法。
* **返回**
* `Message`
### `get_plaintext()`
* **说明**
获取消息纯文本的方法,通常不需要修改,默认通过 `get_message().extract_plain_text` 获取。
* **返回**
* `str`
### _abstract_ `is_tome()`
* **说明**
获取事件是否与机器人有关的方法。
* **返回**
* `bool`

View File

@ -193,7 +193,7 @@ CQHTTP 配置类
## _class_ `Bot`
基类:[`nonebot.adapters.Bot`](README.md#nonebot.adapters.Bot)
基类:[`nonebot.adapters._base.Bot`](README.md#nonebot.adapters._base.Bot)
CQHTTP 协议 Bot 适配。继承属性参考 [BaseBot](./#class-basebot) 。
@ -307,14 +307,14 @@ CQHTTP 协议 Bot 适配。继承属性参考 [BaseBot](./#class-basebot) 。
## _class_ `MessageSegment`
基类:[`nonebot.adapters.MessageSegment`](README.md#nonebot.adapters.MessageSegment)
基类:[`nonebot.adapters._base.MessageSegment`](README.md#nonebot.adapters._base.MessageSegment)
CQHTTP 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。
## _class_ `Message`
基类:[`nonebot.adapters.Message`](README.md#nonebot.adapters.Message)
基类:[`nonebot.adapters._base.Message`](README.md#nonebot.adapters._base.Message)
CQHTTP 协议 Message 适配。
@ -377,7 +377,7 @@ CQHTTP 协议 Message 适配。
## _class_ `Event`
基类:[`nonebot.adapters.Event`](README.md#nonebot.adapters.Event)
基类:[`nonebot.adapters._base.Event`](README.md#nonebot.adapters._base.Event)
CQHTTP 协议事件,字段与 CQHTTP 一致。各事件字段参考 [CQHTTP 文档](https://github.com/howmanybots/onebot/blob/master/README.md)

View File

@ -94,7 +94,7 @@ sidebarDepth: 0
## _class_ `Bot`
基类:[`nonebot.adapters.Bot`](README.md#nonebot.adapters.Bot)
基类:[`nonebot.adapters._base.Bot`](README.md#nonebot.adapters._base.Bot)
钉钉 协议 Bot 适配。继承属性参考 [BaseBot](./#class-basebot) 。
@ -199,7 +199,7 @@ sidebarDepth: 0
## _class_ `MessageSegment`
基类:[`nonebot.adapters.MessageSegment`](README.md#nonebot.adapters.MessageSegment)
基类:[`nonebot.adapters._base.MessageSegment`](README.md#nonebot.adapters._base.MessageSegment)
钉钉 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。
@ -283,7 +283,7 @@ message += MessageSegment.atDingtalkIds(event.senderId)
## _class_ `Message`
基类:[`nonebot.adapters.Message`](README.md#nonebot.adapters.Message)
基类:[`nonebot.adapters._base.Message`](README.md#nonebot.adapters._base.Message)
钉钉 协议 Message 适配。
@ -292,7 +292,7 @@ message += MessageSegment.atDingtalkIds(event.senderId)
## _class_ `Event`
基类:[`nonebot.adapters.Event`](README.md#nonebot.adapters.Event)
基类:[`nonebot.adapters._base.Event`](README.md#nonebot.adapters._base.Event)
钉钉协议事件。各事件字段参考 [钉钉文档](https://ding-doc.dingtalk.com/document#/org-dev-guide/elzz1p)

View File

@ -117,7 +117,7 @@ Bot会话管理器, 提供API主动调用接口
## _class_ `Bot`
基类:[`nonebot.adapters.Bot`](README.md#nonebot.adapters.Bot)
基类:[`nonebot.adapters._base.Bot`](README.md#nonebot.adapters._base.Bot)
mirai-api-http 协议 Bot 适配。
@ -722,7 +722,7 @@ mirai-api-http 正向 Websocket 协议 Bot 适配。
## _class_ `MessageSegment`
基类:[`nonebot.adapters.MessageSegment`](README.md#nonebot.adapters.MessageSegment)
基类:[`nonebot.adapters._base.MessageSegment`](README.md#nonebot.adapters._base.MessageSegment)
CQHTTP 协议 MessageSegment 适配。具体方法参考 [mirai-api-http 消息类型](https://github.com/project-mirai/mirai-api-http/blob/master/docs/MessageType.md)
@ -963,7 +963,7 @@ CQHTTP 协议 MessageSegment 适配。具体方法参考 [mirai-api-http 消息
## _class_ `MessageChain`
基类:[`nonebot.adapters.Message`](README.md#nonebot.adapters.Message)
基类:[`nonebot.adapters._base.Message`](README.md#nonebot.adapters._base.Message)
Mirai 协议 Message 适配
@ -1060,7 +1060,7 @@ Mirai 协议 Message 适配
## _class_ `Event`
基类:[`nonebot.adapters.Event`](README.md#nonebot.adapters.Event)
基类:[`nonebot.adapters._base.Event`](README.md#nonebot.adapters._base.Event)
mirai-api-http 协议事件,字段与 mirai-api-http 一致。各事件字段参考 [mirai-api-http 事件类型](https://github.com/project-mirai/mirai-api-http/blob/master/docs/EventType.md)
@ -1486,7 +1486,7 @@ Bot被邀请入群申请
## _class_ `Event`
基类:[`nonebot.adapters.Event`](README.md#nonebot.adapters.Event)
基类:[`nonebot.adapters._base.Event`](README.md#nonebot.adapters._base.Event)
mirai-api-http 协议事件,字段与 mirai-api-http 一致。各事件字段参考 [mirai-api-http 事件类型](https://github.com/project-mirai/mirai-api-http/blob/master/docs/EventType.md)

View File

@ -52,6 +52,9 @@ sidebarDepth: 0
* `load_plugins` => `nonebot.plugin.load_plugins`
* `load_all_plugins` => `nonebot.plugin.load_all_plugins`
* `load_builtin_plugins` => `nonebot.plugin.load_builtin_plugins`

View File

@ -1267,7 +1267,7 @@ def something_else():
* **说明**
使用 `importlib` 加载单个插件,可以是本地插件或是通过 `pip` 安装的插件。
使用 `PluginManager` 加载单个插件,可以是本地插件或是通过 `pip` 安装的插件。
@ -1308,6 +1308,32 @@ def something_else():
## `load_all_plugins(module_path, plugin_dir)`
* **说明**
导入指定列表中的插件以及指定目录下多个插件,以 `_` 开头的插件不会被导入!
* **参数**
* `module_path: Set[str]`: 指定插件集合
* `plugin_dir: Set[str]`: 指定插件路径集合
* **返回**
* `Set[Plugin]`
## `load_builtin_plugins(name='echo')`

View File

@ -6,7 +6,7 @@ sidebarDepth: 0
NoneBot.adapters 模块
=====================
.. automodule:: nonebot.adapters
.. automodule:: nonebot.adapters._base
:members:
:private-members:
:special-members: __init__

View File

@ -18,6 +18,7 @@
- ``Matchergroup`` => ``nonebot.plugin.MatcherGroup``
- ``load_plugin`` => ``nonebot.plugin.load_plugin``
- ``load_plugins`` => ``nonebot.plugin.load_plugins``
- ``load_all_plugins`` => ``nonebot.plugin.load_all_plugins``
- ``load_builtin_plugins`` => ``nonebot.plugin.load_builtin_plugins``
- ``get_plugin`` => ``nonebot.plugin.get_plugin``
- ``get_loaded_plugins`` => ``nonebot.plugin.get_loaded_plugins``
@ -221,5 +222,5 @@ def run(host: Optional[str] = None,
from nonebot.plugin import on_message, on_notice, on_request, on_metaevent, CommandGroup, MatcherGroup
from nonebot.plugin import on_startswith, on_endswith, on_keyword, on_command, on_shell_command, on_regex
from nonebot.plugin import load_plugin, load_plugins, load_builtin_plugins
from nonebot.plugin import load_plugin, load_plugins, load_all_plugins, load_builtin_plugins
from nonebot.plugin import export, require, get_plugin, get_loaded_plugins

View File

@ -1,24 +1,4 @@
"""
协议适配基类
============
各协议请继承以下基类并使用 ``driver.register_adapter`` 注册适配器
"""
import abc
from copy import copy
from typing_extensions import Literal
from functools import reduce, partial
from dataclasses import dataclass, field
from typing import Any, Dict, Union, TypeVar, Mapping, Optional, Callable, Iterable, Iterator, Awaitable, TYPE_CHECKING
from pydantic import BaseModel
from nonebot.utils import DataclassEncoder
if TYPE_CHECKING:
from nonebot.config import Config
from nonebot.drivers import Driver, WebSocket
from typing import Iterable
try:
import pkg_resources
@ -31,454 +11,4 @@ except ImportError:
except Exception:
pass
class Bot(abc.ABC):
"""
Bot 基类用于处理上报消息并提供 API 调用接口
"""
driver: "Driver"
"""Driver 对象"""
config: "Config"
"""Config 配置对象"""
@abc.abstractmethod
def __init__(self,
connection_type: str,
self_id: str,
*,
websocket: Optional["WebSocket"] = None):
"""
:参数:
* ``connection_type: str``: http 或者 websocket
* ``self_id: str``: 机器人 ID
* ``websocket: Optional[WebSocket]``: Websocket 连接对象
"""
self.connection_type = connection_type
"""连接类型"""
self.self_id = self_id
"""机器人 ID"""
self.websocket = websocket
"""Websocket 连接对象"""
def __getattr__(self, name: str) -> Callable[..., Awaitable[Any]]:
return partial(self.call_api, name)
@property
@abc.abstractmethod
def type(self) -> str:
"""Adapter 类型"""
raise NotImplementedError
@classmethod
def register(cls, driver: "Driver", config: "Config"):
"""
:说明:
`register` 方法会在 `driver.register_adapter` 时被调用用于初始化相关配置
"""
cls.driver = driver
cls.config = config
@classmethod
@abc.abstractmethod
async def check_permission(cls, driver: "Driver", connection_type: str,
headers: dict, body: Optional[dict]) -> str:
"""
:说明:
检查连接请求是否合法的函数如果合法则返回当前连接 ``唯一标识符``通常为机器人 ID如果不合法则抛出 ``RequestDenied`` 异常
:参数:
* ``driver: Driver``: Driver 对象
* ``connection_type: str``: 连接类型
* ``headers: dict``: 请求头
* ``body: Optional[dict]``: 请求数据WebSocket 连接该部分为空
:返回:
- ``str``: 连接唯一标识符
:异常:
- ``RequestDenied``: 请求非法
"""
raise NotImplementedError
@abc.abstractmethod
async def handle_message(self, message: dict):
"""
:说明:
处理上报消息的函数转换为 ``Event`` 事件后调用 ``nonebot.message.handle_event`` 进一步处理事件
:参数:
* ``message: dict``: 收到的上报消息
"""
raise NotImplementedError
@abc.abstractmethod
async def call_api(self, api: str, **data):
"""
:说明:
调用机器人 API 接口可以通过该函数或直接通过 bot 属性进行调用
:参数:
* ``api: str``: API 名称
* ``**data``: API 数据
:示例:
.. code-block:: python
await bot.call_api("send_msg", message="hello world")
await bot.send_msg(message="hello world")
"""
raise NotImplementedError
@abc.abstractmethod
async def send(self, event: "Event",
message: Union[str, "Message", "MessageSegment"], **kwargs):
"""
:说明:
调用机器人基础发送消息接口
:参数:
* ``event: Event``: 上报事件
* ``message: Union[str, Message, MessageSegment]``: 要发送的消息
* ``**kwargs``
"""
raise NotImplementedError
T_Message = TypeVar("T_Message", bound="Message")
T_MessageSegment = TypeVar("T_MessageSegment", bound="MessageSegment")
@dataclass
class MessageSegment(abc.ABC):
"""消息段基类"""
type: str
"""
- 类型: ``str``
- 说明: 消息段类型
"""
data: Dict[str, Any] = field(default_factory=lambda: {})
"""
- 类型: ``Dict[str, Union[str, list]]``
- 说明: 消息段数据
"""
@abc.abstractmethod
def __str__(self: T_MessageSegment) -> str:
"""该消息段所代表的 str在命令匹配部分使用"""
raise NotImplementedError
@abc.abstractmethod
def __add__(self: T_MessageSegment, other: Union[str, T_MessageSegment,
T_Message]) -> T_Message:
"""你需要在这里实现不同消息段的合并:
比如
if isinstance(other, str):
...
elif isinstance(other, MessageSegment):
...
注意需要返回一个新生成的对象
"""
raise NotImplementedError
@abc.abstractmethod
def __radd__(
self: T_MessageSegment, other: Union[str, dict, list, T_MessageSegment,
T_Message]) -> "T_Message":
"""你需要在这里实现不同消息段的合并:
比如
if isinstance(other, str):
...
elif isinstance(other, MessageSegment):
...
注意需要返回一个新生成的对象
"""
raise NotImplementedError
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
return setattr(self, key, value)
def get(self, key, default=None):
return getattr(self, key, default)
def copy(self: T_MessageSegment) -> T_MessageSegment:
return copy(self)
@abc.abstractmethod
def is_text(self) -> bool:
raise NotImplementedError
class Message(list, abc.ABC):
"""消息数组"""
def __init__(self,
message: Union[str, None, Mapping, Iterable[Mapping],
T_MessageSegment, T_Message, Any] = None,
*args,
**kwargs):
"""
:参数:
* ``message: Union[str, list, dict, MessageSegment, Message, Any]``: 消息内容
"""
super().__init__(*args, **kwargs)
if message is None:
return
elif isinstance(message, Message):
self.extend(message)
elif isinstance(message, MessageSegment):
self.append(message)
else:
self.extend(self._construct(message))
def __str__(self):
return ''.join((str(seg) for seg in self))
@classmethod
def __get_validators__(cls):
yield cls._validate
@classmethod
def _validate(cls, value):
return cls(value)
@staticmethod
@abc.abstractmethod
def _construct(
msg: Union[str, Mapping, Iterable[Mapping], Any]
) -> Iterable[T_MessageSegment]:
raise NotImplementedError
def __add__(self: T_Message, other: Union[str, T_MessageSegment,
T_Message]) -> T_Message:
result = self.__class__(self)
if isinstance(other, str):
result.extend(self._construct(other))
elif isinstance(other, MessageSegment):
result.append(other)
elif isinstance(other, Message):
result.extend(other)
return result
def __radd__(self: T_Message, other: Union[str, T_MessageSegment,
T_Message]) -> T_Message:
result = self.__class__(other)
return result.__add__(self)
def __iadd__(self: T_Message, other: Union[str, T_MessageSegment,
T_Message]) -> T_Message:
if isinstance(other, str):
self.extend(self._construct(other))
elif isinstance(other, MessageSegment):
self.append(other)
elif isinstance(other, Message):
self.extend(other)
return self
def append(self: T_Message, obj: Union[str, T_MessageSegment]) -> T_Message:
"""
:说明:
添加一个消息段到消息数组末尾
:参数:
* ``obj: Union[str, MessageSegment]``: 要添加的消息段
"""
if isinstance(obj, MessageSegment):
super().append(obj)
elif isinstance(obj, str):
self.extend(self._construct(obj))
else:
raise ValueError(f"Unexpected type: {type(obj)} {obj}")
return self
def extend(self: T_Message,
obj: Union[T_Message, Iterable[T_MessageSegment]]) -> T_Message:
"""
:说明:
拼接一个消息数组或多个消息段到消息数组末尾
:参数:
* ``obj: Union[Message, Iterable[MessageSegment]]``: 要添加的消息数组
"""
for segment in obj:
self.append(segment)
return self
def reduce(self: T_Message) -> None:
"""
:说明:
缩减消息数组即按 MessageSegment 的实现拼接相邻消息段
"""
index = 0
while index < len(self):
if index > 0 and self[index -
1].is_text() and self[index].is_text():
self[index - 1] += self[index]
del self[index]
else:
index += 1
def extract_plain_text(self: T_Message) -> str:
"""
:说明:
提取消息内纯文本消息
"""
def _concat(x: str, y: T_MessageSegment) -> str:
return f"{x} {y}" if y.is_text() else x
plain_text = reduce(_concat, self, "")
return plain_text[1:] if plain_text else plain_text
class Event(abc.ABC, BaseModel):
"""Event 基类。提供获取关键信息的方法,其余信息可直接获取。"""
class Config:
extra = "allow"
json_encoders = {Message: DataclassEncoder}
@abc.abstractmethod
def get_type(self) -> Literal["message", "notice", "request", "meta_event"]:
"""
:说明:
获取事件类型的方法类型通常为 NoneBot 内置的四种类型
:返回:
* ``Literal["message", "notice", "request", "meta_event"]``
"""
raise NotImplementedError
@abc.abstractmethod
def get_event_name(self) -> str:
"""
:说明:
获取事件名称的方法
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_event_description(self) -> str:
"""
:说明:
获取事件描述的方法通常为事件具体内容
:返回:
* ``str``
"""
raise NotImplementedError
def __str__(self) -> str:
return f"[{self.get_event_name()}]: {self.get_event_description()}"
def get_log_string(self) -> str:
"""
:说明:
获取事件日志信息的方法通常你不需要修改这个方法只有当希望 NoneBot 隐藏该事件日志时可以抛出 ``NoLogException`` 异常
:返回:
* ``str``
:异常:
- ``NoLogException``
"""
return f"[{self.get_event_name()}]: {self.get_event_description()}"
@abc.abstractmethod
def get_user_id(self) -> str:
"""
:说明:
获取事件主体 id 的方法通常是用户 id
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_session_id(self) -> str:
"""
:说明:
获取会话 id 的方法用于判断当前事件属于哪一个会话通常是用户 id群组 id 组合
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_message(self) -> "Message":
"""
:说明:
获取事件消息内容的方法
:返回:
* ``Message``
"""
raise NotImplementedError
def get_plaintext(self) -> str:
"""
:说明:
获取消息纯文本的方法通常不需要修改默认通过 ``get_message().extract_plain_text`` 获取
:返回:
* ``str``
"""
return self.get_message().extract_plain_text()
@abc.abstractmethod
def is_tome(self) -> bool:
"""
:说明:
获取事件是否与机器人有关的方法
:返回:
* ``bool``
"""
raise NotImplementedError
from ._base import Bot, Event, Message, MessageSegment

474
nonebot/adapters/_base.py Normal file
View File

@ -0,0 +1,474 @@
"""
协议适配基类
============
各协议请继承以下基类并使用 ``driver.register_adapter`` 注册适配器
"""
import abc
from copy import copy
from typing_extensions import Literal
from functools import reduce, partial
from dataclasses import dataclass, field
from typing import Any, Dict, Union, TypeVar, Mapping, Optional, Callable, Iterable, Iterator, Awaitable, TYPE_CHECKING
from pydantic import BaseModel
from nonebot.utils import DataclassEncoder
if TYPE_CHECKING:
from nonebot.config import Config
from nonebot.drivers import Driver, WebSocket
class Bot(abc.ABC):
"""
Bot 基类用于处理上报消息并提供 API 调用接口
"""
driver: "Driver"
"""Driver 对象"""
config: "Config"
"""Config 配置对象"""
@abc.abstractmethod
def __init__(self,
connection_type: str,
self_id: str,
*,
websocket: Optional["WebSocket"] = None):
"""
:参数:
* ``connection_type: str``: http 或者 websocket
* ``self_id: str``: 机器人 ID
* ``websocket: Optional[WebSocket]``: Websocket 连接对象
"""
self.connection_type = connection_type
"""连接类型"""
self.self_id = self_id
"""机器人 ID"""
self.websocket = websocket
"""Websocket 连接对象"""
def __getattr__(self, name: str) -> Callable[..., Awaitable[Any]]:
return partial(self.call_api, name)
@property
@abc.abstractmethod
def type(self) -> str:
"""Adapter 类型"""
raise NotImplementedError
@classmethod
def register(cls, driver: "Driver", config: "Config"):
"""
:说明:
`register` 方法会在 `driver.register_adapter` 时被调用用于初始化相关配置
"""
cls.driver = driver
cls.config = config
@classmethod
@abc.abstractmethod
async def check_permission(cls, driver: "Driver", connection_type: str,
headers: dict, body: Optional[dict]) -> str:
"""
:说明:
检查连接请求是否合法的函数如果合法则返回当前连接 ``唯一标识符``通常为机器人 ID如果不合法则抛出 ``RequestDenied`` 异常
:参数:
* ``driver: Driver``: Driver 对象
* ``connection_type: str``: 连接类型
* ``headers: dict``: 请求头
* ``body: Optional[dict]``: 请求数据WebSocket 连接该部分为空
:返回:
- ``str``: 连接唯一标识符
:异常:
- ``RequestDenied``: 请求非法
"""
raise NotImplementedError
@abc.abstractmethod
async def handle_message(self, message: dict):
"""
:说明:
处理上报消息的函数转换为 ``Event`` 事件后调用 ``nonebot.message.handle_event`` 进一步处理事件
:参数:
* ``message: dict``: 收到的上报消息
"""
raise NotImplementedError
@abc.abstractmethod
async def call_api(self, api: str, **data):
"""
:说明:
调用机器人 API 接口可以通过该函数或直接通过 bot 属性进行调用
:参数:
* ``api: str``: API 名称
* ``**data``: API 数据
:示例:
.. code-block:: python
await bot.call_api("send_msg", message="hello world")
await bot.send_msg(message="hello world")
"""
raise NotImplementedError
@abc.abstractmethod
async def send(self, event: "Event",
message: Union[str, "Message", "MessageSegment"], **kwargs):
"""
:说明:
调用机器人基础发送消息接口
:参数:
* ``event: Event``: 上报事件
* ``message: Union[str, Message, MessageSegment]``: 要发送的消息
* ``**kwargs``
"""
raise NotImplementedError
T_Message = TypeVar("T_Message", bound="Message")
T_MessageSegment = TypeVar("T_MessageSegment", bound="MessageSegment")
@dataclass
class MessageSegment(abc.ABC):
"""消息段基类"""
type: str
"""
- 类型: ``str``
- 说明: 消息段类型
"""
data: Dict[str, Any] = field(default_factory=lambda: {})
"""
- 类型: ``Dict[str, Union[str, list]]``
- 说明: 消息段数据
"""
@abc.abstractmethod
def __str__(self: T_MessageSegment) -> str:
"""该消息段所代表的 str在命令匹配部分使用"""
raise NotImplementedError
@abc.abstractmethod
def __add__(self: T_MessageSegment, other: Union[str, T_MessageSegment,
T_Message]) -> T_Message:
"""你需要在这里实现不同消息段的合并:
比如
if isinstance(other, str):
...
elif isinstance(other, MessageSegment):
...
注意需要返回一个新生成的对象
"""
raise NotImplementedError
@abc.abstractmethod
def __radd__(
self: T_MessageSegment, other: Union[str, dict, list, T_MessageSegment,
T_Message]) -> "T_Message":
"""你需要在这里实现不同消息段的合并:
比如
if isinstance(other, str):
...
elif isinstance(other, MessageSegment):
...
注意需要返回一个新生成的对象
"""
raise NotImplementedError
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
return setattr(self, key, value)
def get(self, key, default=None):
return getattr(self, key, default)
def copy(self: T_MessageSegment) -> T_MessageSegment:
return copy(self)
@abc.abstractmethod
def is_text(self) -> bool:
raise NotImplementedError
class Message(list, abc.ABC):
"""消息数组"""
def __init__(self,
message: Union[str, None, Mapping, Iterable[Mapping],
T_MessageSegment, T_Message, Any] = None,
*args,
**kwargs):
"""
:参数:
* ``message: Union[str, list, dict, MessageSegment, Message, Any]``: 消息内容
"""
super().__init__(*args, **kwargs)
if message is None:
return
elif isinstance(message, Message):
self.extend(message)
elif isinstance(message, MessageSegment):
self.append(message)
else:
self.extend(self._construct(message))
def __str__(self):
return ''.join((str(seg) for seg in self))
@classmethod
def __get_validators__(cls):
yield cls._validate
@classmethod
def _validate(cls, value):
return cls(value)
@staticmethod
@abc.abstractmethod
def _construct(
msg: Union[str, Mapping, Iterable[Mapping], Any]
) -> Iterable[T_MessageSegment]:
raise NotImplementedError
def __add__(self: T_Message, other: Union[str, T_MessageSegment,
T_Message]) -> T_Message:
result = self.__class__(self)
if isinstance(other, str):
result.extend(self._construct(other))
elif isinstance(other, MessageSegment):
result.append(other)
elif isinstance(other, Message):
result.extend(other)
return result
def __radd__(self: T_Message, other: Union[str, T_MessageSegment,
T_Message]) -> T_Message:
result = self.__class__(other)
return result.__add__(self)
def __iadd__(self: T_Message, other: Union[str, T_MessageSegment,
T_Message]) -> T_Message:
if isinstance(other, str):
self.extend(self._construct(other))
elif isinstance(other, MessageSegment):
self.append(other)
elif isinstance(other, Message):
self.extend(other)
return self
def append(self: T_Message, obj: Union[str, T_MessageSegment]) -> T_Message:
"""
:说明:
添加一个消息段到消息数组末尾
:参数:
* ``obj: Union[str, MessageSegment]``: 要添加的消息段
"""
if isinstance(obj, MessageSegment):
super().append(obj)
elif isinstance(obj, str):
self.extend(self._construct(obj))
else:
raise ValueError(f"Unexpected type: {type(obj)} {obj}")
return self
def extend(self: T_Message,
obj: Union[T_Message, Iterable[T_MessageSegment]]) -> T_Message:
"""
:说明:
拼接一个消息数组或多个消息段到消息数组末尾
:参数:
* ``obj: Union[Message, Iterable[MessageSegment]]``: 要添加的消息数组
"""
for segment in obj:
self.append(segment)
return self
def reduce(self: T_Message) -> None:
"""
:说明:
缩减消息数组即按 MessageSegment 的实现拼接相邻消息段
"""
index = 0
while index < len(self):
if index > 0 and self[index -
1].is_text() and self[index].is_text():
self[index - 1] += self[index]
del self[index]
else:
index += 1
def extract_plain_text(self: T_Message) -> str:
"""
:说明:
提取消息内纯文本消息
"""
def _concat(x: str, y: T_MessageSegment) -> str:
return f"{x} {y}" if y.is_text() else x
plain_text = reduce(_concat, self, "")
return plain_text[1:] if plain_text else plain_text
class Event(abc.ABC, BaseModel):
"""Event 基类。提供获取关键信息的方法,其余信息可直接获取。"""
class Config:
extra = "allow"
json_encoders = {Message: DataclassEncoder}
@abc.abstractmethod
def get_type(self) -> str:
"""
:说明:
获取事件类型的方法类型通常为 NoneBot 内置的四种类型
:返回:
* ``Literal["message", "notice", "request", "meta_event"]``
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_event_name(self) -> str:
"""
:说明:
获取事件名称的方法
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_event_description(self) -> str:
"""
:说明:
获取事件描述的方法通常为事件具体内容
:返回:
* ``str``
"""
raise NotImplementedError
def __str__(self) -> str:
return f"[{self.get_event_name()}]: {self.get_event_description()}"
def get_log_string(self) -> str:
"""
:说明:
获取事件日志信息的方法通常你不需要修改这个方法只有当希望 NoneBot 隐藏该事件日志时可以抛出 ``NoLogException`` 异常
:返回:
* ``str``
:异常:
- ``NoLogException``
"""
return f"[{self.get_event_name()}]: {self.get_event_description()}"
@abc.abstractmethod
def get_user_id(self) -> str:
"""
:说明:
获取事件主体 id 的方法通常是用户 id
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_session_id(self) -> str:
"""
:说明:
获取会话 id 的方法用于判断当前事件属于哪一个会话通常是用户 id群组 id 组合
:返回:
* ``str``
"""
raise NotImplementedError
@abc.abstractmethod
def get_message(self) -> "Message":
"""
:说明:
获取事件消息内容的方法
:返回:
* ``Message``
"""
raise NotImplementedError
def get_plaintext(self) -> str:
"""
:说明:
获取消息纯文本的方法通常不需要修改默认通过 ``get_message().extract_plain_text`` 获取
:返回:
* ``str``
"""
return self.get_message().extract_plain_text()
@abc.abstractmethod
def is_tome(self) -> bool:
"""
:说明:
获取事件是否与机器人有关的方法
:返回:
* ``bool``
"""
raise NotImplementedError

View File

@ -4,7 +4,6 @@
NoneBot 插件开发提供便携的定义函数
"""
import re
import sys
import pkgutil
@ -21,14 +20,17 @@ from nonebot.permission import Permission
from nonebot.typing import T_State, T_StateFactory, T_Handler, T_RuleChecker
from nonebot.rule import Rule, startswith, endswith, keyword, command, shell_command, ArgumentParser, regex
from .manager import PluginManager
if TYPE_CHECKING:
from nonebot.adapters import Bot, Event, MessageSegment
from nonebot.adapters import Bot, Event
plugins: Dict[str, "Plugin"] = {}
"""
:类型: ``Dict[str, Plugin]``
:说明: 已加载的插件
"""
PLUGIN_NAMESPACE = "nonebot.loaded_plugins"
_export: ContextVar["Export"] = ContextVar("_export")
_tmp_matchers: ContextVar[Set[Type[Matcher]]] = ContextVar("_tmp_matchers")
@ -946,11 +948,37 @@ class MatcherGroup:
return matcher
def _load_plugin(manager: PluginManager, plugin_name: str) -> Optional[Plugin]:
if plugin_name.startswith("_"):
return None
_tmp_matchers.set(set())
_export.set(Export())
if plugin_name in plugins:
return None
try:
module = manager.load_plugin(plugin_name)
for m in _tmp_matchers.get():
m.module = plugin_name
plugin = Plugin(plugin_name, module, _tmp_matchers.get(), _export.get())
plugins[plugin_name] = plugin
logger.opt(
colors=True).info(f'Succeeded to import "<y>{plugin_name}</y>"')
return plugin
except Exception as e:
logger.opt(colors=True, exception=e).error(
f'<r><bg #f8bbd0>Failed to import "{plugin_name}"</bg #f8bbd0></r>')
return None
def load_plugin(module_path: str) -> Optional[Plugin]:
"""
:说明:
使用 ``importlib`` 加载单个插件可以是本地插件或是通过 ``pip`` 安装的插件
使用 ``PluginManager`` 加载单个插件可以是本地插件或是通过 ``pip`` 安装的插件
:参数:
@ -961,34 +989,9 @@ def load_plugin(module_path: str) -> Optional[Plugin]:
- ``Optional[Plugin]``
"""
def _load_plugin(module_path: str) -> Optional[Plugin]:
try:
_tmp_matchers.set(set())
_export.set(Export())
if module_path in plugins:
return plugins[module_path]
elif module_path in sys.modules:
logger.warning(
f"Module {module_path} has been loaded by other plugins! Ignored"
)
return None
module = importlib.import_module(module_path)
for m in _tmp_matchers.get():
m.module = module_path
plugin = Plugin(module_path, module, _tmp_matchers.get(),
_export.get())
plugins[module_path] = plugin
logger.opt(
colors=True).info(f'Succeeded to import "<y>{module_path}</y>"')
return plugin
except Exception as e:
logger.opt(colors=True, exception=e).error(
f'<r><bg #f8bbd0>Failed to import "{module_path}"</bg #f8bbd0></r>'
)
return None
context: Context = copy_context()
return context.run(_load_plugin, module_path)
manager = PluginManager(PLUGIN_NAMESPACE, plugins=[module_path])
return context.run(_load_plugin, manager, module_path)
def load_plugins(*plugin_dir: str) -> Set[Plugin]:
@ -1005,43 +1008,65 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]:
- ``Set[Plugin]``
"""
loaded_plugins = set()
manager = PluginManager(PLUGIN_NAMESPACE, search_path=plugin_dir)
for plugin_name in manager.list_plugins():
context: Context = copy_context()
result = context.run(_load_plugin, manager, plugin_name)
if result:
loaded_plugins.add(result)
return loaded_plugins
def load_all_plugins(module_path: Set[str],
plugin_dir: Set[str]) -> Set[Plugin]:
"""
:说明:
导入指定列表中的插件以及指定目录下多个插件 ``_`` 开头的插件不会被导入
:参数:
- ``module_path: Set[str]``: 指定插件集合
- ``plugin_dir: Set[str]``: 指定插件路径集合
:返回:
- ``Set[Plugin]``
"""
def _load_plugin(plugin_name: str) -> Optional[Plugin]:
if plugin_name.startswith("_"):
return None
def _load_plugin(module_info) -> Optional[Plugin]:
_tmp_matchers.set(set())
_export.set(Export())
name = module_info.name
if name.startswith("_"):
return None
spec = module_info.module_finder.find_spec(name, None)
if not spec:
logger.warning(
f"Module {name} cannot be loaded! Check module name first.")
elif spec.name in plugins:
return None
elif spec.name in sys.modules:
logger.warning(
f"Module {spec.name} has been loaded by other plugin! Ignored")
if plugin_name in plugins:
return None
try:
module = _load(spec)
module = manager.load_plugin(plugin_name)
for m in _tmp_matchers.get():
m.module = name
plugin = Plugin(name, module, _tmp_matchers.get(), _export.get())
plugins[name] = plugin
logger.opt(colors=True).info(f'Succeeded to import "<y>{name}</y>"')
m.module = plugin_name
plugin = Plugin(plugin_name, module, _tmp_matchers.get(),
_export.get())
plugins[plugin_name] = plugin
logger.opt(
colors=True).info(f'Succeeded to import "<y>{plugin_name}</y>"')
return plugin
except Exception as e:
logger.opt(colors=True, exception=e).error(
f'<r><bg #f8bbd0>Failed to import "{name}"</bg #f8bbd0></r>')
f'<r><bg #f8bbd0>Failed to import "{plugin_name}"</bg #f8bbd0></r>'
)
return None
loaded_plugins = set()
for module_info in pkgutil.iter_modules(plugin_dir):
manager = PluginManager(PLUGIN_NAMESPACE, module_path, plugin_dir)
for plugin_name in manager.list_plugins():
context: Context = copy_context()
result = context.run(_load_plugin, module_info)
result = context.run(_load_plugin, plugin_name)
if result:
loaded_plugins.add(result)
return loaded_plugins

View File

@ -176,6 +176,11 @@ def load_plugins(*plugin_dir: str) -> Set[Plugin]:
...
def load_all_plugins(module_path: Set[str],
plugin_dir: Set[str]) -> Set[Plugin]:
...
def load_builtin_plugins(name: str = ...):
...

181
nonebot/plugin/manager.py Normal file
View File

@ -0,0 +1,181 @@
import sys
import uuid
import pkgutil
import importlib
from hashlib import md5
from types import ModuleType
from collections import Counter
from importlib.abc import MetaPathFinder
from importlib.machinery import PathFinder
from typing import Set, List, Optional, Iterable
_internal_space = ModuleType(__name__ + "._internal")
_internal_space.__path__ = [] # type: ignore
sys.modules[_internal_space.__name__] = _internal_space
_manager_stack: List["PluginManager"] = []
class _NamespaceModule(ModuleType):
"""Simple namespace module to store plugins."""
@property
def __path__(self):
return []
def __getattr__(self, name: str):
try:
return super().__getattr__(name) # type: ignore
except AttributeError:
if name.startswith("__"):
raise
raise RuntimeError("Plugin manager not activated!")
class _InternalModule(ModuleType):
"""Internal module for each plugin manager."""
def __init__(self, prefix: str, plugin_manager: "PluginManager"):
super().__init__(f"{prefix}.{plugin_manager.internal_id}")
self.__plugin_manager__ = plugin_manager
@property
def __path__(self) -> List[str]:
return list(self.__plugin_manager__.search_path)
class PluginManager:
def __init__(self,
namespace: Optional[str] = None,
plugins: Optional[Iterable[str]] = None,
search_path: Optional[Iterable[str]] = None,
*,
id: Optional[str] = None):
self.namespace: Optional[str] = namespace
self.namespace_module: Optional[ModuleType] = self._setup_namespace(
namespace)
self.id: str = id or str(uuid.uuid4())
self.internal_id: str = md5(
((self.namespace or "") + self.id).encode()).hexdigest()
self.internal_module = self._setup_internal_module(self.internal_id)
# simple plugin not in search path
self.plugins: Set[str] = set(plugins or [])
self.search_path: Set[str] = set(search_path or [])
# ensure can be loaded
self.list_plugins()
def _setup_namespace(self,
namespace: Optional[str] = None
) -> Optional[ModuleType]:
if not namespace:
return None
try:
module = importlib.import_module(namespace)
except ImportError:
module = _NamespaceModule(namespace)
if "." in namespace:
parent = importlib.import_module(namespace.rsplit(".", 1)[0])
setattr(parent, namespace.rsplit(".", 1)[1], module)
sys.modules[namespace] = module
return module
def _setup_internal_module(self, internal_id: str) -> ModuleType:
if hasattr(_internal_space, internal_id):
raise RuntimeError("Plugin manager already exists!")
prefix = sys._getframe(2).f_globals.get(
"__name__") or _internal_space.__name__
if not prefix.startswith(_internal_space.__name__):
prefix = _internal_space.__name__
module = _InternalModule(prefix, self)
sys.modules[module.__name__] = module
setattr(_internal_space, internal_id, module)
return module
def __enter__(self):
if self in _manager_stack:
raise RuntimeError("Plugin manager already activated!")
_manager_stack.append(self)
return self
def __exit__(self, exc_type, exc_value, traceback):
try:
_manager_stack.pop()
except IndexError:
pass
def search_plugins(self) -> List[str]:
return [
module_info.name
for module_info in pkgutil.iter_modules(self.search_path)
]
def list_plugins(self) -> Set[str]:
_pre_managers: List[PluginManager]
if self in _manager_stack:
_pre_managers = _manager_stack[:_manager_stack.index(self)]
else:
_pre_managers = _manager_stack[:]
_search_path: Set[str] = set()
for manager in _pre_managers:
_search_path |= manager.search_path
if _search_path & self.search_path:
raise RuntimeError("Duplicate plugin search path!")
_search_plugins = self.search_plugins()
c = Counter([*_search_plugins, *self.plugins])
conflict = [name for name, num in c.items() if num > 1]
if conflict:
raise RuntimeError(
f"More than one plugin named {' / '.join(conflict)}!")
return set(_search_plugins) | self.plugins
def load_plugin(self, name) -> ModuleType:
if name in self.plugins:
return importlib.import_module(name)
if "." in name:
raise ValueError("Plugin name cannot contain '.'")
with self:
return importlib.import_module(f"{self.namespace}.{name}")
def load_all_plugins(self) -> List[ModuleType]:
return [self.load_plugin(name) for name in self.list_plugins()]
def _rewrite_module_name(self, module_name) -> Optional[str]:
if module_name == self.namespace:
return self.internal_module.__name__
elif module_name.startswith(self.namespace + "."):
path = module_name.split(".")
length = self.namespace.count(".") + 1
return f"{self.internal_module.__name__}.{'.'.join(path[length:])}"
elif module_name in self.search_plugins():
return f"{self.internal_module.__name__}.{module_name}"
return None
class PluginFinder(MetaPathFinder):
def find_spec(self, fullname: str, path, target):
if _manager_stack:
index = -1
while -index <= len(_manager_stack):
manager = _manager_stack[index]
newname = manager._rewrite_module_name(fullname)
if newname:
spec = PathFinder.find_spec(newname,
list(manager.search_path),
target)
if spec:
return spec
index -= 1
return None
sys.meta_path.insert(0, PluginFinder())

View File

@ -25,10 +25,10 @@ class Event(BaseEvent):
__event__ = ""
time: int
self_id: int
post_type: Literal["message", "notice", "request", "meta_event"]
post_type: str
@overrides(BaseEvent)
def get_type(self) -> Literal["message", "notice", "request", "meta_event"]:
def get_type(self) -> str:
return self.post_type
@overrides(BaseEvent)

View File

@ -4,6 +4,16 @@ sidebar: auto
# 更新日志
## v2.0.0a11
- 修改 `nonebot` 项目结构,分离所有 `adapter`
- 修改插件加载逻辑,使用 `import hook` (PEP 302)
## v2.0.0a10
- 新增 `Quart Driver` 支持
- 修复 `mirai` 协议适配命令处理以及消息转义
## v2.0.0a9
- 修复 `Message` 消息为 `None` 时的处理错误

View File

@ -25,13 +25,10 @@ driver.register_adapter("mirai", MiraiBot)
# load builtin plugin
nonebot.load_builtin_plugins()
nonebot.load_plugin("nonebot_plugin_apscheduler")
nonebot.load_plugin("nonebot_plugin_test")
# load local plugins
nonebot.load_plugins("test_plugins")
print(nonebot.require("test_export"))
# load all plugins
nonebot.load_all_plugins({"nonebot_plugin_apscheduler", "nonebot_plugin_test"},
{"test_plugins"})
# modify some config / config depends on loaded configs
config = driver.config

View File

@ -0,0 +1,5 @@
import nonebot
from .test_export import export
print(export, nonebot.require("test_export"))