🚧 create processor

This commit is contained in:
yanyongyu 2021-11-12 20:55:59 +08:00
parent c454cf0874
commit f7eadb48b5
9 changed files with 205 additions and 269 deletions

View File

@ -1,2 +0,0 @@
from .models import Depends as Depends
from .utils import get_dependent as get_dependent

View File

@ -1,89 +0,0 @@
from enum import Enum
from typing import Any, List, Callable, Optional
from pydantic.fields import Required, FieldInfo, ModelField
from nonebot.utils import get_name
class Depends:
def __init__(self,
dependency: Optional[Callable[..., Any]] = None,
*,
use_cache: bool = True) -> None:
self.dependency = dependency
self.use_cache = use_cache
def __repr__(self) -> str:
dep = get_name(self.dependency)
cache = "" if self.use_cache else ", use_cache=False"
return f"{self.__class__.__name__}({dep}{cache})"
class Dependent:
def __init__(self,
*,
func: Optional[Callable[..., Any]] = None,
name: Optional[str] = None,
bot_param: Optional[ModelField] = None,
event_param: Optional[ModelField] = None,
state_param: Optional[ModelField] = None,
matcher_param: Optional[ModelField] = None,
simple_params: Optional[List[ModelField]] = None,
dependencies: Optional[List["Dependent"]] = None,
use_cache: bool = True) -> None:
self.func = func
self.name = name
self.bot_param = bot_param
self.event_param = event_param
self.state_param = state_param
self.matcher_param = matcher_param
self.simple_params = simple_params or []
self.dependencies = dependencies or []
self.use_cache = use_cache
self.cache_key = (self.func,)
class ParamTypes(Enum):
BOT = "bot"
EVENT = "event"
STATE = "state"
MATCHER = "matcher"
SIMPLE = "simple"
class Param(FieldInfo):
in_: ParamTypes
def __init__(self, default: Any):
super().__init__(default=default)
def __repr__(self) -> str:
return f"{self.__class__.__name__}"
class BotParam(Param):
in_ = ParamTypes.BOT
class EventParam(Param):
in_ = ParamTypes.EVENT
class StateParam(Param):
in_ = ParamTypes.STATE
class MatcherParam(Param):
in_ = ParamTypes.MATCHER
class SimpleParam(Param):
in_ = ParamTypes.SIMPLE
def __init__(self, default: Any):
if default is Required:
raise ValueError("SimpleParam should be given a default value")
super().__init__(default)

View File

