mirror of
https://github.com/nonebot/nonebot2.git
synced 2024-12-18 17:35:46 +08:00
dfd2096fe5
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Ju4tCode <42488585+yanyongyu@users.noreply.github.com>
379 lines
13 KiB
Markdown
379 lines
13 KiB
Markdown
# 开发者指南
|
||
|
||
开发者指南内容较多,故分为了一个示例以及数个专题。
|
||
阅读(并且最好跟随实践)示例后,你将会对使用 `nonebot-plugin-orm` 开发插件有一个基本的认识。
|
||
如果想要更深入地学习关于 [SQLAlchemy](https://www.sqlalchemy.org/) 和 [Alembic](https://alembic.sqlalchemy.org/) 的知识,或者在使用过程中遇到了问题,可以查阅专题以及其官方文档。
|
||
|
||
## 示例
|
||
|
||
### 模型定义
|
||
|
||
首先,我们需要设计存储的数据的结构。
|
||
例如天气插件,需要存储**什么地方 (`location`)** 的**天气是什么 (`weather`)**。
|
||
其中,一个地方只会有一种天气,而不同地方可能有相同的天气。
|
||
所以,我们可以设计出如下的模型:
|
||
|
||
```python title=weather/__init__.py showLineNumbers
|
||
from nonebot_plugin_orm import Model
|
||
from sqlalchemy.orm import Mapped, mapped_column
|
||
|
||
|
||
class Weather(Model):
|
||
location: Mapped[str] = mapped_column(primary_key=True)
|
||
weather: Mapped[str]
|
||
```
|
||
|
||
其中,`primary_key=True` 意味着此列 (`location`) 是主键,即内容是唯一的且非空的。
|
||
每一个模型必须有至少一个主键。
|
||
|
||
我们可以用以下代码检查模型生成的数据库模式是否正确:
|
||
|
||
```python
|
||
from sqlalchemy.schema import CreateTable
|
||
|
||
print(CreateTable(Weather.__table__))
|
||
```
|
||
|
||
```sql
|
||
CREATE TABLE weather_weather (
|
||
location VARCHAR NOT NULL,
|
||
weather VARCHAR NOT NULL,
|
||
CONSTRAINT pk_weather_weather PRIMARY KEY (location)
|
||
)
|
||
```
|
||
|
||
可以注意到表名是 `weather_weather` 而不是 `Weather` 或者 `weather`。
|
||
这是因为 `nonebot-plugin-orm` 会自动为模型生成一个表名,规则是:`<插件模块名>_<类名小写>`。
|
||
|
||
你也可以通过指定 `__tablename__` 属性来自定义表名:
|
||
|
||
```python {2}
|
||
class Weather(Model):
|
||
__tablename__ = "weather"
|
||
...
|
||
```
|
||
|
||
```sql {1}
|
||
CREATE TABLE weather (
|
||
...
|
||
)
|
||
```
|
||
|
||
但是,并不推荐你这么做,因为这可能会导致不同插件间的表名重复,引发冲突。
|
||
特别是当你会发布插件时,你并不知道其他插件会不会使用相同的表名。
|
||
|
||
### 首次迁移
|
||
|
||
我们成功定义了模型,现在启动机器人试试吧:
|
||
|
||
```shell
|
||
$ nb run
|
||
01-02 15:04:05 [SUCCESS] nonebot | NoneBot is initializing...
|
||
01-02 15:04:05 [ERROR] nonebot_plugin_orm | 启动检查失败
|
||
01-02 15:04:05 [ERROR] nonebot | Application startup failed. Exiting.
|
||
Traceback (most recent call last):
|
||
...
|
||
click.exceptions.UsageError: 检测到新的升级操作:
|
||
[('add_table',
|
||
Table('weather', MetaData(), Column('location', String(), table=<weather>, primary_key=True, nullable=False), Column('weather', String(), table=<weather>, nullable=False), schema=None))]
|
||
```
|
||
|
||
咦,发生了什么?
|
||
`nonebot-plugin-orm` 试图阻止我们启动机器人。
|
||
原来是我们定义了模型,但是数据库中并没有对应的表,这会导致插件不能正常运行。
|
||
所以,我们需要迁移数据库。
|
||
|
||
首先,我们需要创建一个迁移脚本:
|
||
|
||
```shell
|
||
nb orm revision -m "first revision" --branch-label weather
|
||
```
|
||
|
||
其中,`-m` 参数是迁移脚本的描述,`--branch-label` 参数是迁移脚本的分支,一般为插件模块名。
|
||
执行命令过后,出现了一个 `weather/migrations` 目录,其中有一个 `xxxxxxxxxxxx_first_revision.py` 文件:
|
||
|
||
```shell {4,5}
|
||
weather
|
||
├── __init__.py
|
||
├── config.py
|
||
└── migrations
|
||
└── xxxxxxxxxxxx_first_revision.py
|
||
```
|
||
|
||
这就是我们创建的迁移脚本,它记录了数据库模式的变化。
|
||
我们可以查看一下它的内容:
|
||
|
||
```python title=weather/migrations/xxxxxxxxxxxx_first_revision.py {25-33,39-41} showLineNumbers
|
||
"""first revision
|
||
|
||
迁移 ID: xxxxxxxxxxxx
|
||
父迁移:
|
||
创建时间: 2006-01-02 15:04:05.999999
|
||
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from collections.abc import Sequence
|
||
|
||
import sqlalchemy as sa
|
||
from alembic import op
|
||
|
||
revision: str = "xxxxxxxxxxxx"
|
||
down_revision: str | Sequence[str] | None = None
|
||
branch_labels: str | Sequence[str] | None = ("weather",)
|
||
depends_on: str | Sequence[str] | None = None
|
||
|
||
|
||
def upgrade(name: str = "") -> None:
|
||
if name:
|
||
return
|
||
# ### commands auto generated by Alembic - please adjust! ###
|
||
op.create_table(
|
||
"weather_weather",
|
||
sa.Column("location", sa.String(), nullable=False),
|
||
sa.Column("weather", sa.String(), nullable=False),
|
||
sa.PrimaryKeyConstraint("location", name=op.f("pk_weather_weather")),
|
||
info={"bind_key": "weather"},
|
||
)
|
||
# ### end Alembic commands ###
|
||
|
||
|
||
def downgrade(name: str = "") -> None:
|
||
if name:
|
||
return
|
||
# ### commands auto generated by Alembic - please adjust! ###
|
||
op.drop_table("weather_weather")
|
||
# ### end Alembic commands ###
|
||
```
|
||
|
||
可以注意到脚本的主体部分(其余是模版代码,请勿修改)是:
|
||
|
||
```python
|
||
# ### commands auto generated by Alembic - please adjust! ###
|
||
op.create_table( # CREATE TABLE
|
||
"weather_weather", # weather_weather
|
||
sa.Column("location", sa.String(), nullable=False), # location VARCHAR NOT NULL,
|
||
sa.Column("weather", sa.String(), nullable=False), # weather VARCHAR NOT NULL,
|
||
sa.PrimaryKeyConstraint("location", name=op.f("pk_weather_weather")), # CONSTRAINT pk_weather_weather PRIMARY KEY (location)
|
||
info={"bind_key": "weather"},
|
||
)
|
||
# ### end Alembic commands ###
|
||
```
|
||
|
||
```python
|
||
# ### commands auto generated by Alembic - please adjust! ###
|
||
op.drop_table("weather_weather") # DROP TABLE weather_weather;
|
||
# ### end Alembic commands ###
|
||
```
|
||
|
||
虽然我们不是很懂这些代码的意思,但是可以注意到它们几乎与 SQL 语句 (DDL) 一一对应。
|
||
显然,它们是用来创建和删除表的。
|
||
|
||
我们还可以注意到,`upgrade()` 和 `downgrade()` 函数中的代码是**互逆**的。
|
||
也就是说,执行一次 `upgrade()` 函数,再执行一次 `downgrade()` 函数后,数据库的模式就会回到原来的状态。
|
||
|
||
这就是迁移脚本的作用:记录数据库模式的变化,以便我们在不同的环境中(例如开发环境和生产环境)**可复现地**、**可逆地**同步数据库模式,正如 git 对我们的代码做的事情那样。
|
||
|
||
对了,不要忘记还有一段注释:`commands auto generated by Alembic - please adjust!`。
|
||
它在提醒我们,这些代码是由 Alembic 自动生成的,我们应该检查它们,并且根据需要进行调整。
|
||
|
||
:::caution 注意
|
||
迁移脚本冗长且繁琐,我们一般不会手写它们,而是由 Alembic 自动生成。
|
||
一般情况下,Alembic 足够智能,可以正确地生成迁移脚本。
|
||
但是,在复杂或有歧义的情况下,我们可能需要手动调整迁移脚本。
|
||
所以,**永远要检查迁移脚本,并且在开发环境中测试!**
|
||
|
||
**迁移脚本中任何一处错误都足以使数据付之东流!**
|
||
:::
|
||
|
||
确定迁移脚本正确后,我们就可以执行迁移脚本,将数据库模式同步到数据库中:
|
||
|
||
```shell
|
||
nb orm upgrade
|
||
```
|
||
|
||
现在,我们可以正常启动机器人了。
|
||
|
||
开发过程中,我们可能会频繁地修改模型,这意味着我们需要频繁地创建并执行迁移脚本,非常繁琐。
|
||
实际上,此时我们不在乎数据安全,只需要数据库模式与模型定义一致即可。
|
||
所以,我们可以关闭 `nonebot-plugin-orm` 的启动检查:
|
||
|
||
```shell title=.env.dev
|
||
ALEMBIC_STARTUP_CHECK=false
|
||
```
|
||
|
||
现在,每次启动机器人时,数据库模式会自动与模型定义同步,无需手动迁移。
|
||
|
||
### 会话管理
|
||
|
||
我们已经成功定义了模型,并且迁移了数据库,现在可以开始使用数据库了……吗?
|
||
并不能,因为模型只是数据结构的定义,并不能通过它操作数据(如果你曾经使用过 [Tortoise ORM](https://tortoise.github.io/),可能会知道 `await Weather.get(location="上海")` 这样的面向对象编程。
|
||
但是 SQLAlchemy 不同,选择了命令式编程)。
|
||
我们需要使用**会话**操作数据:
|
||
|
||
```python title=weather/__init__.py {10,13} showLineNumbers
|
||
from nonebot import on_command
|
||
from nonebot.adapters import Message
|
||
from nonebot.params import CommandArg
|
||
from nonebot_plugin_orm import async_scoped_session
|
||
|
||
weather = on_command("天气")
|
||
|
||
|
||
@weather.handle()
|
||
async def _(session: async_scoped_session, args: Message = CommandArg()):
|
||
location = args.extract_plain_text()
|
||
|
||
if wea := await session.get(Weather, location):
|
||
await weather.finish(f"今天{location}的天气是{wea.weather}")
|
||
|
||
await weather.finish(f"未查询到{location}的天气")
|
||
```
|
||
|
||
我们通过 `session: async_scoped_session` 依赖注入获得了一个会话,然后使用 `await session.get(Weather, location)` 查询数据库。
|
||
`async_scoped_session` 是一个有作用域限制的会话,作用域为当前事件、当前事件响应器。
|
||
会话产生的模型实例(例如此处的 `wea := await session.get(Weather, location)`)作用域与会话相同。
|
||
|
||
:::caution 注意
|
||
此处提到的“会话”指的是 ORM 会话,而非 [NoneBot 会话](../../../appendices/session-control),两者的生命周期也是不同的(NoneBot 会话的生命周期中可能包含多个事件,不同的事件也会有不同的事件响应器)。
|
||
具体而言,就是不要将 ORM 会话和模型实例存储在 NoneBot 会话状态中:
|
||
|
||
```python {12}
|
||
from nonebot.params import ArgPlainText
|
||
from nonebot.typing import T_State
|
||
|
||
|
||
@weather.got("location", prompt="请输入地名")
|
||
async def _(state: T_State, session: async_scoped_session, location: str = ArgPlainText()):
|
||
wea = await session.get(Weather, location)
|
||
|
||
if not wea:
|
||
await weather.finish(f"未查询到{location}的天气")
|
||
|
||
state["weather"] = wea # 不要这么做,除非你知道自己在做什么
|
||
```
|
||
|
||
当然非要这么做也不是不可以:
|
||
|
||
```python {6}
|
||
@weather.handle()
|
||
async def _(state: T_State, session: async_scoped_session):
|
||
# 通过 await session.merge(state["weather"]) 获得了此 ORM 会话中的相应模型实例,
|
||
# 而非直接使用会话状态中的模型实例,
|
||
# 因为先前的 ORM 会话已经关闭了。
|
||
wea = await session.merge(state["weather"])
|
||
await weather.finish(f"今天{state['location']}的天气是{wea.weather}")
|
||
```
|
||
|
||
:::
|
||
|
||
当有数据更改时,我们需要提交事务,也要注意会话作用域问题:
|
||
|
||
```python title=weather/__init__.py {12,20} showLineNumbers
|
||
from nonebot.params import Depends
|
||
|
||
|
||
async def get_weather(
|
||
session: async_scoped_session, args: Message = CommandArg()
|
||
) -> Weather:
|
||
location = args.extract_plain_text()
|
||
|
||
if not (wea := await session.get(Weather, location)):
|
||
wea = Weather(location=location, weather="未知")
|
||
session.add(wea)
|
||
# await session.commit() # 不应该在其他地方提交事务
|
||
|
||
return wea
|
||
|
||
|
||
@weather.handle()
|
||
async def _(session: async_scoped_session, wea: Weather = Depends(get_weather)):
|
||
await weather.send(f"今天的天气是{wea.weather}")
|
||
await session.commit() # 而应该在事件响应器结束前提交事务
|
||
```
|
||
|
||
当然我们也可以获得一个新的会话,不过此时就要手动管理会话了:
|
||
|
||
```python title=weather/__init__.py {5-6} showLineNumbers
|
||
from nonebot_plugin_orm import get_session
|
||
|
||
|
||
async def get_weather(location: str) -> str:
|
||
session = get_session()
|
||
async with session.begin():
|
||
wea = await session.get(Weather, location)
|
||
|
||
if not wea:
|
||
wea = Weather(location=location, weather="未知")
|
||
session.add(wea)
|
||
|
||
return wea.weather
|
||
|
||
|
||
@weather.handle()
|
||
async def _(args: Message = CommandArg()):
|
||
wea = await get_weather(args.extract_plain_text())
|
||
await weather.send(f"今天的天气是{wea}")
|
||
```
|
||
|
||
### 依赖注入
|
||
|
||
在上面的示例中,我们都是通过会话获得数据的。
|
||
不过,我们也可以通过依赖注入获得数据:
|
||
|
||
```python title=weather/__init__.py {12-14} showLineNumbers
|
||
from sqlalchemy import select
|
||
from nonebot.params import Depends
|
||
from nonebot_plugin_orm import SQLDepends
|
||
|
||
|
||
def extract_arg_plain_text(args: Message = CommandArg()) -> str:
|
||
return args.extract_plain_text()
|
||
|
||
|
||
@weather.handle()
|
||
async def _(
|
||
wea: Weather = SQLDepends(
|
||
select(Weather).where(Weather.location == Depends(extract_arg_plain_text))
|
||
),
|
||
):
|
||
await weather.send(f"今天的天气是{wea.weather}")
|
||
```
|
||
|
||
其中,`SQLDepends` 是一个特殊的依赖注入,它会根据类型标注和 SQL 语句提供数据,SQL 语句中也可以有子依赖。
|
||
|
||
不同的类型标注也会获得不同形式的数据:
|
||
|
||
```python title=weather/__init__.py {5} showLineNumbers
|
||
from collections.abc import Sequence
|
||
|
||
@weather.handle()
|
||
async def _(
|
||
weas: Sequence[Weather] = SQLDepends(
|
||
select(Weather).where(Weather.weather == Depends(extract_arg_plain_text))
|
||
),
|
||
):
|
||
await weather.send(f"今天的天气是{weas[0].weather}的城市有{','.join(wea.location for wea in weas)}")
|
||
```
|
||
|
||
支持的类型标注请参见 [依赖注入](dependency)。
|
||
|
||
我们也可以像 [类作为依赖](../../../advanced/dependency#类作为依赖) 那样,在类属性中声明子依赖:
|
||
|
||
```python title=weather/__init__.py {5-6,10} showLineNumbers
|
||
from collections.abc import Sequence
|
||
|
||
class Weather(Model):
|
||
location: Mapped[str] = mapped_column(primary_key=True)
|
||
weather: Mapped[str] = Depends(extract_arg_plain_text)
|
||
# weather: Annotated[Mapped[str], Depends(extract_arg_plain_text)] # Annotated 支持
|
||
|
||
|
||
@weather.handle()
|
||
async def _(weas: Sequence[Weather]):
|
||
await weather.send(
|
||
f"今天的天气是{weas[0].weather}的城市有{','.join(wea.location for wea in weas)}"
|
||
)
|
||
```
|