This commit is contained in:
Richard Chien 2018-06-26 08:49:08 +08:00
parent fd49a272ae
commit c9291e1cba
4 changed files with 44 additions and 18 deletions

View File

@ -28,9 +28,7 @@ _sessions = {}
class Command: class Command:
__slots__ = ('name', 'func', 'permission', 'args_parser_func') __slots__ = ('name', 'func', 'permission', 'args_parser_func')
def __init__(self, name: Tuple[str], def __init__(self, name: Tuple[str], func: Callable, permission: int):
func: Callable,
permission: int):
self.name = name self.name = name
self.func = func self.func = func
self.permission = permission self.permission = permission
@ -38,7 +36,7 @@ class Command:
async def run(self, session, *, permission: int = None) -> bool: async def run(self, session, *, permission: int = None) -> bool:
if permission is None: if permission is None:
permission = await calculate_permission(session.bot, session.ctx) permission = await _calculate_permission(session.bot, session.ctx)
if self.func and permission & self.permission: if self.func and permission & self.permission:
if self.args_parser_func: if self.args_parser_func:
await self.args_parser_func(session) await self.args_parser_func(session)
@ -47,7 +45,7 @@ class Command:
return False return False
async def calculate_permission(bot: CQHttp, ctx: Dict[str, Any]) -> int: async def _calculate_permission(bot: CQHttp, ctx: Dict[str, Any]) -> int:
permission = 0 permission = 0
if ctx['user_id'] in bot.config.SUPERUSERS: if ctx['user_id'] in bot.config.SUPERUSERS:
permission |= perm.IS_SUPERUSER permission |= perm.IS_SUPERUSER
@ -110,12 +108,14 @@ class CommandGroup:
__slots__ = ('basename', 'permission') __slots__ = ('basename', 'permission')
def __init__(self, name: Union[str, Tuple[str]], permission: int = None): def __init__(self, name: Union[str, Tuple[str]], permission: int = None):
self.basename = name if isinstance(name, tuple) else (name,) self.basename = (name,) if isinstance(name, str) else name
self.permission = permission self.permission = permission
def command(self, name: Union[str, Tuple[str]], *, def command(self, name: Union[str, Tuple[str]], *,
aliases: Iterable = None, permission: int = None) -> Callable: aliases: Iterable = None, permission: int = None) -> Callable:
name = self.basename + (name if isinstance(name, tuple) else (name,)) sub_name = (name,) if isinstance(name, str) else name
name = self.basename + sub_name
kwargs = {} kwargs = {}
if aliases is not None: if aliases is not None:
kwargs['aliases'] = aliases kwargs['aliases'] = aliases
@ -125,7 +125,7 @@ class CommandGroup:
def _find_command(name: Union[str, Tuple[str]]) -> Optional[Command]: def _find_command(name: Union[str, Tuple[str]]) -> Optional[Command]:
cmd_name = name if isinstance(name, tuple) else (name,) cmd_name = (name,) if isinstance(name, str) else name
if not cmd_name: if not cmd_name:
return None return None
@ -139,7 +139,7 @@ def _find_command(name: Union[str, Tuple[str]]) -> Optional[Command]:
return cmd_tree.get(cmd_name[-1]) return cmd_tree.get(cmd_name[-1])
class FurtherInteractionNeeded(Exception): class _FurtherInteractionNeeded(Exception):
""" """
Raised by session.require_arg() indicating Raised by session.require_arg() indicating
that the command should enter interactive mode that the command should enter interactive mode
@ -166,7 +166,13 @@ class Session:
self.args = args or {} self.args = args or {}
self.last_interaction = None self.last_interaction = None
def refresh(self, ctx: Dict[str, Any], *, current_arg: str = ''): def refresh(self, ctx: Dict[str, Any], *, current_arg: str = '') -> None:
"""
Refill the session with a new message context.
:param ctx: new message context
:param current_arg: new command argument as a string
"""
self.ctx = ctx self.ctx = ctx
self.current_arg = current_arg self.current_arg = current_arg
self.current_arg_text = Message(current_arg).extract_plain_text() self.current_arg_text = Message(current_arg).extract_plain_text()
@ -174,7 +180,7 @@ class Session:
if s.type == 'image' and 'url' in s.data] if s.type == 'image' and 'url' in s.data]
@property @property
def is_valid(self): def is_valid(self) -> bool:
if self.last_interaction and \ if self.last_interaction and \
datetime.now() - self.last_interaction > \ datetime.now() - self.last_interaction > \
self.bot.config.SESSION_EXPIRE_TIMEOUT: self.bot.config.SESSION_EXPIRE_TIMEOUT:
@ -210,7 +216,7 @@ class Session:
if prompt_expr is not None: if prompt_expr is not None:
prompt = render(prompt_expr, key=key) prompt = render(prompt_expr, key=key)
asyncio.ensure_future(self.send(prompt)) asyncio.ensure_future(self.send(prompt))
raise FurtherInteractionNeeded raise _FurtherInteractionNeeded
async def send(self, async def send(self,
message: Union[str, Dict[str, Any], List[Dict[str, Any]]], message: Union[str, Dict[str, Any], List[Dict[str, Any]]],
@ -229,6 +235,18 @@ class Session:
def _new_command_session(bot: CQHttp, def _new_command_session(bot: CQHttp,
ctx: Dict[str, Any]) -> Optional[Session]: ctx: Dict[str, Any]) -> Optional[Session]:
"""
Create a new session for a command.
This will firstly attempt to parse the current message as
a command, and if succeeded, it then create a session for
the command and return. If the message is not a valid command,
None will be returned.
:param bot: CQHttp instance
:param ctx: message context
:return: Session object or None
"""
msg_text = str(ctx['message']).lstrip() msg_text = str(ctx['message']).lstrip()
for start in bot.config.COMMAND_START: for start in bot.config.COMMAND_START:
@ -271,6 +289,15 @@ def _new_command_session(bot: CQHttp,
async def handle_command(bot: CQHttp, ctx: Dict[str, Any]) -> bool: async def handle_command(bot: CQHttp, ctx: Dict[str, Any]) -> bool:
"""
Handle a message as a command.
This function is typically called by "handle_message".
:param bot: CQHttp instance
:param ctx: message context
:return: the message is handled as a command
"""
src = context_source(ctx) src = context_source(ctx)
session = None session = None
if _sessions.get(src): if _sessions.get(src):
@ -292,7 +319,7 @@ async def handle_command(bot: CQHttp, ctx: Dict[str, Any]) -> bool:
# the command is finished, remove the session # the command is finished, remove the session
del _sessions[src] del _sessions[src]
return res return res
except FurtherInteractionNeeded: except _FurtherInteractionNeeded:
session.last_interaction = datetime.now() session.last_interaction = datetime.now()
# return True because this step of the session is successful # return True because this step of the session is successful
return True return True

View File

@ -2,11 +2,10 @@ import random
from typing import Union, Sequence, Callable from typing import Union, Sequence, Callable
from aiocqhttp import message from aiocqhttp import message
from aiocqhttp.message import Message
def render(expr: Union[str, Sequence[str], Callable], *, escape_args=True, def render(expr: Union[str, Sequence[str], Callable], *, escape_args=True,
**kwargs) -> Message: **kwargs) -> str:
if isinstance(expr, Callable): if isinstance(expr, Callable):
expr = expr() expr = expr()
elif isinstance(expr, Sequence): elif isinstance(expr, Sequence):

View File

@ -1,6 +1,6 @@
import none
from none.command import Session, CommandGroup from none.command import Session, CommandGroup
from none.expressions import weather as expr
from . import expressions as expr
w = CommandGroup('weather') w = CommandGroup('weather')
@ -14,7 +14,7 @@ async def weather(session: Session):
@weather.args_parser @weather.args_parser
async def _(session: Session): async def _(session: Session):
if session.current_key: if session.current_key:
session.args[session.current_key] = session.current_arg.strip() session.args[session.current_key] = session.current_arg_text.strip()
@w.command('suggestion', aliases=('生活指数', '生活建议', '生活提示')) @w.command('suggestion', aliases=('生活指数', '生活建议', '生活提示'))