add rule permission reflected operation support (#872)

Feature: 添加 Rule, Permission 反向位运算支持
This commit is contained in:
Ju4tCode 2022-03-17 21:11:37 +08:00 committed by GitHub
parent 06f8dde33c
commit 02de6fd266
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 196 additions and 2 deletions

View File

@ -85,7 +85,7 @@ class Permission:
) )
return any(results) return any(results)
def __and__(self, other) -> NoReturn: def __and__(self, other: object) -> NoReturn:
raise RuntimeError("And operation between Permissions is not allowed.") raise RuntimeError("And operation between Permissions is not allowed.")
def __or__( def __or__(
@ -98,6 +98,16 @@ class Permission:
else: else:
return Permission(*self.checkers, other) return Permission(*self.checkers, other)
def __ror__(
self, other: Optional[Union["Permission", T_PermissionChecker]]
) -> "Permission":
if other is None:
return self
elif isinstance(other, Permission):
return Permission(*other.checkers, *self.checkers)
else:
return Permission(other, *self.checkers)
class User: class User:
"""检查当前事件是否属于指定会话 """检查当前事件是否属于指定会话

View File

@ -91,5 +91,13 @@ class Rule:
else: else:
return Rule(*self.checkers, other) return Rule(*self.checkers, other)
def __or__(self, other) -> NoReturn: def __rand__(self, other: Optional[Union["Rule", T_RuleChecker]]) -> "Rule":
if other is None:
return self
elif isinstance(other, Rule):
return Rule(*other.checkers, *self.checkers)
else:
return Rule(other, *self.checkers)
def __or__(self, other: object) -> NoReturn:
raise RuntimeError("Or operation between rules is not allowed.") raise RuntimeError("Or operation between rules is not allowed.")

View File

@ -1,3 +1,4 @@
LOG_LEVEL=TRACE LOG_LEVEL=TRACE
NICKNAME=["test"] NICKNAME=["test"]
SUPERUSERS=["test", "fake:faketest"]
CONFIG_FROM_ENV= CONFIG_FROM_ENV=

165
tests/test_permission.py Normal file
View File

@ -0,0 +1,165 @@
import pytest
from nonebug import App
from utils import make_fake_event
@pytest.mark.asyncio
async def test_permission(app: App):
from nonebot.permission import Permission
from nonebot.exception import SkippedException
async def falsy():
return False
async def truthy():
return True
async def skipped() -> bool:
raise SkippedException
def _is_eq(a: Permission, b: Permission) -> bool:
return {d.call for d in a.checkers} == {d.call for d in b.checkers}
assert _is_eq(Permission(truthy) | None, Permission(truthy))
assert _is_eq(Permission(truthy) | falsy, Permission(truthy, falsy))
assert _is_eq(Permission(truthy) | Permission(falsy), Permission(truthy, falsy))
assert _is_eq(None | Permission(truthy), Permission(truthy))
assert _is_eq(truthy | Permission(falsy), Permission(truthy, falsy))
event = make_fake_event()()
async with app.test_api() as ctx:
bot = ctx.create_bot()
assert await Permission(falsy)(bot, event) == False
assert await Permission(truthy)(bot, event) == True
assert await Permission(skipped)(bot, event) == False
assert await Permission(truthy, falsy)(bot, event) == True
assert await Permission(truthy, skipped)(bot, event) == True
@pytest.mark.asyncio
@pytest.mark.parametrize(
"type,expected",
[
("message", True),
("notice", False),
],
)
async def test_message(
app: App,
type: str,
expected: bool,
):
from nonebot.permission import MESSAGE, Message
dependent = list(MESSAGE.checkers)[0]
checker = dependent.call
assert isinstance(checker, Message)
event = make_fake_event(_type=type)()
assert await dependent(event=event) == expected
@pytest.mark.asyncio
@pytest.mark.parametrize(
"type,expected",
[
("message", False),
("notice", True),
],
)
async def test_notice(
app: App,
type: str,
expected: bool,
):
from nonebot.permission import NOTICE, Notice
dependent = list(NOTICE.checkers)[0]
checker = dependent.call
assert isinstance(checker, Notice)
event = make_fake_event(_type=type)()
assert await dependent(event=event) == expected
@pytest.mark.asyncio
@pytest.mark.parametrize(
"type,expected",
[
("message", False),
("request", True),
],
)
async def test_request(
app: App,
type: str,
expected: bool,
):
from nonebot.permission import REQUEST, Request
dependent = list(REQUEST.checkers)[0]
checker = dependent.call
assert isinstance(checker, Request)
event = make_fake_event(_type=type)()
assert await dependent(event=event) == expected
@pytest.mark.asyncio
@pytest.mark.parametrize(
"type,expected",
[
("message", False),
("meta_event", True),
],
)
async def test_metaevent(
app: App,
type: str,
expected: bool,
):
from nonebot.permission import METAEVENT, MetaEvent
dependent = list(METAEVENT.checkers)[0]
checker = dependent.call
assert isinstance(checker, MetaEvent)
event = make_fake_event(_type=type)()
assert await dependent(event=event) == expected
@pytest.mark.asyncio
@pytest.mark.parametrize(
"type,user_id,expected",
[
("message", "test", True),
("message", "foo", False),
("message", "faketest", True),
("notice", "test", False),
],
)
async def test_startswith(
app: App,
type: str,
user_id: str,
expected: bool,
):
from nonebot.permission import SUPERUSER, SuperUser
dependent = list(SUPERUSER.checkers)[0]
checker = dependent.call
assert isinstance(checker, SuperUser)
event = make_fake_event(_type=type, _user_id=user_id)()
async with app.test_api() as ctx:
bot = ctx.create_bot()
assert await dependent(bot=bot, event=event) == expected

View File

@ -20,6 +20,16 @@ async def test_rule(app: App):
async def skipped() -> bool: async def skipped() -> bool:
raise SkippedException raise SkippedException
def _is_eq(a: Rule, b: Rule) -> bool:
return {d.call for d in a.checkers} == {d.call for d in b.checkers}
assert _is_eq(Rule(truthy) & None, Rule(truthy))
assert _is_eq(Rule(truthy) & falsy, Rule(truthy, falsy))
assert _is_eq(Rule(truthy) & Rule(falsy), Rule(truthy, falsy))
assert _is_eq(None & Rule(truthy), Rule(truthy))
assert _is_eq(truthy & Rule(falsy), Rule(truthy, falsy))
event = make_fake_event()() event = make_fake_event()()
async with app.test_api() as ctx: async with app.test_api() as ctx: