support custom response

This commit is contained in:
StarHeartHunt 2021-06-10 21:52:20 +08:00 committed by yanyongyu
parent ca31ec5fe3
commit c0d78449be
25 changed files with 365 additions and 7542 deletions

View File

@ -4,475 +4,3 @@ sidebarDepth: 0
---
# NoneBot.adapters 模块
## 协议适配基类
各协议请继承以下基类,并使用 `driver.register_adapter` 注册适配器
## _class_ `Bot`
基类:`abc.ABC`
Bot 基类。用于处理上报消息,并提供 API 调用接口。
### `driver`
Driver 对象
### `config`
Config 配置对象
### `_calling_api_hook`
* **类型**
`Set[T_CallingAPIHook]`
* **说明**
call_api 时执行的函数
### `_called_api_hook`
* **类型**
`Set[T_CalledAPIHook]`
* **说明**
call_api 后执行的函数
### _abstract_ `__init__(connection_type, self_id, *, websocket=None)`
* **参数**
* `connection_type: str`: http 或者 websocket
* `self_id: str`: 机器人 ID
* `websocket: Optional[WebSocket]`: Websocket 连接对象
### `connection_type`
连接类型
### `self_id`
机器人 ID
### `websocket`
Websocket 连接对象
### _abstract property_ `type`
Adapter 类型
### _classmethod_ `register(driver, config)`
* **说明**
register 方法会在 driver.register_adapter 时被调用,用于初始化相关配置
### _abstract async classmethod_ `check_permission(driver, connection_type, headers, body)`
* **说明**
检查连接请求是否合法的函数,如果合法则返回当前连接 `唯一标识符`,通常为机器人 ID如果不合法则抛出 `RequestDenied` 异常。
* **参数**
* `driver: Driver`: Driver 对象
* `connection_type: str`: 连接类型
* `headers: dict`: 请求头
* `body: Optional[bytes]`: 请求数据WebSocket 连接该部分为 None
* **返回**
* `str`: 连接唯一标识符
* **异常**
* `RequestDenied`: 请求非法
### _abstract async_ `handle_message(message)`
* **说明**
处理上报消息的函数,转换为 `Event` 事件后调用 `nonebot.message.handle_event` 进一步处理事件。
* **参数**
* `message: dict`: 收到的上报消息
### _abstract async_ `_call_api(api, **data)`
* **说明**
`adapter` 实际调用 api 的逻辑实现函数,实现该方法以调用 api。
* **参数**
* `api: str`: API 名称
* `**data`: API 数据
### _async_ `call_api(api, **data)`
* **说明**
调用机器人 API 接口,可以通过该函数或直接通过 bot 属性进行调用
* **参数**
* `api: str`: API 名称
* `**data`: API 数据
* **示例**
```python
await bot.call_api("send_msg", message="hello world")
await bot.send_msg(message="hello world")
```
### _abstract async_ `send(event, message, **kwargs)`
* **说明**
调用机器人基础发送消息接口
* **参数**
* `event: Event`: 上报事件
* `message: Union[str, Message, MessageSegment]`: 要发送的消息
* `**kwargs`
## _class_ `MessageSegment`
基类:`abc.ABC`, `Mapping`
消息段基类
### `type`
* 类型: `str`
* 说明: 消息段类型
### `data`
* 类型: `Dict[str, Union[str, list]]`
* 说明: 消息段数据
## _class_ `Message`
基类:`List`[`nonebot.adapters._base.T_MessageSegment`], `abc.ABC`
消息数组
### `__init__(message=None, *args, **kwargs)`
* **参数**
* `message: Union[str, list, dict, MessageSegment, Message, Any]`: 消息内容
### `append(obj)`
* **说明**
添加一个消息段到消息数组末尾
* **参数**
* `obj: Union[str, MessageSegment]`: 要添加的消息段
### `extend(obj)`
* **说明**
拼接一个消息数组或多个消息段到消息数组末尾
* **参数**
* `obj: Union[Message, Iterable[MessageSegment]]`: 要添加的消息数组
### `reduce()`
* **说明**
缩减消息数组,即按 MessageSegment 的实现拼接相邻消息段
### `extract_plain_text()`
* **说明**
提取消息内纯文本消息
## _class_ `Event`
基类:`abc.ABC`, `pydantic.main.BaseModel`
Event 基类。提供获取关键信息的方法,其余信息可直接获取。
### _abstract_ `get_type()`
* **说明**
获取事件类型的方法,类型通常为 NoneBot 内置的四种类型。
* **返回**
* `Literal["message", "notice", "request", "meta_event"]`
* 其他自定义 `str`
### _abstract_ `get_event_name()`
* **说明**
获取事件名称的方法。
* **返回**
* `str`
### _abstract_ `get_event_description()`
* **说明**
获取事件描述的方法,通常为事件具体内容。
* **返回**
* `str`
### `get_log_string()`
* **说明**
获取事件日志信息的方法,通常你不需要修改这个方法,只有当希望 NoneBot 隐藏该事件日志时,可以抛出 `NoLogException` 异常。
* **返回**
* `str`
* **异常**
* `NoLogException`
### _abstract_ `get_user_id()`
* **说明**
获取事件主体 id 的方法,通常是用户 id 。
* **返回**
* `str`
### _abstract_ `get_session_id()`
* **说明**
获取会话 id 的方法,用于判断当前事件属于哪一个会话,通常是用户 id、群组 id 组合。
* **返回**
* `str`
### _abstract_ `get_message()`
* **说明**
获取事件消息内容的方法。
* **返回**
* `Message`
### `get_plaintext()`
* **说明**
获取消息纯文本的方法,通常不需要修改,默认通过 `get_message().extract_plain_text` 获取。
* **返回**
* `str`
### _abstract_ `is_tome()`
* **说明**
获取事件是否与机器人有关的方法。
* **返回**
* `bool`

View File

