Basic command plugin structure

This commit is contained in:
Richard Chien 2018-06-15 10:40:53 +08:00
parent a3844eda69
commit 19c211b253
3 changed files with 57 additions and 37 deletions

View File

@ -1,43 +1,61 @@
import re import re
from typing import Tuple, Union, Callable, Iterable, Dict, Any from typing import Tuple, Union, Callable, Iterable, Dict, Any, Optional
from aiocqhttp import CQHttp from aiocqhttp import CQHttp
from . import permissions as perm, logger from . import permissions as perm
_command_tree = {} # Key: str (one segment of command name)
# Value: subtree or a leaf Command object
_registry = {}
# Key: str # Key: str
# Value: tuple that identifies a command # Value: tuple that identifies a command
_command_aliases = {} _aliases = {}
# Key: context source # Key: context source
# Value: Command object # Value: Command object
_command_sessions = {} _sessions = {}
# TODO: Command 类只用来表示注册的命令Session 类用来在运行时表示命令的参数等
class Command: class Command:
__slots__ = ('name', 'arg', 'images', 'data', 'last_interaction') __slots__ = ('name', 'func', 'permission')
def __init__(self, name: Tuple[str]): def __init__(self, name: Tuple[str], func: Callable, permission: int):
self.name = name self.name = name
self.func = func
self.permission = permission
async def __call__(self, bot: CQHttp, ctx: Dict[str, Any], async def run(self, bot, ctx, session,
*args, **kwargs) -> bool: *args, **kwargs) -> Any:
logger.info(repr(self.images))
cmd_tree = _command_tree
for part in self.name:
if part not in cmd_tree:
return False
cmd_tree = cmd_tree[part]
cmd = cmd_tree
if 'func' not in cmd or not isinstance(cmd['func'], Callable):
return False
# TODO: check permission # TODO: check permission
await cmd['func'](bot, ctx, self) if isinstance(self.func, Callable):
return True return await self.func(bot, ctx, session)
return None
def _find_command(name: Tuple[str]) -> Optional[Command]:
if not name:
return None
cmd_tree = _registry
for part in name[:-1]:
if part not in cmd_tree:
return False
cmd_tree = cmd_tree[part]
return cmd_tree.get(name[-1])
class Session:
__slots__ = ('cmd', 'arg', 'images', 'data', 'last_interaction')
def __init__(self, cmd: Command, arg: str = ''):
self.cmd = cmd
self.arg = arg
self.images = []
self.data = {}
self.last_interaction = None
async def handle_command(bot: CQHttp, ctx: Dict[str, Any]) -> bool: async def handle_command(bot: CQHttp, ctx: Dict[str, Any]) -> bool:
@ -63,7 +81,7 @@ async def handle_command(bot: CQHttp, ctx: Dict[str, Any]) -> bool:
return False return False
cmd_name_text, *cmd_remained = full_command.split(maxsplit=1) cmd_name_text, *cmd_remained = full_command.split(maxsplit=1)
cmd_name = _command_aliases.get(cmd_name_text) cmd_name = _aliases.get(cmd_name_text)
if not cmd_name: if not cmd_name:
for sep in bot.config.COMMAND_SEP: for sep in bot.config.COMMAND_SEP:
@ -76,11 +94,15 @@ async def handle_command(bot: CQHttp, ctx: Dict[str, Any]) -> bool:
else: else:
cmd_name = (cmd_name_text,) cmd_name = (cmd_name_text,)
cmd = Command(cmd_name) cmd = _find_command(cmd_name)
cmd.arg = ''.join(cmd_remained) if not cmd:
cmd.images = [s.data['url'] for s in ctx['message'] return False
if s.type == 'image' and 'url' in s.data]
return await cmd(bot, ctx) session = Session(cmd, ''.join(cmd_remained))
session.images = [s.data['url'] for s in ctx['message']
if s.type == 'image' and 'url' in s.data]
await cmd.run(bot, ctx, session)
return True
def on_command(name: Union[str, Tuple[str]], aliases: Iterable = (), def on_command(name: Union[str, Tuple[str]], aliases: Iterable = (),
@ -92,17 +114,14 @@ def on_command(name: Union[str, Tuple[str]], aliases: Iterable = (),
raise ValueError('the name of a command must not be empty') raise ValueError('the name of a command must not be empty')
cmd_name = name if isinstance(name, tuple) else (name,) cmd_name = name if isinstance(name, tuple) else (name,)
current_parent = _command_tree current_parent = _registry
for parent_key in cmd_name[:-1]: for parent_key in cmd_name[:-1]:
current_parent[parent_key] = {} current_parent[parent_key] = {}
current_parent = current_parent[parent_key] current_parent = current_parent[parent_key]
current_parent[cmd_name[-1]] = { current_parent[cmd_name[-1]] = Command(
'name': cmd_name, name=cmd_name, func=func, permission=permission)
'func': func,
'permission': permission
}
for alias in aliases: for alias in aliases:
_command_aliases[alias] = cmd_name _aliases[alias] = cmd_name
return func return func
return deco return deco

View File

@ -1,5 +1,6 @@
API_ROOT = '' API_ROOT = ''
SECRET = '' SECRET = ''
ACCESS_TOKEN = ''
HOST = '127.0.0.1' HOST = '127.0.0.1'
PORT = 8080 PORT = 8080
DEBUG = True DEBUG = True

View File

@ -2,5 +2,5 @@ import none
@none.on_command('echo', aliases=('say',)) @none.on_command('echo', aliases=('say',))
async def _(bot, ctx, cmd): async def _(bot, ctx, session):
await bot.send(ctx, cmd.arg) await bot.send(ctx, session.arg)