2021-03-18 19:16:18 +08:00
2021-07-26 12:14:29 +08:00
== == == == == ==
2021-03-18 19:16:18 +08:00
该模块实现事件处理函数的封装 , 以实现动态参数等功能 。
import inspect
from typing import Any , List , Dict , Type , Union , Optional , TYPE_CHECKING
from typing import ForwardRef , _eval_type # type: ignore
from nonebot . log import logger
from nonebot . typing import T_Handler , T_State
from nonebot . matcher import Matcher
from nonebot . adapters import Bot , Event
class HandlerMeta ( type ) :
2021-03-20 15:41:50 +08:00
func : T_Handler
signature : inspect . Signature
bot_type : Type [ " Bot " ]
event_type : Optional [ Type [ " Event " ] ]
state_type : Optional [ T_State ]
matcher_type : Optional [ Type [ " Matcher " ] ]
def __repr__ ( self ) - > str :
return ( f " <Handler { self . func . __name__ } (bot: { self . bot_type } , "
f " event: { self . event_type } , state: { self . state_type } , "
f " matcher: { self . matcher_type } )> " )
def __str__ ( self ) - > str :
2021-03-25 18:22:56 +08:00
return repr ( self )
2021-03-18 19:16:18 +08:00
class Handler ( metaclass = HandlerMeta ) :
""" 事件处理函数类 """
2021-03-19 14:59:59 +08:00
def __init__ ( self , func : T_Handler ) :
2021-03-20 15:41:50 +08:00
""" 装饰事件处理函数以便根据动态参数运行 """
2021-03-18 19:16:18 +08:00
self . func : T_Handler = func
: 类型 : ` ` T_Handler ` `
: 说明 : 事件处理函数
self . signature : inspect . Signature = self . get_signature ( )
: 类型 : ` ` inspect . Signature ` `
: 说明 : 事件处理函数签名
2021-03-20 15:41:50 +08:00
def __repr__ ( self ) - > str :
return ( f " <Handler { self . func . __name__ } (bot: { self . bot_type } , "
f " event: { self . event_type } , state: { self . state_type } , "
f " matcher: { self . matcher_type } )> " )
def __str__ ( self ) - > str :
2021-03-25 18:22:56 +08:00
return repr ( self )
2021-03-20 15:41:50 +08:00
2021-03-18 19:16:18 +08:00
async def __call__ ( self , matcher : " Matcher " , bot : " Bot " , event : " Event " ,
state : T_State ) :
2021-03-25 18:22:56 +08:00
BotType = ( ( self . bot_type is not inspect . Parameter . empty ) and
inspect . isclass ( self . bot_type ) and self . bot_type )
2021-03-18 19:16:18 +08:00
if BotType and not isinstance ( bot , BotType ) :
logger . debug (
f " Matcher { matcher } bot type { type ( bot ) } not match annotation { BotType } , ignored "
2021-03-25 18:22:56 +08:00
EventType = ( ( self . event_type is not inspect . Parameter . empty ) and
inspect . isclass ( self . event_type ) and self . event_type )
2021-03-18 19:16:18 +08:00
if EventType and not isinstance ( event , EventType ) :
logger . debug (
f " Matcher { matcher } event type { type ( event ) } not match annotation { EventType } , ignored "
args = { " bot " : bot , " event " : event , " state " : state , " matcher " : matcher }
await self . func (
2021-03-25 18:22:56 +08:00
* * {
k : v
for k , v in args . items ( )
if self . signature . parameters . get ( k , None ) is not None
} )
2021-03-18 19:16:18 +08:00
2021-03-20 15:41:50 +08:00
def bot_type ( self ) - > Union [ Type [ " Bot " ] , inspect . Parameter . empty ] :
: 类型 : ` ` Union [ Type [ " Bot " ] , inspect . Parameter . empty ] ` `
: 说明 : 事件处理函数接受的 Bot 对象类型 """
2021-03-18 19:16:18 +08:00
return self . signature . parameters [ " bot " ] . annotation
2021-03-20 15:41:50 +08:00
def event_type (
self ) - > Optional [ Union [ Type [ " Event " ] , inspect . Parameter . empty ] ] :
: 类型 : ` ` Optional [ Union [ Type [ Event ] , inspect . Parameter . empty ] ] ` `
: 说明 : 事件处理函数接受的 event 类型 / 不需要 event 参数
2021-03-18 21:30:52 +08:00
if " event " not in self . signature . parameters :
2021-03-18 19:16:18 +08:00
return None
return self . signature . parameters [ " event " ] . annotation
2021-03-20 15:41:50 +08:00
def state_type ( self ) - > Optional [ Union [ T_State , inspect . Parameter . empty ] ] :
: 类型 : ` ` Optional [ Union [ T_State , inspect . Parameter . empty ] ] ` `
: 说明 : 事件处理函数是否接受 state 参数
2021-03-18 21:30:52 +08:00
if " state " not in self . signature . parameters :
2021-03-18 19:16:18 +08:00
return None
return self . signature . parameters [ " state " ] . annotation
2021-03-20 15:41:50 +08:00
def matcher_type (
self ) - > Optional [ Union [ Type [ " Matcher " ] , inspect . Parameter . empty ] ] :
: 类型 : ` ` Optional [ Union [ Type [ " Matcher " ] , inspect . Parameter . empty ] ] ` `
: 说明 : 事件处理函数是否接受 matcher 参数
2021-03-18 21:30:52 +08:00
if " matcher " not in self . signature . parameters :
2021-03-18 19:16:18 +08:00
return None
return self . signature . parameters [ " matcher " ] . annotation
def get_signature ( self ) - > inspect . Signature :
wrapped_signature = self . _get_typed_signature ( )
signature = self . _get_typed_signature ( False )
self . _check_params ( signature )
self . _check_bot_param ( signature )
self . _check_bot_param ( wrapped_signature )
2021-03-18 21:30:52 +08:00
signature . parameters [ " bot " ] . replace (
annotation = wrapped_signature . parameters [ " bot " ] . annotation )
2021-03-18 19:16:18 +08:00
if " event " in wrapped_signature . parameters and " event " in signature . parameters :
2021-03-18 21:30:52 +08:00
signature . parameters [ " event " ] . replace (
annotation = wrapped_signature . parameters [ " event " ] . annotation )
2021-03-18 19:16:18 +08:00
return signature
def update_signature (
self , * * kwargs : Union [ None , Type [ " Bot " ] , Type [ " Event " ] , Type [ " Matcher " ] ,
2021-03-20 15:41:50 +08:00
T_State , inspect . Parameter . empty ]
2021-03-18 19:16:18 +08:00
) - > None :
params : List [ inspect . Parameter ] = [ ]
for param in [ " bot " , " event " , " state " , " matcher " ] :
sig = self . signature . parameters . get ( param , None )
if param in kwargs :
sig = inspect . Parameter ( param ,
inspect . Parameter . POSITIONAL_OR_KEYWORD ,
annotation = kwargs [ param ] )
if sig :
params . append ( sig )
self . signature = inspect . Signature ( params )
def _get_typed_signature ( self ,
follow_wrapped : bool = True ) - > inspect . Signature :
signature = inspect . signature ( self . func , follow_wrapped = follow_wrapped )
2021-03-18 21:30:52 +08:00
globalns = getattr ( self . func , " __globals__ " , { } )
2021-03-18 19:16:18 +08:00
typed_params = [
inspect . Parameter (
name = param . name ,
kind = param . kind ,
default = param . default ,
annotation = param . annotation if follow_wrapped else
self . _get_typed_annotation ( param , globalns ) ,
) for param in signature . parameters . values ( )
typed_signature = inspect . Signature ( typed_params )
return typed_signature
def _get_typed_annotation ( self , param : inspect . Parameter ,
globalns : Dict [ str , Any ] ) - > Any :
try :
if isinstance ( param . annotation , str ) :
return _eval_type ( ForwardRef ( param . annotation ) , globalns ,
globalns )
else :
return param . annotation
except Exception :
return param . annotation
def _check_params ( self , signature : inspect . Signature ) :
2021-03-18 21:30:52 +08:00
if not set ( signature . parameters . keys ( ) ) < = {
2021-03-18 19:16:18 +08:00
" bot " , " event " , " state " , " matcher "
} :
raise ValueError (
" Handler param names must in `bot`/`event`/`state`/`matcher` " )
def _check_bot_param ( self , signature : inspect . Signature ) :
if not any (
param . name == " bot " for param in signature . parameters . values ( ) ) :
raise ValueError ( " Handler missing parameter ' bot ' " )