init version 2

This commit is contained in:
yanyongyu 2020-05-02 20:03:36 +08:00
parent 685f441967
commit 06b7ef2a45
27 changed files with 201 additions and 2520 deletions

View File

@ -1,171 +0,0 @@
import asyncio
import logging
from typing import Any, Optional, Callable, Awaitable
import aiocqhttp
from aiocqhttp import CQHttp
from .log import logger
from .sched import Scheduler
if Scheduler:
scheduler = Scheduler()
else:
scheduler = None
class NoneBot(CQHttp):
def __init__(self, config_object: Optional[Any] = None):
if config_object is None:
from . import default_config as config_object
config_dict = {
k: v
for k, v in config_object.__dict__.items()
if k.isupper() and not k.startswith('_')
}
logger.debug(f'Loaded configurations: {config_dict}')
super().__init__(message_class=aiocqhttp.message.Message,
**{k.lower(): v for k, v in config_dict.items()})
self.config = config_object
self.asgi.debug = self.config.DEBUG
from .message import handle_message
from .notice_request import handle_notice_or_request
@self.on_message
async def _(event: aiocqhttp.Event):
asyncio.create_task(handle_message(self, event))
@self.on_notice
async def _(event: aiocqhttp.Event):
asyncio.create_task(handle_notice_or_request(self, event))
@self.on_request
async def _(event: aiocqhttp.Event):
asyncio.create_task(handle_notice_or_request(self, event))
def run(self,
host: Optional[str] = None,
port: Optional[int] = None,
*args,
**kwargs) -> None:
host = host or self.config.HOST
port = port or self.config.PORT
if 'debug' not in kwargs:
kwargs['debug'] = self.config.DEBUG
logger.info(f'Running on {host}:{port}')
super().run(host=host, port=port, *args, **kwargs)
_bot: Optional[NoneBot] = None
def init(config_object: Optional[Any] = None) -> None:
"""
Initialize NoneBot instance.
This function must be called at the very beginning of code,
otherwise the get_bot() function will return None and nothing
is gonna work properly.
:param config_object: configuration object
"""
global _bot
_bot = NoneBot(config_object)
if _bot.config.DEBUG:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
_bot.server_app.before_serving(_start_scheduler)
async def _start_scheduler():
if scheduler and not scheduler.running:
scheduler.configure(_bot.config.APSCHEDULER_CONFIG)
scheduler.start()
logger.info('Scheduler started')
def get_bot() -> NoneBot:
"""
Get the NoneBot instance.
The result is ensured to be not None, otherwise an exception will
be raised.
:raise ValueError: instance not initialized
"""
if _bot is None:
raise ValueError('NoneBot instance has not been initialized')
return _bot
def run(host: Optional[str] = None, port: Optional[int] = None, *args,
**kwargs) -> None:
"""Run the NoneBot instance."""
get_bot().run(host=host, port=port, *args, **kwargs)
def on_startup(func: Callable[[], Awaitable[None]]) \
-> Callable[[], Awaitable[None]]:
"""
Decorator to register a function as startup callback.
"""
return get_bot().server_app.before_serving(func)
def on_websocket_connect(func: Callable[[aiocqhttp.Event], Awaitable[None]]) \
-> Callable[[], Awaitable[None]]:
"""
Decorator to register a function as websocket connect callback.
Only work with CQHTTP v4.14+.
"""
return get_bot().on_meta_event('lifecycle.connect')(func)
from .exceptions import *
from .command import CommandSession, CommandGroup
from .plugin import (on_command, on_natural_language, on_notice, on_request,
load_plugin, load_plugins, load_builtin_plugins,
get_loaded_plugins)
from .message import message_preprocessor, Message, MessageSegment
from .natural_language import NLPSession, NLPResult, IntentCommand
from .notice_request import NoticeSession, RequestSession
from .helpers import context_id
__all__ = [
'NoneBot',
'scheduler',
'init',
'get_bot',
'run',
'on_startup',
'on_websocket_connect',
'CQHttpError',
'load_plugin',
'load_plugins',
'load_builtin_plugins',
'get_loaded_plugins',
'message_preprocessor',
'Message',
'MessageSegment',
'on_command',
'CommandSession',
'CommandGroup',
'on_natural_language',
'NLPSession',
'NLPResult',
'IntentCommand',
'on_notice',
'NoticeSession',
'on_request',
'RequestSession',
'context_id',
]

View File

@ -1,42 +0,0 @@
from argparse import *
from .command import CommandSession
class ParserExit(RuntimeError):
def __init__(self, status=0, message=None):
self.status = status
self.message = message
class ArgumentParser(ArgumentParser):
"""
An ArgumentParser wrapper that avoid printing messages to
standard I/O.
"""
def __init__(self, *args, **kwargs):
self.session = kwargs.pop('session', None)
super().__init__(*args, **kwargs)
def _session_finish(self, message):
if self.session and isinstance(self.session, CommandSession):
self.session.finish(message)
def _print_message(self, message, file=None):
# do nothing
pass
def exit(self, status=0, message=None):
raise ParserExit(status=status, message=message)
def parse_args(self, args=None, namespace=None):
try:
return super().parse_args(args=args, namespace=namespace)
except ParserExit as e:
if e.status == 0:
# --help
self._session_finish(self.usage or self.format_help())
else:
self._session_finish('参数不足或不正确,请使用 --help 参数查询使用帮助')

View File

@ -1,784 +0,0 @@
import re
import shlex
import asyncio
import warnings
from datetime import datetime
from functools import partial, update_wrapper
from typing import (Tuple, Union, Callable, Iterable, Any, Optional, List, Dict,
Awaitable)
from aiocqhttp import Event as CQEvent
from aiocqhttp.message import Message
from nonebot import NoneBot, permission as perm
from nonebot.command.argfilter import ValidateError
from nonebot.helpers import context_id, send, render_expression
from nonebot.log import logger
from nonebot.session import BaseSession
from nonebot.typing import (CommandName_T, CommandArgs_T, CommandHandler_T,
Message_T, State_T, Filter_T)
# key: context id
# value: CommandSession object
_sessions = {} # type: Dict[str, "CommandSession"]
class Command:
__slots__ = ('name', 'func', 'permission', 'only_to_me', 'privileged',
'args_parser_func')
def __init__(self, *, name: CommandName_T, func: CommandHandler_T,
permission: int, only_to_me: bool, privileged: bool):
self.name = name
self.func = func
self.permission = permission
self.only_to_me = only_to_me
self.privileged = privileged
self.args_parser_func: Optional[CommandHandler_T] = None
async def run(self, session, *, check_perm: bool = True,
dry: bool = False) -> bool:
"""
Run the command in a given session.
:param session: CommandSession object
:param check_perm: should check permission before running
:param dry: just check any prerequisite, without actually running
:return: the command is finished (or can be run, given dry == True)
"""
has_perm = await self._check_perm(session) if check_perm else True
if self.func and has_perm:
if dry:
return True
if session.current_arg_filters is not None and \
session.current_key is not None:
# argument-level filters are given, use them
arg = session.current_arg
config = session.bot.config
for f in session.current_arg_filters:
try:
res = f(arg)
if isinstance(res, Awaitable):
res = await res
arg = res
except ValidateError as e:
# validation failed
if config.MAX_VALIDATION_FAILURES > 0:
# should check number of validation failures
session.state['__validation_failure_num'] = \
session.state.get(
'__validation_failure_num', 0) + 1
if session.state['__validation_failure_num'] >= \
config.MAX_VALIDATION_FAILURES:
# noinspection PyProtectedMember
session.finish(
render_expression(
config.
TOO_MANY_VALIDATION_FAILURES_EXPRESSION
), **session._current_send_kwargs)
failure_message = e.message
if failure_message is None:
failure_message = render_expression(
config.DEFAULT_VALIDATION_FAILURE_EXPRESSION)
# noinspection PyProtectedMember
session.pause(failure_message,
**session._current_send_kwargs)
# passed all filters
session.state[session.current_key] = arg
else:
# fallback to command-level args_parser_func
if self.args_parser_func:
await self.args_parser_func(session)
if session.current_key is not None and \
session.current_key not in session.state:
# args_parser_func didn't set state, here we set it
session.state[session.current_key] = session.current_arg
await self.func(session)
return True
return False
async def _check_perm(self, session) -> bool:
"""
Check if the session has sufficient permission to
call the command.
:param session: CommandSession object
:return: the session has the permission
"""
return await perm.check_permission(session.bot, session.event,
self.permission)
def args_parser(self, parser_func: CommandHandler_T) -> CommandHandler_T:
"""
Decorator to register a function as the arguments parser of
the corresponding command.
"""
self.args_parser_func = parser_func
return parser_func
def __repr__(self):
return f'<Command, name={self.name.__repr__()}>'
def __str__(self):
return self.__repr__()
class CommandManager:
"""Global Command Manager"""
_commands = {} # type: Dict[CommandName_T, Command]
_aliases = {} # type: Dict[str, Command]
_switches = {} # type: Dict[CommandName_T, bool]
def __init__(self):
self.commands = CommandManager._commands.copy()
self.aliases = CommandManager._aliases.copy()
self.switches = CommandManager._switches.copy()
@classmethod
def add_command(cls, cmd_name: CommandName_T, cmd: Command) -> None:
"""Register a command
Args:
cmd_name (CommandName_T): Command name
cmd (Command): Command object
"""
if cmd_name in cls._commands:
warnings.warn(f"Command {cmd_name} already exists")
return
cls._switches[cmd_name] = True
cls._commands[cmd_name] = cmd
@classmethod
def reload_command(cls, cmd_name: CommandName_T, cmd: Command) -> None:
"""Reload a command
**Warning! Dangerous function**
Args:
cmd_name (CommandName_T): Command name
cmd (Command): Command object
"""
if cmd_name not in cls._commands:
warnings.warn(
f"Command {cmd_name} does not exist. Please use add_command instead"
)
return
cls._commands[cmd_name] = cmd
@classmethod
def remove_command(cls, cmd_name: CommandName_T) -> bool:
"""Remove a command
**Warning! Dangerous function**
Args:
cmd_name (CommandName_T): Command name to remove
Returns:
bool: Success or not
"""
if cmd_name in cls._commands:
cmd = cls._commands[cmd_name]
for alias in list(
filter(lambda x: cls._aliases[x] == cmd,
cls._aliases.keys())):
del cls._aliases[alias]
del cls._commands[cmd_name]
if cmd_name in cls._switches:
del cls._switches[cmd_name]
return True
return False
@classmethod
def switch_command_global(cls,
cmd_name: CommandName_T,
state: Optional[bool] = None):
"""Change command state globally or simply switch it if `state` is None
Args:
cmd_name (CommandName_T): Command name
state (Optional[bool]): State to change to. Defaults to None.
"""
cls._switches[cmd_name] = not cls._switches[
cmd_name] if state is None else bool(state)
@classmethod
def add_aliases(cls, aliases: Union[Iterable[str], str], cmd: Command):
"""Register command alias(es)
Args:
aliases (Union[Iterable[str], str]): Command aliases
cmd_name (Command): Command
"""
if isinstance(aliases, str):
aliases = (aliases,)
for alias in aliases:
if not isinstance(alias, str):
warnings.warn(f"Alias {alias} is not a string! Ignored")
return
elif alias in cls._aliases:
warnings.warn(f"Alias {alias} already exists")
return
cls._aliases[alias] = cmd
def _add_command_to_tree(self, cmd_name: CommandName_T, cmd: Command,
tree: Dict[str, Union[Dict, Command]]) -> None:
"""Add command to the target command tree.
Args:
cmd_name (CommandName_T): Name of the command
cmd (Command): Command object
tree (Dict[str, Union[Dict, Command]): Target command tree
"""
current_parent = tree
for parent_key in cmd_name[:-1]:
current_parent[parent_key] = current_parent.get(parent_key) or {}
current_parent = current_parent[parent_key]
# TODO: 支持test test.sub子命令
if not isinstance(current_parent, dict):
warnings.warn(f"{current_parent} is not a registry dict")
return
if cmd_name[-1] in current_parent:
warnings.warn(f"There is already a command named {cmd_name}")
return
current_parent[cmd_name[-1]] = cmd
def _generate_command_tree(self, commands: Dict[CommandName_T, Command]
) -> Dict[str, Union[Dict, Command]]:
"""Generate command tree from commands dictionary.
Args:
commands (Dict[CommandName_T, Command]): Dictionary of commands
Returns:
Dict[str, Union[Dict, "Command"]]: Command tree
"""
cmd_tree = {} #type: Dict[str, Union[Dict, "Command"]]
for cmd_name, cmd in commands.items():
self._add_command_to_tree(cmd_name, cmd, cmd_tree)
return cmd_tree
def _find_command(self,
name: Union[str, CommandName_T]) -> Optional[Command]:
cmd_name = (name,) if isinstance(name, str) else name
if not cmd_name:
return None
cmd_tree = self._generate_command_tree({
name: cmd
for name, cmd in self.commands.items()
if self.switches.get(name, True)
})
for part in cmd_name[:-1]:
if part not in cmd_tree or not isinstance(
cmd_tree[part], #type: ignore
dict):
return None
cmd_tree = cmd_tree[part] # type: ignore
cmd = cmd_tree.get(cmd_name[-1]) # type: ignore
return cmd if isinstance(cmd, Command) else None
def parse_command(self, bot: NoneBot, cmd_string: str
) -> Tuple[Optional[Command], Optional[str]]:
logger.debug(f'Parsing command: {repr(cmd_string)}')
matched_start = None
for start in bot.config.COMMAND_START:
# loop through COMMAND_START to find the longest matched start
curr_matched_start = None
if isinstance(start, type(re.compile(''))):
m = start.search(cmd_string)
if m and m.start(0) == 0:
curr_matched_start = m.group(0)
elif isinstance(start, str):
if cmd_string.startswith(start):
curr_matched_start = start
if curr_matched_start is not None and \
(matched_start is None or
len(curr_matched_start) > len(matched_start)):
# a longer start, use it
matched_start = curr_matched_start
if matched_start is None:
# it's not a command
logger.debug('It\'s not a command')
return None, None
logger.debug(f'Matched command start: '
f'{matched_start}{"(empty)" if not matched_start else ""}')
full_command = cmd_string[len(matched_start):].lstrip()
if not full_command:
# command is empty
return None, None
cmd_name_text, *cmd_remained = full_command.split(maxsplit=1)
cmd_name = None
for sep in bot.config.COMMAND_SEP:
# loop through COMMAND_SEP to find the most optimized split
curr_cmd_name = None
if isinstance(sep, type(re.compile(''))):
curr_cmd_name = tuple(sep.split(cmd_name_text))
elif isinstance(sep, str):
curr_cmd_name = tuple(cmd_name_text.split(sep))
if curr_cmd_name is not None and \
(not cmd_name or len(curr_cmd_name) > len(cmd_name)):
# a more optimized split, use it
cmd_name = curr_cmd_name
if not cmd_name:
cmd_name = (cmd_name_text,)
logger.debug(f'Split command name: {cmd_name}')
cmd = self._find_command(cmd_name) # type: ignore
if not cmd:
logger.debug(f'Command {cmd_name} not found. Try to match aliases')
cmd = self.aliases.get(cmd_name_text)
if not cmd:
return None, None
logger.debug(f'Command {cmd.name} found, function: {cmd.func}')
return cmd, ''.join(cmd_remained)
def switch_command(self,
cmd_name: CommandName_T,
state: Optional[bool] = None):
"""Change command state or simply switch it if `state` is None
Args:
cmd_name (CommandName_T): Command name
state (Optional[bool]): State to change to. Defaults to None.
"""
self.switches[cmd_name] = not self.switches[
cmd_name] if state is None else bool(state)
class _PauseException(Exception):
"""
Raised by session.pause() indicating that the command session
should be paused to ask the user for some arguments.
"""
pass
class _FinishException(Exception):
"""
Raised by session.finish() indicating that the command session
should be stopped and removed.
"""
def __init__(self, result: bool = True):
"""
:param result: succeeded to call the command
"""
self.result = result
class SwitchException(Exception):
"""
Raised by session.switch() indicating that the command session
should be stopped and replaced with a new one (going through
handle_message() again).
Since the new message will go through handle_message() again,
the later function should be notified. So this exception is
intended to be propagated to handle_message().
"""
def __init__(self, new_message: Message):
"""
:param new_message: new message which should be placed in event
"""
self.new_message = new_message
class CommandSession(BaseSession):
__slots__ = ('cmd', 'current_key', 'current_arg_filters',
'_current_send_kwargs', 'current_arg', '_current_arg_text',
'_current_arg_images', '_state', '_last_interaction',
'_running', '_run_future')
def __init__(self,
bot: NoneBot,
event: CQEvent,
cmd: Command,
*,
current_arg: Optional[str] = '',
args: Optional[CommandArgs_T] = None):
super().__init__(bot, event)
self.cmd = cmd # Command object
# unique key of the argument that is currently requesting (asking)
self.current_key: Optional[str] = None
# initialize current argument filters
self.current_arg_filters: Optional[List[Filter_T]] = None
self._current_send_kwargs: Dict[str, Any] = {}
# initialize current argument
self.current_arg: Optional[str] = '' # with potential CQ codes
self._current_arg_text = None
self._current_arg_images = None
self.refresh(event, current_arg=current_arg) # fill the above
self._run_future = partial(asyncio.run_coroutine_threadsafe,
loop=bot.loop)
self._state: State_T = {}
if args:
self._state.update(args)
self._last_interaction = None # last interaction time of this session
self._running = False
@property
def state(self) -> State_T:
"""
State of the session.
This contains all named arguments and
other session scope temporary values.
"""
return self._state
@property
def args(self) -> CommandArgs_T:
"""Deprecated. Use `session.state` instead."""
return self.state
@property
def running(self) -> bool:
return self._running
@running.setter
def running(self, value) -> None:
if self._running is True and value is False:
# change status from running to not running, record the time
self._last_interaction = datetime.now()
self._running = value
@property
def is_valid(self) -> bool:
"""Check if the session is expired or not."""
if self.bot.config.SESSION_EXPIRE_TIMEOUT and \
self._last_interaction and \
datetime.now() - self._last_interaction > \
self.bot.config.SESSION_EXPIRE_TIMEOUT:
return False
return True
@property
def is_first_run(self) -> bool:
return self._last_interaction is None
@property
def current_arg_text(self) -> str:
"""
Plain text part in the current argument, without any CQ codes.
"""
if self._current_arg_text is None:
self._current_arg_text = Message(
self.current_arg).extract_plain_text()
return self._current_arg_text
@property
def current_arg_images(self) -> List[str]:
"""
Images (as list of urls) in the current argument.
"""
if self._current_arg_images is None:
self._current_arg_images = [
s.data['url']
for s in Message(self.current_arg)
if s.type == 'image' and 'url' in s.data
]
return self._current_arg_images
@property
def argv(self) -> List[str]:
"""
Shell-like argument list, similar to sys.argv.
Only available while shell_like is True in on_command decorator.
"""
return self.state.get('argv', [])
def refresh(self, event: CQEvent, *,
current_arg: Optional[str] = '') -> None:
"""
Refill the session with a new message event.
:param event: new message event
:param current_arg: new command argument as a string
"""
self.event = event
self.current_arg = current_arg
self._current_arg_text = None
self._current_arg_images = None
def get(self,
key: str,
*,
prompt: Optional[Message_T] = None,
arg_filters: Optional[List[Filter_T]] = None,
**kwargs) -> Any:
"""
Get an argument with a given key.
If the argument does not exist in the current session,
a pause exception will be raised, and the caller of
the command will know it should keep the session for
further interaction with the user.
:param key: argument key
:param prompt: prompt to ask the user
:param arg_filters: argument filters for the next user input
:return: the argument value
"""
if key in self.state:
return self.state[key]
self.current_key = key
self.current_arg_filters = arg_filters
self._current_send_kwargs = kwargs
self.pause(prompt, **kwargs)
def get_optional(self, key: str,
default: Optional[Any] = None) -> Optional[Any]:
"""
Simply get a argument with given key.
Deprecated. Use `session.state.get()` instead.
"""
return self.state.get(key, default)
def pause(self, message: Optional[Message_T] = None, **kwargs) -> None:
"""Pause the session for further interaction."""
if message:
self._run_future(self.send(message, **kwargs))
raise _PauseException
def finish(self, message: Optional[Message_T] = None, **kwargs) -> None:
"""Finish the session."""
if message:
self._run_future(self.send(message, **kwargs))
raise _FinishException
def switch(self, new_message: Message_T) -> None:
"""
Finish the session and switch to a new (fake) message event.
The user may send another command (or another intention as natural
language) when interacting with the current session. In this case,
the session may not understand what the user is saying, so it
should call this method and pass in that message, then NoneBot will
handle the situation properly.
"""
if self.is_first_run:
# if calling this method during first run,
# we think the command is not handled
raise _FinishException(result=False)
if not isinstance(new_message, Message):
new_message = Message(new_message)
raise SwitchException(new_message)
async def handle_command(bot: NoneBot, event: CQEvent,
manager: CommandManager) -> Optional[bool]:
"""
Handle a message as a command.
This function is typically called by "handle_message".
:param bot: NoneBot instance
:param event: message event
:param manager: command manager
:return: the message is handled as a command
"""
cmd, current_arg = manager.parse_command(bot, str(event.message).lstrip())
is_privileged_cmd = cmd and cmd.privileged
if is_privileged_cmd and cmd.only_to_me and not event['to_me']:
is_privileged_cmd = False
disable_interaction = bool(is_privileged_cmd)
if is_privileged_cmd:
logger.debug(f'Command {cmd.name} is a privileged command')
ctx_id = context_id(event)
if not is_privileged_cmd:
# wait for 1.5 seconds (at most) if the current session is running
retry = 5
while retry > 0 and \
_sessions.get(ctx_id) and _sessions[ctx_id].running:
retry -= 1
await asyncio.sleep(0.3)
check_perm = True
session = _sessions.get(ctx_id) if not is_privileged_cmd else None
if session:
if session.running:
logger.warning(f'There is a session of command '
f'{session.cmd.name} running, notify the user')
asyncio.ensure_future(
send(bot, event,
render_expression(bot.config.SESSION_RUNNING_EXPRESSION)))
# pretend we are successful, so that NLP won't handle it
return True
if session.is_valid:
logger.debug(f'Session of command {session.cmd.name} exists')
# since it's in a session, the user must be talking to me
event['to_me'] = True
session.refresh(event, current_arg=str(event['message']))
# there is no need to check permission for existing session
check_perm = False
else:
# the session is expired, remove it
logger.debug(f'Session of command {session.cmd.name} is expired')
if ctx_id in _sessions:
del _sessions[ctx_id]
session = None
if not session:
if not cmd:
logger.debug('Not a known command, ignored')
return False
if cmd.only_to_me and not event['to_me']:
logger.debug('Not to me, ignored')
return False
session = CommandSession(bot, event, cmd, current_arg=current_arg)
logger.debug(f'New session of command {session.cmd.name} created')
return await _real_run_command(session,
ctx_id,
check_perm=check_perm,
disable_interaction=disable_interaction)
async def call_command(bot: NoneBot,
event: CQEvent,
name: Union[str, CommandName_T],
*,
current_arg: str = '',
args: Optional[CommandArgs_T] = None,
check_perm: bool = True,
disable_interaction: bool = False) -> Optional[bool]:
"""
Call a command internally.
This function is typically called by some other commands
or "handle_natural_language" when handling NLPResult object.
Note: If disable_interaction is not True, after calling this function,
any previous command session will be overridden, even if the command
being called here does not need further interaction (a.k.a asking
the user for more info).
:param bot: NoneBot instance
:param event: message event
:param name: command name
:param current_arg: command current argument string
:param args: command args
:param check_perm: should check permission before running command
:param disable_interaction: disable the command's further interaction
:return: the command is successfully called
"""
cmd = CommandManager()._find_command(name)
if not cmd:
return False
session = CommandSession(bot,
event,
cmd,
current_arg=current_arg,
args=args)
return await _real_run_command(session,
context_id(session.event),
check_perm=check_perm,
disable_interaction=disable_interaction)
async def _real_run_command(session: CommandSession,
ctx_id: str,
disable_interaction: bool = False,
**kwargs) -> Optional[bool]:
if not disable_interaction:
# override session only when interaction is not disabled
_sessions[ctx_id] = session
try:
logger.debug(f'Running command {session.cmd.name}')
session.running = True
future = asyncio.ensure_future(session.cmd.run(session, **kwargs))
timeout = None
if session.bot.config.SESSION_RUN_TIMEOUT:
timeout = session.bot.config.SESSION_RUN_TIMEOUT.total_seconds()
try:
await asyncio.wait_for(future, timeout)
handled = future.result()
except asyncio.TimeoutError:
handled = True
except (_PauseException, _FinishException, SwitchException) as e:
raise e
except Exception as e:
logger.error(f'An exception occurred while '
f'running command {session.cmd.name}:')
logger.exception(e)
handled = True
raise _FinishException(handled)
except _PauseException:
session.running = False
if disable_interaction:
# if the command needs further interaction, we view it as failed
return False
logger.debug(f'Further interaction needed for '
f'command {session.cmd.name}')
# return True because this step of the session is successful
return True
except (_FinishException, SwitchException) as e:
session.running = False
logger.debug(f'Session of command {session.cmd.name} finished')
if not disable_interaction and ctx_id in _sessions:
# the command is finished, remove the session,
# but if interaction is disabled during this command call,
# we leave the _sessions untouched.
del _sessions[ctx_id]
if isinstance(e, _FinishException):
return e.result
elif isinstance(e, SwitchException):
# we are guaranteed that the session is not first run here,
# which means interaction is definitely enabled,
# so we can safely touch _sessions here.
if ctx_id in _sessions:
# make sure there is no session waiting
del _sessions[ctx_id]
logger.debug(f'Session of command {session.cmd.name} switching, '
f'new message: {e.new_message}')
raise e # this is intended to be propagated to handle_message()
def kill_current_session(event: CQEvent) -> None:
"""
Force kill current session of the given event context,
despite whether it is running or not.
:param event: message event
"""
ctx_id = context_id(event)
if ctx_id in _sessions:
del _sessions[ctx_id]
from nonebot.command.group import CommandGroup

View File

@ -1,9 +0,0 @@
from typing import Optional
from nonebot.typing import Message_T
class ValidateError(ValueError):
def __init__(self, message: Optional[Message_T] = None):
self.message = message

View File

@ -1,34 +0,0 @@
import re
from nonebot import CommandSession
from nonebot.helpers import render_expression
def handle_cancellation(session: CommandSession):
"""
If the input is a string of cancellation word, finish the command session.
"""
def control(value):
if _is_cancellation(value) is True:
session.finish(
render_expression(session.bot.config.SESSION_CANCEL_EXPRESSION))
return value
return control
def _is_cancellation(sentence: str) -> bool:
for kw in ('', '', '', '', '取消'):
if kw in sentence:
# a keyword matches
break
else:
# no keyword matches
return False
if re.match(r'^那?[算别不停]\w{0,3}了?吧?$', sentence) or \
re.match(r'^那?(?:[给帮]我)?取消了?吧?$', sentence):
return True
return False

View File

@ -1,42 +0,0 @@
from typing import Optional, List
def _simple_chinese_to_bool(text: str) -> Optional[bool]:
"""
Convert a chinese text to boolean.
Examples:
是的 -> True
好的呀 -> True
不要 -> False
不用了 -> False
你好呀 -> None
"""
text = text.strip().lower().replace(' ', '') \
.rstrip(',.!?~,。!?~了的呢吧呀啊呗啦')
if text in {
'', '', '', '', '', '', '', 'ok', 'okay', 'yeah', 'yep',
'当真', '当然', '必须', '可以', '肯定', '没错', '确定', '确认'
}:
return True
if text in {
'', '不要', '不用', '不是', '', '不好', '不对', '不行', '', 'no', 'nono',
'nonono', 'nope', '不ok', '不可以', '不能', '不可以'
}:
return False
return None
def _split_nonempty_lines(text: str) -> List[str]:
return list(filter(lambda x: x, text.splitlines()))
def _split_nonempty_stripped_lines(text: str) -> List[str]:
return list(filter(lambda x: x, map(lambda x: x.strip(),
text.splitlines())))
simple_chinese_to_bool = _simple_chinese_to_bool
split_nonempty_lines = _split_nonempty_lines
split_nonempty_stripped_lines = _split_nonempty_stripped_lines

View File

@ -1,33 +0,0 @@
import re
from typing import List
from aiocqhttp.message import Message
from nonebot.typing import Message_T
def _extract_text(arg: Message_T) -> str:
"""Extract all plain text segments from a message-like object."""
arg_as_msg = Message(arg)
return arg_as_msg.extract_plain_text()
def _extract_image_urls(arg: Message_T) -> List[str]:
"""Extract all image urls from a message-like object."""
arg_as_msg = Message(arg)
return [
s.data['url']
for s in arg_as_msg
if s.type == 'image' and 'url' in s.data
]
def _extract_numbers(arg: Message_T) -> List[float]:
"""Extract all numbers (integers and floats) from a message-like object."""
s = str(arg)
return list(map(float, re.findall(r'[+-]?(\d*\.?\d+|\d+\.?\d*)', s)))
extract_text = _extract_text
extract_image_urls = _extract_image_urls
extract_numbers = _extract_numbers

View File

@ -1,100 +0,0 @@
import re
from typing import Callable, Any
from nonebot.typing import Filter_T
from nonebot.command.argfilter import ValidateError
class BaseValidator:
def __init__(self, message=None):
self.message = message
def raise_failure(self):
raise ValidateError(self.message)
def _raise_failure(message):
raise ValidateError(message)
def not_empty(message=None) -> Filter_T:
"""
Validate any object to ensure it's not empty (is None or has no elements).
"""
def validate(value):
if value is None:
_raise_failure(message)
if hasattr(value, '__len__') and value.__len__() == 0:
_raise_failure(message)
return value
return validate
def fit_size(min_length: int = 0, max_length: int = None,
message=None) -> Filter_T:
"""
Validate any sized object to ensure the size/length
is in a given range [min_length, max_length].
"""
def validate(value):
length = len(value) if value is not None else 0
if length < min_length or \
(max_length is not None and length > max_length):
_raise_failure(message)
return value
return validate
def match_regex(pattern: str, message=None, *, flags=0,
fullmatch: bool = False) -> Filter_T:
"""
Validate any string object to ensure it matches a given pattern.
"""
pattern = re.compile(pattern, flags)
def validate(value):
if fullmatch:
if not re.fullmatch(pattern, value):
_raise_failure(message)
else:
if not re.match(pattern, value):
_raise_failure(message)
return value
return validate
def ensure_true(bool_func: Callable[[Any], bool], message=None) -> Filter_T:
"""
Validate any object to ensure the result of applying
a boolean function to it is True.
"""
def validate(value):
if bool_func(value) is not True:
_raise_failure(message)
return value
return validate
def between_inclusive(start=None, end=None, message=None) -> Filter_T:
"""
Validate any comparable object to ensure it's between
`start` and `end` inclusively.
"""
def validate(value):
if start is not None and value < start:
_raise_failure(message)
if end is not None and end < value:
_raise_failure(message)
return value
return validate

View File

@ -1,26 +0,0 @@
from typing import Union, Callable
from nonebot.plugin import on_command
from nonebot.typing import CommandName_T
class CommandGroup:
"""
Group a set of commands with same name prefix.
"""
__slots__ = ('basename', 'base_kwargs')
def __init__(self, name: Union[str, CommandName_T], **kwargs):
self.basename = (name,) if isinstance(name, str) else name
if 'aliases' in kwargs:
del kwargs['aliases'] # ensure there is no aliases here
self.base_kwargs = kwargs
def command(self, name: Union[str, CommandName_T], **kwargs) -> Callable:
sub_name = (name,) if isinstance(name, str) else name
name = self.basename + sub_name
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
return on_command(name, **final_kwargs)

View File

@ -1,24 +0,0 @@
from typing import Union, Callable, Iterable
from nonebot.typing import CommandName_T
class CommandGroup:
"""
Group a set of commands with same name prefix.
"""
__slots__ = ('basename', 'base_kwargs')
def __init__(self, name: Union[str, CommandName_T], *,
permission: int = ...,
only_to_me: bool = ...,
privileged: bool = ...,
shell_like: bool = ...): ...
def command(self, name: Union[str, CommandName_T], *,
aliases: Union[Iterable[str], str] = ...,
permission: int = ...,
only_to_me: bool = ...,
privileged: bool = ...,
shell_like: bool = ...) -> Callable: ...

View File

@ -1,48 +0,0 @@
"""
Default configurations.
Any derived configurations must import everything from this module
at the very beginning of their code, and then set their own value
to override the default one.
For example:
>>> from nonebot.default_config import *
>>> PORT = 9090
>>> DEBUG = False
>>> SUPERUSERS.add(123456)
>>> NICKNAME = '小明'
"""
from datetime import timedelta
from typing import Container, Union, Iterable, Pattern, Optional, Dict, Any
from .typing import Expression_T
API_ROOT: str = ''
ACCESS_TOKEN: str = ''
SECRET: str = ''
HOST: str = '127.0.0.1'
PORT: int = 8080
DEBUG: bool = True
SUPERUSERS: Container[int] = set()
NICKNAME: Union[str, Iterable[str]] = ''
COMMAND_START: Iterable[Union[str, Pattern]] = {'/', '!', '', ''}
COMMAND_SEP: Iterable[Union[str, Pattern]] = {'/', '.'}
SESSION_EXPIRE_TIMEOUT: Optional[timedelta] = timedelta(minutes=5)
SESSION_RUN_TIMEOUT: Optional[timedelta] = None
SESSION_RUNNING_EXPRESSION: Expression_T = '您有命令正在执行,请稍后再试'
SHORT_MESSAGE_MAX_LENGTH: int = 50
DEFAULT_VALIDATION_FAILURE_EXPRESSION: Expression_T = '您的输入不符合要求,请重新输入'
MAX_VALIDATION_FAILURES: int = 3
TOO_MANY_VALIDATION_FAILURES_EXPRESSION: Expression_T = \
'您输入错误太多次啦,如需重试,请重新触发本功能'
SESSION_CANCEL_EXPRESSION: Expression_T = '好的'
APSCHEDULER_CONFIG: Dict[str, Any] = {'apscheduler.timezone': 'Asia/Shanghai'}

8
nonebot/exception.py Normal file
View File

@ -0,0 +1,8 @@
class BlockedException(Exception):
"""Block a message from further handling"""
pass
class RejectedException(Exception):
"""Reject a message and return current handler back"""
pass

View File

@ -1 +0,0 @@
from aiocqhttp import Error as CQHttpError

View File

@ -1,93 +0,0 @@
import hashlib
import random
from typing import Sequence, Callable, Any
from aiocqhttp.message import escape
from aiocqhttp import Event as CQEvent
from . import NoneBot
from .exceptions import CQHttpError
from .typing import Message_T, Expression_T
def context_id(event: CQEvent, *, mode: str = 'default',
use_hash: bool = False) -> str:
"""
Calculate a unique id representing the context of the given event.
mode:
default: one id for one context
group: one id for one group or discuss
user: one id for one user
:param event: the event object
:param mode: unique id mode: "default", "group", or "user"
:param use_hash: use md5 to hash the id or not
"""
ctx_id = ''
if mode == 'default':
if event.group_id:
ctx_id = f'/group/{event.group_id}'
elif event.discuss_id:
ctx_id = f'/discuss/{event.discuss_id}'
if event.user_id:
ctx_id += f'/user/{event.user_id}'
elif mode == 'group':
if event.group_id:
ctx_id = f'/group/{event.group_id}'
elif event.discuss_id:
ctx_id = f'/discuss/{event.discuss_id}'
elif event.user_id:
ctx_id = f'/user/{event.user_id}'
elif mode == 'user':
if event.user_id:
ctx_id = f'/user/{event.user_id}'
if ctx_id and use_hash:
ctx_id = hashlib.md5(ctx_id.encode('ascii')).hexdigest()
return ctx_id
async def send(bot: NoneBot,
event: CQEvent,
message: Message_T,
*,
ensure_private: bool = False,
ignore_failure: bool = True,
**kwargs) -> Any:
"""Send a message ignoring failure by default."""
try:
if ensure_private:
event = event.copy()
event['message_type'] = 'private'
return await bot.send(event, message, **kwargs)
except CQHttpError:
if not ignore_failure:
raise
return None
def render_expression(expr: Expression_T,
*args,
escape_args: bool = True,
**kwargs) -> str:
"""
Render an expression to message string.
:param expr: expression to render
:param escape_args: should escape arguments or not
:param args: positional arguments used in str.format()
:param kwargs: keyword arguments used in str.format()
:return: the rendered message
"""
if isinstance(expr, Callable):
expr = expr(*args, **kwargs)
elif isinstance(expr, Sequence) and not isinstance(expr, str):
expr = random.choice(expr)
if escape_args:
return expr.format(
*[escape(s) if isinstance(s, str) else s for s in args], **{
k: escape(v) if isinstance(v, str) else v
for k, v in kwargs.items()
})
return expr.format(*args, **kwargs)

View File

@ -1,15 +0,0 @@
"""
Provide logger object.
Any other modules in "nonebot" should use "logger" from this module
to log messages.
"""
import logging
import sys
logger = logging.getLogger('nonebot')
default_handler = logging.StreamHandler(sys.stdout)
default_handler.setFormatter(
logging.Formatter('[%(asctime)s %(name)s] %(levelname)s: %(message)s'))
logger.addHandler(default_handler)

135
nonebot/matcher.py Normal file
View File

@ -0,0 +1,135 @@
import re
import copy
from functools import wraps
from typing import Union, Optional
from .rule import Rule, startswith, regex, user
from .typing import Scope, Handler
from .exception import BlockedException, RejectedException
class Matcher:
def __init__(self,
rule: Rule,
scope: Scope = "ALL",
permission: str = "ALL",
block: bool = True,
*,
handlers: list = [],
state: dict = {},
temp: bool = False):
self.rule = rule
self.scope = scope
self.permission = permission
self.block = block
self.handlers = handlers
self.state = state
self.temp = temp
def _default_parser(event: "Event", state: dict):
state[state.pop("_current_arg")] = event.message
self._args_parser = _default_parser
def __call__(self, func: Handler) -> Handler:
self.handlers.append(func)
# TODO: export some functions
func.args_parser = self.args_parser
func.receive = self.receive
func.got = self.got
return func
def args_parser(self, func):
self._args_parser = func
return func
def receive(self):
def _decorator(func: Handler) -> Handler:
@wraps(func)
def _handler(event: "Event", state: dict):
# TODO: add tmp matcher to matcher tree
matcher = Matcher(user(event.user_id) & self.rule,
scope=self.scope,
permission=self.permission,
block=self.block,
handlers=self.handlers,
state=state,
temp=True)
matcher.args_parser(self._args_parser)
raise BlockedException
self.handlers.append(_handler)
return func
return _decorator
def got(self, key, args_parser=None):
def _decorator(func: Handler) -> Handler:
@wraps(func)
def _handler(event: "Event", state: dict):
if key not in state:
state["_current_arg"] = key
# TODO: add tmp matcher to matcher tree
matcher = copy.copy(self)
raise RejectedException
return func(event, state)
self.handlers.append(_handler)
return func
return _decorator
def finish(self):
# BlockedException用于阻止后续handler继续执行
raise BlockedException
def reject(self):
# RejectedException用于阻止后续handler继续执行并将当前handler放回队列
raise RejectedException
def on_message(rule: Rule,
scope="ALL",
permission="ALL",
block=True,
*,
handlers=[],
state={},
temp=False) -> Matcher:
# TODO: add matcher to matcher tree
return Matcher(rule,
scope,
permission,
block,
handlers=handlers,
state=state,
temp=temp)
def on_startswith(msg,
start: int = None,
end: int = None,
rule: Optional[Rule] = None,
**kwargs) -> Matcher:
return on_message(startswith(msg, start, end) &
rule, **kwargs) if rule else on_message(
startswith(msg, start, end), **kwargs)
def on_regex(pattern,
flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Rule] = None,
**kwargs) -> Matcher:
return on_message(regex(pattern, flags) &
rule, **kwargs) if rule else on_message(
regex(pattern, flags), **kwargs)

View File

@ -1,143 +0,0 @@
import re
import asyncio
from typing import Callable, Iterable
from aiocqhttp import Event as CQEvent
from aiocqhttp.message import escape, unescape, Message, MessageSegment
from . import NoneBot
from .log import logger
from .natural_language import handle_natural_language
from .command import handle_command, SwitchException
from .plugin import PluginManager
_message_preprocessors = set()
def message_preprocessor(func: Callable) -> Callable:
_message_preprocessors.add(func)
return func
class CanceledException(Exception):
"""
Raised by message_preprocessor indicating that
the bot should ignore the message
"""
def __init__(self, reason):
"""
:param reason: reason to ignore the message
"""
self.reason = reason
async def handle_message(bot: NoneBot, event: CQEvent) -> None:
_log_message(event)
assert isinstance(event.message, Message)
if not event.message:
event.message.append(MessageSegment.text('')) # type: ignore
raw_to_me = event.get('to_me', False)
_check_at_me(bot, event)
_check_calling_me_nickname(bot, event)
event['to_me'] = raw_to_me or event['to_me']
coros = []
plugin_manager = PluginManager()
for preprocessor in _message_preprocessors:
coros.append(preprocessor(bot, event, plugin_manager))
if coros:
try:
await asyncio.gather(*coros)
except CanceledException:
logger.info(f'Message {event["message_id"]} is ignored')
return
while True:
try:
handled = await handle_command(bot, event,
plugin_manager.cmd_manager)
break
except SwitchException as e:
# we are sure that there is no session existing now
event['message'] = e.new_message
event['to_me'] = True
if handled:
logger.info(f'Message {event.message_id} is handled as a command')
return
handled = await handle_natural_language(bot, event,
plugin_manager.nlp_manager)
if handled:
logger.info(f'Message {event.message_id} is handled '
f'as natural language')
return
def _check_at_me(bot: NoneBot, event: CQEvent) -> None:
if event.detail_type == 'private':
event['to_me'] = True
else:
# group or discuss
event['to_me'] = False
at_me_seg = MessageSegment.at(event.self_id)
# check the first segment
first_msg_seg = event.message[0]
if first_msg_seg == at_me_seg:
event['to_me'] = True
del event.message[0]
if not event['to_me']:
# check the last segment
i = -1
last_msg_seg = event.message[i]
if last_msg_seg.type == 'text' and \
not last_msg_seg.data['text'].strip() and \
len(event.message) >= 2:
i -= 1
last_msg_seg = event.message[i]
if last_msg_seg == at_me_seg:
event['to_me'] = True
del event.message[i:]
if not event.message:
event.message.append(MessageSegment.text(''))
def _check_calling_me_nickname(bot: NoneBot, event: CQEvent) -> None:
first_msg_seg = event.message[0]
if first_msg_seg.type != 'text':
return
first_text = first_msg_seg.data['text']
if bot.config.NICKNAME:
# check if the user is calling me with my nickname
if isinstance(bot.config.NICKNAME, str) or \
not isinstance(bot.config.NICKNAME, Iterable):
nicknames = (bot.config.NICKNAME,)
else:
nicknames = filter(lambda n: n, bot.config.NICKNAME)
nickname_regex = '|'.join(nicknames)
m = re.search(rf'^({nickname_regex})([\s,]*|$)', first_text,
re.IGNORECASE)
if m:
nickname = m.group(1)
logger.debug(f'User is calling me {nickname}')
event['to_me'] = True
first_msg_seg.data['text'] = first_text[m.end():]
def _log_message(event: CQEvent) -> None:
msg_from = str(event.user_id)
if event.detail_type == 'group':
msg_from += f'@[群:{event.group_id}]'
elif event.detail_type == 'discuss':
msg_from += f'@[讨论组:{event.discuss_id}]'
logger.info(f'Self: {event.self_id}, '
f'Message {event.message_id} from {msg_from}: '
f'{repr(str(event.message))}')

View File

@ -1,217 +0,0 @@
import asyncio
import warnings
from functools import update_wrapper
from typing import Set, Iterable, Optional, Callable, Union, NamedTuple
from aiocqhttp import Event as CQEvent
from aiocqhttp.message import Message
from .log import logger
from . import NoneBot, permission as perm
from .command import call_command
from .session import BaseSession
from .typing import CommandName_T, CommandArgs_T
class NLProcessor:
__slots__ = ('func', 'keywords', 'permission', 'only_to_me',
'only_short_message', 'allow_empty_message')
def __init__(self, *, func: Callable, keywords: Optional[Iterable],
permission: int, only_to_me: bool, only_short_message: bool,
allow_empty_message: bool):
self.func = func
self.keywords = keywords
self.permission = permission
self.only_to_me = only_to_me
self.only_short_message = only_short_message
self.allow_empty_message = allow_empty_message
class NLPManager:
_nl_processors: Set[NLProcessor] = set()
def __init__(self):
self.nl_processors = NLPManager._nl_processors.copy()
@classmethod
def add_nl_processor(cls, processor: NLProcessor) -> None:
"""Register a natural language processor
Args:
processor (NLProcessor): Processor object
"""
if processor in cls._nl_processors:
warnings.warn(f"NLProcessor {processor} already exists")
return
cls._nl_processors.add(processor)
@classmethod
def remove_nl_processor(cls, processor: NLProcessor) -> bool:
"""Remove a natural language processor globally
Args:
processor (NLProcessor): Processor to remove
Returns:
bool: Success or not
"""
if processor in cls._nl_processors:
cls._nl_processors.remove(processor)
return True
return False
@classmethod
def switch_nlprocessor_global(cls,
processor: NLProcessor,
state: Optional[bool] = None
) -> Optional[bool]:
"""Remove or add a natural language processor globally
Args:
processor (NLProcessor): Processor object
Returns:
bool: True if removed, False if added
"""
if processor in cls._nl_processors and not state:
cls._nl_processors.remove(processor)
return True
elif processor not in cls._nl_processors and state != False:
cls._nl_processors.add(processor)
return False
def switch_nlprocessor(self,
processor: NLProcessor,
state: Optional[bool] = None) -> Optional[bool]:
"""Remove or add a natural language processor
Args:
processor (NLProcessor): Processor to remove
Returns:
bool: True if removed, False if added
"""
if processor in self.nl_processors and not state:
self.nl_processors.remove(processor)
return True
elif processor not in self.nl_processors and state != False:
self.nl_processors.add(processor)
return False
class NLPSession(BaseSession):
__slots__ = ('msg', 'msg_text', 'msg_images')
def __init__(self, bot: NoneBot, event: CQEvent, msg: str):
super().__init__(bot, event)
self.msg = msg
tmp_msg = Message(msg)
self.msg_text = tmp_msg.extract_plain_text()
self.msg_images = [
s.data['url']
for s in tmp_msg
if s.type == 'image' and 'url' in s.data
]
class NLPResult(NamedTuple):
"""
Deprecated.
Use class IntentCommand instead.
"""
confidence: float
cmd_name: Union[str, CommandName_T]
cmd_args: Optional[CommandArgs_T] = None
def to_intent_command(self):
return IntentCommand(confidence=self.confidence,
name=self.cmd_name,
args=self.cmd_args)
class IntentCommand(NamedTuple):
"""
To represent a command that we think the user may be intended to call.
"""
confidence: float
name: Union[str, CommandName_T]
args: Optional[CommandArgs_T] = None
current_arg: str = ''
async def handle_natural_language(bot: NoneBot, event: CQEvent,
manager: NLPManager) -> bool:
"""
Handle a message as natural language.
This function is typically called by "handle_message".
:param bot: NoneBot instance
:param event: message event
:param manager: natural language processor manager
:return: the message is handled as natural language
"""
session = NLPSession(bot, event, str(event.message))
# use msg_text here because CQ code "share" may be very long,
# at the same time some plugins may want to handle it
msg_text_length = len(session.msg_text)
futures = []
for p in manager.nl_processors:
if not p.allow_empty_message and not session.msg:
# don't allow empty msg, but it is one, so skip to next
continue
if p.only_short_message and \
msg_text_length > bot.config.SHORT_MESSAGE_MAX_LENGTH:
continue
if p.only_to_me and not event['to_me']:
continue
should_run = await perm.check_permission(bot, event, p.permission)
if should_run and p.keywords:
for kw in p.keywords:
if kw in session.msg_text:
break
else:
# no keyword matches
should_run = False
if should_run:
futures.append(asyncio.ensure_future(p.func(session)))
if futures:
# wait for intent commands, and sort them by confidence
intent_commands = []
for fut in futures:
try:
res = await fut
if isinstance(res, NLPResult):
intent_commands.append(res.to_intent_command())
elif isinstance(res, IntentCommand):
intent_commands.append(res)
except Exception as e:
logger.error('An exception occurred while running '
'some natural language processor:')
logger.exception(e)
intent_commands.sort(key=lambda ic: ic.confidence, reverse=True)
logger.debug(f'Intent commands: {intent_commands}')
if intent_commands and intent_commands[0].confidence >= 60.0:
# choose the intent command with highest confidence
chosen_cmd = intent_commands[0]
logger.debug(
f'Intent command with highest confidence: {chosen_cmd}')
return await call_command(bot,
event,
chosen_cmd.name,
args=chosen_cmd.args,
current_arg=chosen_cmd.current_arg,
check_perm=False) # type: ignore
else:
logger.debug('No intent command has enough confidence')
return False

View File

@ -1,93 +0,0 @@
from functools import update_wrapper
from typing import List, Optional, Callable, Union
from aiocqhttp import Event as CQEvent
from aiocqhttp.bus import EventBus
from . import NoneBot
from .log import logger
from .exceptions import CQHttpError
from .session import BaseSession
_bus = EventBus()
class EventHandler:
__slots__ = ('events', 'func')
def __init__(self, events: List[str], func: Callable):
self.events = events
self.func = func
class NoticeSession(BaseSession):
__slots__ = ()
def __init__(self, bot: NoneBot, event: CQEvent):
super().__init__(bot, event)
class RequestSession(BaseSession):
__slots__ = ()
def __init__(self, bot: NoneBot, event: CQEvent):
super().__init__(bot, event)
async def approve(self, remark: str = '') -> None:
"""
Approve the request.
:param remark: remark of friend (only works in friend request)
"""
try:
await self.bot.call_action(action='.handle_quick_operation_async',
self_id=self.event.self_id,
context=self.event,
operation={
'approve': True,
'remark': remark
})
except CQHttpError:
pass
async def reject(self, reason: str = '') -> None:
"""
Reject the request.
:param reason: reason to reject (only works in group request)
"""
try:
await self.bot.call_action(action='.handle_quick_operation_async',
self_id=self.event.self_id,
context=self.event,
operation={
'approve': False,
'reason': reason
})
except CQHttpError:
pass
async def handle_notice_or_request(bot: NoneBot, event: CQEvent) -> None:
if event.type == 'notice':
_log_notice(event)
session = NoticeSession(bot, event)
else: # must be 'request'
_log_request(event)
session = RequestSession(bot, event)
ev_name = event.name
logger.debug(f'Emitting event: {ev_name}')
try:
await _bus.emit(ev_name, session)
except Exception as e:
logger.error(f'An exception occurred while handling event {ev_name}:')
logger.exception(e)
def _log_notice(event: CQEvent) -> None:
logger.info(f'Notice: {event}')
def _log_request(event: CQEvent) -> None:
logger.info(f'Request: {event}')

View File

@ -1,102 +0,0 @@
from collections import namedtuple
from aiocache import cached
from aiocqhttp import Event as CQEvent
from . import NoneBot
from .exceptions import CQHttpError
PRIVATE_FRIEND = 0x0001
PRIVATE_GROUP = 0x0002
PRIVATE_DISCUSS = 0x0004
PRIVATE_OTHER = 0x0008
PRIVATE = 0x000F
DISCUSS = 0x00F0
GROUP_MEMBER = 0x0100
GROUP_ADMIN = 0x0200
GROUP_OWNER = 0x0400
GROUP = 0x0F00
SUPERUSER = 0xF000
EVERYBODY = 0xFFFF
IS_NOBODY = 0x0000
IS_PRIVATE_FRIEND = PRIVATE_FRIEND
IS_PRIVATE_GROUP = PRIVATE_GROUP
IS_PRIVATE_DISCUSS = PRIVATE_DISCUSS
IS_PRIVATE_OTHER = PRIVATE_OTHER
IS_PRIVATE = PRIVATE
IS_DISCUSS = DISCUSS
IS_GROUP_MEMBER = GROUP_MEMBER
IS_GROUP_ADMIN = GROUP_MEMBER | GROUP_ADMIN
IS_GROUP_OWNER = GROUP_ADMIN | GROUP_OWNER
IS_GROUP = GROUP
IS_SUPERUSER = 0xFFFF
_min_event_fields = (
'self_id',
'message_type',
'sub_type',
'user_id',
'discuss_id',
'group_id',
'anonymous',
)
_MinEvent = namedtuple('MinEvent', _min_event_fields)
async def check_permission(bot: NoneBot, event: CQEvent,
permission_required: int) -> bool:
"""
Check if the event context has the permission required.
:param bot: NoneBot instance
:param event: message event
:param permission_required: permission required
:return: the context has the permission
"""
min_event_kwargs = {}
for field in _min_event_fields:
if field in event:
min_event_kwargs[field] = event[field]
else:
min_event_kwargs[field] = None
min_event = _MinEvent(**min_event_kwargs)
return await _check(bot, min_event, permission_required)
@cached(ttl=2 * 60) # cache the result for 2 minute
async def _check(bot: NoneBot, min_event: _MinEvent,
permission_required: int) -> bool:
permission = 0
if min_event.user_id in bot.config.SUPERUSERS:
permission |= IS_SUPERUSER
if min_event.message_type == 'private':
if min_event.sub_type == 'friend':
permission |= IS_PRIVATE_FRIEND
elif min_event.sub_type == 'group':
permission |= IS_PRIVATE_GROUP
elif min_event.sub_type == 'discuss':
permission |= IS_PRIVATE_DISCUSS
elif min_event.sub_type == 'other':
permission |= IS_PRIVATE_OTHER
elif min_event.message_type == 'group':
permission |= IS_GROUP_MEMBER
if not min_event.anonymous:
try:
member_info = await bot.get_group_member_info(
self_id=min_event.self_id,
group_id=min_event.group_id,
user_id=min_event.user_id,
no_cache=True)
if member_info:
if member_info['role'] == 'owner':
permission |= IS_GROUP_OWNER
elif member_info['role'] == 'admin':
permission |= IS_GROUP_ADMIN
except CQHttpError:
pass
elif min_event.message_type == 'discuss':
permission |= IS_DISCUSS
return bool(permission & permission_required)

View File

@ -1,460 +0,0 @@
import os
import re
import sys
import shlex
import warnings
import importlib
from types import ModuleType
from typing import Any, Set, Dict, Union, Optional, Iterable, Callable
from .log import logger
from nonebot import permission as perm
from .command import Command, CommandManager
from .notice_request import _bus, EventHandler
from .natural_language import NLProcessor, NLPManager
from .typing import CommandName_T, CommandHandler_T
_tmp_command: Set[Command] = set()
_tmp_nl_processor: Set[NLProcessor] = set()
_tmp_event_handler: Set[EventHandler] = set()
class Plugin:
__slots__ = ('module', 'name', 'usage', 'commands', 'nl_processors',
'event_handlers')
def __init__(self,
module: ModuleType,
name: Optional[str] = None,
usage: Optional[Any] = None,
commands: Set[Command] = set(),
nl_processors: Set[NLProcessor] = set(),
event_handlers: Set[EventHandler] = set()):
self.module = module
self.name = name
self.usage = usage
self.commands = commands
self.nl_processors = nl_processors
self.event_handlers = event_handlers
class PluginManager:
_plugins: Dict[str, Plugin] = {}
def __init__(self):
self.cmd_manager = CommandManager()
self.nlp_manager = NLPManager()
@classmethod
def add_plugin(cls, module_path: str, plugin: Plugin) -> None:
"""Register a plugin
Args:
module_path (str): module path
plugin (Plugin): Plugin object
"""
if module_path in cls._plugins:
warnings.warn(f"Plugin {module_path} already exists")
return
cls._plugins[module_path] = plugin
@classmethod
def get_plugin(cls, module_path: str) -> Optional[Plugin]:
"""Get plugin object by plugin module path
Args:
module_path (str): Plugin module path
Returns:
Optional[Plugin]: Plugin object
"""
return cls._plugins.get(module_path, None)
@classmethod
def remove_plugin(cls, module_path: str) -> bool:
"""Remove a plugin by plugin module path
** Warning: This function not remove plugin actually! **
** Just remove command, nlprocessor and event handlers **
Args:
module_path (str): Plugin module path
Returns:
bool: Success or not
"""
plugin = cls.get_plugin(module_path)
if not plugin:
warnings.warn(f"Plugin {module_path} not exists")
return False
for command in plugin.commands:
CommandManager.remove_command(command.name)
for nl_processor in plugin.nl_processors:
NLPManager.remove_nl_processor(nl_processor)
for event_handler in plugin.event_handlers:
for event in event_handler.events:
_bus.unsubscribe(event, event_handler.func)
del cls._plugins[module_path]
return True
@classmethod
def switch_plugin_global(cls,
module_path: str,
state: Optional[bool] = None) -> None:
"""Change plugin state globally or simply switch it if `state` is None
Args:
module_path (str): Plugin module path
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = cls.get_plugin(module_path)
if not plugin:
warnings.warn(f"Plugin {module_path} not found")
return
for command in plugin.commands:
CommandManager.switch_command_global(command.name, state)
for nl_processor in plugin.nl_processors:
NLPManager.switch_nlprocessor_global(nl_processor, state)
for event_handler in plugin.event_handlers:
for event in event_handler.events:
if event_handler.func in _bus._subscribers[event] and not state:
_bus.unsubscribe(event, event_handler.func)
elif event_handler.func not in _bus._subscribers[
event] and state != False:
_bus.subscribe(event, event_handler.func)
@classmethod
def switch_command_global(cls,
module_path: str,
state: Optional[bool] = None) -> None:
"""Change plugin command state globally or simply switch it if `state` is None
Args:
module_path (str): Plugin module path
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = cls.get_plugin(module_path)
if not plugin:
warnings.warn(f"Plugin {module_path} not found")
return
for command in plugin.commands:
CommandManager.switch_command_global(command.name, state)
@classmethod
def switch_nlprocessor_global(cls,
module_path: str,
state: Optional[bool] = None) -> None:
"""Change plugin nlprocessor state globally or simply switch it if `state` is None
Args:
module_path (str): Plugin module path
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = cls.get_plugin(module_path)
if not plugin:
warnings.warn(f"Plugin {module_path} not found")
return
for processor in plugin.nl_processors:
NLPManager.switch_nlprocessor_global(processor, state)
@classmethod
def switch_eventhandler_global(cls,
module_path: str,
state: Optional[bool] = None) -> None:
"""Change plugin event handler state globally or simply switch it if `state` is None
Args:
module_path (str): Plugin module path
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = cls.get_plugin(module_path)
if not plugin:
warnings.warn(f"Plugin {module_path} not found")
return
for event_handler in plugin.event_handlers:
for event in event_handler.events:
if event_handler.func in _bus._subscribers[event] and not state:
_bus.unsubscribe(event, event_handler.func)
elif event_handler.func not in _bus._subscribers[
event] and state != False:
_bus.subscribe(event, event_handler.func)
def switch_plugin(self, module_path: str,
state: Optional[bool] = None) -> None:
"""Change plugin state or simply switch it if `state` is None
Tips:
This method will only change the state of the plugin's
commands and natural language processors since change
state of the event handler for message is meaningless.
Args:
module_path (str): Plugin module path
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = self.get_plugin(module_path)
if not plugin:
warnings.warn(f"Plugin {module_path} not found")
return
for command in plugin.commands:
self.cmd_manager.switch_command(command.name, state)
for nl_processor in plugin.nl_processors:
self.nlp_manager.switch_nlprocessor(nl_processor, state)
def switch_command(self, module_path: str,
state: Optional[bool] = None) -> None:
"""Change plugin command state or simply switch it if `state` is None
Args:
module_path (str): Plugin module path
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = self.get_plugin(module_path)
if not plugin:
warnings.warn(f"Plugin {module_path} not found")
return
for command in plugin.commands:
self.cmd_manager.switch_command(command.name, state)
def switch_nlprocessor(self, module_path: str,
state: Optional[bool] = None) -> None:
"""Change plugin nlprocessor state or simply switch it if `state` is None
Args:
module_path (str): Plugin module path
state (Optional[bool]): State to change to. Defaults to None.
"""
plugin = self.get_plugin(module_path)
if not plugin:
warnings.warn(f"Plugin {module_path} not found")
return
for processor in plugin.nl_processors:
self.nlp_manager.switch_nlprocessor(processor, state)
def load_plugin(module_path: str) -> Optional[Plugin]:
"""Load a module as a plugin
Args:
module_path (str): path of module to import
Returns:
Optional[Plugin]: Plugin object loaded
"""
# Make sure tmp is clean
_tmp_command.clear()
_tmp_nl_processor.clear()
_tmp_event_handler.clear()
try:
module = importlib.import_module(module_path)
name = getattr(module, '__plugin_name__', None)
usage = getattr(module, '__plugin_usage__', None)
commands = _tmp_command.copy()
nl_processors = _tmp_nl_processor.copy()
event_handlers = _tmp_event_handler.copy()
plugin = Plugin(module, name, usage, commands, nl_processors,
event_handlers)
PluginManager.add_plugin(module_path, plugin)
logger.info(f'Succeeded to import "{module_path}"')
return plugin
except Exception as e:
logger.error(f'Failed to import "{module_path}", error: {e}')
logger.exception(e)
return None
def reload_plugin(module_path: str) -> Optional[Plugin]:
result = PluginManager.remove_plugin(module_path)
if not result:
return None
for module in list(
filter(lambda x: x.startswith(module_path), sys.modules.keys())):
del sys.modules[module]
_tmp_command.clear()
_tmp_nl_processor.clear()
_tmp_event_handler.clear()
try:
module = importlib.import_module(module_path)
name = getattr(module, '__plugin_name__', None)
usage = getattr(module, '__plugin_usage__', None)
commands = _tmp_command.copy()
nl_processors = _tmp_nl_processor.copy()
event_handlers = _tmp_event_handler.copy()
plugin = Plugin(module, name, usage, commands, nl_processors,
event_handlers)
PluginManager.add_plugin(module_path, plugin)
logger.info(f'Succeeded to reload "{module_path}"')
return plugin
except Exception as e:
logger.error(f'Failed to reload "{module_path}", error: {e}')
logger.exception(e)
return None
def load_plugins(plugin_dir: str, module_prefix: str) -> Set[Plugin]:
"""Find all non-hidden modules or packages in a given directory,
and import them with the given module prefix.
Args:
plugin_dir (str): Plugin directory to search
module_prefix (str): Module prefix used while importing
Returns:
Set[Plugin]: Set of plugin objects successfully loaded
"""
count = set()
for name in os.listdir(plugin_dir):
path = os.path.join(plugin_dir, name)
if os.path.isfile(path) and \
(name.startswith('_') or not name.endswith('.py')):
continue
if os.path.isdir(path) and \
(name.startswith('_') or not os.path.exists(
os.path.join(path, '__init__.py'))):
continue
m = re.match(r'([_A-Z0-9a-z]+)(.py)?', name)
if not m:
continue
result = load_plugin(f'{module_prefix}.{m.group(1)}')
if result:
count.add(result)
return count
def load_builtin_plugins() -> Set[Plugin]:
"""
Load built-in plugins distributed along with "nonebot" package.
"""
plugin_dir = os.path.join(os.path.dirname(__file__), 'plugins')
return load_plugins(plugin_dir, 'nonebot.plugins')
def get_loaded_plugins() -> Set[Plugin]:
"""
Get all plugins loaded.
:return: a set of Plugin objects
"""
return set(PluginManager._plugins.values())
def on_command(name: Union[str, CommandName_T],
*,
aliases: Union[Iterable[str], str] = (),
permission: int = perm.EVERYBODY,
only_to_me: bool = True,
privileged: bool = False,
shell_like: bool = False) -> Callable:
"""
Decorator to register a function as a command.
:param name: command name (e.g. 'echo' or ('random', 'number'))
:param aliases: aliases of command name, for convenient access
:param permission: permission required by the command
:param only_to_me: only handle messages to me
:param privileged: can be run even when there is already a session
:param shell_like: use shell-like syntax to split arguments
"""
def deco(func: CommandHandler_T) -> CommandHandler_T:
if not isinstance(name, (str, tuple)):
raise TypeError('the name of a command must be a str or tuple')
if not name:
raise ValueError('the name of a command must not be empty')
cmd_name = (name,) if isinstance(name, str) else name
cmd = Command(name=cmd_name,
func=func,
permission=permission,
only_to_me=only_to_me,
privileged=privileged)
if shell_like:
async def shell_like_args_parser(session):
session.args['argv'] = shlex.split(session.current_arg)
cmd.args_parser_func = shell_like_args_parser
CommandManager.add_command(cmd_name, cmd)
CommandManager.add_aliases(aliases, cmd)
_tmp_command.add(cmd)
func.args_parser = cmd.args_parser
return func
return deco
def on_natural_language(
keywords: Union[Optional[Iterable], str, Callable] = None,
*,
permission: int = perm.EVERYBODY,
only_to_me: bool = True,
only_short_message: bool = True,
allow_empty_message: bool = False) -> Callable:
"""
Decorator to register a function as a natural language processor.
:param keywords: keywords to respond to, if None, respond to all messages
:param permission: permission required by the processor
:param only_to_me: only handle messages to me
:param only_short_message: only handle short messages
:param allow_empty_message: handle empty messages
"""
def deco(func: Callable) -> Callable:
nl_processor = NLProcessor(
func=func,
keywords=keywords, # type: ignore
permission=permission,
only_to_me=only_to_me,
only_short_message=only_short_message,
allow_empty_message=allow_empty_message)
NLPManager.add_nl_processor(nl_processor)
_tmp_nl_processor.add(nl_processor)
return func
if isinstance(keywords, Callable):
# here "keywords" is the function to be decorated
return on_natural_language()(keywords)
else:
if isinstance(keywords, str):
keywords = (keywords,)
return deco
def _make_event_deco(post_type: str) -> Callable:
def deco_deco(arg: Optional[Union[str, Callable]] = None,
*events: str) -> Callable:
def deco(func: Callable) -> Callable:
if isinstance(arg, str):
events_tmp = list(
map(lambda x: f"{post_type}.{x}", [arg] + list(events)))
for e in events_tmp:
_bus.subscribe(e, func)
handler = EventHandler(events_tmp, func)
else:
_bus.subscribe(post_type, func)
handler = EventHandler([post_type], func)
_tmp_event_handler.add(handler)
return func
if isinstance(arg, Callable):
return deco(arg) # type: ignore
return deco
return deco_deco
on_notice = _make_event_deco('notice')
on_request = _make_event_deco('request')