@ -5,673 +5,16 @@ sidebarDepth: 0
# NoneBot.adapters.cqhttp 模块
## CQHTTP (OneBot) v11 协议适配
协议详情请看: [CQHTTP](https://github.com/howmanybots/onebot/blob/master/README.md) | [OneBot](https://github.com/howmanybots/onebot/blob/master/README.md)
# NoneBot.adapters.cqhttp.config 模块
## _class_ `Config`
CQHTTP 配置类
* **配置项**
* `access_token` / `cqhttp_access_token`: CQHTTP 协议授权令牌
* `secret` / `cqhttp_secret`: CQHTTP HTTP 上报数据签名口令
# NoneBot.adapters.cqhttp.utils 模块
## `escape(s, *, escape_comma=True)`
* **说明**
对字符串进行 CQ 码转义。
* **参数**
* `s: str`: 需要转义的字符串
* `escape_comma: bool`: 是否转义逗号(`,`)。
## `unescape(s)`
* **说明**
对字符串进行 CQ 码去转义。
* **参数**
* `s: str`: 需要转义的字符串
# NoneBot.adapters.cqhttp.exception 模块
## _exception_ `ActionFailed`
基类:[`nonebot.exception.ActionFailed`](../exception.md#nonebot.exception.ActionFailed), `nonebot.adapters.cqhttp.exception.CQHTTPAdapterException`
* **说明**
API 请求返回错误信息。
* **参数**
* `retcode: Optional[int]`: 错误码
## _exception_ `NetworkError`
基类:[`nonebot.exception.NetworkError`](../exception.md#nonebot.exception.NetworkError), `nonebot.adapters.cqhttp.exception.CQHTTPAdapterException`
* **说明**
网络错误。
* **参数**
* `retcode: Optional[int]`: 错误码
# NoneBot.adapters.cqhttp.bot 模块
## _async_ `_check_reply(bot, event)`
* **说明**
检查消息中存在的回复,去除并赋值 `event.reply`, `event.to_me`
* **参数**
* `bot: Bot`: Bot 对象
* `event: Event`: Event 对象
## `_check_at_me(bot, event)`
* **说明**
检查消息开头或结尾是否存在 @机器人,去除并赋值 `event.to_me`
* **参数**
* `bot: Bot`: Bot 对象
* `event: Event`: Event 对象
## `_check_nickname(bot, event)`
* **说明**
检查消息开头是否存在,去除并赋值 `event.to_me`
* **参数**
* `bot: Bot`: Bot 对象
* `event: Event`: Event 对象
## `_handle_api_result(result)`
* **说明**
处理 API 请求返回值。
* **参数**
* `result: Optional[Dict[str, Any]]`: API 返回数据
* **返回**
* `Any`: API 调用返回数据
* **异常**
* `ActionFailed`: API 调用失败
## _class_ `Bot`
基类:[`nonebot.adapters._base.Bot`](README.md#nonebot.adapters._base.Bot)
CQHTTP 协议 Bot 适配。继承属性参考 [BaseBot](./#class-basebot) 。
### _property_ `type`
* 返回: `"cqhttp"`
### _async classmethod_ `check_permission(driver, connection_type, headers, body)`
* **说明**
CQHTTP (OneBot) 协议鉴权。参考 [鉴权](https://github.com/howmanybots/onebot/blob/master/v11/specs/communication/authorization.md)
### _async_ `handle_message(message)`
* **说明**
调用 [_check_reply](#async-check-reply-bot-event), [_check_at_me](#check-at-me-bot-event), [_check_nickname](#check-nickname-bot-event) 处理事件并转换为 [Event](#class-event)
### _async_ `call_api(api, **data)`
* **说明**
调用 CQHTTP 协议 API
* **参数**
* `api: str`: API 名称
* `**data: Any`: API 参数
* **返回**
* `Any`: API 调用返回数据
* **异常**
* `NetworkError`: 网络错误
* `ActionFailed`: API 调用失败
### _async_ `send(event, message, at_sender=False, **kwargs)`
* **说明**
根据 `event` 向触发事件的主体发送消息。
* **参数**
* `event: Event`: Event 对象
* `message: Union[str, Message, MessageSegment]`: 要发送的消息
* `at_sender: bool`: 是否 @ 事件主体
* `**kwargs`: 覆盖默认参数
* **返回**
* `Any`: API 调用返回数据
* **异常**
* `ValueError`: 缺少 `user_id`, `group_id`
* `NetworkError`: 网络错误
* `ActionFailed`: API 调用失败
# NoneBot.adapters.cqhttp.message 模块
## _class_ `MessageSegment`
基类:`abc.ABC`, `Mapping`
CQHTTP 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。
### `is_text()`
### _static_ `anonymous(ignore_failure=None)`
### _static_ `at(user_id)`
### _static_ `contact(type_, id)`
### _static_ `contact_group(group_id)`
### _static_ `contact_user(user_id)`
### _static_ `dice()`
### _static_ `face(id_)`
### _static_ `forward(id_)`
### _static_ `image(file, type_=None, cache=True, proxy=True, timeout=None)`
### _static_ `json(data)`
### _static_ `location(latitude, longitude, title=None, content=None)`
### _static_ `music(type_, id_)`
### _static_ `music_custom(url, audio, title, content=None, img_url=None)`
### _static_ `node(id_)`
### _static_ `node_custom(user_id, nickname, content)`
### _static_ `poke(type_, id_)`
### _static_ `record(file, magic=None, cache=None, proxy=None, timeout=None)`
### _static_ `reply(id_)`
### _static_ `rps()`
### _static_ `shake()`
### _static_ `share(url='', title='', content=None, image=None)`
### _static_ `text(text)`
### _static_ `video(file, cache=None, proxy=None, timeout=None)`
### _static_ `xml(data)`
### `type`
* 类型: `str`
* 说明: 消息段类型
### `data`
* 类型: `Dict[str, Union[str, list]]`
* 说明: 消息段数据
## _class_ `Message`
基类:[`nonebot.adapters._base.Message`](README.md#nonebot.adapters._base.Message)[`nonebot.adapters.cqhttp.message.MessageSegment`]
CQHTTP 协议 Message 适配。
### `extract_plain_text()`
# NoneBot.adapters.cqhttp.permission 模块
## `PRIVATE`
* **说明**: 匹配任意私聊消息类型事件
## `PRIVATE_FRIEND`
* **说明**: 匹配任意好友私聊消息类型事件
## `PRIVATE_GROUP`
* **说明**: 匹配任意群临时私聊消息类型事件
## `PRIVATE_OTHER`
* **说明**: 匹配任意其他私聊消息类型事件
## `GROUP`
* **说明**: 匹配任意群聊消息类型事件
## `GROUP_MEMBER`
* **说明**: 匹配任意群员群聊消息类型事件
:::warning 警告
该权限通过 event.sender 进行判断且不包含管理员以及群主!
:::
## `GROUP_ADMIN`
* **说明**: 匹配任意群管理员群聊消息类型事件
## `GROUP_OWNER`
* **说明**: 匹配任意群主群聊消息类型事件
# NoneBot.adapters.cqhttp.event 模块
## _class_ `Event`
基类:[`nonebot.adapters._base.Event`](README.md#nonebot.adapters._base.Event)
CQHTTP 协议事件,字段与 CQHTTP 一致。各事件字段参考 [CQHTTP 文档](https://github.com/howmanybots/onebot/blob/master/README.md)
## _class_ `MessageEvent`
基类:`nonebot.adapters.cqhttp.event.Event`
消息事件
### `to_me`
* **说明**
消息是否与机器人有关
* **类型**
`bool`
### `reply`
* **说明**
消息中提取的回复消息,内容为 `get_msg` API 返回结果
* **类型**
`Optional[Reply]`
## _class_ `PrivateMessageEvent`
基类:`nonebot.adapters.cqhttp.event.MessageEvent`
私聊消息
## _class_ `GroupMessageEvent`
基类:`nonebot.adapters.cqhttp.event.MessageEvent`
群消息
## _class_ `NoticeEvent`
基类:`nonebot.adapters.cqhttp.event.Event`
通知事件
## _class_ `GroupUploadNoticeEvent`
基类:`nonebot.adapters.cqhttp.event.NoticeEvent`
群文件上传事件
## _class_ `GroupAdminNoticeEvent`
基类:`nonebot.adapters.cqhttp.event.NoticeEvent`
群管理员变动
## _class_ `GroupDecreaseNoticeEvent`
基类:`nonebot.adapters.cqhttp.event.NoticeEvent`
群成员减少事件
## _class_ `GroupIncreaseNoticeEvent`
基类:`nonebot.adapters.cqhttp.event.NoticeEvent`
群成员增加事件
## _class_ `GroupBanNoticeEvent`
基类:`nonebot.adapters.cqhttp.event.NoticeEvent`
群禁言事件
## _class_ `FriendAddNoticeEvent`
基类:`nonebot.adapters.cqhttp.event.NoticeEvent`
好友添加事件
## _class_ `GroupRecallNoticeEvent`
基类:`nonebot.adapters.cqhttp.event.NoticeEvent`
群消息撤回事件
## _class_ `FriendRecallNoticeEvent`
基类:`nonebot.adapters.cqhttp.event.NoticeEvent`
好友消息撤回事件
## _class_ `NotifyEvent`
基类:`nonebot.adapters.cqhttp.event.NoticeEvent`
提醒事件
## _class_ `PokeNotifyEvent`
基类:`nonebot.adapters.cqhttp.event.NotifyEvent`
戳一戳提醒事件
## _class_ `LuckyKingNotifyEvent`
基类:`nonebot.adapters.cqhttp.event.NotifyEvent`
群红包运气王提醒事件
## _class_ `HonorNotifyEvent`
基类:`nonebot.adapters.cqhttp.event.NotifyEvent`
群荣誉变更提醒事件
## _class_ `RequestEvent`
基类:`nonebot.adapters.cqhttp.event.Event`
请求事件
## _class_ `FriendRequestEvent`
基类:`nonebot.adapters.cqhttp.event.RequestEvent`
加好友请求事件
## _class_ `GroupRequestEvent`
基类:`nonebot.adapters.cqhttp.event.RequestEvent`
加群请求/邀请事件
## _class_ `MetaEvent`
基类:`nonebot.adapters.cqhttp.event.Event`
元事件
## _class_ `LifecycleMetaEvent`
基类:`nonebot.adapters.cqhttp.event.MetaEvent`
生命周期元事件
## _class_ `HeartbeatMetaEvent`
基类:`nonebot.adapters.cqhttp.event.MetaEvent`
心跳元事件
## `get_event_model(event_name)`
* **说明**
根据事件名获取对应 `Event Model``FallBack Event Model` 列表
* **返回**
* `List[Type[Event]]`

View File

@ -5,330 +5,12 @@ sidebarDepth: 0
# NoneBot.adapters.ding 模块
## 钉钉群机器人 协议适配
协议详情请看: [钉钉文档](https://ding-doc.dingtalk.com/document#/org-dev-guide/elzz1p)
# NoneBot.adapters.ding.config 模块
## _class_ `Config`
钉钉配置类
* **配置项**
* `access_token` / `ding_access_token`: 钉钉令牌
* `secret` / `ding_secret`: 钉钉 HTTP 上报数据签名口令
# NoneBot.adapters.ding.exception 模块
## _exception_ `DingAdapterException`
基类:[`nonebot.exception.AdapterException`](../exception.md#nonebot.exception.AdapterException)
* **说明**
钉钉 Adapter 错误基类
## _exception_ `ActionFailed`
基类:[`nonebot.exception.ActionFailed`](../exception.md#nonebot.exception.ActionFailed), `nonebot.adapters.ding.exception.DingAdapterException`
* **说明**
API 请求返回错误信息。
* **参数**
* `errcode: Optional[int]`: 错误码
* `errmsg: Optional[str]`: 错误信息
## _exception_ `NetworkError`
基类:[`nonebot.exception.NetworkError`](../exception.md#nonebot.exception.NetworkError), `nonebot.adapters.ding.exception.DingAdapterException`
* **说明**
网络错误。
* **参数**
* `retcode: Optional[int]`: 错误码
## _exception_ `SessionExpired`
基类:`nonebot.adapters.ding.exception.ApiNotAvailable`, `nonebot.adapters.ding.exception.DingAdapterException`
* **说明**
发消息的 session 已经过期。
# NoneBot.adapters.ding.bot 模块
## _class_ `Bot`
基类:[`nonebot.adapters._base.Bot`](README.md#nonebot.adapters._base.Bot)
钉钉 协议 Bot 适配。继承属性参考 [BaseBot](./#class-basebot) 。
### _property_ `type`
* 返回: `"ding"`
### _async classmethod_ `check_permission(driver, connection_type, headers, body)`
* **说明**
钉钉协议鉴权。参考 [鉴权](https://ding-doc.dingtalk.com/doc#/serverapi2/elzz1p)
### _async_ `call_api(api, event=None, **data)`
* **说明**
调用 钉钉 协议 API
* **参数**
* `api: str`: API 名称
* `event: Optional[MessageEvent]`: Event 对象
* `**data: Any`: API 参数
* **返回**
* `Any`: API 调用返回数据
* **异常**
* `NetworkError`: 网络错误
* `ActionFailed`: API 调用失败
### _async_ `send(event, message, at_sender=False, webhook=None, secret=None, **kwargs)`
* **说明**
根据 `event` 向触发事件的主体发送消息。
* **参数**
* `event: Event`: Event 对象
* `message: Union[str, Message, MessageSegment]`: 要发送的消息
* `at_sender: bool`: 是否 @ 事件主体
* `webhook: Optional[str]`: 该条消息将调用的 webhook 地址。不传则将使用 sessionWebhook若其也不存在该条消息不发送使用自定义 webhook 时注意你设置的安全方式如加关键词IP地址加签等等。
* `secret: Optional[str]`: 如果你使用自定义的 webhook 地址,推荐使用加签方式对消息进行验证,将 机器人安全设置页面加签一栏下面显示的SEC开头的字符串 传入这个参数即可。
* `**kwargs`: 覆盖默认参数
* **返回**
* `Any`: API 调用返回数据
* **异常**
* `ValueError`: 缺少 `user_id`, `group_id`
* `NetworkError`: 网络错误
* `ActionFailed`: API 调用失败
# NoneBot.adapters.ding.message 模块
## _class_ `MessageSegment`
基类:`abc.ABC`, `Mapping`
钉钉 协议 MessageSegment 适配。具体方法参考协议消息段类型或源码。
### _static_ `atAll()`
@全体
### _static_ `atMobiles(*mobileNumber)`
@指定手机号人员
### _static_ `atDingtalkIds(*dingtalkIds)`
@指定 id@ 默认会在消息段末尾。
所以你可以在消息中使用 @{senderId} 占位,发送出去之后 @ 就会出现在占位的位置:
``python
message = MessageSegment.text(f"@{event.senderId},你好")
message += MessageSegment.atDingtalkIds(event.senderId)
``
### _static_ `text(text)`
发送 `text` 类型消息
### _static_ `image(picURL)`
发送 `image` 类型消息
### _static_ `extension(dict_)`
"标记 text 文本的 extension 属性,需要与 text 消息段相加。
### _static_ `code(code_language, code)`
"发送 code 消息段
### _static_ `markdown(title, text)`
发送 `markdown` 类型消息
### _static_ `actionCardSingleBtn(title, text, singleTitle, singleURL)`
发送 `actionCardSingleBtn` 类型消息
### _static_ `actionCardMultiBtns(title, text, btns, hideAvatar=False, btnOrientation='1')`
发送 `actionCardMultiBtn` 类型消息
* **参数**
* `btnOrientation`: 0按钮竖直排列 1按钮横向排列
* `btns`: [{ "title": title, "actionURL": actionURL }, ...]
### _static_ `feedCard(links)`
发送 `feedCard` 类型消息
* **参数**
* `links`: [{ "title": xxx, "messageURL": xxx, "picURL": xxx }, ...]
## _class_ `Message`
基类:[`nonebot.adapters._base.Message`](README.md#nonebot.adapters._base.Message)[`nonebot.adapters.ding.message.MessageSegment`]
钉钉 协议 Message 适配。
# NoneBot.adapters.ding.event 模块
## _class_ `Event`
基类:[`nonebot.adapters._base.Event`](README.md#nonebot.adapters._base.Event)
钉钉协议事件。各事件字段参考 [钉钉文档](https://ding-doc.dingtalk.com/document#/org-dev-guide/elzz1p)
## _class_ `ConversationType`
基类:`str`, `enum.Enum`
An enumeration.
## _class_ `MessageEvent`
基类:`nonebot.adapters.ding.event.Event`
消息事件
## _class_ `PrivateMessageEvent`
基类:`nonebot.adapters.ding.event.MessageEvent`
私聊消息事件
## _class_ `GroupMessageEvent`
基类:`nonebot.adapters.ding.event.MessageEvent`
群消息事件

File diff suppressed because it is too large Load Diff

View File

@ -4,429 +4,3 @@ sidebarDepth: 0
---
# NoneBot.drivers 模块
## 后端驱动适配基类
各驱动请继承以下基类
## _class_ `Driver`
基类:`abc.ABC`
Driver 基类。
### `_adapters`
* **类型**
`Dict[str, Type[Bot]]`
* **说明**
已注册的适配器列表
### `_bot_connection_hook`
* **类型**
`Set[T_BotConnectionHook]`
* **说明**
Bot 连接建立时执行的函数
### `_bot_disconnection_hook`
* **类型**
`Set[T_BotDisconnectionHook]`
* **说明**
Bot 连接断开时执行的函数
### _abstract_ `__init__(env, config)`
* **参数**
* `env: Env`: 包含环境信息的 Env 对象
* `config: Config`: 包含配置信息的 Config 对象
### `env`
* **类型**
`str`
* **说明**
环境名称
### `config`
* **类型**
`Config`
* **说明**
配置对象
### `_clients`
* **类型**
`Dict[str, Bot]`
* **说明**
已连接的 Bot
### _property_ `bots`
* **类型**
`Dict[str, Bot]`
* **说明**
获取当前所有已连接的 Bot
### `register_adapter(name, adapter, **kwargs)`
* **说明**
注册一个协议适配器
* **参数**
* `name: str`: 适配器名称,用于在连接时进行识别
* `adapter: Type[Bot]`: 适配器 Class
### _abstract property_ `type`
驱动类型名称
### _abstract property_ `logger`
驱动专属 logger 日志记录器
### _abstract_ `run(host=None, port=None, *args, **kwargs)`
* **说明**
启动驱动框架
* **参数**
* `host: Optional[str]`: 驱动绑定 IP
* `post: Optional[int]`: 驱动绑定端口
* `*args`
* `**kwargs`
### _abstract_ `on_startup(func)`
注册一个在驱动启动时运行的函数
### _abstract_ `on_shutdown(func)`
注册一个在驱动停止时运行的函数
### `on_bot_connect(func)`
* **说明**
装饰一个函数使他在 bot 通过 WebSocket 连接成功时执行。
* **函数参数**
* `bot: Bot`: 当前连接上的 Bot 对象
### `on_bot_disconnect(func)`
* **说明**
装饰一个函数使他在 bot 通过 WebSocket 连接断开时执行。
* **函数参数**
* `bot: Bot`: 当前连接上的 Bot 对象
### `_bot_connect(bot)`
在 WebSocket 连接成功后,调用该函数来注册 bot 对象
### `_bot_disconnect(bot)`
在 WebSocket 连接断开后,调用该函数来注销 bot 对象
## _class_ `ReverseDriver`
基类:`nonebot.drivers.Driver`
Reverse Driver 基类。将后端框架封装,以满足适配器使用。
### _abstract property_ `server_app`
驱动 APP 对象
### _abstract property_ `asgi`
驱动 ASGI 对象
### _abstract async_ `_handle_http(*args, **kwargs)`
用于处理 HTTP 类型请求的函数
### _abstract async_ `_handle_ws_reverse(*args, **kwargs)`
用于处理 WebSocket 类型请求的函数
## _class_ `HTTPRequest`
基类:`object`
HTTP 请求封装。参考 [asgi http scope](https://asgi.readthedocs.io/en/latest/specs/www.html#http-connection-scope)。
### _property_ `type`
Always http
### _property_ `scope`
Raw scope from asgi.
The connection scope information, a dictionary that
contains at least a type key specifying the protocol that is incoming.
### _property_ `http_version`
One of "1.0", "1.1" or "2".
### _property_ `method`
The HTTP method name, uppercased.
### _property_ `schema`
URL scheme portion (likely "http" or "https").
Optional (but must not be empty); default is "http".
### _property_ `path`
HTTP request target excluding any query string,
with percent-encoded sequences and UTF-8 byte sequences
decoded into characters.
### _property_ `query_string`
URL portion after the ?, percent-encoded.
### _property_ `headers`
An iterable of [name, value] two-item iterables,
where name is the header name, and value is the header value.
Order of header values must be preserved from the original HTTP request;
order of header names is not important.
Duplicates are possible and must be preserved in the message as received.
Header names must be lowercased.
### _property_ `body`
Body of the request.
Optional; if missing defaults to b"".
If more_body is set, treat as start of body and concatenate on further chunks.
## _class_ `HTTPResponse`
基类:`object`
HTTP 响应封装。参考 [asgi http scope](https://asgi.readthedocs.io/en/latest/specs/www.html#http-connection-scope)。
### `status`
HTTP status code.
### `headers`
An iterable of [name, value] two-item iterables,
where name is the header name,
and value is the header value.
Order must be preserved in the HTTP response.
Header names must be lowercased.
Optional; if missing defaults to an empty list.
### `body`
HTTP body content.
Optional; if missing defaults to None.
### _property_ `type`
Always http
## _class_ `WebSocket`
基类:`object`
WebSocket 连接封装。参考 [asgi websocket scope](https://asgi.readthedocs.io/en/latest/specs/www.html#websocket-connection-scope)。
### _abstract_ `__init__(websocket)`
* **参数**
* `websocket: Any`: WebSocket 连接对象
### _property_ `websocket`
WebSocket 连接对象
### _abstract property_ `closed`
* **类型**
`bool`
* **说明**
连接是否已经关闭
### _abstract async_ `accept()`
接受 WebSocket 连接请求
### _abstract async_ `close(code)`
关闭 WebSocket 连接请求
### _abstract async_ `receive()`
接收一条 WebSocket 信息
### _abstract async_ `send(data)`
发送一条 WebSocket 信息

View File

@ -4,132 +4,3 @@ sidebarDepth: 0
---
# NoneBot.drivers.fastapi 模块
## FastAPI 驱动适配
后端使用方法请参考: [FastAPI 文档](https://fastapi.tiangolo.com/)
## _class_ `Config`
基类:`pydantic.env_settings.BaseSettings`
FastAPI 驱动框架设置,详情参考 FastAPI 文档
### `fastapi_openapi_url`
* **类型**
`Optional[str]`
* **说明**
`openapi.json` 地址,默认为 `None` 即关闭
### `fastapi_docs_url`
* **类型**
`Optional[str]`
* **说明**
`swagger` 地址,默认为 `None` 即关闭
### `fastapi_redoc_url`
* **类型**
`Optional[str]`
* **说明**
`redoc` 地址,默认为 `None` 即关闭
### `fastapi_reload_dirs`
* **类型**
`List[str]`
* **说明**
`debug` 模式下重载监控文件夹列表,默认为 uvicorn 默认值
## _class_ `Driver`
基类:[`nonebot.drivers.ReverseDriver`](README.md#nonebot.drivers.ReverseDriver)
FastAPI 驱动框架
* **上报地址**
* `/{adapter name}/`: HTTP POST 上报
* `/{adapter name}/http/`: HTTP POST 上报
* `/{adapter name}/ws`: WebSocket 上报
* `/{adapter name}/ws/`: WebSocket 上报
### _property_ `type`
驱动名称: `fastapi`
### _property_ `server_app`
`FastAPI APP` 对象
### _property_ `asgi`
`FastAPI APP` 对象
### _property_ `logger`
fastapi 使用的 logger
### `on_startup(func)`
参考文档: [Events](https://fastapi.tiangolo.com/advanced/events/#startup-event)
### `on_shutdown(func)`
参考文档: [Events](https://fastapi.tiangolo.com/advanced/events/#startup-event)
### `run(host=None, port=None, *, app=None, **kwargs)`
使用 `uvicorn` 启动 FastAPI

View File

@ -4,59 +4,3 @@ sidebarDepth: 0
---
# NoneBot.drivers.quart 模块
## Quart 驱动适配
后端使用方法请参考: [Quart 文档](https://pgjones.gitlab.io/quart/index.html)
## _class_ `Driver`
基类:[`nonebot.drivers.ReverseDriver`](README.md#nonebot.drivers.ReverseDriver)
Quart 驱动框架
* **上报地址**
* `/{adapter name}/http`: HTTP POST 上报
* `/{adapter name}/ws`: WebSocket 上报
### _property_ `type`
驱动名称: `quart`
### _property_ `server_app`
`Quart` 对象
### _property_ `asgi`
`Quart` 对象
### _property_ `logger`
fastapi 使用的 logger
### `on_startup(func)`
参考文档: [Startup and Shutdown](https://pgjones.gitlab.io/quart/how_to_guides/startup_shutdown.html)
### `on_shutdown(func)`
参考文档: [Startup and Shutdown](https://pgjones.gitlab.io/quart/how_to_guides/startup_shutdown.html)
### `run(host=None, port=None, *, app=None, **kwargs)`
使用 `uvicorn` 启动 Quart

View File

@ -4,211 +4,3 @@ sidebarDepth: 0
---
# NoneBot.exception 模块
## 异常
下列文档中的异常是所有 NoneBot 运行时可能会抛出的。
这些异常并非所有需要用户处理,在 NoneBot 内部运行时被捕获,并进行对应操作。
## _exception_ `NoneBotException`
基类:`Exception`
* **说明**
所有 NoneBot 发生的异常基类。
## _exception_ `IgnoredException`
基类:`nonebot.exception.NoneBotException`
* **说明**
指示 NoneBot 应该忽略该事件。可由 PreProcessor 抛出。
* **参数**
* `reason`: 忽略事件的原因
## _exception_ `ParserExit`
基类:`nonebot.exception.NoneBotException`
* **说明**
`shell command` 处理消息失败时返回的异常
* **参数**
* `status`
* `message`
## _exception_ `PausedException`
基类:`nonebot.exception.NoneBotException`
* **说明**
指示 NoneBot 结束当前 `Handler` 并等待下一条消息后继续下一个 `Handler`
可用于用户输入新信息。
* **用法**
可以在 `Handler` 中通过 `Matcher.pause()` 抛出。
## _exception_ `RejectedException`
基类:`nonebot.exception.NoneBotException`
* **说明**
指示 NoneBot 结束当前 `Handler` 并等待下一条消息后重新运行当前 `Handler`
可用于用户重新输入。
* **用法**
可以在 `Handler` 中通过 `Matcher.reject()` 抛出。
## _exception_ `FinishedException`
基类:`nonebot.exception.NoneBotException`
* **说明**
指示 NoneBot 结束当前 `Handler` 且后续 `Handler` 不再被运行。
可用于结束用户会话。
* **用法**
可以在 `Handler` 中通过 `Matcher.finish()` 抛出。
## _exception_ `StopPropagation`
基类:`nonebot.exception.NoneBotException`
* **说明**
指示 NoneBot 终止事件向下层传播。
* **用法**
`Matcher.block == True` 时抛出。
## _exception_ `RequestDenied`
基类:`nonebot.exception.NoneBotException`
* **说明**
Bot 连接请求不合法。
* **参数**
* `status_code: int`: HTTP 状态码
* `reason: str`: 拒绝原因
## _exception_ `AdapterException`
基类:`nonebot.exception.NoneBotException`
* **说明**
代表 `Adapter` 抛出的异常,所有的 `Adapter` 都要在内部继承自这个 `Exception`
* **参数**
* `adapter_name: str`: 标识 adapter
## _exception_ `NoLogException`
基类:`Exception`
* **说明**
指示 NoneBot 对当前 `Event` 进行处理但不显示 Log 信息,可在 `get_log_string` 时抛出
## _exception_ `ApiNotAvailable`
基类:`nonebot.exception.AdapterException`
* **说明**
在 API 连接不可用时抛出。
## _exception_ `NetworkError`
基类:`nonebot.exception.AdapterException`
* **说明**
在网络出现问题时抛出,如: API 请求地址不正确, API 请求无返回或返回状态非正常等。
## _exception_ `ActionFailed`
基类:`nonebot.exception.AdapterException`
* **说明**
API 请求成功返回数据,但 API 操作失败。

View File

@ -4,108 +4,3 @@ sidebarDepth: 0
---
# NoneBot.handler 模块
## 事件处理函数
该模块实现事件处理函数的封装,以实现动态参数等功能。
## _class_ `Handler`
基类:`object`
事件处理函数类
### `__init__(func)`
装饰事件处理函数以便根据动态参数运行
### `func`
* **类型**
`T_Handler`
* **说明**
事件处理函数
### `signature`
* **类型**
`inspect.Signature`
* **说明**
事件处理函数签名
### _property_ `bot_type`
* **类型**
`Union[Type["Bot"], inspect.Parameter.empty]`
* **说明**
事件处理函数接受的 Bot 对象类型
### _property_ `event_type`
* **类型**
`Optional[Union[Type[Event], inspect.Parameter.empty]]`
* **说明**
事件处理函数接受的 event 类型 / 不需要 event 参数
### _property_ `state_type`
* **类型**
`Optional[Union[T_State, inspect.Parameter.empty]]`
* **说明**
事件处理函数是否接受 state 参数
### _property_ `matcher_type`
* **类型**
`Optional[Union[Type["Matcher"], inspect.Parameter.empty]]`
* **说明**
事件处理函数是否接受 matcher 参数

View File

@ -4,583 +4,3 @@ sidebarDepth: 0
---
# NoneBot.matcher 模块
## 事件响应器
该模块实现事件响应器的创建与运行,并提供一些快捷方法来帮助用户更好的与机器人进行对话 。
## `matchers`
* **类型**
`Dict[int, List[Type[Matcher]]]`
* **说明**
用于存储当前所有的事件响应器
## _class_ `Matcher`
基类:`object`
事件响应器类
### `module`
* **类型**
`Optional[ModuleType]`
* **说明**
事件响应器所在模块
### `plugin_name`
* **类型**
`Optional[str]`
* **说明**
事件响应器所在插件名
### `module_name`
* **类型**
`Optional[str]`
* **说明**
事件响应器所在模块名
### `module_prefix`
* **类型**
`Optional[str]`
* **说明**
事件响应器所在模块前缀
### `type`
* **类型**
`str`
* **说明**
事件响应器类型
### `rule`
* **类型**
`Rule`
* **说明**
事件响应器匹配规则
### `permission`
* **类型**
`Permission`
* **说明**
事件响应器触发权限
### `priority`
* **类型**
`int`
* **说明**
事件响应器优先级
### `block`
* **类型**
`bool`
* **说明**
事件响应器是否阻止事件传播
### `temp`
* **类型**
`bool`
* **说明**
事件响应器是否为临时
### `expire_time`
* **类型**
`Optional[datetime]`
* **说明**
事件响应器过期时间点
### `_default_state`
* **类型**
`T_State`
* **说明**
事件响应器默认状态
### `_default_state_factory`
* **类型**
`Optional[T_State]`
* **说明**
事件响应器默认工厂函数
### `_default_parser`
* **类型**
`Optional[T_ArgsParser]`
* **说明**
事件响应器默认参数解析函数
### `_default_type_updater`
* **类型**
`Optional[T_TypeUpdater]`
* **说明**
事件响应器类型更新函数
### `_default_permission_updater`
* **类型**
`Optional[T_PermissionUpdater]`
* **说明**
事件响应器权限更新函数
### `__init__()`
实例化 Matcher 以便运行
### `handlers`
* **类型**
`List[Handler]`
* **说明**
事件响应器拥有的事件处理函数列表
### _classmethod_ `new(type_='', rule=None, permission=None, handlers=None, temp=False, priority=1, block=False, *, module=None, expire_time=None, default_state=None, default_state_factory=None, default_parser=None, default_type_updater=None, default_permission_updater=None)`
* **说明**
创建一个新的事件响应器,并存储至 [matchers](#matchers)
* **参数**
* `type_: str`: 事件响应器类型,与 `event.get_type()` 一致时触发,空字符串表示任意
* `rule: Optional[Rule]`: 匹配规则
* `permission: Optional[Permission]`: 权限
* `handlers: Optional[List[T_Handler]]`: 事件处理函数列表
* `temp: bool`: 是否为临时事件响应器,即触发一次后删除
* `priority: int`: 响应优先级
* `block: bool`: 是否阻止事件向更低优先级的响应器传播
* `module: Optional[str]`: 事件响应器所在模块名称
* `default_state: Optional[T_State]`: 默认状态 `state`
* `default_state_factory: Optional[T_StateFactory]`: 默认状态 `state` 的工厂函数
* `expire_time: Optional[datetime]`: 事件响应器最终有效时间点,过时即被删除
* **返回**
* `Type[Matcher]`: 新的事件响应器类
### _async classmethod_ `check_perm(bot, event)`
* **说明**
检查是否满足触发权限
* **参数**
* `bot: Bot`: Bot 对象
* `event: Event`: 上报事件
* **返回**
* `bool`: 是否满足权限
### _async classmethod_ `check_rule(bot, event, state)`
* **说明**
检查是否满足匹配规则
* **参数**
* `bot: Bot`: Bot 对象
* `event: Event`: 上报事件
* `state: T_State`: 当前状态
* **返回**
* `bool`: 是否满足匹配规则
### _classmethod_ `args_parser(func)`
* **说明**
装饰一个函数来更改当前事件响应器的默认参数解析函数
* **参数**
* `func: T_ArgsParser`: 参数解析函数
### _classmethod_ `type_updater(func)`
* **说明**
装饰一个函数来更改当前事件响应器的默认响应事件类型更新函数
* **参数**
* `func: T_TypeUpdater`: 响应事件类型更新函数
### _classmethod_ `permission_updater(func)`
* **说明**
装饰一个函数来更改当前事件响应器的默认会话权限更新函数
* **参数**
* `func: T_PermissionUpdater`: 会话权限更新函数
### _classmethod_ `handle()`
* **说明**
装饰一个函数来向事件响应器直接添加一个处理函数
* **参数**
* 无
### _classmethod_ `receive()`
* **说明**
装饰一个函数来指示 NoneBot 在接收用户新的一条消息后继续运行该函数
* **参数**
* 无
### _classmethod_ `got(key, prompt=None, args_parser=None)`
* **说明**
装饰一个函数来指示 NoneBot 当要获取的 `key` 不存在时接收用户新的一条消息并经过 `ArgsParser` 处理后再运行该函数,如果 `key` 已存在则直接继续运行
* **参数**
* `key: str`: 参数名
* `prompt: Optional[Union[str, Message, MessageSegment]]`: 在参数不存在时向用户发送的消息
* `args_parser: Optional[T_ArgsParser]`: 可选参数解析函数,空则使用默认解析函数
### _async classmethod_ `send(message, **kwargs)`
* **说明**
发送一条消息给当前交互用户
* **参数**
* `message: Union[str, Message, MessageSegment]`: 消息内容
* `**kwargs`: 其他传递给 `bot.send` 的参数,请参考对应 adapter 的 bot 对象 api
### _async classmethod_ `finish(message=None, **kwargs)`
* **说明**
发送一条消息给当前交互用户并结束当前事件响应器
* **参数**
* `message: Union[str, Message, MessageSegment]`: 消息内容
* `**kwargs`: 其他传递给 `bot.send` 的参数,请参考对应 adapter 的 bot 对象 api
### _async classmethod_ `pause(prompt=None, **kwargs)`
* **说明**
发送一条消息给当前交互用户并暂停事件响应器,在接收用户新的一条消息后继续下一个处理函数
* **参数**
* `prompt: Union[str, Message, MessageSegment]`: 消息内容
* `**kwargs`: 其他传递给 `bot.send` 的参数,请参考对应 adapter 的 bot 对象 api
### _async classmethod_ `reject(prompt=None, **kwargs)`
* **说明**
发送一条消息给当前交互用户并暂停事件响应器,在接收用户新的一条消息后重新运行当前处理函数
* **参数**
* `prompt: Union[str, Message, MessageSegment]`: 消息内容
* `**kwargs`: 其他传递给 `bot.send` 的参数,请参考对应 adapter 的 bot 对象 api
### `stop_propagation()`
* **说明**
阻止事件传播

View File

@ -4,140 +4,3 @@ sidebarDepth: 0
---
# NoneBot.message 模块
## 事件处理
NoneBot 内部处理并按优先级分发事件给所有事件响应器,提供了多个插槽以进行事件的预处理等。
## `event_preprocessor(func)`
* **说明**
事件预处理。装饰一个函数,使它在每次接收到事件并分发给各响应器之前执行。
* **参数**
事件预处理函数接收三个参数。
* `bot: Bot`: Bot 对象
* `event: Event`: Event 对象
* `state: T_State`: 当前 State
## `event_postprocessor(func)`
* **说明**
事件后处理。装饰一个函数,使它在每次接收到事件并分发给各响应器之后执行。
* **参数**
事件后处理函数接收三个参数。
* `bot: Bot`: Bot 对象
* `event: Event`: Event 对象
* `state: T_State`: 当前事件运行前 State
## `run_preprocessor(func)`
* **说明**
运行预处理。装饰一个函数,使它在每次事件响应器运行前执行。
* **参数**
运行预处理函数接收四个参数。
* `matcher: Matcher`: 当前要运行的事件响应器
* `bot: Bot`: Bot 对象
* `event: Event`: Event 对象
* `state: T_State`: 当前 State
## `run_postprocessor(func)`
* **说明**
运行后处理。装饰一个函数,使它在每次事件响应器运行后执行。
* **参数**
运行后处理函数接收五个参数。
* `matcher: Matcher`: 运行完毕的事件响应器
* `exception: Optional[Exception]`: 事件响应器运行错误(如果存在)
* `bot: Bot`: Bot 对象
* `event: Event`: Event 对象
* `state: T_State`: 当前 State
## _async_ `handle_event(bot, event)`
* **说明**
处理一个事件。调用该函数以实现分发事件。
* **参数**
* `bot: Bot`: Bot 对象
* `event: Event`: Event 对象
* **示例**
```python
import asyncio
asyncio.create_task(handle_event(bot, event))
```

View File

@ -4,275 +4,3 @@ sidebarDepth: 0
---
# NoneBot 模块
## 快捷导入
为方便使用,`nonebot` 模块从子模块导入了部分内容
* `on_message` => `nonebot.plugin.on_message`
* `on_notice` => `nonebot.plugin.on_notice`
* `on_request` => `nonebot.plugin.on_request`
* `on_metaevent` => `nonebot.plugin.on_metaevent`
* `on_startswith` => `nonebot.plugin.on_startswith`
* `on_endswith` => `nonebot.plugin.on_endswith`
* `on_keyword` => `nonebot.plugin.on_keyword`
* `on_command` => `nonebot.plugin.on_command`
* `on_shell_command` => `nonebot.plugin.on_shell_command`
* `on_regex` => `nonebot.plugin.on_regex`
* `CommandGroup` => `nonebot.plugin.CommandGroup`
* `Matchergroup` => `nonebot.plugin.MatcherGroup`
* `load_plugin` => `nonebot.plugin.load_plugin`
* `load_plugins` => `nonebot.plugin.load_plugins`
* `load_all_plugins` => `nonebot.plugin.load_all_plugins`
* `load_from_json` => `nonebot.plugin.load_from_json`
* `load_from_toml` => `nonebot.plugin.load_from_toml`
* `load_builtin_plugins` => `nonebot.plugin.load_builtin_plugins`
* `get_plugin` => `nonebot.plugin.get_plugin`
* `get_loaded_plugins` => `nonebot.plugin.get_loaded_plugins`
* `export` => `nonebot.plugin.export`
* `require` => `nonebot.plugin.require`
## `get_driver()`
* **说明**
获取全局 Driver 对象。可用于在计划任务的回调中获取当前 Driver 对象。
* **返回**
* `Driver`: 全局 Driver 对象
* **异常**
* `ValueError`: 全局 Driver 对象尚未初始化 (nonebot.init 尚未调用)
* **用法**
```python
driver = nonebot.get_driver()
```
## `get_app()`
* **说明**
获取全局 Driver 对应 Server App 对象。
* **返回**
* `Any`: Server App 对象
* **异常**
* `ValueError`: 全局 Driver 对象尚未初始化 (nonebot.init 尚未调用)
* **用法**
```python
app = nonebot.get_app()
```
## `get_asgi()`
* **说明**
获取全局 Driver 对应 Asgi 对象。
* **返回**
* `Any`: Asgi 对象
* **异常**
* `ValueError`: 全局 Driver 对象尚未初始化 (nonebot.init 尚未调用)
* **用法**
```python
asgi = nonebot.get_asgi()
```
## `get_bots()`
* **说明**
获取所有通过 ws 连接 NoneBot 的 Bot 对象。
* **返回**
* `Dict[str, Bot]`: 一个以字符串 ID 为键Bot 对象为值的字典
* **异常**
* `ValueError`: 全局 Driver 对象尚未初始化 (nonebot.init 尚未调用)
* **用法**
```python
bots = nonebot.get_bots()
```
## `init(*, _env_file=None, **kwargs)`
* **说明**
初始化 NoneBot 以及 全局 Driver 对象。
NoneBot 将会从 .env 文件中读取环境信息,并使用相应的 env 文件配置。
你也可以传入自定义的 _env_file 来指定 NoneBot 从该文件读取配置。
* **参数**
* `_env_file: Optional[str]`: 配置文件名,默认从 .env.{env_name} 中读取配置
* `**kwargs`: 任意变量,将会存储到 Config 对象里
* **返回**
* `None`
* **用法**
```python
nonebot.init(database=Database(...))
```
## `run(host=None, port=None, *args, **kwargs)`
* **说明**
启动 NoneBot即运行全局 Driver 对象。
* **参数**
* `host: Optional[str]`: 主机名IP若不传入则使用配置文件中指定的值
* `port: Optional[int]`: 端口,若不传入则使用配置文件中指定的值
* `*args`: 传入 Driver.run 的位置参数
* `**kwargs`: 传入 Driver.run 的命名参数
* **返回**
* `None`
* **用法**
```python
nonebot.run(host="127.0.0.1", port=8080)
```

View File

@ -4,134 +4,3 @@ sidebarDepth: 0
---
# NoneBot.permission 模块
## 权限
每个 `Matcher` 拥有一个 `Permission` ,其中是 **异步** `PermissionChecker` 的集合,只要有一个 `PermissionChecker` 检查结果为 `True` 时就会继续运行。
:::tip 提示
`PermissionChecker` 既可以是 async function 也可以是 sync function
:::
## _class_ `Permission`
基类:`object`
* **说明**
`Matcher` 规则类,当事件传递时,在 `Matcher` 运行前进行检查。
* **示例**
```python
Permission(async_function) | sync_function
# 等价于
from nonebot.utils import run_sync
Permission(async_function, run_sync(sync_function))
```
### `__init__(*checkers)`
* **参数**
* `*checkers: Callable[[Bot, Event], Awaitable[bool]]`: **异步** PermissionChecker
### `checkers`
* **说明**
存储 `PermissionChecker`
* **类型**
* `Set[Callable[[Bot, Event], Awaitable[bool]]]`
### _async_ `__call__(bot, event)`
* **说明**
检查是否满足某个权限
* **参数**
* `bot: Bot`: Bot 对象
* `event: Event`: Event 对象
* **返回**
* `bool`
## `MESSAGE`
* **说明**: 匹配任意 `message` 类型事件,仅在需要同时捕获不同类型事件时使用。优先使用 message type 的 Matcher。
## `NOTICE`
* **说明**: 匹配任意 `notice` 类型事件,仅在需要同时捕获不同类型事件时使用。优先使用 notice type 的 Matcher。
## `REQUEST`
* **说明**: 匹配任意 `request` 类型事件,仅在需要同时捕获不同类型事件时使用。优先使用 request type 的 Matcher。
## `METAEVENT`
* **说明**: 匹配任意 `meta_event` 类型事件,仅在需要同时捕获不同类型事件时使用。优先使用 meta_event type 的 Matcher。
## `USER(*user, perm=None)`
* **说明**
`event``session_id` 在白名单内且满足 perm
* **参数**
* `*user: str`: 白名单
* `perm: Optional[Permission]`: 需要同时满足的权限
## `SUPERUSER`
* **说明**: 匹配任意超级用户消息类型事件

File diff suppressed because it is too large Load Diff

View File

@ -4,263 +4,3 @@ sidebarDepth: 0
---
# NoneBot.rule 模块
## 规则
每个事件响应器 `Matcher` 拥有一个匹配规则 `Rule` ,其中是 **异步** `RuleChecker` 的集合,只有当所有 `RuleChecker` 检查结果为 `True` 时继续运行。
:::tip 提示
`RuleChecker` 既可以是 async function 也可以是 sync function但在最终会被 `nonebot.utils.run_sync` 转换为 async function
:::
## _class_ `Rule`
基类:`object`
* **说明**
`Matcher` 规则类,当事件传递时,在 `Matcher` 运行前进行检查。
* **示例**
```python
Rule(async_function) & sync_function
# 等价于
from nonebot.utils import run_sync
Rule(async_function, run_sync(sync_function))
```
### `__init__(*checkers)`
* **参数**
* `*checkers: Callable[[Bot, Event, T_State], Awaitable[bool]]`: **异步** RuleChecker
### `checkers`
* **说明**
存储 `RuleChecker`
* **类型**
* `Set[Callable[[Bot, Event, T_State], Awaitable[bool]]]`
### _async_ `__call__(bot, event, state)`
* **说明**
检查是否符合所有规则
* **参数**
* `bot: Bot`: Bot 对象
* `event: Event`: Event 对象
* `state: T_State`: 当前 State
* **返回**
* `bool`
## `startswith(msg, ignorecase=False)`
* **说明**
匹配消息开头
* **参数**
* `msg: str`: 消息开头字符串
## `endswith(msg, ignorecase=False)`
* **说明**
匹配消息结尾
* **参数**
* `msg: str`: 消息结尾字符串
## `keyword(*keywords)`
* **说明**
匹配消息关键词
* **参数**
* `*keywords: str`: 关键词
## `command(*cmds)`
* **说明**
命令形式匹配,根据配置里提供的 `command_start`, `command_sep` 判断消息是否为命令。
可以通过 `state["_prefix"]["command"]` 获取匹配成功的命令(例:`("test",)`),通过 `state["_prefix"]["raw_command"]` 获取匹配成功的原始命令文本(例:`"/test"`)。
* **参数**
* `*cmds: Union[str, Tuple[str, ...]]`: 命令内容
* **示例**
使用默认 `command_start`, `command_sep` 配置
命令 `("test",)` 可以匹配:`/test` 开头的消息
命令 `("test", "sub")` 可以匹配”`/test.sub` 开头的消息
:::tip 提示
命令内容与后续消息间无需空格!
:::
## _class_ `ArgumentParser`
基类:`argparse.ArgumentParser`
* **说明**
`shell_like` 命令参数解析器,解析出错时不会退出程序。
## `shell_command(*cmds, parser=None)`
* **说明**
支持 `shell_like` 解析参数的命令形式匹配,根据配置里提供的 `command_start`, `command_sep` 判断消息是否为命令。
可以通过 `state["_prefix"]["command"]` 获取匹配成功的命令(例:`("test",)`),通过 `state["_prefix"]["raw_command"]` 获取匹配成功的原始命令文本(例:`"/test"`)。
可以通过 `state["argv"]` 获取用户输入的原始参数列表
添加 `parser` 参数后, 可以自动处理消息并将结果保存在 `state["args"]` 中。
* **参数**
* `*cmds: Union[str, Tuple[str, ...]]`: 命令内容
* `parser: Optional[ArgumentParser]`: `nonebot.rule.ArgumentParser` 对象
* **示例**
使用默认 `command_start`, `command_sep` 配置,更多示例参考 `argparse` 标准库文档。
```python
from nonebot.rule import ArgumentParser
parser = ArgumentParser()
parser.add_argument("-a", action="store_true")
rule = shell_command("ls", parser=parser)
```
:::tip 提示
命令内容与后续消息间无需空格!
:::
## `regex(regex, flags=0)`
* **说明**
根据正则表达式进行匹配。
可以通过 `state["_matched"]` `state["_matched_groups"]` `state["_matched_dict"]`
获取正则表达式匹配成功的文本。
* **参数**
* `regex: str`: 正则表达式
* `flags: Union[int, re.RegexFlag]`: 正则标志
:::tip 提示
正则表达式匹配使用 search 而非 match如需从头匹配请使用 `r"^xxx"` 来确保匹配开头
:::
## `to_me()`
* **说明**
通过 `event.is_tome()` 判断事件是否与机器人有关
* **参数**
* 无

View File

@ -11,13 +11,14 @@ from copy import copy
from functools import reduce, partial
from typing_extensions import Protocol
from dataclasses import dataclass, field
from typing import (Any, Set, List, Dict, Union, TypeVar, Mapping, Optional,
Iterable, Awaitable, TYPE_CHECKING)
from typing import (Any, Set, List, Dict, Tuple, Union, TypeVar, Mapping,
Optional, Iterable, Awaitable, TYPE_CHECKING)
from pydantic import BaseModel
from nonebot.log import logger
from nonebot.utils import DataclassEncoder
from nonebot.drivers import HTTPConnection, HTTPResponse
from nonebot.typing import T_CallingAPIHook, T_CalledAPIHook
if TYPE_CHECKING:
@ -51,12 +52,7 @@ class Bot(abc.ABC):
:说明: call_api 后执行的函数
"""
@abc.abstractmethod
def __init__(self,
connection_type: str,
self_id: str,
*,
websocket: Optional["WebSocket"] = None):
def __init__(self, self_id: str, request: HTTPConnection):
"""
:参数:
@ -64,12 +60,10 @@ class Bot(abc.ABC):
* ``self_id: str``: 机器人 ID
* ``websocket: Optional[WebSocket]``: Websocket 连接对象
"""
self.connection_type = connection_type
"""连接类型"""
self.self_id = self_id
self.self_id: str = self_id
"""机器人 ID"""
self.websocket = websocket
"""Websocket 连接对象"""
self.request: HTTPConnection = request
"""连接信息"""
def __getattr__(self, name: str) -> _ApiCall:
return partial(self.call_api, name)
@ -92,8 +86,9 @@ class Bot(abc.ABC):
@classmethod
@abc.abstractmethod
async def check_permission(cls, driver: "Driver", connection_type: str,
headers: dict, body: Optional[bytes]) -> str:
async def check_permission(
cls, driver: "Driver", request: HTTPConnection
) -> Tuple[Optional[str], Optional[HTTPResponse]]:
"""
:说明:
@ -108,7 +103,8 @@ class Bot(abc.ABC):
:返回:
- ``str``: 连接唯一标识符
- ``str``: 连接唯一标识符``None`` 代表连接不合法
- ``HTTPResponse``: HTTP 上报响应
:异常:
@ -117,7 +113,7 @@ class Bot(abc.ABC):
raise NotImplementedError
@abc.abstractmethod
async def handle_message(self, message: dict):
async def handle_message(self, message: bytes):
"""
:说明:
@ -125,7 +121,7 @@ class Bot(abc.ABC):
:参数:
* ``message: dict``: 收到的上报消息
* ``message: bytes``: 收到的上报消息
"""
raise NotImplementedError

View File

@ -7,8 +7,8 @@
import abc
import asyncio
from typing import (Any, Set, List, Dict, Type, Tuple, Optional, Callable,
MutableMapping, TYPE_CHECKING)
from dataclasses import dataclass, field
from typing import Set, Dict, Type, Optional, Callable, TYPE_CHECKING
from nonebot.log import logger
from nonebot.config import Env, Config
@ -47,12 +47,12 @@ class Driver(abc.ABC):
* ``env: Env``: 包含环境信息的 Env 对象
* ``config: Config``: 包含配置信息的 Config 对象
"""
self.env = env.environment
self.env: str = env.environment
"""
:类型: ``str``
:说明: 环境名称
"""
self.config = config
self.config: Config = config
"""
:类型: ``Config``
:说明: 配置对象
@ -231,143 +231,101 @@ class ReverseDriver(Driver):
raise NotImplementedError
class HTTPRequest:
@dataclass
class HTTPConnection(abc.ABC):
http_version: str
"""One of `"1.0"`, `"1.1"` or `"2"`."""
scheme: str
"""URL scheme portion (likely `"http"` or `"https"`)."""
path: str
"""
HTTP request target excluding any query string,
with percent-encoded sequences and UTF-8 byte sequences
decoded into characters.
"""
query_string: bytes = b""
""" URL portion after the `?`, percent-encoded."""
headers: Dict[str, str] = field(default_factory=dict)
"""A dict of name-value pairs,
where name is the header name, and value is the header value.
Order of header values must be preserved from the original HTTP request;
order of header names is not important.
Header names must be lowercased.
"""
@property
@abc.abstractmethod
def type(self) -> str:
"""Connection type."""
raise NotImplementedError
@dataclass
class HTTPRequest(HTTPConnection):
"""HTTP 请求封装。参考 `asgi http scope`_。
.. _asgi http scope:
https://asgi.readthedocs.io/en/latest/specs/www.html#http-connection-scope
"""
method: str = "GET"
"""The HTTP method name, uppercased."""
body: bytes = b""
"""Body of the request.
def __init__(self, scope: MutableMapping[str, Any]):
self._scope = scope
Optional; if missing defaults to b"".
"""
@property
def type(self) -> str:
"""Always `http`"""
"""Always ``http``"""
return "http"
@property
def scope(self) -> MutableMapping[str, Any]:
"""Raw scope from asgi.
The connection scope information, a dictionary that
contains at least a `type` key specifying the protocol that is incoming.
"""
return self._scope
@property
def http_version(self) -> str:
"""One of `"1.0"`, `"1.1"` or `"2"`."""
raise self.scope["http_version"]
@property
def method(self) -> str:
"""The HTTP method name, uppercased."""
raise self.scope["method"]
@property
def schema(self) -> str:
"""
URL scheme portion (likely `"http"` or `"https"`).
Optional (but must not be empty); default is `"http"`.
"""
raise self.scope["schema"]
@property
def path(self) -> str:
"""
HTTP request target excluding any query string,
with percent-encoded sequences and UTF-8 byte sequences
decoded into characters.
"""
return self.scope["path"]
@property
def query_string(self) -> bytes:
""" URL portion after the `?`, percent-encoded."""
return self.scope["query_string"]
@property
def headers(self) -> List[Tuple[bytes, bytes]]:
"""An iterable of [name, value] two-item iterables,
where name is the header name, and value is the header value.
Order of header values must be preserved from the original HTTP request;
order of header names is not important.
Duplicates are possible and must be preserved in the message as received.
Header names must be lowercased.
"""
return list(self.scope["headers"])
@property
def body(self) -> bytes:
"""Body of the request.
Optional; if missing defaults to b"".
If more_body is set, treat as start of body and concatenate on further chunks.
"""
return self.scope["body"]
@dataclass
class HTTPResponse:
"""HTTP 响应封装。参考 `asgi http scope`_。
.. _asgi http scope:
https://asgi.readthedocs.io/en/latest/specs/www.html#http-connection-scope
"""
status: int
"""HTTP status code."""
body: Optional[bytes] = None
"""HTTP body content.
def __init__(self,
status: int,
headers: List[Tuple[bytes, bytes]] = [],
body: Optional[bytes] = None):
self.status: int = status
"""HTTP status code."""
self.headers: List[Tuple[bytes, bytes]] = headers
"""An iterable of [name, value] two-item iterables,
where name is the header name,
and value is the header value.
Optional; if missing defaults to ``None``.
"""
headers: Dict[str, str] = field(default_factory=dict)
"""A dict of name-value pairs,
where name is the header name, and value is the header value.
Order must be preserved in the HTTP response.
Order must be preserved in the HTTP response.
Header names must be lowercased.
Header names must be lowercased.
Optional; if missing defaults to an empty list.
"""
self.body: Optional[bytes] = body
"""HTTP body content.
Optional; if missing defaults to `None`.
"""
Optional; if missing defaults to an empty dict.
"""
@property
def type(self) -> str:
"""Always `http`"""
"""Always ``http``"""
return "http"
class WebSocket:
@dataclass
class WebSocket(HTTPConnection, abc.ABC):
"""WebSocket 连接封装。参考 `asgi websocket scope`_。
.. _asgi websocket scope:
https://asgi.readthedocs.io/en/latest/specs/www.html#websocket-connection-scope
"""
@abc.abstractmethod
def __init__(self, websocket):
"""
:参数:
* ``websocket: Any``: WebSocket 连接对象
"""
self._websocket = websocket
@property
def websocket(self):
"""WebSocket 连接对象"""
return self._websocket
def type(self) -> str:
"""Always ``websocket``"""
return "websocket"
@property
@abc.abstractmethod
@ -389,11 +347,21 @@ class WebSocket:
raise NotImplementedError
@abc.abstractmethod
async def receive(self) -> dict:
"""接收一条 WebSocket 信息"""
async def receive(self) -> str:
"""接收一条 WebSocket text 信息"""
raise NotImplementedError
@abc.abstractmethod
async def send(self, data: dict):
"""发送一条 WebSocket 信息"""
async def receive_bytes(self) -> bytes:
"""接收一条 WebSocket binary 信息"""
raise NotImplementedError
@abc.abstractmethod
async def send(self, data: str):
"""发送一条 WebSocket text 信息"""
raise NotImplementedError
@abc.abstractmethod
async def send_bytes(self, data: bytes):
"""发送一条 WebSocket text 信息"""
raise NotImplementedError

View File

@ -8,23 +8,22 @@ FastAPI 驱动适配
https://fastapi.tiangolo.com/
"""
import json
import asyncio
import logging
from dataclasses import dataclass
from typing import List, Optional, Callable
import uvicorn
from pydantic import BaseSettings
from fastapi.responses import Response
from fastapi import status, Request, FastAPI, HTTPException
from starlette.websockets import WebSocketDisconnect, WebSocket as FastAPIWebSocket
from starlette.websockets import (WebSocketState, WebSocketDisconnect, WebSocket
as FastAPIWebSocket)
from nonebot.log import logger
from nonebot.typing import overrides
from nonebot.utils import DataclassEncoder
from nonebot.exception import RequestDenied
from nonebot.config import Env, Config as NoneBotConfig
from nonebot.drivers import ReverseDriver, WebSocket as BaseWebSocket
from nonebot.drivers import ReverseDriver, HTTPRequest, WebSocket as BaseWebSocket
class Config(BaseSettings):
@ -179,11 +178,6 @@ class Driver(ReverseDriver):
@overrides(ReverseDriver)
async def _handle_http(self, adapter: str, request: Request):
data = await request.body()
data_dict = json.loads(data.decode())
if not isinstance(data_dict, dict):
logger.warning("Data received is invalid")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
if adapter not in self._adapters:
logger.warning(
@ -194,27 +188,34 @@ class Driver(ReverseDriver):
# 创建 Bot 对象
BotClass = self._adapters[adapter]
headers = dict(request.headers)
try:
x_self_id = await BotClass.check_permission(self, "http", headers,
data)
except RequestDenied as e:
raise HTTPException(status_code=e.status_code,
detail=e.reason) from None
http_request = HTTPRequest(request.scope["http_version"],
request.url.scheme, request.url.path,
request.scope["query_string"],
dict(request.headers), request.method, data)
x_self_id, response = await BotClass.check_permission(
self, http_request)
if not x_self_id:
raise HTTPException(response and response.status or 401,
response.body)
if x_self_id in self._clients:
logger.warning("There's already a reverse websocket connection,"
"so the event may be handled twice.")
bot = BotClass("http", x_self_id)
bot = BotClass(x_self_id, http_request)
asyncio.create_task(bot.handle_message(data_dict))
return Response("", 204)
asyncio.create_task(bot.handle_message(data))
return Response(response and response.body,
response and response.status or 200)
@overrides(ReverseDriver)
async def _handle_ws_reverse(self, adapter: str,
websocket: FastAPIWebSocket):
ws = WebSocket(websocket)
ws = WebSocket(websocket.scope.get("http_version",
"1.1"), websocket.url.scheme,
websocket.url.path, websocket.scope["query_string"],
dict(websocket.headers), websocket)
if adapter not in self._adapters:
logger.warning(
@ -225,11 +226,9 @@ class Driver(ReverseDriver):
# Create Bot Object
BotClass = self._adapters[adapter]
headers = dict(websocket.headers)
try:
x_self_id = await BotClass.check_permission(self, "websocket",
headers, None)
except RequestDenied:
x_self_id, _ = await BotClass.check_permission(self, ws)
if not x_self_id:
await ws.close(code=status.WS_1008_POLICY_VIOLATION)
return
@ -240,7 +239,7 @@ class Driver(ReverseDriver):
await ws.close(code=status.WS_1008_POLICY_VIOLATION)
return
bot = BotClass("websocket", x_self_id, websocket=ws)
bot = BotClass(x_self_id, ws)
await ws.accept()
logger.opt(colors=True).info(
@ -251,54 +250,51 @@ class Driver(ReverseDriver):
try:
while not ws.closed:
data = await ws.receive()
try:
data = await ws.receive()
except WebSocketDisconnect:
logger.error("WebSocket disconnected by peer.")
break
except Exception as e:
logger.opt(exception=e).error(
"Error when receiving data from websocket.")
break
if not data:
continue
asyncio.create_task(bot.handle_message(data))
asyncio.create_task(bot.handle_message(data.encode()))
finally:
self._bot_disconnect(bot)
@dataclass
class WebSocket(BaseWebSocket):
def __init__(self, websocket: FastAPIWebSocket):
super().__init__(websocket)
self._closed = False
websocket: FastAPIWebSocket = None # type: ignore
@property
@overrides(BaseWebSocket)
def closed(self):
return self._closed
return (self.websocket.client_state == WebSocketState.DISCONNECTED or
self.websocket.application_state == WebSocketState.DISCONNECTED)
@overrides(BaseWebSocket)
async def accept(self):
await self.websocket.accept()
self._closed = False
@overrides(BaseWebSocket)
async def close(self, code: int = status.WS_1000_NORMAL_CLOSURE):
await self.websocket.close(code=code)
self._closed = True
@overrides(BaseWebSocket)
async def receive(self) -> Optional[dict]:
data = None
try:
data = await self.websocket.receive_json()
if not isinstance(data, dict):
data = None
raise ValueError
except ValueError:
logger.warning("Received an invalid json message.")
except WebSocketDisconnect:
self._closed = True
logger.error("WebSocket disconnected by peer.")
return data
async def receive(self) -> str:
return await self.websocket.receive_text()
@overrides(BaseWebSocket)
async def send(self, data: dict) -> None:
text = json.dumps(data, cls=DataclassEncoder)
await self.websocket.send({"type": "websocket.send", "text": text})
async def receive_bytes(self) -> bytes:
return await self.websocket.receive_bytes()
@overrides(BaseWebSocket)
async def send(self, data: str) -> None:
await self.websocket.send({"type": "websocket.send", "text": data})
@overrides(BaseWebSocket)
async def send_bytes(self, data: bytes) -> None:
await self.websocket.send({"type": "websocket.send", "bytes": data})

View File

@ -9,24 +9,22 @@ Quart 驱动适配
"""
import asyncio
from json.decoder import JSONDecodeError
from typing import Any, Callable, Coroutine, Dict, Optional, Type, TypeVar
from typing import List, TypeVar, Callable, Coroutine, Optional
import uvicorn
from pydantic import BaseSettings
from nonebot.config import Config as NoneBotConfig
from nonebot.config import Env
from nonebot.drivers import ReverseDriver, WebSocket as BaseWebSocket
from nonebot.exception import RequestDenied
from nonebot.log import logger
from nonebot.typing import overrides
from nonebot.config import Env, Config as NoneBotConfig
from nonebot.drivers import ReverseDriver, HTTPRequest, WebSocket as BaseWebSocket
try:
from quart import Quart, Request, Response
from quart import Websocket as QuartWebSocket
from quart import exceptions
from quart import request as _request
from quart import websocket as _websocket
from quart import Quart, Request, Response
from quart import Websocket as QuartWebSocket
except ImportError:
raise ValueError(
'Please install Quart by using `pip install nonebot2[quart]`')
@ -34,6 +32,25 @@ except ImportError:
_AsyncCallable = TypeVar("_AsyncCallable", bound=Callable[..., Coroutine])
class Config(BaseSettings):
"""
Quart 驱动框架设置
"""
quart_reload_dirs: List[str] = []
"""
:类型:
``List[str]``
:说明:
``debug`` 模式下重载监控文件夹列表默认为 uvicorn 默认值
"""
class Config:
extra = "ignore"
class Driver(ReverseDriver):
"""
Quart 驱动框架
@ -48,18 +65,20 @@ class Driver(ReverseDriver):
def __init__(self, env: Env, config: NoneBotConfig):
super().__init__(env, config)
self.quart_config = Config(**config.dict())
self._server_app = Quart(self.__class__.__qualname__)
self._server_app.add_url_rule('/<adapter>/http',
methods=['POST'],
self._server_app.add_url_rule("/<adapter>/http",
methods=["POST"],
view_func=self._handle_http)
self._server_app.add_websocket('/<adapter>/ws',
self._server_app.add_websocket("/<adapter>/ws",
view_func=self._handle_ws_reverse)
@property
@overrides(ReverseDriver)
def type(self) -> str:
"""驱动名称: ``quart``"""
return 'quart'
return "quart"
@property
@overrides(ReverseDriver)
@ -76,17 +95,21 @@ class Driver(ReverseDriver):
@property
@overrides(ReverseDriver)
def logger(self):
"""fastapi 使用的 logger"""
"""Quart 使用的 logger"""
return self._server_app.logger
@overrides(ReverseDriver)
def on_startup(self, func: _AsyncCallable) -> _AsyncCallable:
"""参考文档: `Startup and Shutdown <https://pgjones.gitlab.io/quart/how_to_guides/startup_shutdown.html>`_"""
"""参考文档: `Startup and Shutdown`_
.. _Startup and Shutdown:
https://pgjones.gitlab.io/quart/how_to_guides/startup_shutdown.html
"""
return self.server_app.before_serving(func) # type: ignore
@overrides(ReverseDriver)
def on_shutdown(self, func: _AsyncCallable) -> _AsyncCallable:
"""参考文档: `Startup and Shutdown <https://pgjones.gitlab.io/quart/how_to_guides/startup_shutdown.html>`_"""
"""参考文档: `Startup and Shutdown`_"""
return self.server_app.after_serving(func) # type: ignore
@overrides(ReverseDriver)
@ -121,6 +144,7 @@ class Driver(ReverseDriver):
host=host or str(self.config.host),
port=port or self.config.port,
reload=bool(app) and self.config.debug,
reload_dirs=self.quart_config.quart_reload_dirs or None,
debug=self.config.debug,
log_config=LOGGING_CONFIG,
**kwargs)
@ -128,11 +152,7 @@ class Driver(ReverseDriver):
@overrides(ReverseDriver)
async def _handle_http(self, adapter: str):
request: Request = _request
try:
data: Dict[str, Any] = await request.get_json()
except Exception as e:
raise exceptions.BadRequest()
data: bytes = await request.get_data() # type: ignore
if adapter not in self._adapters:
logger.warning(f'Unknown adapter {adapter}. '
@ -140,25 +160,32 @@ class Driver(ReverseDriver):
raise exceptions.NotFound()
BotClass = self._adapters[adapter]
headers = {k: v for k, v in request.headers.items(lower=True)}
http_request = HTTPRequest(request.http_version, request.scheme,
request.path, request.query_string,
dict(request.headers), request.method, data)
try:
self_id = await BotClass.check_permission(self, 'http', headers,
data)
except RequestDenied as e:
raise exceptions.HTTPException(status_code=e.status_code,
description=e.reason,
name='Request Denied')
self_id, response = await BotClass.check_permission(self, http_request)
if not self_id:
raise exceptions.HTTPException(
response and response.status or 401,
description=(response and response.body or b"").decode(),
name="Request Denied")
if self_id in self._clients:
logger.warning("There's already a reverse websocket connection,"
"so the event may be handled twice.")
bot = BotClass('http', self_id)
bot = BotClass(self_id, http_request)
asyncio.create_task(bot.handle_message(data))
return Response('', 204)
return Response(response and response.body or "",
response and response.status or 200)
@overrides(ReverseDriver)
async def _handle_ws_reverse(self, adapter: str):
websocket: QuartWebSocket = _websocket
ws = WebSocket(websocket.http_version, websocket.scheme,
websocket.path, websocket.query_string,
dict(websocket.headers), websocket)
if adapter not in self._adapters:
logger.warning(
f'Unknown adapter {adapter}. Please register the adapter before use.'
@ -166,19 +193,23 @@ class Driver(ReverseDriver):
raise exceptions.NotFound()
BotClass = self._adapters[adapter]
headers = {k: v for k, v in websocket.headers.items(lower=True)}
try:
self_id = await BotClass.check_permission(self, 'websocket',
headers, None)
except RequestDenied as e:
raise exceptions.HTTPException(status_code=e.status_code,
description=e.reason,
name='Request Denied')
self_id, response = await BotClass.check_permission(self, ws)
if not self_id:
raise exceptions.HTTPException(
response and response.status or 401,
description=(response and response.body or b"").decode(),
name="Request Denied")
if self_id in self._clients:
logger.warning("There's already a reverse websocket connection,"
"so the event may be handled twice.")
ws = WebSocket(websocket)
bot = BotClass('websocket', self_id, websocket=ws)
logger.opt(colors=True).warning(
"There's already a reverse websocket connection, "
f"<y>{adapter.upper()} Bot {self_id}</y> ignored.")
raise exceptions.HTTPException(403,
description="Client already exists",
name="Request Denied")
bot = BotClass(self_id, ws)
await ws.accept()
logger.opt(colors=True).info(
f"WebSocket Connection from <y>{adapter.upper()} "
@ -187,52 +218,51 @@ class Driver(ReverseDriver):
try:
while not ws.closed:
data = await ws.receive()
if data is None:
continue
asyncio.create_task(bot.handle_message(data))
try:
data = await ws.receive()
except asyncio.CancelledError:
logger.warning("WebSocket disconnected by peer.")
break
except Exception as e:
logger.opt(exception=e).error(
"Error when receiving data from websocket.")
break
asyncio.create_task(bot.handle_message(data.encode()))
finally:
self._bot_disconnect(bot)
class WebSocket(BaseWebSocket):
@overrides(BaseWebSocket)
def __init__(self, websocket: QuartWebSocket):
super().__init__(websocket)
self._closed = False
@property
@overrides(BaseWebSocket)
def websocket(self) -> QuartWebSocket:
return self._websocket
websocket: QuartWebSocket = None # type: ignore
@property
@overrides(BaseWebSocket)
def closed(self):
return self._closed
# FIXME
return False
@overrides(BaseWebSocket)
async def accept(self):
await self.websocket.accept()
self._closed = False
@overrides(BaseWebSocket)
async def close(self):
self._closed = True
# FIXME
pass
@overrides(BaseWebSocket)
async def receive(self) -> Optional[Dict[str, Any]]:
data: Optional[Dict[str, Any]] = None
try:
data = await self.websocket.receive_json()
except JSONDecodeError:
logger.warning('Received an invalid json message.')
except asyncio.CancelledError:
self._closed = True
logger.warning('WebSocket disconnected by peer.')
return data
async def receive(self) -> str:
return await self.websocket.receive() # type: ignore
@overrides(BaseWebSocket)
async def send(self, data: dict):
await self.websocket.send_json(data)
async def receive_bytes(self) -> bytes:
return await self.websocket.receive() # type: ignore
@overrides(BaseWebSocket)
async def send(self, data: str):
await self.websocket.send(data)
@overrides(BaseWebSocket)
async def send_bytes(self, data: bytes):
await self.websocket.send(data)

View File

@ -115,29 +115,6 @@ class StopPropagation(NoneBotException):
pass
class RequestDenied(NoneBotException):
"""
:说明:
Bot 连接请求不合法
:参数:
* ``status_code: int``: HTTP 状态码
* ``reason: str``: 拒绝原因
"""
def __init__(self, status_code: int, reason: str):
self.status_code = status_code
self.reason = reason
def __repr__(self):
return f"<RequestDenied, status_code={self.status_code}, reason={self.reason}>"
def __str__(self):
return self.__repr__()
class AdapterException(NoneBotException):
"""
:说明:

View File

@ -3,14 +3,15 @@ import sys
import hmac
import json
import asyncio
from typing import Any, Dict, Union, Optional, TYPE_CHECKING
from typing import Any, Dict, Tuple, Union, Optional, TYPE_CHECKING
import httpx
from nonebot.log import logger
from nonebot.typing import overrides
from nonebot.message import handle_event
from nonebot.utils import DataclassEncoder
from nonebot.adapters import Bot as BaseBot
from nonebot.exception import RequestDenied
from nonebot.drivers import Driver, HTTPConnection, HTTPRequest, HTTPResponse, WebSocket
from .utils import log, escape
from .config import Config as CQHTTPConfig
@ -20,7 +21,6 @@ from .exception import NetworkError, ApiNotAvailable, ActionFailed
if TYPE_CHECKING:
from nonebot.config import Config
from nonebot.drivers import Driver, WebSocket
def get_auth_bearer(access_token: Optional[str] = None) -> Optional[str]:
@ -28,7 +28,7 @@ def get_auth_bearer(access_token: Optional[str] = None) -> Optional[str]:
return None
scheme, _, param = access_token.partition(" ")
if scheme.lower() not in ["bearer", "token"]:
raise RequestDenied(401, "Not authenticated")
return None
return param
@ -225,14 +225,6 @@ class Bot(BaseBot):
"""
cqhttp_config: CQHTTPConfig
def __init__(self,
connection_type: str,
self_id: str,
*,
websocket: Optional["WebSocket"] = None):
super().__init__(connection_type, self_id, websocket=websocket)
@property
@overrides(BaseBot)
def type(self) -> str:
@ -242,84 +234,84 @@ class Bot(BaseBot):
return "cqhttp"
@classmethod
def register(cls, driver: "Driver", config: "Config"):
def register(cls, driver: Driver, config: "Config"):
super().register(driver, config)
cls.cqhttp_config = CQHTTPConfig(**config.dict())
@classmethod
@overrides(BaseBot)
async def check_permission(cls, driver: "Driver", connection_type: str,
headers: dict, body: Optional[bytes]) -> str:
async def check_permission(
cls, driver: Driver,
request: HTTPConnection) -> Tuple[Optional[str], HTTPResponse]:
"""
:说明:
CQHTTP (OneBot) 协议鉴权参考 `鉴权 <https://github.com/howmanybots/onebot/blob/master/v11/specs/communication/authorization.md>`_
"""
x_self_id = headers.get("x-self-id")
x_signature = headers.get("x-signature")
token = get_auth_bearer(headers.get("authorization"))
x_self_id = request.headers.get("x-self-id")
x_signature = request.headers.get("x-signature")
token = get_auth_bearer(request.headers.get("authorization"))
cqhttp_config = CQHTTPConfig(**driver.config.dict())
# 检查连接方式
if connection_type not in ["http", "websocket"]:
log("WARNING", "Unsupported connection type")
raise RequestDenied(405, "Unsupported connection type")
# 检查self_id
if not x_self_id:
log("WARNING", "Missing X-Self-ID Header")
raise RequestDenied(400, "Missing X-Self-ID Header")
return None, HTTPResponse(400, b"Missing X-Self-ID Header")
# 检查签名
secret = cqhttp_config.secret
if secret and connection_type == "http":
if secret and isinstance(request, HTTPRequest):
if not x_signature:
log("WARNING", "Missing Signature Header")
raise RequestDenied(401, "Missing Signature")
sig = hmac.new(secret.encode("utf-8"), body, "sha1").hexdigest()
return None, HTTPResponse(401, b"Missing Signature")
sig = hmac.new(secret.encode("utf-8"), request.body,
"sha1").hexdigest()
if x_signature != "sha1=" + sig:
log("WARNING", "Signature Header is invalid")
raise RequestDenied(403, "Signature is invalid")
return None, HTTPResponse(403, b"Signature is invalid")
access_token = cqhttp_config.access_token
if access_token and access_token != token and connection_type == "websocket":
if access_token and access_token != token and isinstance(
request, WebSocket):
log(
"WARNING", "Authorization Header is invalid"
if token else "Missing Authorization Header")
raise RequestDenied(
403, "Authorization Header is invalid"
if token else "Missing Authorization Header")
return str(x_self_id)
return None, HTTPResponse(
403, b"Authorization Header is invalid"
if token else b"Missing Authorization Header")
return str(x_self_id), HTTPResponse(204, b'')
@overrides(BaseBot)
async def handle_message(self, message: dict):
async def handle_message(self, message: bytes):
"""
:说明:
调用 `_check_reply <#async-check-reply-bot-event>`_, `_check_at_me <#check-at-me-bot-event>`_, `_check_nickname <#check-nickname-bot-event>`_ 处理事件并转换为 `Event <#class-event>`_
"""
if not message:
data = json.loads(message)
if not data:
return
if "post_type" not in message:
ResultStore.add_result(message)
if "post_type" not in data:
ResultStore.add_result(data)
return
try:
post_type = message['post_type']
detail_type = message.get(f"{post_type}_type")
post_type = data['post_type']
detail_type = data.get(f"{post_type}_type")
detail_type = f".{detail_type}" if detail_type else ""
sub_type = message.get("sub_type")
sub_type = data.get("sub_type")
sub_type = f".{sub_type}" if sub_type else ""
models = get_event_model(post_type + detail_type + sub_type)
for model in models:
try:
event = model.parse_obj(message)
event = model.parse_obj(data)
break
except Exception as e:
log("DEBUG", "Event Parser Error", e)
else:
event = Event.parse_obj(message)
event = Event.parse_obj(data)
# Check whether user is calling me
await _check_reply(self, event)
@ -329,25 +321,28 @@ class Bot(BaseBot):
await handle_event(self, event)
except Exception as e:
logger.opt(colors=True, exception=e).error(
f"<r><bg #f8bbd0>Failed to handle event. Raw: {message}</bg #f8bbd0></r>"
f"<r><bg #f8bbd0>Failed to handle event. Raw: {data}</bg #f8bbd0></r>"
)
@overrides(BaseBot)
async def _call_api(self, api: str, **data) -> Any:
log("DEBUG", f"Calling API <y>{api}</y>")
if self.connection_type == "websocket":
if isinstance(self.request, WebSocket):
seq = ResultStore.get_seq()
await self.websocket.send({
"action": api,
"params": data,
"echo": {
"seq": seq
}
})
json_data = json.dumps(
{
"action": api,
"params": data,
"echo": {
"seq": seq
}
},
cls=DataclassEncoder)
await self.request.send(json_data)
return _handle_api_result(await ResultStore.fetch(
seq, self.config.api_timeout))
elif self.connection_type == "http":
elif isinstance(self.request, HTTPRequest):
api_root = self.config.api_root.get(self.self_id)
if not api_root:
raise ApiNotAvailable
@ -431,7 +426,7 @@ class Bot(BaseBot):
message, str) else message
msg = message if isinstance(message, Message) else Message(message)
at_sender = at_sender and getattr(event, "user_id", None)
at_sender = at_sender and bool(getattr(event, "user_id", None))
params = {}
if getattr(event, "user_id", None):
@ -449,8 +444,7 @@ class Bot(BaseBot):
raise ValueError("Cannot guess message type to reply!")
if at_sender and params["message_type"] != "private":
params["message"] = MessageSegment.at(params["user_id"]) + \
MessageSegment.text(" ") + msg
params["message"] = MessageSegment.at(params["user_id"]) + " " + msg
else:
params["message"] = msg
return await self.send_msg(**params)

View File

@ -1,16 +1,17 @@
import json
import urllib.parse
from datetime import datetime
import time
from typing import Any, Union, Optional, TYPE_CHECKING
from datetime import datetime
from typing import Any, Tuple, Union, Optional, TYPE_CHECKING
import httpx
from nonebot.log import logger
from nonebot.typing import overrides
from nonebot.message import handle_event
from nonebot.adapters import Bot as BaseBot
from nonebot.exception import RequestDenied
from nonebot.drivers import Driver, HTTPConnection, HTTPRequest, HTTPResponse
from .utils import calc_hmac_base64, log
from .config import Config as DingConfig
@ -20,7 +21,6 @@ from .event import MessageEvent, PrivateMessageEvent, GroupMessageEvent, Convers
if TYPE_CHECKING:
from nonebot.config import Config
from nonebot.drivers import Driver
SEND = "send"
@ -31,10 +31,6 @@ class Bot(BaseBot):
"""
ding_config: DingConfig
def __init__(self, connection_type: str, self_id: str, **kwargs):
super().__init__(connection_type, self_id, **kwargs)
@property
def type(self) -> str:
"""
@ -43,57 +39,61 @@ class Bot(BaseBot):
return "ding"
@classmethod
def register(cls, driver: "Driver", config: "Config"):
def register(cls, driver: Driver, config: "Config"):
super().register(driver, config)
cls.ding_config = DingConfig(**config.dict())
@classmethod
@overrides(BaseBot)
async def check_permission(cls, driver: "Driver", connection_type: str,
headers: dict, body: Optional[bytes]) -> str:
async def check_permission(
cls, driver: Driver,
request: HTTPConnection) -> Tuple[Optional[str], HTTPResponse]:
"""
:说明:
钉钉协议鉴权参考 `鉴权 <https://ding-doc.dingtalk.com/doc#/serverapi2/elzz1p>`_
"""
timestamp = headers.get("timestamp")
sign = headers.get("sign")
timestamp = request.headers.get("timestamp")
sign = request.headers.get("sign")
# 检查连接方式
if connection_type not in ["http"]:
raise RequestDenied(
405, "Unsupported connection type, available type: `http`")
if not isinstance(request, HTTPRequest):
return None, HTTPResponse(
405, b"Unsupported connection type, available type: `http`")
# 检查 timestamp
if not timestamp:
raise RequestDenied(400, "Missing `timestamp` Header")
return None, HTTPResponse(400, b"Missing `timestamp` Header")
# 检查 sign
secret = cls.ding_config.secret
if secret:
if not sign:
log("WARNING", "Missing Signature Header")
raise RequestDenied(400, "Missing `sign` Header")
return None, HTTPResponse(400, b"Missing `sign` Header")
sign_base64 = calc_hmac_base64(str(timestamp), secret)
if sign != sign_base64.decode('utf-8'):
log("WARNING", "Signature Header is invalid")
raise RequestDenied(403, "Signature is invalid")
return None, HTTPResponse(403, b"Signature is invalid")
else:
log("WARNING", "Ding signature check ignored!")
return json.loads(body.decode())["chatbotUserId"]
return (json.loads(request.body.decode())["chatbotUserId"],
HTTPResponse(204, b''))
@overrides(BaseBot)
async def handle_message(self, message: dict):
if not message:
async def handle_message(self, message: bytes):
data = json.loads(message)
if not data:
return
# 判断消息类型,生成不同的 Event
try:
conversation_type = message["conversationType"]
conversation_type = data["conversationType"]
if conversation_type == ConversationType.private:
event = PrivateMessageEvent.parse_obj(message)
event = PrivateMessageEvent.parse_obj(data)
elif conversation_type == ConversationType.group:
event = GroupMessageEvent.parse_obj(message)
event = GroupMessageEvent.parse_obj(data)
else:
raise ValueError("Unsupported conversation type")
except Exception as e:
@ -104,7 +104,7 @@ class Bot(BaseBot):
await handle_event(self, event)
except Exception as e:
logger.opt(colors=True, exception=e).error(
f"<r><bg #f8bbd0>Failed to handle event. Raw: {message}</bg #f8bbd0></r>"
f"<r><bg #f8bbd0>Failed to handle event. Raw: {data}</bg #f8bbd0></r>"
)
return

View File

@ -1,8 +1,8 @@
"""
r"""
Mirai-API-HTTP 协议适配
============================
协议详情请看: `mirai-api-http 文档`_
协议详情请看: `mirai-api-http 文档`_
\:\:\: tip
该Adapter目前仍然处在早期实验性阶段, 并未经过充分测试

View File

@ -5,11 +5,11 @@ from typing import Any, Dict, List, NoReturn, Optional, Tuple, Union
import httpx
from nonebot.adapters import Bot as BaseBot
from nonebot.config import Config
from nonebot.drivers import Driver, WebSocket
from nonebot.exception import ApiNotAvailable, RequestDenied
from nonebot.typing import overrides
from nonebot.adapters import Bot as BaseBot
from nonebot.exception import ApiNotAvailable
from nonebot.drivers import Driver, HTTPConnection, HTTPResponse, WebSocket
from .config import Config as MiraiConfig
from .event import Event, FriendMessage, GroupMessage, TempMessage
@ -140,7 +140,7 @@ class SessionManager:
class Bot(BaseBot):
"""
r"""
mirai-api-http 协议 Bot 适配
\:\:\: warning
@ -151,14 +151,6 @@ class Bot(BaseBot):
"""
@overrides(BaseBot)
def __init__(self,
connection_type: str,
self_id: str,
*,
websocket: Optional[WebSocket] = None):
super().__init__(connection_type, self_id, websocket=websocket)
@property
@overrides(BaseBot)
def type(self) -> str:
@ -166,7 +158,8 @@ class Bot(BaseBot):
@property
def alive(self) -> bool:
return not self.websocket.closed
assert isinstance(self.request, WebSocket)
return not self.request.closed
@property
def api(self) -> SessionManager:
@ -177,27 +170,26 @@ class Bot(BaseBot):
@classmethod
@overrides(BaseBot)
async def check_permission(cls, driver: "Driver", connection_type: str,
headers: dict, body: Optional[bytes]) -> str:
if connection_type == 'ws':
raise RequestDenied(
status_code=501,
reason='Websocket connection is not implemented')
self_id: Optional[str] = headers.get('bot')
async def check_permission(
cls, driver: Driver,
request: HTTPConnection) -> Tuple[Optional[str], HTTPResponse]:
if isinstance(request, WebSocket):
return None, HTTPResponse(
501, b'Websocket connection is not implemented')
self_id: Optional[str] = request.headers.get('bot')
if self_id is None:
raise RequestDenied(status_code=400,
reason='Header `Bot` is required.')
return None, HTTPResponse(400, b'Header `Bot` is required.')
self_id = str(self_id).strip()
await SessionManager.new(
int(self_id),
host=cls.mirai_config.host, # type: ignore
port=cls.mirai_config.port, #type: ignore
auth_key=cls.mirai_config.auth_key) # type: ignore
return self_id
return self_id, HTTPResponse(204, b'')
@classmethod
@overrides(BaseBot)
def register(cls, driver: "Driver", config: "Config"):
def register(cls, driver: Driver, config: "Config"):
cls.mirai_config = MiraiConfig(**config.dict())
if (cls.mirai_config.auth_key and cls.mirai_config.host and
cls.mirai_config.port) is None:
@ -224,7 +216,7 @@ class Bot(BaseBot):
@overrides(BaseBot)
async def call_api(self, api: str, **data) -> NoReturn:
"""
r"""
\:\:\: danger
由于Mirai的HTTP API特殊性, 该API暂时无法实现
\:\:\:

View File

@ -1,18 +1,16 @@
import asyncio
import json
import asyncio
from dataclasses import dataclass
from ipaddress import IPv4Address
from typing import (Any, Callable, Coroutine, Dict, NoReturn, Optional, Set,
TypeVar)
from typing import Any, Set, Dict, Tuple, TypeVar, Optional, Callable, Coroutine
import httpx
import websockets
from nonebot.config import Config
from nonebot.drivers import Driver
from nonebot.drivers import WebSocket as BaseWebSocket
from nonebot.exception import RequestDenied
from nonebot.log import logger
from nonebot.config import Config
from nonebot.typing import overrides
from nonebot.drivers import Driver, HTTPConnection, HTTPResponse, WebSocket as BaseWebSocket
from .bot import SessionManager, Bot
@ -21,7 +19,9 @@ WebsocketHandler_T = TypeVar('WebsocketHandler_T',
bound=WebsocketHandlerFunction)
@dataclass
class WebSocket(BaseWebSocket):
websocket: websockets.WebSocketClientProtocol = None # type: ignore
@classmethod
async def new(cls, *, host: IPv4Address, port: int,
@ -37,24 +37,26 @@ class WebSocket(BaseWebSocket):
self.event_handlers: Set[WebsocketHandlerFunction] = set()
super().__init__(websocket)
@property
@overrides(BaseWebSocket)
def websocket(self) -> websockets.WebSocketClientProtocol:
return self._websocket
@property
@overrides(BaseWebSocket)
def closed(self) -> bool:
return self.websocket.closed
@overrides(BaseWebSocket)
async def send(self, data: Dict[str, Any]):
return await self.websocket.send(json.dumps(data))
async def send(self, data: str):
return await self.websocket.send(data)
@overrides(BaseWebSocket)
async def receive(self) -> Dict[str, Any]:
received = await self.websocket.recv()
return json.loads(received)
async def send_bytes(self, data: str):
return await self.websocket.send(data)
@overrides(BaseWebSocket)
async def receive(self) -> str:
return await self.websocket.recv() # type: ignore
@overrides(BaseWebSocket)
async def receive_bytes(self) -> bytes:
return await self.websocket.recv() # type: ignore
async def _dispatcher(self):
while not self.closed:
@ -93,11 +95,6 @@ class WebsocketBot(Bot):
mirai-api-http 正向 Websocket 协议 Bot 适配
"""
@overrides(Bot)
def __init__(self, connection_type: str, self_id: str, *,
websocket: WebSocket):
super().__init__(connection_type, self_id, websocket=websocket)
@property
@overrides(Bot)
def type(self) -> str:
@ -105,7 +102,8 @@ class WebsocketBot(Bot):
@property
def alive(self) -> bool:
return not self.websocket.closed
assert isinstance(self.request, WebSocket)
return not self.request.closed
@property
def api(self) -> SessionManager:
@ -115,16 +113,14 @@ class WebsocketBot(Bot):
@classmethod
@overrides(Bot)
async def check_permission(cls, driver: "Driver", connection_type: str,
headers: dict,
body: Optional[bytes]) -> NoReturn:
raise RequestDenied(
status_code=501,
reason=f'Connection {connection_type} not implented')
async def check_permission(
cls, driver: Driver,
request: HTTPConnection) -> Tuple[None, HTTPResponse]:
return None, HTTPResponse(501, b'Connection not implented')
@classmethod
@overrides(Bot)
def register(cls, driver: "Driver", config: "Config", qq: int):
def register(cls, driver: Driver, config: "Config", qq: int):
"""
:说明: