Update __init__.py

把格式改回来
This commit is contained in:
Mix 2020-02-22 18:09:21 +08:00 committed by GitHub
parent bdb64569e7
commit 22462c8ed6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,8 +4,10 @@ import shlex
import warnings import warnings
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
from typing import (Tuple, Union, Callable, Iterable, Any, Optional, List, from typing import (
Dict, Awaitable) Tuple, Union, Callable, Iterable, Any, Optional, List, Dict,
Awaitable
)
from nonebot import NoneBot, permission as perm from nonebot import NoneBot, permission as perm
from nonebot.command.argfilter import ValidateError from nonebot.command.argfilter import ValidateError
@ -13,8 +15,10 @@ from nonebot.helpers import context_id, send, render_expression
from nonebot.log import logger from nonebot.log import logger
from nonebot.message import Message from nonebot.message import Message
from nonebot.session import BaseSession from nonebot.session import BaseSession
from nonebot.typing import (Context_T, CommandName_T, CommandArgs_T, Message_T, from nonebot.typing import (
State_T, Filter_T) Context_T, CommandName_T, CommandArgs_T, Message_T, State_T,
Filter_T
)
# key: one segment of command name # key: one segment of command name
# value: subtree or a leaf Command object # value: subtree or a leaf Command object
@ -32,11 +36,18 @@ CommandHandler_T = Callable[['CommandSession'], Any]
class Command: class Command:
__slots__ = ('name', 'func', 'permission', 'only_to_me', 'privileged', __slots__ = ('name', 'func',
'permission',
'only_to_me',
'privileged',
'args_parser_func') 'args_parser_func')
def __init__(self, *, name: CommandName_T, func: CommandHandler_T, def __init__(self, *,
permission: int, only_to_me: bool, privileged: bool): name: CommandName_T,
func: CommandHandler_T,
permission: int,
only_to_me: bool,
privileged: bool):
self.name = name self.name = name
self.func = func self.func = func
self.permission = permission self.permission = permission
@ -44,9 +55,7 @@ class Command:
self.privileged = privileged self.privileged = privileged
self.args_parser_func: Optional[CommandHandler_T] = None self.args_parser_func: Optional[CommandHandler_T] = None
async def run(self, async def run(self, session, *,
session,
*,
check_perm: bool = True, check_perm: bool = True,
dry: bool = False) -> bool: dry: bool = False) -> bool:
""" """
@ -84,16 +93,15 @@ class Command:
if session.state['__validation_failure_num'] >= \ if session.state['__validation_failure_num'] >= \
config.MAX_VALIDATION_FAILURES: config.MAX_VALIDATION_FAILURES:
# noinspection PyProtectedMember # noinspection PyProtectedMember
session.finish( session.finish(render_expression(
render_expression( config.TOO_MANY_VALIDATION_FAILURES_EXPRESSION
config. ), **session._current_send_kwargs)
TOO_MANY_VALIDATION_FAILURES_EXPRESSION
), **session._current_send_kwargs)
failure_message = e.message failure_message = e.message
if failure_message is None: if failure_message is None:
failure_message = render_expression( failure_message = render_expression(
config.DEFAULT_VALIDATION_FAILURE_EXPRESSION) config.DEFAULT_VALIDATION_FAILURE_EXPRESSION
)
# noinspection PyProtectedMember # noinspection PyProtectedMember
session.pause(failure_message, session.pause(failure_message,
**session._current_send_kwargs) **session._current_send_kwargs)
@ -150,8 +158,7 @@ class CommandFunc:
return parser_func return parser_func
def on_command(name: Union[str, CommandName_T], def on_command(name: Union[str, CommandName_T], *,
*,
aliases: Union[Iterable[str], str] = (), aliases: Union[Iterable[str], str] = (),
permission: int = perm.EVERYBODY, permission: int = perm.EVERYBODY,
only_to_me: bool = True, only_to_me: bool = True,
@ -167,21 +174,18 @@ def on_command(name: Union[str, CommandName_T],
:param privileged: can be run even when there is already a session :param privileged: can be run even when there is already a session
:param shell_like: use shell-like syntax to split arguments :param shell_like: use shell-like syntax to split arguments
""" """
def deco(func: CommandHandler_T) -> CommandHandler_T: def deco(func: CommandHandler_T) -> CommandHandler_T:
if not isinstance(name, (str, tuple)): if not isinstance(name, (str, tuple)):
raise TypeError('the name of a command must be a str or tuple') raise TypeError('the name of a command must be a str or tuple')
if not name: if not name:
raise ValueError('the name of a command must not be empty') raise ValueError('the name of a command must not be empty')
cmd_name = (name, ) if isinstance(name, str) else name cmd_name = (name,) if isinstance(name, str) else name
cmd = Command(name=cmd_name, cmd = Command(name=cmd_name, func=func, permission=permission,
func=func, only_to_me=only_to_me, privileged=privileged)
permission=permission,
only_to_me=only_to_me,
privileged=privileged)
if shell_like: if shell_like:
async def shell_like_args_parser(session): async def shell_like_args_parser(session):
session.args['argv'] = shlex.split(session.current_arg) session.args['argv'] = shlex.split(session.current_arg)
@ -201,7 +205,7 @@ def on_command(name: Union[str, CommandName_T],
nonlocal aliases nonlocal aliases
if isinstance(aliases, str): if isinstance(aliases, str):
aliases = (aliases, ) aliases = (aliases,)
for alias in aliases: for alias in aliases:
_aliases[alias] = cmd_name _aliases[alias] = cmd_name
@ -211,7 +215,7 @@ def on_command(name: Union[str, CommandName_T],
def _find_command(name: Union[str, CommandName_T]) -> Optional[Command]: def _find_command(name: Union[str, CommandName_T]) -> Optional[Command]:
cmd_name = (name, ) if isinstance(name, str) else name cmd_name = (name,) if isinstance(name, str) else name
if not cmd_name: if not cmd_name:
return None return None
@ -238,6 +242,7 @@ class _FinishException(Exception):
Raised by session.finish() indicating that the command session Raised by session.finish() indicating that the command session
should be stopped and removed. should be stopped and removed.
""" """
def __init__(self, result: bool = True): def __init__(self, result: bool = True):
""" """
:param result: succeeded to call the command :param result: succeeded to call the command
@ -255,6 +260,7 @@ class SwitchException(Exception):
again, the later function should be notified. So this exception again, the later function should be notified. So this exception
is designed to be propagated to handle_message(). is designed to be propagated to handle_message().
""" """
def __init__(self, new_ctx_message: Message): def __init__(self, new_ctx_message: Message):
""" """
:param new_ctx_message: new message which should be placed in context :param new_ctx_message: new message which should be placed in context
@ -263,18 +269,13 @@ class SwitchException(Exception):
class CommandSession(BaseSession): class CommandSession(BaseSession):
__slots__ = ('cmd', 'current_key', 'current_arg_filters', __slots__ = ('cmd',
'_current_send_kwargs', 'current_arg', '_current_arg_text', 'current_key', 'current_arg_filters', '_current_send_kwargs',
'_current_arg_images', '_state', '_last_interaction', 'current_arg', '_current_arg_text', '_current_arg_images',
'_running', '_run_future') '_state', '_last_interaction', '_running', '_run_future')
def __init__(self, def __init__(self, bot: NoneBot, ctx: Context_T, cmd: Command, *,
bot: NoneBot, current_arg: str = '', args: Optional[CommandArgs_T] = None):
ctx: Context_T,
cmd: Command,
*,
current_arg: str = '',
args: Optional[CommandArgs_T] = None):
super().__init__(bot, ctx) super().__init__(bot, ctx)
self.cmd = cmd # Command object self.cmd = cmd # Command object
@ -292,13 +293,12 @@ class CommandSession(BaseSession):
self._current_arg_images = None self._current_arg_images = None
self.refresh(ctx, current_arg=current_arg) # fill the above self.refresh(ctx, current_arg=current_arg) # fill the above
self._run_future = partial(asyncio.run_coroutine_threadsafe, loop=bot.loop)
self._state: State_T = {} self._state: State_T = {}
if args: if args:
self._state.update(args) self._state.update(args)
self._run_future = partial(asyncio.run_coroutine_threadsafe,
loop=bot.loop)
self._last_interaction = None # last interaction time of this session self._last_interaction = None # last interaction time of this session
self._running = False self._running = False
@ -384,9 +384,7 @@ class CommandSession(BaseSession):
self._current_arg_text = None self._current_arg_text = None
self._current_arg_images = None self._current_arg_images = None
def get(self, def get(self, key: str, *,
key: str,
*,
prompt: Optional[Message_T] = None, prompt: Optional[Message_T] = None,
arg_filters: Optional[List[Filter_T]] = None, arg_filters: Optional[List[Filter_T]] = None,
**kwargs) -> Any: **kwargs) -> Any:
@ -411,8 +409,7 @@ class CommandSession(BaseSession):
self._current_send_kwargs = kwargs self._current_send_kwargs = kwargs
self.pause(prompt, **kwargs) self.pause(prompt, **kwargs)
def get_optional(self, def get_optional(self, key: str,
key: str,
default: Optional[Any] = None) -> Optional[Any]: default: Optional[Any] = None) -> Optional[Any]:
""" """
Simply get a argument with given key. Simply get a argument with given key.
@ -513,7 +510,7 @@ def parse_command(bot: NoneBot,
cmd_name = curr_cmd_name cmd_name = curr_cmd_name
if not cmd_name: if not cmd_name:
cmd_name = (cmd_name_text, ) cmd_name = (cmd_name_text,)
logger.debug(f'Split command name: {cmd_name}') logger.debug(f'Split command name: {cmd_name}')
cmd = _find_command(cmd_name) cmd = _find_command(cmd_name)
@ -560,9 +557,10 @@ async def handle_command(bot: NoneBot, ctx: Context_T) -> bool:
if session.running: if session.running:
logger.warning(f'There is a session of command ' logger.warning(f'There is a session of command '
f'{session.cmd.name} running, notify the user') f'{session.cmd.name} running, notify the user')
asyncio.ensure_future( asyncio.ensure_future(send(
send(bot, ctx, bot, ctx,
render_expression(bot.config.SESSION_RUNNING_EXPRESSION))) render_expression(bot.config.SESSION_RUNNING_EXPRESSION)
))
# pretend we are successful, so that NLP won't handle it # pretend we are successful, so that NLP won't handle it
return True return True
@ -590,16 +588,12 @@ async def handle_command(bot: NoneBot, ctx: Context_T) -> bool:
session = CommandSession(bot, ctx, cmd, current_arg=current_arg) session = CommandSession(bot, ctx, cmd, current_arg=current_arg)
logger.debug(f'New session of command {session.cmd.name} created') logger.debug(f'New session of command {session.cmd.name} created')
return await _real_run_command(session, return await _real_run_command(session, ctx_id, check_perm=check_perm,
ctx_id,
check_perm=check_perm,
disable_interaction=disable_interaction) disable_interaction=disable_interaction)
async def call_command(bot: NoneBot, async def call_command(bot: NoneBot, ctx: Context_T,
ctx: Context_T, name: Union[str, CommandName_T], *,
name: Union[str, CommandName_T],
*,
current_arg: str = '', current_arg: str = '',
args: Optional[CommandArgs_T] = None, args: Optional[CommandArgs_T] = None,
check_perm: bool = True, check_perm: bool = True,
@ -628,8 +622,7 @@ async def call_command(bot: NoneBot,
if not cmd: if not cmd:
return False return False
session = CommandSession(bot, ctx, cmd, current_arg=current_arg, args=args) session = CommandSession(bot, ctx, cmd, current_arg=current_arg, args=args)
return await _real_run_command(session, return await _real_run_command(session, context_id(session.ctx),
context_id(session.ctx),
check_perm=check_perm, check_perm=check_perm,
disable_interaction=disable_interaction) disable_interaction=disable_interaction)