🐛 detect runtime plugin (#1857)

This commit is contained in:
Ju4tCode 2023-03-29 12:22:50 +08:00 committed by GitHub
parent 17c86f7da2
commit 2a2f7b6dce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 163 additions and 85 deletions

View File

@ -28,13 +28,26 @@ from nonebot.rule import (
shell_command,
)
from .plugin import Plugin
from . import get_plugin_by_module_name
from .manager import _current_plugin_chain
def _store_matcher(matcher: Type[Matcher]) -> None:
# only store the matcher defined in the plugin
if plugins := _current_plugin_chain.get():
plugins[-1].matcher.add(matcher)
# only store the matcher defined when plugin loading
if plugin_chain := _current_plugin_chain.get():
plugin_chain[-1].matcher.add(matcher)
def _get_matcher_plugin(depth: int = 1) -> Optional[Plugin]:
# matcher defined when plugin loading
if plugin_chain := _current_plugin_chain.get():
return plugin_chain[-1]
# matcher defined when plugin running
if module := _get_matcher_module(depth + 1):
if plugin := get_plugin_by_module_name(module.__name__):
return plugin
def _get_matcher_module(depth: int = 1) -> Optional[ModuleType]:
@ -71,7 +84,6 @@ def on(
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
plugin_chain = _current_plugin_chain.get()
matcher = Matcher.new(
type,
Rule() & rule,
@ -81,7 +93,7 @@ def on(
priority=priority,
block=block,
handlers=handlers,
plugin=plugin_chain[-1] if plugin_chain else None,
plugin=_get_matcher_plugin(_depth + 1),
module=_get_matcher_module(_depth + 1),
default_state=state,
)

View File

@ -47,7 +47,7 @@ class Plugin:
manager: "PluginManager"
"""导入该插件的插件管理器"""
matcher: Set[Type[Matcher]] = field(default_factory=set)
"""插件定义的 `Matcher`"""
"""插件加载时定义的 `Matcher`"""
parent_plugin: Optional["Plugin"] = None
"""父插件"""
sub_plugins: Set["Plugin"] = field(default_factory=set)

View File

@ -90,3 +90,6 @@ async def overload(event: FakeEvent):
@test_overload.handle()
async def finish():
await test_overload.finish()
test_destroy = on_message()

View File

@ -1,6 +1,8 @@
from typing import Type
from datetime import datetime, timezone
from nonebot.adapters import Event
from nonebot.matcher import Matcher
from nonebot import (
CommandGroup,
MatcherGroup,
@ -50,6 +52,20 @@ matcher_on = on(
)
def matcher_on_factory() -> Type[Matcher]:
return on(
"test",
rule=rule,
permission=permission,
handlers=[handler],
temp=True,
expire_time=expire_time,
priority=priority,
block=True,
state=state,
)
matcher_on_metaevent = on_metaevent(
rule=rule,
handlers=[handler],

View File

@ -79,6 +79,20 @@ async def test_matcher(app: App):
ctx.should_finished()
@pytest.mark.asyncio
async def test_matcher_destroy(app: App):
from plugins.matcher.matcher_process import test_destroy
async with app.test_matcher(test_destroy) as ctx:
assert len(matchers) == 1
assert len(matchers[test_destroy.priority]) == 1
assert matchers[test_destroy.priority][0] is test_destroy
test_destroy.destroy()
assert len(matchers[test_destroy.priority]) == 0
@pytest.mark.asyncio
async def test_type_updater(app: App):
from plugins.matcher.matcher_type import test_type_updater, test_custom_updater

View File

@ -1,8 +1,9 @@
from typing import Type, Optional
from typing import Type, Callable, Optional
import pytest
import nonebot
from nonebot.adapters import Event
from nonebot.typing import T_RuleChecker
from nonebot.matcher import Matcher, matchers
from nonebot.rule import (
@ -18,7 +19,74 @@ from nonebot.rule import (
@pytest.mark.asyncio
async def test_on():
@pytest.mark.parametrize(
"matcher_name, pre_rule_factory, has_permission",
[
pytest.param("matcher_on", None, True),
pytest.param("matcher_on_metaevent", None, False),
pytest.param("matcher_on_message", None, True),
pytest.param("matcher_on_notice", None, False),
pytest.param("matcher_on_request", None, False),
pytest.param(
"matcher_on_startswith", lambda e: StartswithRule(("test",)), True
),
pytest.param("matcher_on_endswith", lambda e: EndswithRule(("test",)), True),
pytest.param("matcher_on_fullmatch", lambda e: FullmatchRule(("test",)), True),
pytest.param("matcher_on_keyword", lambda e: KeywordsRule("test"), True),
pytest.param("matcher_on_command", lambda e: CommandRule([("test",)]), True),
pytest.param(
"matcher_on_shell_command",
lambda e: ShellCommandRule([("test",)], None),
True,
),
pytest.param("matcher_on_regex", lambda e: RegexRule("test"), True),
pytest.param("matcher_on_type", lambda e: IsTypeRule(e), True),
pytest.param("matcher_sub_cmd", lambda e: CommandRule([("test", "sub")]), True),
pytest.param(
"matcher_sub_shell_cmd",
lambda e: ShellCommandRule([("test", "sub")], None),
True,
),
pytest.param("matcher_group_on", None, True),
pytest.param("matcher_group_on_metaevent", None, False),
pytest.param("matcher_group_on_message", None, True),
pytest.param("matcher_group_on_notice", None, False),
pytest.param("matcher_group_on_request", None, False),
pytest.param(
"matcher_group_on_startswith",
lambda e: StartswithRule(("test",)),
True,
),
pytest.param(
"matcher_group_on_endswith",
lambda e: EndswithRule(("test",)),
True,
),
pytest.param(
"matcher_group_on_fullmatch",
lambda e: FullmatchRule(("test",)),
True,
),
pytest.param("matcher_group_on_keyword", lambda e: KeywordsRule("test"), True),
pytest.param(
"matcher_group_on_command",
lambda e: CommandRule([("test",)]),
True,
),
pytest.param(
"matcher_group_on_shell_command",
lambda e: ShellCommandRule([("test",)], None),
True,
),
pytest.param("matcher_group_on_regex", lambda e: RegexRule("test"), True),
pytest.param("matcher_group_on_type", lambda e: IsTypeRule(e), True),
],
)
async def test_on(
matcher_name: str,
pre_rule_factory: Optional[Callable[[Type[Event]], T_RuleChecker]],
has_permission: bool,
):
import plugins.plugin.matchers as module
from plugins.plugin.matchers import (
TestEvent,
@ -26,45 +94,18 @@ async def test_on():
state,
handler,
priority,
matcher_on,
permission,
expire_time,
matcher_on_type,
matcher_sub_cmd,
matcher_group_on,
matcher_on_regex,
matcher_on_notice,
matcher_on_command,
matcher_on_keyword,
matcher_on_message,
matcher_on_request,
matcher_on_endswith,
matcher_on_fullmatch,
matcher_on_metaevent,
matcher_group_on_type,
matcher_on_startswith,
matcher_sub_shell_cmd,
matcher_group_on_regex,
matcher_group_on_notice,
matcher_group_on_command,
matcher_group_on_keyword,
matcher_group_on_message,
matcher_group_on_request,
matcher_on_shell_command,
matcher_group_on_endswith,
matcher_group_on_fullmatch,
matcher_group_on_metaevent,
matcher_group_on_startswith,
matcher_group_on_shell_command,
)
plugin = nonebot.get_plugin("plugin")
matcher = getattr(module, matcher_name)
assert issubclass(matcher, Matcher), f"{matcher_name} should be a Matcher"
pre_rule = pre_rule_factory(TestEvent) if pre_rule_factory else None
plugin = nonebot.get_plugin("plugin")
assert plugin, "plugin should be loaded"
def _check(
matcher: Type[Matcher],
pre_rule: Optional[T_RuleChecker],
has_permission: bool,
):
assert {dependent.call for dependent in matcher.rule.checkers} == (
{pre_rule, rule} if pre_rule else {rule}
)
@ -82,35 +123,27 @@ async def test_on():
assert matcher._default_state == state
assert matcher.plugin is plugin
assert matcher in plugin.matcher
assert matcher.module is module
assert matcher.plugin_name == "plugin"
assert matcher.module_name == "plugins.plugin.matchers"
_check(matcher_on, None, True)
_check(matcher_on_metaevent, None, False)
_check(matcher_on_message, None, True)
_check(matcher_on_notice, None, False)
_check(matcher_on_request, None, False)
_check(matcher_on_startswith, StartswithRule(("test",)), True)
_check(matcher_on_endswith, EndswithRule(("test",)), True)
_check(matcher_on_fullmatch, FullmatchRule(("test",)), True)
_check(matcher_on_keyword, KeywordsRule("test"), True)
_check(matcher_on_command, CommandRule([("test",)]), True)
_check(matcher_on_shell_command, ShellCommandRule([("test",)], None), True)
_check(matcher_on_regex, RegexRule("test"), True)
_check(matcher_on_type, IsTypeRule(TestEvent), True)
_check(matcher_sub_cmd, CommandRule([("test", "sub")]), True)
_check(matcher_sub_shell_cmd, ShellCommandRule([("test", "sub")], None), True)
_check(matcher_group_on, None, True)
_check(matcher_group_on_metaevent, None, False)
_check(matcher_group_on_message, None, True)
_check(matcher_group_on_notice, None, False)
_check(matcher_group_on_request, None, False)
_check(matcher_group_on_startswith, StartswithRule(("test",)), True)
_check(matcher_group_on_endswith, EndswithRule(("test",)), True)
_check(matcher_group_on_fullmatch, FullmatchRule(("test",)), True)
_check(matcher_group_on_keyword, KeywordsRule("test"), True)
_check(matcher_group_on_command, CommandRule([("test",)]), True)
_check(matcher_group_on_shell_command, ShellCommandRule([("test",)], None), True)
_check(matcher_group_on_regex, RegexRule("test"), True)
_check(matcher_group_on_type, IsTypeRule(TestEvent), True)
@pytest.mark.asyncio
async def test_runtime_on():
import plugins.plugin.matchers as module
from plugins.plugin.matchers import matcher_on_factory
matcher = matcher_on_factory()
plugin = nonebot.get_plugin("plugin")
assert plugin, "plugin should be loaded"
try:
assert matcher.plugin is plugin
assert matcher not in plugin.matcher
assert matcher.module is module
assert matcher.plugin_name == "plugin"
assert matcher.module_name == "plugins.plugin.matchers"
finally:
matcher.destroy()