nonebot2/nonebot/permission.py
2022-01-12 19:24:01 +08:00

199 lines
5.3 KiB
Python

"""
## 权限
每个 `Matcher` 拥有一个 `Permission` ,其中是 `PermissionChecker` 的集合,只要有一个 `PermissionChecker` 检查结果为 `True` 时就会继续运行。
::: tip 提示
`PermissionChecker` 既可以是 async function 也可以是 sync function
:::
"""
import asyncio
from contextlib import AsyncExitStack
from typing import Any, Set, Tuple, Union, NoReturn, Optional, Coroutine
from nonebot.adapters import Bot, Event
from nonebot.dependencies import Dependent
from nonebot.exception import SkippedException
from nonebot.typing import T_Handler, T_DependencyCache, T_PermissionChecker
from nonebot.params import (
BotParam,
EventType,
EventParam,
DependParam,
DefaultParam,
)
async def _run_coro_with_catch(coro: Coroutine[Any, Any, Any]):
try:
return await coro
except SkippedException:
return False
class Permission:
"""
`Matcher` 规则类,当事件传递时,在 `Matcher` 运行前进行检查。
用法:
```python
Permission(async_function) | sync_function
# 等价于
from nonebot.utils import run_sync
Permission(async_function, run_sync(sync_function))
```
"""
__slots__ = ("checkers",)
HANDLER_PARAM_TYPES = [
DependParam,
BotParam,
EventParam,
DefaultParam,
]
def __init__(self, *checkers: Union[T_PermissionChecker, Dependent[bool]]) -> None:
"""
参数:
*checkers: PermissionChecker
"""
self.checkers: Set[Dependent[bool]] = set(
checker
if isinstance(checker, Dependent)
else Dependent[bool].parse(
call=checker, allow_types=self.HANDLER_PARAM_TYPES
)
for checker in checkers
)
"""
存储 `PermissionChecker`
"""
async def __call__(
self,
bot: Bot,
event: Event,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[T_DependencyCache] = None,
) -> bool:
"""
检查是否满足某个权限
参数:
bot: Bot 对象
event: Event 对象
stack: 异步上下文栈
dependency_cache: 依赖缓存
"""
if not self.checkers:
return True
results = await asyncio.gather(
*(
_run_coro_with_catch(
checker(
bot=bot,
event=event,
stack=stack,
dependency_cache=dependency_cache,
)
)
for checker in self.checkers
),
)
return any(results)
def __and__(self, other) -> NoReturn:
raise RuntimeError("And operation between Permissions is not allowed.")
def __or__(
self, other: Optional[Union["Permission", T_PermissionChecker]]
) -> "Permission":
if other is None:
return self
elif isinstance(other, Permission):
return Permission(*self.checkers, *other.checkers)
else:
return Permission(*self.checkers, other)
class Message:
async def __call__(self, type: str = EventType()) -> bool:
return type == "message"
class Notice:
async def __call__(self, type: str = EventType()) -> bool:
return type == "notice"
class Request:
async def __call__(self, type: str = EventType()) -> bool:
return type == "request"
class MetaEvent:
async def __call__(self, type: str = EventType()) -> bool:
return type == "meta_event"
MESSAGE = Permission(Message())
"""
匹配任意 `message` 类型事件,仅在需要同时捕获不同类型事件时使用。优先使用 message type 的 Matcher。
"""
NOTICE = Permission(Notice())
"""
匹配任意 `notice` 类型事件,仅在需要同时捕获不同类型事件时使用。优先使用 notice type 的 Matcher。
"""
REQUEST = Permission(Request())
"""
匹配任意 `request` 类型事件,仅在需要同时捕获不同类型事件时使用。优先使用 request type 的 Matcher。
"""
METAEVENT = Permission(MetaEvent())
"""
匹配任意 `meta_event` 类型事件,仅在需要同时捕获不同类型事件时使用。优先使用 meta_event type 的 Matcher。
"""
class User:
def __init__(
self, users: Tuple[str, ...], perm: Optional[Permission] = None
) -> None:
self.users = users
self.perm = perm
async def __call__(self, bot: Bot, event: Event) -> bool:
return bool(
event.get_session_id() in self.users
and (self.perm is None or await self.perm(bot, event))
)
def USER(*users: str, perm: Optional[Permission] = None):
"""
`event` 的 `session_id` 在白名单内且满足 perm
参数:
*user: 白名单
perm: 需要同时满足的权限
"""
return Permission(User(users, perm))
class SuperUser:
async def __call__(self, bot: Bot, event: Event) -> bool:
return event.get_type() == "message" and (
f"{bot.adapter.get_name().split(maxsplit=1)[0].lower()}:{event.get_user_id()}"
in bot.config.superusers
or event.get_user_id() in bot.config.superusers # 兼容旧配置
)
SUPERUSER = Permission(SuperUser())
"""
匹配任意超级用户消息类型事件
"""