Feature: 为子依赖添加 PEP593 Annotated 支持 (#1832)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
Johnny Hsieh 2023-03-22 20:54:46 +08:00 committed by GitHub
parent 9afaf3d516
commit 3e92fccd4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 13 deletions

View File

@ -1,8 +1,10 @@
import asyncio import asyncio
import inspect import inspect
from typing_extensions import Annotated
from contextlib import AsyncExitStack, contextmanager, asynccontextmanager from contextlib import AsyncExitStack, contextmanager, asynccontextmanager
from typing import TYPE_CHECKING, Any, Type, Tuple, Literal, Callable, Optional, cast from typing import TYPE_CHECKING, Any, Type, Tuple, Literal, Callable, Optional, cast
from pydantic.typing import get_args, get_origin
from pydantic.fields import Required, Undefined, ModelField from pydantic.fields import Required, Undefined, ModelField
from nonebot.dependencies.utils import check_field_type from nonebot.dependencies.utils import check_field_type
@ -78,21 +80,33 @@ class DependParam(Param):
def _check_param( def _check_param(
cls, param: inspect.Parameter, allow_types: Tuple[Type[Param], ...] cls, param: inspect.Parameter, allow_types: Tuple[Type[Param], ...]
) -> Optional["DependParam"]: ) -> Optional["DependParam"]:
if isinstance(param.default, DependsInner): type_annotation, depends_inner = param.annotation, None
dependency: T_Handler if get_origin(param.annotation) is Annotated:
if param.default.dependency is None: type_annotation, *extra_args = get_args(param.annotation)
assert param.annotation is not param.empty, "Dependency cannot be empty" depends_inner = next(
dependency = param.annotation (x for x in extra_args if isinstance(x, DependsInner)), None
else:
dependency = param.default.dependency
sub_dependent = Dependent[Any].parse(
call=dependency,
allow_types=allow_types,
)
return cls(
Required, use_cache=param.default.use_cache, dependent=sub_dependent
) )
depends_inner = (
param.default if isinstance(param.default, DependsInner) else depends_inner
)
if depends_inner is None:
return
dependency: T_Handler
if depends_inner.dependency is None:
assert (
type_annotation is not inspect.Signature.empty
), "Dependency cannot be empty"
dependency = type_annotation
else:
dependency = depends_inner.dependency
sub_dependent = Dependent[Any].parse(
call=dependency,
allow_types=allow_types,
)
return cls(Required, use_cache=depends_inner.use_cache, dependent=sub_dependent)
@classmethod @classmethod
def _check_parameterless( def _check_parameterless(
cls, value: Any, allow_types: Tuple[Type[Param], ...] cls, value: Any, allow_types: Tuple[Type[Param], ...]

View File

@ -1,4 +1,5 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing_extensions import Annotated
from nonebot import on_message from nonebot import on_message
from nonebot.params import Depends from nonebot.params import Depends
@ -47,3 +48,17 @@ async def depends_cache(y: int = Depends(dependency, use_cache=True)):
async def class_depend(c: ClassDependency = Depends()): async def class_depend(c: ClassDependency = Depends()):
return c return c
async def annotated_depend(x: Annotated[int, Depends(dependency)]):
return x
async def annotated_class_depend(c: Annotated[ClassDependency, Depends()]):
return c
async def annotated_prior_depend(
x: Annotated[int, Depends(lambda: 2)] = Depends(dependency)
):
return x

View File

@ -42,6 +42,9 @@ async def test_depend(app: App):
depends, depends,
class_depend, class_depend,
test_depends, test_depends,
annotated_depend,
annotated_class_depend,
annotated_prior_depend,
) )
async with app.test_dependent(depends, allow_types=[DependParam]) as ctx: async with app.test_dependent(depends, allow_types=[DependParam]) as ctx:
@ -63,6 +66,20 @@ async def test_depend(app: App):
async with app.test_dependent(class_depend, allow_types=[DependParam]) as ctx: async with app.test_dependent(class_depend, allow_types=[DependParam]) as ctx:
ctx.should_return(ClassDependency(x=1, y=2)) ctx.should_return(ClassDependency(x=1, y=2))
async with app.test_dependent(annotated_depend, allow_types=[DependParam]) as ctx:
ctx.should_return(1)
async with app.test_dependent(
annotated_prior_depend, allow_types=[DependParam]
) as ctx:
ctx.should_return(1)
assert runned == [1, 1]
async with app.test_dependent(
annotated_class_depend, allow_types=[DependParam]
) as ctx:
ctx.should_return(ClassDependency(x=1, y=2))
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_bot(app: App): async def test_bot(app: App):