nonebot2/website/versioned_docs/version-2.4.0/best-practice/database/developer/README.md
noneflow[bot] 5e86d53e0b 🔖 Release 2.4.0
2024-10-31 13:45:33 +00:00

13 KiB
Raw Blame History

开发者指南

开发者指南内容较多,故分为了一个示例以及数个专题。 阅读(并且最好跟随实践)示例后,你将会对使用 nonebot-plugin-orm 开发插件有一个基本的认识。 如果想要更深入地学习关于 SQLAlchemyAlembic 的知识,或者在使用过程中遇到了问题,可以查阅专题以及其官方文档。

示例

模型定义

首先,我们需要设计存储的数据的结构。 例如天气插件,需要存储什么地方 (location)天气是什么 (weather)。 其中,一个地方只会有一种天气,而不同地方可能有相同的天气。 所以,我们可以设计出如下的模型:

from nonebot_plugin_orm import Model
from sqlalchemy.orm import Mapped, mapped_column


class Weather(Model):
    location: Mapped[str] = mapped_column(primary_key=True)
    weather: Mapped[str]

其中,primary_key=True 意味着此列 (location) 是主键,即内容是唯一的且非空的。 每一个模型必须有至少一个主键。

我们可以用以下代码检查模型生成的数据库模式是否正确:

from sqlalchemy.schema import CreateTable

print(CreateTable(Weather.__table__))
CREATE TABLE weather_weather (
        location VARCHAR NOT NULL,
        weather VARCHAR NOT NULL,
        CONSTRAINT pk_weather_weather PRIMARY KEY (location)
)

可以注意到表名是 weather_weather 而不是 Weather 或者 weather。 这是因为 nonebot-plugin-orm 会自动为模型生成一个表名,规则是:<插件模块名>_<类名小写>

你也可以通过指定 __tablename__ 属性来自定义表名:

class Weather(Model):
    __tablename__ = "weather"
    ...
CREATE TABLE weather (
    ...
)

但是,并不推荐你这么做,因为这可能会导致不同插件间的表名重复,引发冲突。 特别是当你会发布插件时,你并不知道其他插件会不会使用相同的表名。

首次迁移

我们成功定义了模型,现在启动机器人试试吧:

$ nb run
01-02 15:04:05 [SUCCESS] nonebot | NoneBot is initializing...
01-02 15:04:05 [ERROR] nonebot_plugin_orm | 启动检查失败
01-02 15:04:05 [ERROR] nonebot | Application startup failed. Exiting.
Traceback (most recent call last):
  ...
click.exceptions.UsageError: 检测到新的升级操作:
[('add_table',
  Table('weather', MetaData(), Column('location', String(), table=<weather>, primary_key=True, nullable=False), Column('weather', String(), table=<weather>, nullable=False), schema=None))]

咦,发生了什么? nonebot-plugin-orm 试图阻止我们启动机器人。 原来是我们定义了模型,但是数据库中并没有对应的表,这会导致插件不能正常运行。 所以,我们需要迁移数据库。

首先,我们需要创建一个迁移脚本:

nb orm revision -m "first revision" --branch-label weather

其中,-m 参数是迁移脚本的描述,--branch-label 参数是迁移脚本的分支,一般为插件模块名。 执行命令过后,出现了一个 weather/migrations 目录,其中有一个 xxxxxxxxxxxx_first_revision.py 文件:

weather
├── __init__.py
├── config.py
└── migrations
    └── xxxxxxxxxxxx_first_revision.py

这就是我们创建的迁移脚本,它记录了数据库模式的变化。 我们可以查看一下它的内容:

"""first revision

迁移 ID: xxxxxxxxxxxx
父迁移:
创建时间: 2006-01-02 15:04:05.999999

"""

from __future__ import annotations

from collections.abc import Sequence

import sqlalchemy as sa
from alembic import op

revision: str = "xxxxxxxxxxxx"
down_revision: str | Sequence[str] | None = None
branch_labels: str | Sequence[str] | None = ("weather",)
depends_on: str | Sequence[str] | None = None


def upgrade(name: str = "") -> None:
    if name:
        return
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table(
        "weather_weather",
        sa.Column("location", sa.String(), nullable=False),
        sa.Column("weather", sa.String(), nullable=False),
        sa.PrimaryKeyConstraint("location", name=op.f("pk_weather_weather")),
        info={"bind_key": "weather"},
    )
    # ### end Alembic commands ###


def downgrade(name: str = "") -> None:
    if name:
        return
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_table("weather_weather")
    # ### end Alembic commands ###

可以注意到脚本的主体部分(其余是模版代码,请勿修改)是:

# ### commands auto generated by Alembic - please adjust! ###
op.create_table(  # CREATE TABLE
    "weather_weather",  # weather_weather
    sa.Column("location", sa.String(), nullable=False),  # location VARCHAR NOT NULL,
    sa.Column("weather", sa.String(), nullable=False),  # weather VARCHAR NOT NULL,
    sa.PrimaryKeyConstraint("location", name=op.f("pk_weather_weather")),  # CONSTRAINT pk_weather_weather PRIMARY KEY (location)
    info={"bind_key": "weather"},
)
# ### end Alembic commands ###
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("weather_weather")  # DROP TABLE weather_weather;
# ### end Alembic commands ###

虽然我们不是很懂这些代码的意思,但是可以注意到它们几乎与 SQL 语句 (DDL) 一一对应。 显然,它们是用来创建和删除表的。

我们还可以注意到,upgrade()downgrade() 函数中的代码是互逆的。 也就是说,执行一次 upgrade() 函数,再执行一次 downgrade() 函数后,数据库的模式就会回到原来的状态。

这就是迁移脚本的作用:记录数据库模式的变化,以便我们在不同的环境中(例如开发环境和生产环境)可复现地可逆地同步数据库模式,正如 git 对我们的代码做的事情那样。

对了,不要忘记还有一段注释:commands auto generated by Alembic - please adjust!。 它在提醒我们,这些代码是由 Alembic 自动生成的,我们应该检查它们,并且根据需要进行调整。

:::caution 注意 迁移脚本冗长且繁琐,我们一般不会手写它们,而是由 Alembic 自动生成。 一般情况下Alembic 足够智能,可以正确地生成迁移脚本。 但是,在复杂或有歧义的情况下,我们可能需要手动调整迁移脚本。 所以,永远要检查迁移脚本,并且在开发环境中测试!

迁移脚本中任何一处错误都足以使数据付之东流! :::

确定迁移脚本正确后,我们就可以执行迁移脚本,将数据库模式同步到数据库中:

nb orm upgrade

现在,我们可以正常启动机器人了。

开发过程中,我们可能会频繁地修改模型,这意味着我们需要频繁地创建并执行迁移脚本,非常繁琐。 实际上,此时我们不在乎数据安全,只需要数据库模式与模型定义一致即可。 所以,我们可以关闭 nonebot-plugin-orm 的启动检查:

ALEMBIC_STARTUP_CHECK=false

现在,每次启动机器人时,数据库模式会自动与模型定义同步,无需手动迁移。

会话管理

我们已经成功定义了模型,并且迁移了数据库,现在可以开始使用数据库了……吗? 并不能,因为模型只是数据结构的定义,并不能通过它操作数据(如果你曾经使用过 Tortoise ORM,可能会知道 await Weather.get(location="上海") 这样的面向对象编程。 但是 SQLAlchemy 不同,选择了命令式编程)。 我们需要使用会话操作数据:

from nonebot import on_command
from nonebot.adapters import Message
from nonebot.params import CommandArg
from nonebot_plugin_orm import async_scoped_session

weather = on_command("天气")


@weather.handle()
async def _(session: async_scoped_session, args: Message = CommandArg()):
    location = args.extract_plain_text()

    if wea := await session.get(Weather, location):
        await weather.finish(f"今天{location}的天气是{wea.weather}")

    await weather.finish(f"未查询到{location}的天气")

我们通过 session: async_scoped_session 依赖注入获得了一个会话,然后使用 await session.get(Weather, location) 查询数据库。 async_scoped_session 是一个有作用域限制的会话,作用域为当前事件、当前事件响应器。 会话产生的模型实例(例如此处的 wea := await session.get(Weather, location))作用域与会话相同。

