nonebot2/nonebot/permission.py

223 lines
5.7 KiB
Python
Raw Normal View History

r"""
2020-09-14 20:48:03 +08:00
权限
====
每个 ``Matcher`` 拥有一个 ``Permission`` 其中是 ``PermissionChecker`` 的集合只要有一个 ``PermissionChecker`` 检查结果为 ``True`` 时就会继续运行
2020-09-14 20:48:03 +08:00
\:\:\:tip 提示
``PermissionChecker`` 既可以是 async function 也可以是 sync function
\:\:\:
"""
2020-08-17 16:09:41 +08:00
import asyncio
from contextlib import AsyncExitStack
from typing import (
Any,
Set,
Dict,
Tuple,
Union,
Callable,
NoReturn,
Optional,
2021-12-06 22:19:05 +08:00
Coroutine,
)
2020-08-17 16:09:41 +08:00
from nonebot import params
2021-11-14 18:51:23 +08:00
from nonebot.adapters import Bot, Event
from nonebot.dependencies import Dependent
from nonebot.exception import SkippedException
2021-12-16 23:22:25 +08:00
from nonebot.typing import T_Handler, T_DependencyCache, T_PermissionChecker
2020-12-06 02:30:19 +08:00
2020-08-17 16:09:41 +08:00
2021-12-06 22:19:05 +08:00
async def _run_coro_with_catch(coro: Coroutine[Any, Any, Any]):
try:
return await coro
except SkippedException:
return False
2020-08-17 16:09:41 +08:00
class Permission:
2021-05-30 11:07:27 +08:00
"""
:说明:
``Matcher`` 规则类当事件传递时 ``Matcher`` 运行前进行检查
:示例:
.. code-block:: python
Permission(async_function) | sync_function
# 等价于
from nonebot.utils import run_sync
Permission(async_function, run_sync(sync_function))
"""
2020-08-17 16:09:41 +08:00
__slots__ = ("checkers",)
HANDLER_PARAM_TYPES = [
params.DependParam,
params.BotParam,
params.EventParam,
params.DefaultParam,
]
def __init__(self, *checkers: Union[T_PermissionChecker, Dependent[bool]]) -> None:
2020-09-14 20:48:03 +08:00
"""
:参数:
2020-11-30 11:08:00 +08:00
* ``*checkers: Union[T_PermissionChecker, Dependent[bool]``: PermissionChecker
2020-09-14 20:48:03 +08:00
"""
2021-11-21 15:46:48 +08:00
self.checkers: Set[Dependent[bool]] = set(
checker
if isinstance(checker, Dependent)
else Dependent[bool].parse(
call=checker, allow_types=self.HANDLER_PARAM_TYPES
)
for checker in checkers
)
2020-09-14 20:48:03 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-14 20:48:03 +08:00
存储 ``PermissionChecker``
2020-11-30 11:08:00 +08:00
2020-09-14 20:48:03 +08:00
:类型:
2020-11-30 11:08:00 +08:00
* ``Set[Dependent[bool]]``
2020-09-14 20:48:03 +08:00
"""
2020-08-17 16:09:41 +08:00
async def __call__(
self,
bot: Bot,
event: Event,
stack: Optional[AsyncExitStack] = None,
2021-12-16 23:22:25 +08:00
dependency_cache: Optional[T_DependencyCache] = None,
) -> bool:
2020-09-14 20:48:03 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
2020-09-14 20:48:03 +08:00
检查是否满足某个权限
2020-11-30 11:08:00 +08:00
2020-09-14 20:48:03 +08:00
:参数:
2020-11-30 11:08:00 +08:00
2020-09-14 20:48:03 +08:00
* ``bot: Bot``: Bot 对象
* ``event: Event``: Event 对象
* ``stack: Optional[AsyncExitStack]``: 异步上下文栈
* ``dependency_cache: Optional[CacheDict[T_Handler, Any]]``: 依赖缓存
2020-11-30 11:08:00 +08:00
2020-09-14 20:48:03 +08:00
:返回:
2020-11-30 11:08:00 +08:00
2020-09-14 20:48:03 +08:00
- ``bool``
"""
2020-08-17 16:09:41 +08:00
if not self.checkers:
return True
results = await asyncio.gather(
*(
2021-12-06 22:19:05 +08:00
_run_coro_with_catch(
checker(
bot=bot,
event=event,
stack=stack,
dependency_cache=dependency_cache,
2021-12-06 22:19:05 +08:00
)
)
for checker in self.checkers
),
)
2021-12-06 22:19:05 +08:00
return any(results)
2020-08-17 16:09:41 +08:00
def __and__(self, other) -> NoReturn:
raise RuntimeError("And operation between Permissions is not allowed.")
2020-09-27 18:05:13 +08:00
def __or__(
self, other: Optional[Union["Permission", T_PermissionChecker]]
) -> "Permission":
2020-09-27 18:05:13 +08:00
if other is None:
return self
elif isinstance(other, Permission):
return Permission(*self.checkers, *other.checkers)
2020-08-17 16:09:41 +08:00
else:
return Permission(*self.checkers, other)
2020-08-17 16:09:41 +08:00
class Message:
async def __call__(self, event: Event) -> bool:
return event.get_type() == "message"
2020-08-17 16:09:41 +08:00
class Notice:
async def __call__(self, event: Event) -> bool:
return event.get_type() == "notice"
2020-08-17 16:09:41 +08:00
class Request:
async def __call__(self, event: Event) -> bool:
return event.get_type() == "request"
2020-08-17 16:09:41 +08:00
class MetaEvent:
async def __call__(self, event: Event) -> bool:
return event.get_type() == "meta_event"
2020-08-17 16:09:41 +08:00
MESSAGE = Permission(Message())
2020-09-14 20:48:03 +08:00
"""
- **说明**: 匹配任意 ``message`` 类型事件仅在需要同时捕获不同类型事件时使用优先使用 message type Matcher
"""
NOTICE = Permission(Notice())
2020-09-14 20:48:03 +08:00
"""
- **说明**: 匹配任意 ``notice`` 类型事件仅在需要同时捕获不同类型事件时使用优先使用 notice type Matcher
"""
REQUEST = Permission(Request())
2020-09-14 20:48:03 +08:00
"""
- **说明**: 匹配任意 ``request`` 类型事件仅在需要同时捕获不同类型事件时使用优先使用 request type Matcher
"""
METAEVENT = Permission(MetaEvent())
2020-09-14 20:48:03 +08:00
"""
- **说明**: 匹配任意 ``meta_event`` 类型事件仅在需要同时捕获不同类型事件时使用优先使用 meta_event type Matcher
"""
2020-08-17 16:09:41 +08:00
class User:
def __init__(
self, users: Tuple[str, ...], perm: Optional[Permission] = None
) -> None:
self.users = users
self.perm = perm
async def __call__(self, bot: Bot, event: Event) -> bool:
return bool(
event.get_session_id() in self.users
and (self.perm is None or await self.perm(bot, event))
)
def USER(*users: str, perm: Optional[Permission] = None):
2020-09-14 20:48:03 +08:00
"""
:说明:
2020-11-30 11:08:00 +08:00
``event`` ``session_id`` 在白名单内且满足 perm
2020-11-30 11:08:00 +08:00
2020-09-14 20:48:03 +08:00
:参数:
2020-11-30 11:08:00 +08:00
2020-12-09 19:57:49 +08:00
* ``*user: str``: 白名单
* ``perm: Optional[Permission]``: 需要同时满足的权限
2020-09-14 20:48:03 +08:00
"""
2020-08-17 16:09:41 +08:00
return Permission(User(users, perm))
2020-08-17 16:09:41 +08:00
class SuperUser:
async def __call__(self, bot: Bot, event: Event) -> bool:
return (
event.get_type() == "message"
and event.get_user_id() in bot.config.superusers
)
2020-12-10 02:13:25 +08:00
SUPERUSER = Permission(SuperUser())
2020-12-10 02:13:25 +08:00
"""
- **说明**: 匹配任意超级用户消息类型事件
"""