nonebot2/nonebot/command/__init__.py

671 lines
23 KiB
Python
Raw Normal View History

2018-06-25 16:50:34 +08:00
import asyncio
2018-07-02 16:54:29 +08:00
import re
import shlex
2018-06-25 16:50:34 +08:00
from datetime import datetime
2018-06-25 21:14:27 +08:00
from typing import (
2019-01-25 21:55:07 +08:00
Tuple, Union, Callable, Iterable, Any, Optional, List, Dict,
Awaitable
2018-06-25 21:14:27 +08:00
)
2018-06-15 06:58:24 +08:00
2019-01-25 00:14:30 +08:00
from nonebot import NoneBot, permission as perm
2019-01-25 19:11:18 +08:00
from nonebot.command.argfilter import ValidateError
2019-01-25 00:14:30 +08:00
from nonebot.helpers import context_id, send, render_expression
from nonebot.log import logger
from nonebot.message import Message
from nonebot.session import BaseSession
from nonebot.typing import (
2019-01-25 19:11:18 +08:00
Context_T, CommandName_T, CommandArgs_T, Message_T, State_T,
Filter_T
2018-10-16 01:03:50 +08:00
)
2018-06-15 06:58:24 +08:00
2019-01-21 22:01:47 +08:00
# key: one segment of command name
# value: subtree or a leaf Command object
_registry = {} # type: Dict[str, Union[Dict, Command]]
2018-06-15 06:58:24 +08:00
2019-01-21 22:01:47 +08:00
# key: alias
# value: real command name
_aliases = {} # type: Dict[str, CommandName_T]
2018-06-15 06:58:24 +08:00
2019-01-21 22:01:47 +08:00
# key: context id
# value: CommandSession object
_sessions = {} # type: Dict[str, CommandSession]
2018-06-25 12:41:12 +08:00
2019-01-25 00:14:30 +08:00
CommandHandler_T = Callable[['CommandSession'], Any]
2018-06-25 12:41:12 +08:00
2018-06-15 06:58:24 +08:00
class Command:
2019-01-25 16:25:00 +08:00
__slots__ = ('name', 'func',
'permission',
'only_to_me',
'privileged',
'args_parser_func')
2018-06-15 06:58:24 +08:00
2019-01-25 00:14:30 +08:00
def __init__(self, *,
name: CommandName_T,
func: CommandHandler_T,
permission: int,
only_to_me: bool,
privileged: bool):
2018-06-15 06:58:24 +08:00
self.name = name
2018-06-15 10:40:53 +08:00
self.func = func
self.permission = permission
self.only_to_me = only_to_me
self.privileged = privileged
2019-01-25 00:14:30 +08:00
self.args_parser_func: Optional[CommandHandler_T] = None
2018-06-15 06:58:24 +08:00
async def run(self, session, *,
check_perm: bool = True,
dry: bool = False) -> bool:
2018-06-30 09:25:25 +08:00
"""
Run the command in a given session.
:param session: CommandSession object
2018-07-01 11:01:24 +08:00
:param check_perm: should check permission before running
:param dry: just check any prerequisite, without actually running
:return: the command is finished (or can be run, given dry == True)
2018-06-30 09:25:25 +08:00
"""
has_perm = await self._check_perm(session) if check_perm else True
2018-07-01 11:01:24 +08:00
if self.func and has_perm:
if dry:
return True
2019-01-25 00:14:30 +08:00
if session.current_arg_filters is not None and \
session.current_key is not None:
# argument-level filters are given, use them
2019-01-25 21:55:07 +08:00
arg = session.current_arg
for f in session.current_arg_filters:
try:
res = f(arg)
if isinstance(res, Awaitable):
res = await res
arg = res
except ValidateError as e:
# validation failed
failure_message = e.message
if failure_message is None:
config = session.bot.config
failure_message = render_expression(
config.DEFAULT_VALIDATION_FAILURE_EXPRESSION
)
# noinspection PyProtectedMember
session.pause(failure_message,
**session._current_send_kwargs)
# passed all filters
session.state[session.current_key] = arg
2019-01-25 00:14:30 +08:00
else:
# fallback to command-level args_parser_func
if self.args_parser_func:
await self.args_parser_func(session)
2019-01-25 22:30:35 +08:00
if session.current_key is not None and \
session.current_key not in session.state:
# args_parser_func didn't set state, here we set it
2019-01-25 21:55:07 +08:00
session.state[session.current_key] = session.current_arg
2019-01-25 00:14:30 +08:00
2018-06-25 16:50:34 +08:00
await self.func(session)
2018-06-25 10:41:48 +08:00
return True
return False
2018-06-15 10:40:53 +08:00
async def _check_perm(self, session) -> bool:
"""
Check if the session has sufficient permission to
call the command.
:param session: CommandSession object
:return: the session has the permission
"""
return await perm.check_permission(session.bot, session.ctx,
self.permission)
2018-06-15 10:40:53 +08:00
class CommandFunc:
__slots__ = ('cmd', 'func')
2019-01-25 00:14:30 +08:00
def __init__(self, cmd: Command, func: CommandHandler_T):
self.cmd = cmd
self.func = func
2019-01-25 00:14:30 +08:00
def __call__(self, session: 'CommandSession') -> Any:
return self.func(session)
2019-01-25 00:14:30 +08:00
def args_parser(self, parser_func: CommandHandler_T) -> CommandHandler_T:
"""
Decorator to register a function as the arguments parser of
the corresponding command.
"""
self.cmd.args_parser_func = parser_func
return parser_func
2018-10-16 01:03:50 +08:00
def on_command(name: Union[str, CommandName_T], *,
aliases: Iterable[str] = (),
permission: int = perm.EVERYBODY,
only_to_me: bool = True,
privileged: bool = False,
shell_like: bool = False) -> Callable:
2018-07-01 20:01:05 +08:00
"""
Decorator to register a function as a command.
:param name: command name (e.g. 'echo' or ('random', 'number'))
:param aliases: aliases of command name, for convenient access
:param permission: permission required by the command
:param only_to_me: only handle messages to me
:param privileged: can be run even when there is already a session
:param shell_like: use shell-like syntax to split arguments
2018-07-01 20:01:05 +08:00
"""
2019-01-25 00:14:30 +08:00
def deco(func: CommandHandler_T) -> CommandHandler_T:
2018-06-25 15:22:59 +08:00
if not isinstance(name, (str, tuple)):
raise TypeError('the name of a command must be a str or tuple')
if not name:
raise ValueError('the name of a command must not be empty')
2018-07-06 14:24:18 +08:00
cmd_name = (name,) if isinstance(name, str) else name
cmd = Command(name=cmd_name, func=func, permission=permission,
only_to_me=only_to_me, privileged=privileged)
if shell_like:
async def shell_like_args_parser(session):
session.args['argv'] = shlex.split(session.current_arg)
cmd.args_parser_func = shell_like_args_parser
2018-06-25 15:22:59 +08:00
current_parent = _registry
for parent_key in cmd_name[:-1]:
2018-06-25 17:28:10 +08:00
current_parent[parent_key] = current_parent.get(parent_key) or {}
2018-06-25 15:22:59 +08:00
current_parent = current_parent[parent_key]
current_parent[cmd_name[-1]] = cmd
2018-06-25 15:22:59 +08:00
for alias in aliases:
_aliases[alias] = cmd_name
return CommandFunc(cmd, func)
2018-06-25 15:22:59 +08:00
return deco
2018-10-16 01:03:50 +08:00
def _find_command(name: Union[str, CommandName_T]) -> Optional[Command]:
2018-06-26 08:49:08 +08:00
cmd_name = (name,) if isinstance(name, str) else name
2018-06-25 12:41:12 +08:00
if not cmd_name:
2018-06-15 10:40:53 +08:00
return None
cmd_tree = _registry
2018-06-25 12:41:12 +08:00
for part in cmd_name[:-1]:
2018-12-21 23:39:45 +08:00
if part not in cmd_tree or not isinstance(cmd_tree[part], dict):
2018-06-23 22:45:43 +08:00
return None
2018-06-15 10:40:53 +08:00
cmd_tree = cmd_tree[part]
2018-12-26 21:36:23 +08:00
cmd = cmd_tree.get(cmd_name[-1])
return cmd if isinstance(cmd, Command) else None
2018-06-15 10:40:53 +08:00
2019-01-25 00:14:30 +08:00
class _PauseException(Exception):
2018-06-25 15:22:59 +08:00
"""
2019-01-25 00:14:30 +08:00
Raised by session.pause() indicating that the command session
should be paused to ask the user for some arguments.
2018-06-25 15:22:59 +08:00
"""
pass
2018-07-06 19:07:02 +08:00
class _FinishException(Exception):
"""
Raised by session.finish() indicating that the command session
2018-07-24 23:59:45 +08:00
should be stopped and removed.
2018-07-06 19:07:02 +08:00
"""
2018-07-21 22:41:56 +08:00
def __init__(self, result: bool = True):
"""
:param result: succeeded to call the command
"""
self.result = result
2018-07-06 19:07:02 +08:00
2018-07-24 23:59:45 +08:00
class SwitchException(Exception):
"""
Raised by session.switch() indicating that the command session
should be stopped and replaced with a new one (going through
handle_message() again).
Since the new context message will go through handle_message()
again, the later function should be notified. So this exception
is designed to be propagated to handle_message().
"""
def __init__(self, new_ctx_message: Message):
"""
:param new_ctx_message: new message which should be placed in context
"""
self.new_ctx_message = new_ctx_message
2018-06-27 22:05:12 +08:00
class CommandSession(BaseSession):
__slots__ = ('cmd',
'current_key', 'current_arg_filters', '_current_send_kwargs',
'current_arg', '_current_arg_text', '_current_arg_images',
2019-01-25 00:14:30 +08:00
'_state', '_last_interaction', '_running')
2018-06-15 10:40:53 +08:00
2018-10-16 01:03:50 +08:00
def __init__(self, bot: NoneBot, ctx: Context_T, cmd: Command, *,
current_arg: str = '', args: Optional[CommandArgs_T] = None):
2018-06-27 22:05:12 +08:00
super().__init__(bot, ctx)
2018-07-01 20:01:05 +08:00
self.cmd = cmd # Command object
2019-01-25 00:14:30 +08:00
# unique key of the argument that is currently requesting (asking)
self.current_key: Optional[str] = None
# initialize current argument filters
2019-01-25 19:11:18 +08:00
self.current_arg_filters: Optional[List[Filter_T]] = None
self._current_send_kwargs: Dict[str, Any] = {}
2019-01-25 00:14:30 +08:00
# initialize current argument
self.current_arg: str = '' # with potential CQ codes
self._current_arg_text = None
self._current_arg_images = None
2019-01-25 00:14:30 +08:00
self.refresh(ctx, current_arg=current_arg) # fill the above
self._state: State_T = {}
if args:
self._state.update(args)
2018-07-21 23:52:14 +08:00
self._last_interaction = None # last interaction time of this session
self._running = False
2019-01-25 00:14:30 +08:00
@property
def state(self) -> State_T:
"""
State of the session.
This contains all named arguments and
other session scope temporary values.
"""
return self._state
@property
def args(self) -> CommandArgs_T:
"""Deprecated. Use `session.state` instead."""
return self.state
2018-07-21 23:52:14 +08:00
@property
2018-10-16 01:03:50 +08:00
def running(self) -> bool:
2018-07-21 23:52:14 +08:00
return self._running
@running.setter
2018-10-16 01:03:50 +08:00
def running(self, value) -> None:
2018-07-21 23:52:14 +08:00
if self._running is True and value is False:
# change status from running to not running, record the time
self._last_interaction = datetime.now()
self._running = value
2018-06-15 06:58:24 +08:00
@property
def is_valid(self) -> bool:
"""Check if the session is expired or not."""
if self.bot.config.SESSION_EXPIRE_TIMEOUT and \
self._last_interaction and \
datetime.now() - self._last_interaction > \
self.bot.config.SESSION_EXPIRE_TIMEOUT:
return False
return True
2018-07-23 23:55:23 +08:00
@property
2018-10-16 01:03:50 +08:00
def is_first_run(self) -> bool:
2018-07-23 23:55:23 +08:00
return self._last_interaction is None
@property
def current_arg_text(self) -> str:
"""
Plain text part in the current argument, without any CQ codes.
"""
if self._current_arg_text is None:
self._current_arg_text = Message(
self.current_arg).extract_plain_text()
return self._current_arg_text
@property
def current_arg_images(self) -> List[str]:
"""
Images (as list of urls) in the current argument.
"""
if self._current_arg_images is None:
self._current_arg_images = [
s.data['url'] for s in Message(self.current_arg)
if s.type == 'image' and 'url' in s.data
]
return self._current_arg_images
2019-01-25 18:51:42 +08:00
@property
def argv(self) -> List[str]:
"""
Shell-like argument list, similar to sys.argv.
Only available while shell_like is True in on_command decorator.
"""
return self.state.get('argv', [])
2018-10-16 01:03:50 +08:00
def refresh(self, ctx: Context_T, *, current_arg: str = '') -> None:
2018-06-26 08:49:08 +08:00
"""
Refill the session with a new message context.
:param ctx: new message context
:param current_arg: new command argument as a string
"""
2018-06-25 15:22:59 +08:00
self.ctx = ctx
self.current_arg = current_arg
self._current_arg_text = None
self._current_arg_images = None
2018-06-24 23:00:37 +08:00
2019-01-25 00:14:30 +08:00
def get(self, key: str, *,
prompt: Optional[Message_T] = None,
2019-01-25 19:11:18 +08:00
arg_filters: Optional[List[Filter_T]] = None,
2019-01-25 00:14:30 +08:00
**kwargs) -> Any:
2018-06-25 15:22:59 +08:00
"""
Get an argument with a given key.
2018-07-01 17:51:01 +08:00
If the argument does not exist in the current session,
2019-01-25 00:14:30 +08:00
a pause exception will be raised, and the caller of
the command will know it should keep the session for
further interaction with the user.
2018-06-25 15:22:59 +08:00
:param key: argument key
2018-06-25 16:50:34 +08:00
:param prompt: prompt to ask the user
:param arg_filters: argument filters for the next user input
2018-06-25 15:22:59 +08:00
:return: the argument value
"""
2019-01-25 00:14:30 +08:00
if key in self.state:
return self.state[key]
2018-06-25 15:22:59 +08:00
self.current_key = key
2019-01-25 00:14:30 +08:00
self.current_arg_filters = arg_filters
self._current_send_kwargs = kwargs
2018-12-23 20:20:40 +08:00
self.pause(prompt, **kwargs)
2018-06-25 15:22:59 +08:00
2019-01-25 00:14:30 +08:00
def get_optional(self, key: str,
2018-07-01 17:51:01 +08:00
default: Optional[Any] = None) -> Optional[Any]:
2019-01-25 00:14:30 +08:00
"""
Simply get a argument with given key.
Deprecated. Use `session.state.get()` instead.
"""
return self.state.get(key, default)
2018-07-01 17:51:01 +08:00
2018-12-23 20:20:40 +08:00
def pause(self, message: Optional[Message_T] = None, **kwargs) -> None:
2018-07-06 19:07:02 +08:00
"""Pause the session for further interaction."""
if message:
2018-12-23 20:20:40 +08:00
asyncio.ensure_future(self.send(message, **kwargs))
2019-01-25 00:14:30 +08:00
raise _PauseException
2018-07-06 19:07:02 +08:00
2018-12-23 20:20:40 +08:00
def finish(self, message: Optional[Message_T] = None, **kwargs) -> None:
2018-07-06 19:07:02 +08:00
"""Finish the session."""
if message:
2018-12-23 20:20:40 +08:00
asyncio.ensure_future(self.send(message, **kwargs))
2018-07-06 19:07:02 +08:00
raise _FinishException
2018-10-16 01:03:50 +08:00
def switch(self, new_ctx_message: Message_T) -> None:
2018-07-24 23:59:45 +08:00
"""
Finish the session and switch to a new (fake) message context.
The user may send another command (or another intention as natural
language) when interacting with the current session. In this case,
the session may not understand what the user is saying, so it
should call this method and pass in that message, then NoneBot will
handle the situation properly.
"""
if self.is_first_run:
# if calling this method during first run,
# we think the command is not handled
raise _FinishException(result=False)
if not isinstance(new_ctx_message, Message):
new_ctx_message = Message(new_ctx_message)
raise SwitchException(new_ctx_message)
2018-06-25 15:22:59 +08:00
def parse_command(bot: NoneBot,
cmd_string: str) -> Tuple[Optional[Command], Optional[str]]:
2018-06-26 08:49:08 +08:00
"""
Parse a command string (typically from a message).
2018-06-26 08:49:08 +08:00
2018-07-04 09:28:31 +08:00
:param bot: NoneBot instance
:param cmd_string: command string
:return: (Command object, current arg string)
2018-06-26 08:49:08 +08:00
"""
2018-07-21 00:46:34 +08:00
logger.debug(f'Parsing command: {cmd_string}')
2018-07-03 17:32:12 +08:00
matched_start = None
2018-06-15 06:58:24 +08:00
for start in bot.config.COMMAND_START:
2018-07-03 23:13:34 +08:00
# loop through COMMAND_START to find the longest matched start
curr_matched_start = None
2018-06-15 06:58:24 +08:00
if isinstance(start, type(re.compile(''))):
m = start.search(cmd_string)
2018-07-03 17:32:12 +08:00
if m and m.start(0) == 0:
2018-07-03 23:13:34 +08:00
curr_matched_start = m.group(0)
2018-06-15 06:58:24 +08:00
elif isinstance(start, str):
if cmd_string.startswith(start):
2018-07-03 23:13:34 +08:00
curr_matched_start = start
if curr_matched_start is not None and \
(matched_start is None or
len(curr_matched_start) > len(matched_start)):
# a longer start, use it
matched_start = curr_matched_start
2018-07-03 17:32:12 +08:00
if matched_start is None:
2018-06-15 06:58:24 +08:00
# it's not a command
2018-07-21 00:46:34 +08:00
logger.debug('It\'s not a command')
return None, None
2018-06-15 06:58:24 +08:00
2018-07-21 00:46:34 +08:00
logger.debug(f'Matched command start: '
2018-12-25 20:40:36 +08:00
f'{matched_start}{"(empty)" if not matched_start else ""}')
full_command = cmd_string[len(matched_start):].lstrip()
2018-07-03 17:32:12 +08:00
2018-06-15 06:58:24 +08:00
if not full_command:
# command is empty
return None, None
2018-06-15 06:58:24 +08:00
cmd_name_text, *cmd_remained = full_command.split(maxsplit=1)
2018-06-15 10:40:53 +08:00
cmd_name = _aliases.get(cmd_name_text)
2018-06-15 06:58:24 +08:00
if not cmd_name:
for sep in bot.config.COMMAND_SEP:
2018-07-03 23:13:34 +08:00
# loop through COMMAND_SEP to find the most optimized split
curr_cmd_name = None
2018-06-15 06:58:24 +08:00
if isinstance(sep, type(re.compile(''))):
2018-07-03 23:13:34 +08:00
curr_cmd_name = tuple(sep.split(cmd_name_text))
2018-06-15 06:58:24 +08:00
elif isinstance(sep, str):
2018-07-03 23:13:34 +08:00
curr_cmd_name = tuple(cmd_name_text.split(sep))
if curr_cmd_name is not None and \
(not cmd_name or len(curr_cmd_name) > len(cmd_name)):
# a more optimized split, use it
cmd_name = curr_cmd_name
if not cmd_name:
2018-06-15 06:58:24 +08:00
cmd_name = (cmd_name_text,)
2018-07-21 00:46:34 +08:00
logger.debug(f'Split command name: {cmd_name}')
2018-06-15 10:40:53 +08:00
cmd = _find_command(cmd_name)
if not cmd:
2018-07-21 00:46:34 +08:00
logger.debug(f'Command {cmd_name} not found')
return None, None
2018-06-24 23:00:37 +08:00
2018-07-21 00:46:34 +08:00
logger.debug(f'Command {cmd.name} found, function: {cmd.func}')
return cmd, ''.join(cmd_remained)
2018-06-24 23:00:37 +08:00
2018-06-15 06:58:24 +08:00
2018-10-16 01:03:50 +08:00
async def handle_command(bot: NoneBot, ctx: Context_T) -> bool:
2018-06-26 08:49:08 +08:00
"""
Handle a message as a command.
This function is typically called by "handle_message".
2018-07-04 09:28:31 +08:00
:param bot: NoneBot instance
2018-06-26 08:49:08 +08:00
:param ctx: message context
:return: the message is handled as a command
"""
cmd, current_arg = parse_command(bot, str(ctx['message']).lstrip())
is_privileged_cmd = cmd and cmd.privileged
if is_privileged_cmd and cmd.only_to_me and not ctx['to_me']:
is_privileged_cmd = False
disable_interaction = is_privileged_cmd
if is_privileged_cmd:
logger.debug(f'Command {cmd.name} is a privileged command')
2018-07-01 20:01:05 +08:00
ctx_id = context_id(ctx)
if not is_privileged_cmd:
# wait for 1.5 seconds (at most) if the current session is running
retry = 5
while retry > 0 and \
_sessions.get(ctx_id) and _sessions[ctx_id].running:
retry -= 1
await asyncio.sleep(0.3)
2018-08-21 22:31:22 +08:00
check_perm = True
session = _sessions.get(ctx_id) if not is_privileged_cmd else None
2018-08-21 22:31:22 +08:00
if session:
2018-07-21 23:52:14 +08:00
if session.running:
logger.warning(f'There is a session of command '
f'{session.cmd.name} running, notify the user')
2018-12-22 12:44:10 +08:00
asyncio.ensure_future(send(
bot, ctx,
render_expression(bot.config.SESSION_RUNNING_EXPRESSION)
))
2018-07-21 23:52:14 +08:00
# pretend we are successful, so that NLP won't handle it
return True
if session.is_valid:
2018-07-21 00:46:34 +08:00
logger.debug(f'Session of command {session.cmd.name} exists')
2019-01-21 10:38:48 +08:00
# since it's in a session, the user must be talking to me
ctx['to_me'] = True
2018-06-25 16:50:34 +08:00
session.refresh(ctx, current_arg=str(ctx['message']))
2018-07-01 11:01:24 +08:00
# there is no need to check permission for existing session
check_perm = False
2018-06-25 16:50:34 +08:00
else:
# the session is expired, remove it
2018-07-21 00:46:34 +08:00
logger.debug(f'Session of command {session.cmd.name} is expired')
2018-08-21 22:31:22 +08:00
if ctx_id in _sessions:
del _sessions[ctx_id]
2018-06-25 16:50:34 +08:00
session = None
2018-08-21 22:31:22 +08:00
2018-06-25 16:50:34 +08:00
if not session:
2018-08-19 19:11:08 +08:00
if not cmd:
logger.debug('Not a known command, ignored')
return False
if cmd.only_to_me and not ctx['to_me']:
2018-07-21 22:41:56 +08:00
logger.debug('Not to me, ignored')
2018-06-25 15:22:59 +08:00
return False
session = CommandSession(bot, ctx, cmd, current_arg=current_arg)
2018-07-21 00:46:34 +08:00
logger.debug(f'New session of command {session.cmd.name} created')
2018-08-21 22:31:22 +08:00
return await _real_run_command(session, ctx_id, check_perm=check_perm,
disable_interaction=disable_interaction)
2018-07-01 17:51:01 +08:00
2018-10-16 01:03:50 +08:00
async def call_command(bot: NoneBot, ctx: Context_T,
name: Union[str, CommandName_T], *,
current_arg: str = '',
2018-10-16 01:03:50 +08:00
args: Optional[CommandArgs_T] = None,
check_perm: bool = True,
disable_interaction: bool = False) -> bool:
2018-07-01 17:51:01 +08:00
"""
Call a command internally.
This function is typically called by some other commands
or "handle_natural_language" when handling NLPResult object.
Note: If disable_interaction is not True, after calling this function,
any previous command session will be overridden, even if the command
being called here does not need further interaction (a.k.a asking
the user for more info).
2018-07-01 20:01:05 +08:00
2018-07-04 09:28:31 +08:00
:param bot: NoneBot instance
2018-07-01 17:51:01 +08:00
:param ctx: message context
:param name: command name
:param current_arg: command current argument string
2018-07-01 17:51:01 +08:00
:param args: command args
:param check_perm: should check permission before running command
:param disable_interaction: disable the command's further interaction
2018-07-01 17:51:01 +08:00
:return: the command is successfully called
"""
cmd = _find_command(name)
if not cmd:
return False
session = CommandSession(bot, ctx, cmd, current_arg=current_arg, args=args)
2018-07-01 20:01:05 +08:00
return await _real_run_command(session, context_id(session.ctx),
check_perm=check_perm,
disable_interaction=disable_interaction)
2018-07-01 17:51:01 +08:00
async def _real_run_command(session: CommandSession,
ctx_id: str,
disable_interaction: bool = False,
**kwargs) -> bool:
if not disable_interaction:
2018-08-21 22:31:22 +08:00
# override session only when interaction is not disabled
_sessions[ctx_id] = session
2018-06-25 15:22:59 +08:00
try:
2018-07-21 00:46:34 +08:00
logger.debug(f'Running command {session.cmd.name}')
2018-07-21 23:52:14 +08:00
session.running = True
future = asyncio.ensure_future(session.cmd.run(session, **kwargs))
timeout = None
if session.bot.config.SESSION_RUN_TIMEOUT:
timeout = session.bot.config.SESSION_RUN_TIMEOUT.total_seconds()
2018-12-11 11:17:57 +08:00
try:
await asyncio.wait_for(future, timeout)
2018-12-11 11:17:57 +08:00
handled = future.result()
except asyncio.TimeoutError:
2018-12-11 11:17:57 +08:00
handled = True
2019-01-25 00:14:30 +08:00
except (_PauseException, _FinishException, SwitchException) as e:
raise e
2018-12-11 11:17:57 +08:00
except Exception as e:
2018-12-25 20:40:36 +08:00
logger.error(f'An exception occurred while '
f'running command {session.cmd.name}:')
2018-12-11 11:17:57 +08:00
logger.exception(e)
handled = True
raise _FinishException(handled)
2019-01-25 00:14:30 +08:00
except _PauseException:
2018-07-22 21:47:39 +08:00
session.running = False
if disable_interaction:
# if the command needs further interaction, we view it as failed
return False
2018-07-21 00:46:34 +08:00
logger.debug(f'Further interaction needed for '
f'command {session.cmd.name}')
2018-06-25 15:22:59 +08:00
# return True because this step of the session is successful
return True
2018-07-24 23:59:45 +08:00
except (_FinishException, SwitchException) as e:
2018-07-21 23:52:14 +08:00
session.running = False
2018-07-22 21:47:39 +08:00
logger.debug(f'Session of command {session.cmd.name} finished')
2018-08-21 22:31:22 +08:00
if not disable_interaction and ctx_id in _sessions:
# the command is finished, remove the session,
# but if interaction is disabled during this command call,
# we leave the _sessions untouched.
2018-07-21 22:41:56 +08:00
del _sessions[ctx_id]
2018-07-24 23:59:45 +08:00
if isinstance(e, _FinishException):
return e.result
elif isinstance(e, SwitchException):
2018-08-21 22:31:22 +08:00
# we are guaranteed that the session is not first run here,
# which means interaction is definitely enabled,
# so we can safely touch _sessions here.
2018-07-24 23:59:45 +08:00
if ctx_id in _sessions:
# make sure there is no session waiting
del _sessions[ctx_id]
2018-07-25 22:50:08 +08:00
logger.debug(f'Session of command {session.cmd.name} switching, '
2018-07-24 23:59:45 +08:00
f'new context message: {e.new_ctx_message}')
raise e # this is intended to be propagated to handle_message()
2018-12-27 20:33:28 +08:00
def kill_current_session(ctx: Context_T) -> None:
"""
Force kill current session of the given context,
despite whether it is running or not.
:param ctx: message context
"""
ctx_id = context_id(ctx)
if ctx_id in _sessions:
del _sessions[ctx_id]
from nonebot.command.group import CommandGroup