import pytest from nonebug import App from utils import load_plugin, make_fake_event, make_fake_message @pytest.mark.asyncio async def test_matcher(app: App, load_plugin): from plugins.matcher.matcher_process import ( test_got, test_handle, test_preset, test_combine, test_receive, test_overload, ) message = make_fake_message()("text") event = make_fake_event(_message=message)() message_next = make_fake_message()("text_next") event_next = make_fake_event(_message=message_next)() assert len(test_handle.handlers) == 1 async with app.test_matcher(test_handle) as ctx: bot = ctx.create_bot() ctx.receive_event(bot, event) ctx.should_call_send(event, "send", "result", at_sender=True) ctx.should_finished() assert len(test_got.handlers) == 1 async with app.test_matcher(test_got) as ctx: bot = ctx.create_bot() ctx.receive_event(bot, event) ctx.should_call_send(event, "prompt key1", "result1") ctx.receive_event(bot, event) ctx.should_call_send(event, "prompt key2", "result2") ctx.receive_event(bot, event) ctx.should_call_send(event, "reject", "result3", at_sender=True) ctx.should_rejected() ctx.receive_event(bot, event_next) assert len(test_receive.handlers) == 1 async with app.test_matcher(test_receive) as ctx: bot = ctx.create_bot() ctx.receive_event(bot, event) ctx.receive_event(bot, event) ctx.receive_event(bot, event) ctx.should_call_send(event, "pause", "result", at_sender=True) ctx.should_paused() assert len(test_receive.handlers) == 1 async with app.test_matcher(test_combine) as ctx: bot = ctx.create_bot() ctx.receive_event(bot, event) ctx.receive_event(bot, event) ctx.receive_event(bot, event) ctx.should_rejected() ctx.receive_event(bot, event_next) ctx.should_rejected() ctx.receive_event(bot, event_next) ctx.should_rejected() ctx.receive_event(bot, event_next) assert len(test_preset.handlers) == 2 async with app.test_matcher(test_preset) as ctx: bot = ctx.create_bot() ctx.receive_event(bot, event) ctx.receive_event(bot, event) ctx.should_rejected() ctx.receive_event(bot, event_next) assert len(test_overload.handlers) == 2 async with app.test_matcher(test_overload) as ctx: bot = ctx.create_bot() ctx.receive_event(bot, event) ctx.should_finished() @pytest.mark.asyncio async def test_type_updater(app: App, load_plugin): from plugins.matcher.matcher_type import ( test_type_updater, test_custom_updater, ) event = make_fake_event()() assert test_type_updater.type == "test" async with app.test_api() as ctx: bot = ctx.create_bot() matcher = test_type_updater() new_type = await matcher.update_type(bot, event) assert new_type == "message" assert test_custom_updater.type == "test" async with app.test_api() as ctx: bot = ctx.create_bot() matcher = test_custom_updater() new_type = await matcher.update_type(bot, event) assert new_type == "custom" @pytest.mark.asyncio async def test_permission_updater(app: App, load_plugin): from nonebot.permission import User from plugins.matcher.matcher_permission import ( default_permission, test_custom_updater, test_permission_updater, ) event = make_fake_event(_session_id="test")() assert test_permission_updater.permission is default_permission async with app.test_api() as ctx: bot = ctx.create_bot() matcher = test_permission_updater() new_perm = await matcher.update_permission(bot, event) assert len(new_perm.checkers) == 1 checker = list(new_perm.checkers)[0].call assert isinstance(checker, User) assert checker.users == ("test",) assert checker.perm is default_permission assert test_custom_updater.permission is default_permission async with app.test_api() as ctx: bot = ctx.create_bot() matcher = test_custom_updater() new_perm = await matcher.update_permission(bot, event) assert new_perm is default_permission @pytest.mark.asyncio async def test_run(app: App): from nonebot.matcher import Matcher, matchers assert not matchers event = make_fake_event()() async def reject(): await Matcher.reject() test_reject = Matcher.new(handlers=[reject]) async with app.test_api() as ctx: bot = ctx.create_bot() await test_reject().run(bot, event, {}) assert len(matchers[0]) == 1 assert len(matchers[0][0].handlers) == 1 del matchers[0] async def pause(): await Matcher.pause() test_pause = Matcher.new(handlers=[pause]) async with app.test_api() as ctx: bot = ctx.create_bot() await test_pause().run(bot, event, {}) assert len(matchers[0]) == 1 assert len(matchers[0][0].handlers) == 0