mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-02-25 11:59:53 +08:00
Update
This commit is contained in:
parent
f5e6641d3f
commit
86115e9e04
@ -9,9 +9,9 @@ from aiocqhttp import CQHttp, Error as CQHttpError
|
|||||||
from aiocqhttp.message import Message
|
from aiocqhttp.message import Message
|
||||||
|
|
||||||
from . import permissions as perm
|
from . import permissions as perm
|
||||||
from .helpers import context_source
|
from ..helpers import context_source
|
||||||
from .expression import render
|
from ..expression import render
|
||||||
from .session import BaseSession
|
from ..session import BaseSession
|
||||||
|
|
||||||
# Key: str (one segment of command name)
|
# Key: str (one segment of command name)
|
||||||
# Value: subtree or a leaf Command object
|
# Value: subtree or a leaf Command object
|
||||||
@ -35,7 +35,14 @@ class Command:
|
|||||||
self.permission = permission
|
self.permission = permission
|
||||||
self.args_parser_func = None
|
self.args_parser_func = None
|
||||||
|
|
||||||
async def run(self, session, *, permission: int = None) -> bool:
|
async def run(self, session, *, permission: Optional[int] = None) -> bool:
|
||||||
|
"""
|
||||||
|
Run the command in a given session.
|
||||||
|
|
||||||
|
:param session: CommandSession object
|
||||||
|
:param permission: the permission the caller owns
|
||||||
|
:return: the command is finished
|
||||||
|
"""
|
||||||
if permission is None:
|
if permission is None:
|
||||||
permission = await _calculate_permission(session.bot, session.ctx)
|
permission = await _calculate_permission(session.bot, session.ctx)
|
||||||
if self.func and permission & self.permission:
|
if self.func and permission & self.permission:
|
||||||
@ -47,6 +54,18 @@ class Command:
|
|||||||
|
|
||||||
|
|
||||||
async def _calculate_permission(bot: CQHttp, ctx: Dict[str, Any]) -> int:
|
async def _calculate_permission(bot: CQHttp, ctx: Dict[str, Any]) -> int:
|
||||||
|
"""
|
||||||
|
Calculate the permission OWNED by the current context.
|
||||||
|
|
||||||
|
This is different from the permission REQUIRED by a command.
|
||||||
|
The result of this function should be made a bit-and with
|
||||||
|
the permission required by some command to check whether
|
||||||
|
the context is allowed to call the command.
|
||||||
|
|
||||||
|
:param bot: CQHttp instance
|
||||||
|
:param ctx: message context
|
||||||
|
:return: the calculated permission value
|
||||||
|
"""
|
||||||
permission = 0
|
permission = 0
|
||||||
if ctx['user_id'] in bot.config.SUPERUSERS:
|
if ctx['user_id'] in bot.config.SUPERUSERS:
|
||||||
permission |= perm.IS_SUPERUSER
|
permission |= perm.IS_SUPERUSER
|
||||||
@ -78,7 +97,7 @@ async def _calculate_permission(bot: CQHttp, ctx: Dict[str, Any]) -> int:
|
|||||||
|
|
||||||
def on_command(name: Union[str, Tuple[str]], *,
|
def on_command(name: Union[str, Tuple[str]], *,
|
||||||
aliases: Iterable = (),
|
aliases: Iterable = (),
|
||||||
permission: int = perm.EVERYONE) -> Callable:
|
permission: int = perm.EVERYBODY) -> Callable:
|
||||||
def deco(func: Callable) -> Callable:
|
def deco(func: Callable) -> Callable:
|
||||||
if not isinstance(name, (str, tuple)):
|
if not isinstance(name, (str, tuple)):
|
||||||
raise TypeError('the name of a command must be a str or tuple')
|
raise TypeError('the name of a command must be a str or tuple')
|
||||||
@ -106,14 +125,24 @@ def on_command(name: Union[str, Tuple[str]], *,
|
|||||||
|
|
||||||
|
|
||||||
class CommandGroup:
|
class CommandGroup:
|
||||||
|
"""
|
||||||
|
Group a set of commands with same name prefix.
|
||||||
|
"""
|
||||||
|
|
||||||
__slots__ = ('basename', 'permission')
|
__slots__ = ('basename', 'permission')
|
||||||
|
|
||||||
def __init__(self, name: Union[str, Tuple[str]], permission: int = None):
|
def __init__(self, name: Union[str, Tuple[str]],
|
||||||
|
permission: Optional[int] = None):
|
||||||
|
"""
|
||||||
|
:param name: name prefix of commands in this group
|
||||||
|
:param permission: default permission
|
||||||
|
"""
|
||||||
self.basename = (name,) if isinstance(name, str) else name
|
self.basename = (name,) if isinstance(name, str) else name
|
||||||
self.permission = permission
|
self.permission = permission
|
||||||
|
|
||||||
def command(self, name: Union[str, Tuple[str]], *,
|
def command(self, name: Union[str, Tuple[str]], *,
|
||||||
aliases: Iterable = None, permission: int = None) -> Callable:
|
aliases: Optional[Iterable] = None,
|
||||||
|
permission: Optional[int] = None) -> Callable:
|
||||||
sub_name = (name,) if isinstance(name, str) else name
|
sub_name = (name,) if isinstance(name, str) else name
|
||||||
name = self.basename + sub_name
|
name = self.basename + sub_name
|
||||||
|
|
||||||
@ -122,12 +151,13 @@ class CommandGroup:
|
|||||||
kwargs['aliases'] = aliases
|
kwargs['aliases'] = aliases
|
||||||
if permission is not None:
|
if permission is not None:
|
||||||
kwargs['permission'] = permission
|
kwargs['permission'] = permission
|
||||||
|
elif self.permission is not None:
|
||||||
|
kwargs['permission'] = self.permission
|
||||||
return on_command(name, **kwargs)
|
return on_command(name, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def _find_command(name: Union[str, Tuple[str]]) -> Optional[Command]:
|
def _find_command(name: Union[str, Tuple[str]]) -> Optional[Command]:
|
||||||
cmd_name = (name,) if isinstance(name, str) else name
|
cmd_name = (name,) if isinstance(name, str) else name
|
||||||
|
|
||||||
if not cmd_name:
|
if not cmd_name:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@ -154,7 +184,7 @@ class CommandSession(BaseSession):
|
|||||||
'images', 'args', 'last_interaction')
|
'images', 'args', 'last_interaction')
|
||||||
|
|
||||||
def __init__(self, bot: CQHttp, ctx: Dict[str, Any], cmd: Command, *,
|
def __init__(self, bot: CQHttp, ctx: Dict[str, Any], cmd: Command, *,
|
||||||
current_arg: str = '', args: Dict[str, Any] = None):
|
current_arg: str = '', args: Optional[Dict[str, Any]] = None):
|
||||||
super().__init__(bot, ctx)
|
super().__init__(bot, ctx)
|
||||||
self.cmd = cmd
|
self.cmd = cmd
|
||||||
self.current_key = None
|
self.current_key = None
|
||||||
@ -180,6 +210,7 @@ class CommandSession(BaseSession):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def is_valid(self) -> bool:
|
def is_valid(self) -> bool:
|
||||||
|
"""Check if the session is expired or not."""
|
||||||
if self.last_interaction and \
|
if self.last_interaction and \
|
||||||
datetime.now() - self.last_interaction > \
|
datetime.now() - self.last_interaction > \
|
||||||
self.bot.config.SESSION_EXPIRE_TIMEOUT:
|
self.bot.config.SESSION_EXPIRE_TIMEOUT:
|
@ -9,7 +9,7 @@ GROUP_ADMIN = 0x0200
|
|||||||
GROUP_OWNER = 0x0400
|
GROUP_OWNER = 0x0400
|
||||||
GROUP = 0x0F00
|
GROUP = 0x0F00
|
||||||
SUPERUSER = 0xF000
|
SUPERUSER = 0xF000
|
||||||
EVERYONE = 0xFFFF
|
EVERYBODY = 0xFFFF
|
||||||
|
|
||||||
IS_NOBODY = 0x0000
|
IS_NOBODY = 0x0000
|
||||||
IS_PRIVATE_FRIEND = PRIVATE_FRIEND
|
IS_PRIVATE_FRIEND = PRIVATE_FRIEND
|
@ -1,6 +1,7 @@
|
|||||||
from aiocqhttp.message import unescape
|
from aiocqhttp.message import unescape
|
||||||
|
|
||||||
from none import on_command, CommandSession, permissions as perm
|
from none import on_command, CommandSession
|
||||||
|
from none.command import permissions as perm
|
||||||
|
|
||||||
|
|
||||||
@on_command('echo')
|
@on_command('echo')
|
||||||
|
@ -2,6 +2,7 @@ import re
|
|||||||
|
|
||||||
from none.default_config import *
|
from none.default_config import *
|
||||||
|
|
||||||
|
HOST = '0.0.0.0'
|
||||||
SECRET = 'abc'
|
SECRET = 'abc'
|
||||||
|
|
||||||
SUPERUSERS = {1002647525}
|
SUPERUSERS = {1002647525}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
from none.command import CommandSession, CommandGroup
|
from none.command import CommandSession, CommandGroup, permissions as perm
|
||||||
|
|
||||||
from . import expressions as expr
|
from . import expressions as expr
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user