View File

@ -1,13 +0,0 @@
from nonebot import on_command, CommandSession, permission as perm
from nonebot.message import unescape
@on_command('echo')
async def echo(session: CommandSession):
await session.send(session.state.get('message') or session.current_arg)
@on_command('say', permission=perm.SUPERUSER)
async def say(session: CommandSession):
await session.send(
unescape(session.state.get('message') or session.current_arg))

55
nonebot/rule.py Normal file
View File

@ -0,0 +1,55 @@
import re
from typing import Union, Callable
class Rule:
def __init__(self, checker: Callable[["Event"], bool]):
self.checker = checker
def __call__(self, event):
return self.checker(event)
def __and__(self, other):
return Rule(lambda event: self.checker(event) and other.checker(event))
def __or__(self, other):
return Rule(lambda event: self.checker(event) or other.checker(event))
def __neg__(self):
return Rule(lambda event: not self.checker(event))
def user(*qq: int) -> Rule:
return Rule(lambda event: event.user_id in qq)
def private() -> Rule:
return Rule(lambda event: event.detail_type == "private")
def group(*group: int) -> Rule:
return Rule(
lambda event: event.detail_type == "group" and event.group_id in group)
def discuss(*discuss: int) -> Rule:
return Rule(lambda event: event.detail_type == "discuss" and event.
discuss_id in discuss)
def startswith(msg, start: int = None, end: int = None) -> Rule:
return Rule(lambda event: event.message.startswith(msg, start, end))
def endswith(msg, start: int = None, end: int = None) -> Rule:
return Rule(lambda event: event.message.endswith(msg, start=None, end=None))
def has(msg: str) -> Rule:
return Rule(lambda event: msg in event.message)
def regex(regex, flags: Union[int, re.RegexFlag] = 0) -> Rule:
pattern = re.compile(regex, flags)
return Rule(lambda event: bool(pattern.search(event.message)))

View File

@ -1,12 +0,0 @@
try:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
except ImportError:
# APScheduler is not installed
AsyncIOScheduler = None
if AsyncIOScheduler:
class Scheduler(AsyncIOScheduler):
pass
else:
Scheduler = None

View File

@ -1,49 +0,0 @@
from aiocqhttp import Event as CQEvent
from . import NoneBot
from .helpers import send
from .typing import Message_T
class BaseSession:
__slots__ = ('bot', 'event')
def __init__(self, bot: NoneBot, event: CQEvent):
self.bot = bot
self.event = event
@property
def ctx(self) -> CQEvent:
return self.event
@ctx.setter
def ctx(self, val: CQEvent) -> None:
self.event = val
@property
def self_id(self) -> int:
return self.event.self_id
async def send(self,
message: Message_T,
*,
at_sender: bool = False,
ensure_private: bool = False,
ignore_failure: bool = True,
**kwargs) -> None:
"""
Send a message ignoring failure by default.
:param message: message to send
:param at_sender: @ the sender if in group or discuss chat
:param ensure_private: ensure the message is sent to private chat
:param ignore_failure: if any CQHttpError raised, ignore it
:return: the result returned by CQHTTP
"""
return await send(self.bot,
self.event,
message,
at_sender=at_sender,
ensure_private=ensure_private,
ignore_failure=ignore_failure,
**kwargs)

View File

@ -1,10 +1,4 @@
from typing import Union, List, Dict, Any, Sequence, Callable, Tuple, Awaitable from typing import Literal, Callable
Context_T = Dict[str, Any] Scope = Literal["PRIVATE", "DISCUSS", "GROUP", "ALL"]
Message_T = Union[str, Dict[str, Any], List[Dict[str, Any]]] Handler = Callable[["Event", dict], None]
Expression_T = Union[str, Sequence[str], Callable]
CommandName_T = Tuple[str, ...]
CommandArgs_T = Dict[str, Any]
CommandHandler_T = Callable[["CommandSession"], Any]
State_T = Dict[str, Any]
Filter_T = Callable[[Any], Union[Any, Awaitable[Any]]]