:::caution 注意 此处提到的“会话”指的是 ORM 会话,而非 NoneBot 会话两者的生命周期也是不同的NoneBot 会话的生命周期中可能包含多个事件,不同的事件也会有不同的事件响应器)。 具体而言,就是不要将 ORM 会话和模型实例存储在 NoneBot 会话状态中:

from nonebot.params import ArgPlainText
from nonebot.typing import T_State


@weather.got("location", prompt="请输入地名")
async def _(state: T_State, session: async_scoped_session, location: str = ArgPlainText()):
    wea = await session.get(Weather, location)

    if not wea:
        await weather.finish(f"未查询到{location}的天气")

    state["weather"] = wea  # 不要这么做,除非你知道自己在做什么

当然非要这么做也不是不可以:

@weather.handle()
async def _(state: T_State, session: async_scoped_session):
    # 通过 await session.merge(state["weather"]) 获得了此 ORM 会话中的相应模型实例,
    # 而非直接使用会话状态中的模型实例,
    # 因为先前的 ORM 会话已经关闭了。
    wea = await session.merge(state["weather"])
    await weather.finish(f"今天{state['location']}的天气是{wea.weather}")

:::

当有数据更改时,我们需要提交事务,也要注意会话作用域问题:

from nonebot.params import Depends


async def get_weather(
    session: async_scoped_session, args: Message = CommandArg()
) -> Weather:
    location = args.extract_plain_text()

    if not (wea := await session.get(Weather, location)):
        wea = Weather(location=location, weather="未知")
        session.add(wea)
        # await session.commit()  # 不应该在其他地方提交事务

    return wea


@weather.handle()
async def _(session: async_scoped_session, wea: Weather = Depends(get_weather)):
    await weather.send(f"今天的天气是{wea.weather}")
    await session.commit()  # 而应该在事件响应器结束前提交事务

当然我们也可以获得一个新的会话,不过此时就要手动管理会话了:

from nonebot_plugin_orm import get_session


async def get_weather(location: str) -> str:
    session = get_session()
    async with session.begin():
        wea = await session.get(Weather, location)

        if not wea:
            wea = Weather(location=location, weather="未知")
            session.add(wea)

        return wea.weather


@weather.handle()
async def _(args: Message = CommandArg()):
    wea = await get_weather(args.extract_plain_text())
    await weather.send(f"今天的天气是{wea}")

依赖注入

在上面的示例中,我们都是通过会话获得数据的。 不过,我们也可以通过依赖注入获得数据:

from sqlalchemy import select
from nonebot.params import Depends
from nonebot_plugin_orm import SQLDepends


def extract_arg_plain_text(args: Message = CommandArg()) -> str:
    return args.extract_plain_text()


@weather.handle()
async def _(
    wea: Weather = SQLDepends(
        select(Weather).where(Weather.location == Depends(extract_arg_plain_text))
    ),
):
    await weather.send(f"今天的天气是{wea.weather}")

其中,SQLDepends 是一个特殊的依赖注入,它会根据类型标注和 SQL 语句提供数据SQL 语句中也可以有子依赖。

不同的类型标注也会获得不同形式的数据:

from collections.abc import Sequence

@weather.handle()
async def _(
    weas: Sequence[Weather] = SQLDepends(
        select(Weather).where(Weather.weather == Depends(extract_arg_plain_text))
    ),
):
    await weather.send(f"今天的天气是{weas[0].weather}的城市有{''.join(wea.location for wea in weas)}")

支持的类型标注请参见 依赖注入

我们也可以像 类作为依赖 那样,在类属性中声明子依赖:

from collections.abc import Sequence

class Weather(Model):
    location: Mapped[str] = mapped_column(primary_key=True)
    weather: Mapped[str] = Depends(extract_arg_plain_text)
    # weather: Annotated[Mapped[str], Depends(extract_arg_plain_text)]  # Annotated 支持


@weather.handle()
async def _(weas: Sequence[Weather]):
    await weather.send(
        f"今天的天气是{weas[0].weather}的城市有{''.join(wea.location for wea in weas)}"
    )