Feature: 支持自定义 matchers 存储管理 (#1395)

This commit is contained in:
Ju4tCode 2022-11-21 18:44:55 +08:00 committed by GitHub
parent c783ab5e9b
commit 2922da7b2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 171 additions and 20 deletions

View File

@ -0,0 +1,11 @@
from .manager import MatcherManager as MatcherManager
from .provider import MatcherProvider as MatcherProvider
from .provider import DEFAULT_PROVIDER_CLASS as DEFAULT_PROVIDER_CLASS
matchers = MatcherManager()
from .matcher import Matcher as Matcher
from .matcher import current_bot as current_bot
from .matcher import current_event as current_event
from .matcher import current_handler as current_handler
from .matcher import current_matcher as current_matcher

View File

@ -0,0 +1,104 @@
from typing import (
TYPE_CHECKING,
Any,
List,
Type,
Tuple,
Union,
TypeVar,
Iterator,
KeysView,
Optional,
ItemsView,
ValuesView,
MutableMapping,
overload,
)
from .provider import DEFAULT_PROVIDER_CLASS, MatcherProvider
if TYPE_CHECKING:
from .matcher import Matcher
T = TypeVar("T")
class MatcherManager(MutableMapping[int, List[Type["Matcher"]]]):
"""事件响应器管理器
实现了常用字典操作用于管理事件响应器
"""
def __init__(self):
self.provider: MatcherProvider = DEFAULT_PROVIDER_CLASS({})
def __repr__(self) -> str:
return f"MatcherManager(provider={self.provider!r})"
def __contains__(self, o: object) -> bool:
return o in self.provider
def __iter__(self) -> Iterator[int]:
return iter(self.provider)
def __len__(self) -> int:
return len(self.provider)
def __getitem__(self, key: int) -> List[Type["Matcher"]]:
return self.provider[key]
def __setitem__(self, key: int, value: List[Type["Matcher"]]) -> None:
self.provider[key] = value
def __delitem__(self, key: int) -> None:
del self.provider[key]
def __eq__(self, other: Any) -> bool:
return isinstance(other, MatcherManager) and self.provider == other.provider
def keys(self) -> KeysView[int]:
return self.provider.keys()
def values(self) -> ValuesView[List[Type["Matcher"]]]:
return self.provider.values()
def items(self) -> ItemsView[int, List[Type["Matcher"]]]:
return self.provider.items()
@overload
def get(self, key: int) -> Optional[List[Type["Matcher"]]]:
...
@overload
def get(self, key: int, default: T) -> Union[List[Type["Matcher"]], T]:
...
def get(
self, key: int, default: Optional[T] = None
) -> Optional[Union[List[Type["Matcher"]], T]]:
return self.provider.get(key, default)
def pop(self, key: int) -> List[Type["Matcher"]]:
return self.provider.pop(key)
def popitem(self) -> Tuple[int, List[Type["Matcher"]]]:
return self.provider.popitem()
def clear(self) -> None:
self.provider.clear()
def update(self, __m: MutableMapping[int, List[Type["Matcher"]]]) -> None:
self.provider.update(__m)
def setdefault(
self, key: int, default: List[Type["Matcher"]]
) -> List[Type["Matcher"]]:
return self.provider.setdefault(key, default)
def set_provider(self, provider_class: Type[MatcherProvider]) -> None:
"""设置事件响应器存储器
参数:
provider_class: 事件响应器存储器类
"""
self.provider = provider_class(self.provider)

View File

@ -1,12 +1,10 @@
from types import ModuleType
from contextvars import ContextVar
from collections import defaultdict
from datetime import datetime, timedelta
from contextlib import AsyncExitStack, contextmanager
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Type,
Union,
@ -19,7 +17,16 @@ from typing import (
)
from nonebot.log import logger
from nonebot.internal.rule import Rule
from nonebot.dependencies import Dependent
from nonebot.internal.permission import USER, User, Permission
from nonebot.internal.adapter import (
Bot,
Event,
Message,
MessageSegment,
MessageTemplate,
)
from nonebot.consts import (
ARG_KEY,
RECEIVE_KEY,
@ -42,11 +49,7 @@ from nonebot.exception import (
FinishedException,
RejectedException,
)
from .rule import Rule
from .permission import USER, User, Permission
from .adapter import Bot, Event, Message, MessageSegment, MessageTemplate
from .params import (
from nonebot.internal.params import (
Depends,
ArgParam,
BotParam,
@ -57,13 +60,13 @@ from .params import (
MatcherParam,
)
from . import matchers
if TYPE_CHECKING:
from nonebot.plugin import Plugin
T = TypeVar("T")
matchers: Dict[int, List[Type["Matcher"]]] = defaultdict(list)
"""用于存储当前所有的事件响应器"""
current_bot: ContextVar[Bot] = ContextVar("current_bot")
current_event: ContextVar[Event] = ContextVar("current_event")
current_matcher: ContextVar["Matcher"] = ContextVar("current_matcher")
@ -764,14 +767,3 @@ class Matcher(metaclass=MatcherMeta):
)
except FinishedException:
pass
__autodoc__ = {
"MatcherMeta": False,
"Matcher.get_target": False,
"Matcher.set_target": False,
"Matcher.update_type": False,
"Matcher.update_permission": False,
"Matcher.resolve_reject": False,
"Matcher.simple_run": False,
}

View File

@ -0,0 +1,27 @@
import abc
from collections import defaultdict
from typing import TYPE_CHECKING, List, Type, Mapping, MutableMapping
if TYPE_CHECKING:
from .matcher import Matcher
class MatcherProvider(abc.ABC, MutableMapping[int, List[Type["Matcher"]]]):
"""事件响应器存储器基类
参数:
matchers: 当前存储器中已有的事件响应器
"""
@abc.abstractmethod
def __init__(self, matchers: Mapping[int, List[Type["Matcher"]]]):
raise NotImplementedError
class _DictProvider(defaultdict, MatcherProvider):
def __init__(self, matchers: Mapping[int, List[Type["Matcher"]]]):
super().__init__(list, matchers)
DEFAULT_PROVIDER_CLASS = _DictProvider
"""默认存储器类型"""

View File

@ -9,10 +9,16 @@ from nonebot.internal.matcher import Matcher as Matcher
from nonebot.internal.matcher import matchers as matchers
from nonebot.internal.matcher import current_bot as current_bot
from nonebot.internal.matcher import current_event as current_event
from nonebot.internal.matcher import MatcherManager as MatcherManager
from nonebot.internal.matcher import MatcherProvider as MatcherProvider
from nonebot.internal.matcher import current_handler as current_handler
from nonebot.internal.matcher import current_matcher as current_matcher
from nonebot.internal.matcher import DEFAULT_PROVIDER_CLASS as DEFAULT_PROVIDER_CLASS
__autodoc__ = {
"Matcher": True,
"matchers": True,
"MatcherManager": True,
"MatcherProvider": True,
"DEFAULT_PROVIDER_CLASS": True,
}

View File

@ -0,0 +1,11 @@
import pytest
from nonebug import App
@pytest.mark.asyncio
async def test_manager(app: App, load_plugin):
from nonebot.matcher import DEFAULT_PROVIDER_CLASS, matchers
default_provider = matchers.provider
matchers.set_provider(DEFAULT_PROVIDER_CLASS)
assert matchers.provider == default_provider