nonebot2/none/command.py

299 lines
9.9 KiB
Python
Raw Normal View History

2018-06-15 06:58:24 +08:00
import re
2018-06-25 16:50:34 +08:00
import asyncio
from datetime import datetime
2018-06-25 21:14:27 +08:00
from typing import (
Tuple, Union, Callable, Iterable, Dict, Any, Optional, List, Sequence
)
2018-06-15 06:58:24 +08:00
2018-06-15 15:00:58 +08:00
from aiocqhttp import CQHttp, Error as CQHttpError
from aiocqhttp.message import Message
2018-06-15 06:58:24 +08:00
2018-06-15 10:40:53 +08:00
from . import permissions as perm
2018-06-25 15:22:59 +08:00
from .helpers import context_source
2018-06-25 21:14:27 +08:00
from .expression import render
2018-06-15 06:58:24 +08:00
2018-06-15 10:40:53 +08:00
# Key: str (one segment of command name)
# Value: subtree or a leaf Command object
_registry = {}
2018-06-15 06:58:24 +08:00
# Key: str
# Value: tuple that identifies a command
2018-06-15 10:40:53 +08:00
_aliases = {}
2018-06-15 06:58:24 +08:00
# Key: context source
2018-06-25 16:50:34 +08:00
# Value: Session object
_sessions = {}
2018-06-25 12:41:12 +08:00
2018-06-15 06:58:24 +08:00
class Command:
2018-06-25 16:50:34 +08:00
__slots__ = ('name', 'func', 'permission', 'args_parser_func')
2018-06-15 06:58:24 +08:00
2018-06-25 16:50:34 +08:00
def __init__(self, name: Tuple[str],
func: Callable,
permission: int):
2018-06-15 06:58:24 +08:00
self.name = name
2018-06-15 10:40:53 +08:00
self.func = func
self.permission = permission
2018-06-25 16:50:34 +08:00
self.args_parser_func = None
2018-06-15 06:58:24 +08:00
2018-06-25 16:50:34 +08:00
async def run(self, session, *, permission: int = None) -> bool:
2018-06-25 12:41:12 +08:00
if permission is None:
2018-06-25 16:50:34 +08:00
permission = await calculate_permission(session.bot, session.ctx)
if self.func and permission & self.permission:
if self.args_parser_func:
await self.args_parser_func(session)
await self.func(session)
2018-06-25 10:41:48 +08:00
return True
return False
2018-06-15 10:40:53 +08:00
2018-06-25 12:41:12 +08:00
async def calculate_permission(bot: CQHttp, ctx: Dict[str, Any]) -> int:
permission = 0
if ctx['user_id'] in bot.config.SUPERUSERS:
permission |= perm.IS_SUPERUSER
if ctx['message_type'] == 'private':
if ctx['sub_type'] == 'friend':
permission |= perm.IS_PRIVATE_FRIEND
elif ctx['sub_type'] == 'group':
permission |= perm.IS_PRIVATE_GROUP
elif ctx['sub_type'] == 'discuss':
permission |= perm.IS_PRIVATE_DISCUSS
elif ctx['sub_type'] == 'other':
permission |= perm.IS_PRIVATE_OTHER
elif ctx['message_type'] == 'group':
permission |= perm.IS_GROUP_MEMBER
if not ctx['anonymous']:
try:
member_info = await bot.get_group_member_info(**ctx)
if member_info:
if member_info['role'] == 'owner':
permission |= perm.IS_GROUP_OWNER
elif member_info['role'] == 'admin':
permission |= perm.IS_GROUP_ADMIN
except CQHttpError:
pass
elif ctx['message_type'] == 'discuss':
permission |= perm.IS_DISCUSS
return permission
2018-06-25 17:28:10 +08:00
def on_command(name: Union[str, Tuple[str]], *,
aliases: Iterable = (),
2018-06-25 15:22:59 +08:00
permission: int = perm.EVERYONE) -> Callable:
def deco(func: Callable) -> Callable:
if not isinstance(name, (str, tuple)):
raise TypeError('the name of a command must be a str or tuple')
if not name:
raise ValueError('the name of a command must not be empty')
cmd_name = name if isinstance(name, tuple) else (name,)
current_parent = _registry
for parent_key in cmd_name[:-1]:
2018-06-25 17:28:10 +08:00
current_parent[parent_key] = current_parent.get(parent_key) or {}
2018-06-25 15:22:59 +08:00
current_parent = current_parent[parent_key]
cmd = Command(name=cmd_name, func=func, permission=permission)
current_parent[cmd_name[-1]] = cmd
for alias in aliases:
_aliases[alias] = cmd_name
def args_parser(parser_func: Callable):
2018-06-25 16:50:34 +08:00
cmd.args_parser_func = parser_func
2018-06-25 15:22:59 +08:00
return parser_func
func.args_parser = args_parser
return func
return deco
2018-06-25 22:49:15 +08:00
class CommandGroup:
__slots__ = ('basename', 'permission')
def __init__(self, name: Union[str, Tuple[str]], permission: int = None):
self.basename = name if isinstance(name, tuple) else (name,)
self.permission = permission
def command(self, name: Union[str, Tuple[str]], *,
aliases: Iterable = None, permission: int = None) -> Callable:
name = self.basename + (name if isinstance(name, tuple) else (name,))
kwargs = {}
if aliases is not None:
kwargs['aliases'] = aliases
if permission is not None:
kwargs['permission'] = permission
return on_command(name, **kwargs)
2018-06-25 12:41:12 +08:00
def _find_command(name: Union[str, Tuple[str]]) -> Optional[Command]:
cmd_name = name if isinstance(name, tuple) else (name,)
if not cmd_name:
2018-06-15 10:40:53 +08:00
return None
cmd_tree = _registry
2018-06-25 12:41:12 +08:00
for part in cmd_name[:-1]:
2018-06-15 10:40:53 +08:00
if part not in cmd_tree:
2018-06-23 22:45:43 +08:00
return None
2018-06-15 10:40:53 +08:00
cmd_tree = cmd_tree[part]
2018-06-25 12:41:12 +08:00
return cmd_tree.get(cmd_name[-1])
2018-06-15 10:40:53 +08:00
2018-06-25 15:22:59 +08:00
class FurtherInteractionNeeded(Exception):
"""
Raised by session.require_arg() indicating
that the command should enter interactive mode
to ask the user for some arguments.
"""
pass
2018-06-15 10:40:53 +08:00
class Session:
2018-06-25 16:50:34 +08:00
__slots__ = ('bot', 'cmd', 'ctx',
'current_key', 'current_arg', 'current_arg_text',
2018-06-24 23:00:37 +08:00
'images', 'args', 'last_interaction')
2018-06-15 10:40:53 +08:00
2018-06-25 16:50:34 +08:00
def __init__(self, bot: CQHttp, cmd: Command, ctx: Dict[str, Any], *,
2018-06-25 12:41:12 +08:00
current_arg: str = '', args: Dict[str, Any] = None):
2018-06-25 16:50:34 +08:00
self.bot = bot
2018-06-15 10:40:53 +08:00
self.cmd = cmd
2018-06-24 23:00:37 +08:00
self.ctx = ctx
self.current_key = None
self.current_arg = current_arg
self.current_arg_text = Message(current_arg).extract_plain_text()
2018-06-25 12:41:12 +08:00
self.images = [s.data['url'] for s in ctx['message']
if s.type == 'image' and 'url' in s.data]
self.args = args or {}
2018-06-15 10:40:53 +08:00
self.last_interaction = None
2018-06-15 06:58:24 +08:00
2018-06-25 15:22:59 +08:00
def refresh(self, ctx: Dict[str, Any], *, current_arg: str = ''):
self.ctx = ctx
self.current_arg = current_arg
self.current_arg_text = Message(current_arg).extract_plain_text()
self.images = [s.data['url'] for s in ctx['message']
if s.type == 'image' and 'url' in s.data]
2018-06-24 23:00:37 +08:00
2018-06-25 15:22:59 +08:00
@property
def is_valid(self):
2018-06-25 16:50:34 +08:00
if self.last_interaction and \
datetime.now() - self.last_interaction > \
self.bot.config.SESSION_EXPIRE_TIMEOUT:
return False
2018-06-25 15:22:59 +08:00
return True
2018-06-15 06:58:24 +08:00
2018-06-25 21:14:27 +08:00
def require_arg(self, key: str, prompt: str = None, *,
prompt_expr: Union[str, Sequence[str], Callable] = None,
2018-06-25 15:22:59 +08:00
interactive: bool = True) -> Any:
"""
Get an argument with a given key.
If "interactive" is True, and the argument does not exist
in the current session, a FurtherInteractionNeeded exception
will be raised, and the caller of the command will know
it should keep the session for further interaction with the user.
If "interactive" is False, missed key will cause a result of None.
:param key: argument key
2018-06-25 16:50:34 +08:00
:param prompt: prompt to ask the user
2018-06-25 21:14:27 +08:00
:param prompt_expr: prompt expression to ask the user
2018-06-25 15:22:59 +08:00
:param interactive: should enter interactive mode while key missing
:return: the argument value
:raise FurtherInteractionNeeded: further interaction is needed
"""
value = self.args.get(key)
if value is not None or not interactive:
return value
self.current_key = key
2018-06-25 16:50:34 +08:00
# ask the user for more information
2018-06-25 21:14:27 +08:00
if prompt_expr is not None:
prompt = render(prompt_expr, key=key)
2018-06-25 16:50:34 +08:00
asyncio.ensure_future(self.send(prompt))
2018-06-25 15:22:59 +08:00
raise FurtherInteractionNeeded
2018-06-25 16:50:34 +08:00
async def send(self,
message: Union[str, Dict[str, Any], List[Dict[str, Any]]],
*, ignore_failure: bool = True) -> None:
try:
await self.bot.send(self.ctx, message)
except CQHttpError:
if not ignore_failure:
raise
2018-06-25 21:14:27 +08:00
async def send_expr(self,
expr: Union[str, Sequence[str], Callable],
**kwargs):
return await self.send(render(expr, **kwargs))
2018-06-25 15:22:59 +08:00
def _new_command_session(bot: CQHttp,
ctx: Dict[str, Any]) -> Optional[Session]:
2018-06-15 15:00:58 +08:00
msg_text = str(ctx['message']).lstrip()
2018-06-15 06:58:24 +08:00
for start in bot.config.COMMAND_START:
if isinstance(start, type(re.compile(''))):
m = start.search(msg_text)
if m:
full_command = msg_text[len(m.group(0)):].lstrip()
break
elif isinstance(start, str):
if msg_text.startswith(start):
full_command = msg_text[len(start):].lstrip()
break
else:
# it's not a command
2018-06-25 15:22:59 +08:00
return None
2018-06-15 06:58:24 +08:00
if not full_command:
# command is empty
2018-06-25 15:22:59 +08:00
return None
2018-06-15 06:58:24 +08:00
cmd_name_text, *cmd_remained = full_command.split(maxsplit=1)
2018-06-15 10:40:53 +08:00
cmd_name = _aliases.get(cmd_name_text)
2018-06-15 06:58:24 +08:00
if not cmd_name:
for sep in bot.config.COMMAND_SEP:
if isinstance(sep, type(re.compile(''))):
cmd_name = tuple(sep.split(cmd_name_text))
break
elif isinstance(sep, str):
cmd_name = tuple(cmd_name_text.split(sep))
break
else:
cmd_name = (cmd_name_text,)
2018-06-15 10:40:53 +08:00
cmd = _find_command(cmd_name)
if not cmd:
2018-06-25 15:22:59 +08:00
return None
2018-06-24 23:00:37 +08:00
2018-06-25 16:50:34 +08:00
return Session(bot, cmd, ctx, current_arg=''.join(cmd_remained))
2018-06-24 23:00:37 +08:00
2018-06-15 06:58:24 +08:00
2018-06-25 15:22:59 +08:00
async def handle_command(bot: CQHttp, ctx: Dict[str, Any]) -> bool:
src = context_source(ctx)
2018-06-25 16:50:34 +08:00
session = None
if _sessions.get(src):
session = _sessions[src]
if session and session.is_valid:
session.refresh(ctx, current_arg=str(ctx['message']))
else:
# the session is expired, remove it
del _sessions[src]
session = None
if not session:
2018-06-25 15:22:59 +08:00
session = _new_command_session(bot, ctx)
if not session:
return False
2018-06-25 16:50:34 +08:00
_sessions[src] = session
2018-06-25 15:22:59 +08:00
try:
2018-06-25 16:50:34 +08:00
res = await session.cmd.run(session)
# the command is finished, remove the session
del _sessions[src]
2018-06-25 15:22:59 +08:00
return res
except FurtherInteractionNeeded:
2018-06-25 16:50:34 +08:00
session.last_interaction = datetime.now()
2018-06-25 15:22:59 +08:00
# return True because this step of the session is successful
return True