@ -1,150 +0,0 @@
import inspect
from typing import Any, Dict, Type, Union, Callable, Optional, ForwardRef
from pydantic import BaseConfig
from pydantic.class_validators import Validator
from pydantic.typing import evaluate_forwardref
from pydantic.schema import get_annotation_from_field_info
from pydantic.fields import Required, FieldInfo, ModelField, UndefinedType
from .models import Param, Depends, Dependent, ParamTypes, SimpleParam
def get_typed_signature(call: Callable[..., Any]) -> inspect.Signature:
signature = inspect.signature(call)
globalns = getattr(call, "__globals__", {})
typed_params = [
inspect.Parameter(
name=param.name,
kind=param.kind,
default=param.default,
annotation=get_typed_annotation(param, globalns),
) for param in signature.parameters.values()
]
typed_signature = inspect.Signature(typed_params)
return typed_signature
def get_typed_annotation(param: inspect.Parameter, globalns: Dict[str,
Any]) -> Any:
annotation = param.annotation
if isinstance(annotation, str):
annotation = ForwardRef(annotation)
annotation = evaluate_forwardref(annotation, globalns, globalns)
return annotation
def get_param_sub_dependent(*, param: inspect.Parameter) -> Dependent:
depends: Depends = param.default
if depends.dependency:
dependency = depends.dependency
else:
dependency = param.annotation
return get_sub_dependant(
depends=depends,
dependency=dependency,
name=param.name,
)
def get_parameterless_sub_dependant(*, depends: Depends) -> Dependent:
assert callable(
depends.dependency
), "A parameter-less dependency must have a callable dependency"
return get_sub_dependant(depends=depends, dependency=depends.dependency)
def get_sub_dependant(
*,
depends: Depends,
dependency: Callable[..., Any],
name: Optional[str] = None,
) -> Dependent:
sub_dependant = get_dependent(
func=dependency,
name=name,
use_cache=depends.use_cache,
)
return sub_dependant
def get_dependent(*,
func: Callable[..., Any],
name: Optional[str] = None,
use_cache: bool = True) -> Dependent:
signature = get_typed_signature(func)
params = signature.parameters
dependent = Dependent(func=func, name=name, use_cache=use_cache)
for param_name, param in params.items():
if isinstance(param.default, Depends):
sub_dependent = get_param_sub_dependent(param=param)
dependent.dependencies.append(sub_dependent)
continue
param_field = get_param_field(param=param,
param_name=param_name,
default_field_info=SimpleParam)
return dependent
def get_param_field(*,
param: inspect.Parameter,
param_name: str,
default_field_info: Type[Param] = Param,
force_type: Optional[ParamTypes] = None,
ignore_default: bool = False) -> ModelField:
default_value = Required
if param.default != param.empty and not ignore_default:
default_value = param.default
if isinstance(default_value, FieldInfo):
field_info = default_value
default_value = field_info.default
if (isinstance(field_info, Param) and
getattr(field_info, "in_", None) is None):
field_info.in_ = default_field_info.in_
if force_type:
field_info.in_ = force_type # type: ignore
else:
field_info = default_field_info(default_value)
required: bool = default_value == Required
annotation: Any = Any
if param.annotation != param.empty:
annotation = param.annotation
annotation = get_annotation_from_field_info(annotation, field_info,
param_name)
if not field_info.alias and getattr(field_info, "convert_underscores",
None):
alias = param.name.replace("_", "-")
else:
alias = field_info.alias or param.name
field = create_field(
name=param.name,
type_=annotation,
default=None if required else default_value,
alias=alias,
required=required,
field_info=field_info,
)
# field.required = required
return field
def create_field(name: str,
type_: Type[Any],
class_validators: Optional[Dict[str, Validator]] = None,
default: Optional[Any] = None,
required: Union[bool, UndefinedType] = False,
model_config: Type[BaseConfig] = BaseConfig,
field_info: Optional[FieldInfo] = None,
alias: Optional[str] = None) -> ModelField:
class_validators = class_validators or {}
field_info = field_info or FieldInfo(None)
return ModelField(name=name,
type_=type_,
class_validators=class_validators,
model_config=model_config,
default=default,
required=required,
alias=alias,
field_info=field_info)

View File

@ -1,27 +0,0 @@
"""
事件处理函数
============
该模块实现事件处理函数的封装以实现动态参数等功能
"""
import inspect
from typing import Optional
from pydantic.typing import evaluate_forwardref
from nonebot.utils import get_name
from nonebot.typing import T_Handler
class Handler:
"""事件处理函数类"""
def __init__(self, func: T_Handler, *, name: Optional[str] = None):
"""装饰事件处理函数以便根据动态参数运行"""
self.func: T_Handler = func
"""
:类型: ``T_Handler``
:说明: 事件处理函数
"""
self.name = get_name(func) if name is None else name

View File

@ -0,0 +1,77 @@
import inspect
from typing import Any, Callable, Optional
from .models import Dependent
from .models import Depends as Depends
from nonebot.adapters import Bot, Event
from .utils import get_typed_signature, generic_check_issubclass
def get_param_sub_dependent(*, param: inspect.Parameter) -> Dependent:
depends: Depends = param.default
if depends.dependency:
dependency = depends.dependency
else:
dependency = param.annotation
return get_sub_dependant(
depends=depends,
dependency=dependency,
name=param.name,
)
def get_parameterless_sub_dependant(*, depends: Depends) -> Dependent:
assert callable(
depends.dependency
), "A parameter-less dependency must have a callable dependency"
return get_sub_dependant(depends=depends, dependency=depends.dependency)
def get_sub_dependant(
*,
depends: Depends,
dependency: Callable[..., Any],
name: Optional[str] = None,
) -> Dependent:
sub_dependant = get_dependent(
func=dependency,
name=name,
use_cache=depends.use_cache,
)
return sub_dependant
def get_dependent(*,
func: Callable[..., Any],
name: Optional[str] = None,
use_cache: bool = True) -> Dependent:
signature = get_typed_signature(func)
params = signature.parameters
dependent = Dependent(func=func, name=name, use_cache=use_cache)
for param_name, param in params.items():
if isinstance(param.default, Depends):
sub_dependent = get_param_sub_dependent(param=param)
dependent.dependencies.append(sub_dependent)
continue
if generic_check_issubclass(param.annotation, Bot):
dependent.bot_param_name = param_name
continue
elif generic_check_issubclass(param.annotation, Event):
dependent.event_param_name = param_name
continue
elif generic_check_issubclass(param.annotation, dict):
dependent.state_param_name = param_name
continue
elif generic_check_issubclass(param.annotation, Matcher):
dependent.matcher_param_name = param_name
continue
raise ValueError(
f"Unknown parameter {param_name} with type {param.annotation}")
return dependent
from .handler import Handler as Handler
from .matcher import Matcher as Matcher

