nonebot2/nonebot/internal/permission.py

188 lines
5.5 KiB
Python
Raw Normal View History

import asyncio
from typing_extensions import Self
from contextlib import AsyncExitStack
2022-09-08 10:37:16 +08:00
from typing import Set, Tuple, Union, NoReturn, Optional
from nonebot.dependencies import Dependent
from nonebot.utils import run_coro_with_catch
from nonebot.exception import SkippedException
from nonebot.typing import T_DependencyCache, T_PermissionChecker
2022-02-06 17:08:11 +08:00
from .adapter import Bot, Event
from .params import BotParam, EventParam, DependParam, DefaultParam
class Permission:
"""{ref}`nonebot.matcher.Matcher` 权限类。
当事件传递时 {ref}`nonebot.matcher.Matcher` 运行前进行检查
参数:
checkers: PermissionChecker
用法:
```python
Permission(async_function) | sync_function
# 等价于
Permission(async_function, sync_function)
```
"""
__slots__ = ("checkers",)
HANDLER_PARAM_TYPES = [
DependParam,
BotParam,
EventParam,
DefaultParam,
]
def __init__(self, *checkers: Union[T_PermissionChecker, Dependent[bool]]) -> None:
self.checkers: Set[Dependent[bool]] = {
checker
if isinstance(checker, Dependent)
else Dependent[bool].parse(
call=checker, allow_types=self.HANDLER_PARAM_TYPES
)
for checker in checkers
}
"""存储 `PermissionChecker`"""
def __repr__(self) -> str:
return f"Permission({', '.join(repr(checker) for checker in self.checkers)})"
async def __call__(
self,
bot: Bot,
event: Event,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[T_DependencyCache] = None,
) -> bool:
"""检查是否满足某个权限。
参数:
bot: Bot 对象
event: Event 对象
stack: 异步上下文栈
dependency_cache: 依赖缓存
"""
if not self.checkers:
return True
results = await asyncio.gather(
*(
run_coro_with_catch(
checker(
bot=bot,
event=event,
stack=stack,
dependency_cache=dependency_cache,
),
(SkippedException,),
False,
)
for checker in self.checkers
),
)
return any(results)
def __and__(self, other: object) -> NoReturn:
raise RuntimeError("And operation between Permissions is not allowed.")
def __or__(
self, other: Optional[Union["Permission", T_PermissionChecker]]
) -> "Permission":
if other is None:
return self
elif isinstance(other, Permission):
return Permission(*self.checkers, *other.checkers)
else:
return Permission(*self.checkers, other)
def __ror__(
self, other: Optional[Union["Permission", T_PermissionChecker]]
) -> "Permission":
if other is None:
return self
elif isinstance(other, Permission):
return Permission(*other.checkers, *self.checkers)
else:
return Permission(other, *self.checkers)
class User:
"""检查当前事件是否属于指定会话。
参数:
users: 会话 ID 元组
perm: 需同时满足的权限
"""
__slots__ = ("users", "perm")
def __init__(
self, users: Tuple[str, ...], perm: Optional[Permission] = None
) -> None:
self.users = users
self.perm = perm
def __repr__(self) -> str:
return (
f"User(users={self.users}"
+ (f", permission={self.perm})" if self.perm else "")
+ ")"
)
async def __call__(self, bot: Bot, event: Event) -> bool:
try:
session = event.get_session_id()
except Exception:
return False
return bool(
session in self.users and (self.perm is None or await self.perm(bot, event))
)
@classmethod
def _clean_permission(cls, perm: Permission) -> Optional[Permission]:
if len(perm.checkers) == 1 and isinstance(
user_perm := tuple(perm.checkers)[0].call, cls
):
return user_perm.perm
return perm
@classmethod
def from_event(cls, event: Event, perm: Optional[Permission] = None) -> Self:
"""从事件中获取会话 ID。
如果 `perm` 中仅有 `User` 类型的权限检查函数则会去除原有的会话 ID 限制
参数:
event: Event 对象
perm: 需同时满足的权限
"""
return cls((event.get_session_id(),), perm=perm and cls._clean_permission(perm))
@classmethod
def from_permission(cls, *users: str, perm: Optional[Permission] = None) -> Self:
"""指定会话与权限。
如果 `perm` 中仅有 `User` 类型的权限检查函数则会去除原有的会话 ID 限制
参数:
users: 会话白名单
perm: 需同时满足的权限
"""
return cls(users, perm=perm and cls._clean_permission(perm))
def USER(*users: str, perm: Optional[Permission] = None):
"""匹配当前事件属于指定会话。
如果 `perm` 中仅有 `User` 类型的权限检查函数则会去除原有检查函数的会话 ID 限制
参数:
user: 会话白名单
perm: 需要同时满足的权限
"""
return Permission(User.from_permission(*users, perm=perm))