View File

@ -0,0 +1,43 @@
"""
事件处理函数
============
该模块实现事件处理函数的封装以实现动态参数等功能
"""
from typing import TYPE_CHECKING, List, Optional
from .models import Depends
from nonebot.utils import get_name
from nonebot.typing import T_State, T_Handler
from . import get_dependent, get_parameterless_sub_dependant
if TYPE_CHECKING:
from .matcher import Matcher
from nonebot.adapters import Bot, Event
class Handler:
"""事件处理函数类"""
def __init__(self,
func: T_Handler,
*,
name: Optional[str] = None,
dependencies: Optional[List[Depends]] = None):
"""装饰事件处理函数以便根据动态参数运行"""
self.func: T_Handler = func
"""
:类型: ``T_Handler``
:说明: 事件处理函数
"""
self.name = get_name(func) if name is None else name
self.dependencies = dependencies or []
self.dependent = get_dependent(func=func)
for depends in self.dependencies[::-1]:
self.dependent.dependencies.insert(
0, get_parameterless_sub_dependant(depends=depends))
def __call__(self, bot: Bot, event: Event, state: T_State,
matcher: "Matcher"):
...

View File

@ -13,9 +13,9 @@ from collections import defaultdict
from typing import (TYPE_CHECKING, Any, Dict, List, Type, Union, Callable,
NoReturn, Optional)
from .handler import Handler
from nonebot.rule import Rule
from nonebot.log import logger
from nonebot.handler import Handler
from nonebot.adapters import MessageTemplate
from nonebot.permission import USER, Permission
from nonebot.exception import (PausedException, StopPropagation,

View File

@ -0,0 +1,41 @@
from typing import Any, List, Callable, Optional
from nonebot.utils import get_name
class Depends:
def __init__(self,
dependency: Optional[Callable[..., Any]] = None,
*,
use_cache: bool = True) -> None:
self.dependency = dependency
self.use_cache = use_cache
def __repr__(self) -> str:
dep = get_name(self.dependency)
cache = "" if self.use_cache else ", use_cache=False"
return f"{self.__class__.__name__}({dep}{cache})"
class Dependent:
def __init__(self,
*,
func: Optional[Callable[..., Any]] = None,
name: Optional[str] = None,
bot_param_name: Optional[str] = None,
event_param_name: Optional[str] = None,
state_param_name: Optional[str] = None,
matcher_param_name: Optional[str] = None,
dependencies: Optional[List["Dependent"]] = None,
use_cache: bool = True) -> None:
self.func = func
self.name = name
self.bot_param_name = bot_param_name
self.event_param_name = event_param_name
self.state_param_name = state_param_name
self.matcher_param_name = matcher_param_name
self.dependencies = dependencies or []
self.use_cache = use_cache
self.cache_key = (self.func,)

View File

@ -0,0 +1,43 @@
import inspect
from typing import Any, Dict, Type, Tuple, Union, Callable
from pydantic.typing import (ForwardRef, GenericAlias, get_args, get_origin,
evaluate_forwardref)
def get_typed_signature(call: Callable[..., Any]) -> inspect.Signature:
signature = inspect.signature(call)
globalns = getattr(call, "__globals__", {})
typed_params = [
inspect.Parameter(
name=param.name,
kind=param.kind,
default=param.default,
annotation=get_typed_annotation(param, globalns),
) for param in signature.parameters.values()
]
typed_signature = inspect.Signature(typed_params)
return typed_signature
def get_typed_annotation(param: inspect.Parameter, globalns: Dict[str,
Any]) -> Any:
annotation = param.annotation
if isinstance(annotation, str):
annotation = ForwardRef(annotation)
annotation = evaluate_forwardref(annotation, globalns, globalns)
return annotation
def generic_check_issubclass(
cls: Any, class_or_tuple: Union[Type[Any], Tuple[Type[Any],
...]]) -> bool:
try:
return isinstance(cls, type) and issubclass(cls, class_or_tuple)
except TypeError:
if get_origin(cls) is Union:
for type_ in get_args(cls):
if not generic_check_issubclass(type_, class_or_tuple):
return False
return True
raise