From 06b7ef2a45634e0ca1f7b11c4d549d37bfe6395d Mon Sep 17 00:00:00 2001 From: yanyongyu Date: Sat, 2 May 2020 20:03:36 +0800 Subject: [PATCH] init version 2 --- nonebot/__init__.py | 171 ----- nonebot/argparse.py | 42 -- nonebot/command/__init__.py | 784 ----------------------- nonebot/command/argfilter/__init__.py | 9 - nonebot/command/argfilter/controllers.py | 34 - nonebot/command/argfilter/converters.py | 42 -- nonebot/command/argfilter/extractors.py | 33 - nonebot/command/argfilter/validators.py | 100 --- nonebot/command/group.py | 26 - nonebot/command/group.pyi | 24 - nonebot/default_config.py | 48 -- nonebot/exception.py | 8 + nonebot/exceptions.py | 1 - nonebot/helpers.py | 93 --- nonebot/log.py | 15 - nonebot/matcher.py | 135 ++++ nonebot/message.py | 143 ----- nonebot/natural_language.py | 217 ------- nonebot/notice_request.py | 93 --- nonebot/permission.py | 102 --- nonebot/plugin.py | 460 ------------- nonebot/plugins/__init__.py | 0 nonebot/plugins/base.py | 13 - nonebot/rule.py | 55 ++ nonebot/sched.py | 12 - nonebot/session.py | 49 -- nonebot/typing.py | 12 +- 27 files changed, 201 insertions(+), 2520 deletions(-) delete mode 100644 nonebot/__init__.py delete mode 100644 nonebot/argparse.py delete mode 100644 nonebot/command/__init__.py delete mode 100644 nonebot/command/argfilter/__init__.py delete mode 100644 nonebot/command/argfilter/controllers.py delete mode 100644 nonebot/command/argfilter/converters.py delete mode 100644 nonebot/command/argfilter/extractors.py delete mode 100644 nonebot/command/argfilter/validators.py delete mode 100644 nonebot/command/group.py delete mode 100644 nonebot/command/group.pyi delete mode 100644 nonebot/default_config.py create mode 100644 nonebot/exception.py delete mode 100644 nonebot/exceptions.py delete mode 100644 nonebot/helpers.py delete mode 100644 nonebot/log.py create mode 100644 nonebot/matcher.py delete mode 100644 nonebot/message.py delete mode 100644 nonebot/natural_language.py delete mode 100644 nonebot/notice_request.py delete mode 100644 nonebot/permission.py delete mode 100644 nonebot/plugin.py delete mode 100644 nonebot/plugins/__init__.py delete mode 100644 nonebot/plugins/base.py create mode 100644 nonebot/rule.py delete mode 100644 nonebot/sched.py delete mode 100644 nonebot/session.py diff --git a/nonebot/__init__.py b/nonebot/__init__.py deleted file mode 100644 index 2981a888..00000000 --- a/nonebot/__init__.py +++ /dev/null @@ -1,171 +0,0 @@ -import asyncio -import logging -from typing import Any, Optional, Callable, Awaitable - -import aiocqhttp -from aiocqhttp import CQHttp - -from .log import logger -from .sched import Scheduler - -if Scheduler: - scheduler = Scheduler() -else: - scheduler = None - - -class NoneBot(CQHttp): - - def __init__(self, config_object: Optional[Any] = None): - if config_object is None: - from . import default_config as config_object - - config_dict = { - k: v - for k, v in config_object.__dict__.items() - if k.isupper() and not k.startswith('_') - } - logger.debug(f'Loaded configurations: {config_dict}') - super().__init__(message_class=aiocqhttp.message.Message, - **{k.lower(): v for k, v in config_dict.items()}) - - self.config = config_object - self.asgi.debug = self.config.DEBUG - - from .message import handle_message - from .notice_request import handle_notice_or_request - - @self.on_message - async def _(event: aiocqhttp.Event): - asyncio.create_task(handle_message(self, event)) - - @self.on_notice - async def _(event: aiocqhttp.Event): - asyncio.create_task(handle_notice_or_request(self, event)) - - @self.on_request - async def _(event: aiocqhttp.Event): - asyncio.create_task(handle_notice_or_request(self, event)) - - def run(self, - host: Optional[str] = None, - port: Optional[int] = None, - *args, - **kwargs) -> None: - host = host or self.config.HOST - port = port or self.config.PORT - if 'debug' not in kwargs: - kwargs['debug'] = self.config.DEBUG - - logger.info(f'Running on {host}:{port}') - super().run(host=host, port=port, *args, **kwargs) - - -_bot: Optional[NoneBot] = None - - -def init(config_object: Optional[Any] = None) -> None: - """ - Initialize NoneBot instance. - - This function must be called at the very beginning of code, - otherwise the get_bot() function will return None and nothing - is gonna work properly. - - :param config_object: configuration object - """ - global _bot - _bot = NoneBot(config_object) - - if _bot.config.DEBUG: - logger.setLevel(logging.DEBUG) - else: - logger.setLevel(logging.INFO) - - _bot.server_app.before_serving(_start_scheduler) - - -async def _start_scheduler(): - if scheduler and not scheduler.running: - scheduler.configure(_bot.config.APSCHEDULER_CONFIG) - scheduler.start() - logger.info('Scheduler started') - - -def get_bot() -> NoneBot: - """ - Get the NoneBot instance. - - The result is ensured to be not None, otherwise an exception will - be raised. - - :raise ValueError: instance not initialized - """ - if _bot is None: - raise ValueError('NoneBot instance has not been initialized') - return _bot - - -def run(host: Optional[str] = None, port: Optional[int] = None, *args, - **kwargs) -> None: - """Run the NoneBot instance.""" - get_bot().run(host=host, port=port, *args, **kwargs) - - -def on_startup(func: Callable[[], Awaitable[None]]) \ - -> Callable[[], Awaitable[None]]: - """ - Decorator to register a function as startup callback. - """ - return get_bot().server_app.before_serving(func) - - -def on_websocket_connect(func: Callable[[aiocqhttp.Event], Awaitable[None]]) \ - -> Callable[[], Awaitable[None]]: - """ - Decorator to register a function as websocket connect callback. - - Only work with CQHTTP v4.14+. - """ - return get_bot().on_meta_event('lifecycle.connect')(func) - - -from .exceptions import * -from .command import CommandSession, CommandGroup -from .plugin import (on_command, on_natural_language, on_notice, on_request, - load_plugin, load_plugins, load_builtin_plugins, - get_loaded_plugins) -from .message import message_preprocessor, Message, MessageSegment -from .natural_language import NLPSession, NLPResult, IntentCommand -from .notice_request import NoticeSession, RequestSession -from .helpers import context_id - -__all__ = [ - 'NoneBot', - 'scheduler', - 'init', - 'get_bot', - 'run', - 'on_startup', - 'on_websocket_connect', - 'CQHttpError', - 'load_plugin', - 'load_plugins', - 'load_builtin_plugins', - 'get_loaded_plugins', - 'message_preprocessor', - 'Message', - 'MessageSegment', - 'on_command', - 'CommandSession', - 'CommandGroup', - 'on_natural_language', - 'NLPSession', - 'NLPResult', - 'IntentCommand', - 'on_notice', - 'NoticeSession', - 'on_request', - 'RequestSession', - 'context_id', -] diff --git a/nonebot/argparse.py b/nonebot/argparse.py deleted file mode 100644 index c3abd381..00000000 --- a/nonebot/argparse.py +++ /dev/null @@ -1,42 +0,0 @@ -from argparse import * - -from .command import CommandSession - - -class ParserExit(RuntimeError): - - def __init__(self, status=0, message=None): - self.status = status - self.message = message - - -class ArgumentParser(ArgumentParser): - """ - An ArgumentParser wrapper that avoid printing messages to - standard I/O. - """ - - def __init__(self, *args, **kwargs): - self.session = kwargs.pop('session', None) - super().__init__(*args, **kwargs) - - def _session_finish(self, message): - if self.session and isinstance(self.session, CommandSession): - self.session.finish(message) - - def _print_message(self, message, file=None): - # do nothing - pass - - def exit(self, status=0, message=None): - raise ParserExit(status=status, message=message) - - def parse_args(self, args=None, namespace=None): - try: - return super().parse_args(args=args, namespace=namespace) - except ParserExit as e: - if e.status == 0: - # --help - self._session_finish(self.usage or self.format_help()) - else: - self._session_finish('参数不足或不正确,请使用 --help 参数查询使用帮助') diff --git a/nonebot/command/__init__.py b/nonebot/command/__init__.py deleted file mode 100644 index b067a13f..00000000 --- a/nonebot/command/__init__.py +++ /dev/null @@ -1,784 +0,0 @@ -import re -import shlex -import asyncio -import warnings -from datetime import datetime -from functools import partial, update_wrapper -from typing import (Tuple, Union, Callable, Iterable, Any, Optional, List, Dict, - Awaitable) - -from aiocqhttp import Event as CQEvent -from aiocqhttp.message import Message - -from nonebot import NoneBot, permission as perm -from nonebot.command.argfilter import ValidateError -from nonebot.helpers import context_id, send, render_expression -from nonebot.log import logger -from nonebot.session import BaseSession -from nonebot.typing import (CommandName_T, CommandArgs_T, CommandHandler_T, - Message_T, State_T, Filter_T) - -# key: context id -# value: CommandSession object -_sessions = {} # type: Dict[str, "CommandSession"] - - -class Command: - __slots__ = ('name', 'func', 'permission', 'only_to_me', 'privileged', - 'args_parser_func') - - def __init__(self, *, name: CommandName_T, func: CommandHandler_T, - permission: int, only_to_me: bool, privileged: bool): - self.name = name - self.func = func - self.permission = permission - self.only_to_me = only_to_me - self.privileged = privileged - self.args_parser_func: Optional[CommandHandler_T] = None - - async def run(self, session, *, check_perm: bool = True, - dry: bool = False) -> bool: - """ - Run the command in a given session. - - :param session: CommandSession object - :param check_perm: should check permission before running - :param dry: just check any prerequisite, without actually running - :return: the command is finished (or can be run, given dry == True) - """ - has_perm = await self._check_perm(session) if check_perm else True - if self.func and has_perm: - if dry: - return True - - if session.current_arg_filters is not None and \ - session.current_key is not None: - # argument-level filters are given, use them - arg = session.current_arg - config = session.bot.config - for f in session.current_arg_filters: - try: - res = f(arg) - if isinstance(res, Awaitable): - res = await res - arg = res - except ValidateError as e: - # validation failed - if config.MAX_VALIDATION_FAILURES > 0: - # should check number of validation failures - session.state['__validation_failure_num'] = \ - session.state.get( - '__validation_failure_num', 0) + 1 - - if session.state['__validation_failure_num'] >= \ - config.MAX_VALIDATION_FAILURES: - # noinspection PyProtectedMember - session.finish( - render_expression( - config. - TOO_MANY_VALIDATION_FAILURES_EXPRESSION - ), **session._current_send_kwargs) - - failure_message = e.message - if failure_message is None: - failure_message = render_expression( - config.DEFAULT_VALIDATION_FAILURE_EXPRESSION) - # noinspection PyProtectedMember - session.pause(failure_message, - **session._current_send_kwargs) - - # passed all filters - session.state[session.current_key] = arg - else: - # fallback to command-level args_parser_func - if self.args_parser_func: - await self.args_parser_func(session) - if session.current_key is not None and \ - session.current_key not in session.state: - # args_parser_func didn't set state, here we set it - session.state[session.current_key] = session.current_arg - - await self.func(session) - return True - return False - - async def _check_perm(self, session) -> bool: - """ - Check if the session has sufficient permission to - call the command. - - :param session: CommandSession object - :return: the session has the permission - """ - return await perm.check_permission(session.bot, session.event, - self.permission) - - def args_parser(self, parser_func: CommandHandler_T) -> CommandHandler_T: - """ - Decorator to register a function as the arguments parser of - the corresponding command. - """ - self.args_parser_func = parser_func - return parser_func - - def __repr__(self): - return f'' - - def __str__(self): - return self.__repr__() - - -class CommandManager: - """Global Command Manager""" - _commands = {} # type: Dict[CommandName_T, Command] - _aliases = {} # type: Dict[str, Command] - _switches = {} # type: Dict[CommandName_T, bool] - - def __init__(self): - self.commands = CommandManager._commands.copy() - self.aliases = CommandManager._aliases.copy() - self.switches = CommandManager._switches.copy() - - @classmethod - def add_command(cls, cmd_name: CommandName_T, cmd: Command) -> None: - """Register a command - - Args: - cmd_name (CommandName_T): Command name - cmd (Command): Command object - """ - if cmd_name in cls._commands: - warnings.warn(f"Command {cmd_name} already exists") - return - cls._switches[cmd_name] = True - cls._commands[cmd_name] = cmd - - @classmethod - def reload_command(cls, cmd_name: CommandName_T, cmd: Command) -> None: - """Reload a command - - **Warning! Dangerous function** - - Args: - cmd_name (CommandName_T): Command name - cmd (Command): Command object - """ - if cmd_name not in cls._commands: - warnings.warn( - f"Command {cmd_name} does not exist. Please use add_command instead" - ) - return - cls._commands[cmd_name] = cmd - - @classmethod - def remove_command(cls, cmd_name: CommandName_T) -> bool: - """Remove a command - - **Warning! Dangerous function** - - Args: - cmd_name (CommandName_T): Command name to remove - - Returns: - bool: Success or not - """ - if cmd_name in cls._commands: - cmd = cls._commands[cmd_name] - for alias in list( - filter(lambda x: cls._aliases[x] == cmd, - cls._aliases.keys())): - del cls._aliases[alias] - del cls._commands[cmd_name] - if cmd_name in cls._switches: - del cls._switches[cmd_name] - return True - return False - - @classmethod - def switch_command_global(cls, - cmd_name: CommandName_T, - state: Optional[bool] = None): - """Change command state globally or simply switch it if `state` is None - - Args: - cmd_name (CommandName_T): Command name - state (Optional[bool]): State to change to. Defaults to None. - """ - cls._switches[cmd_name] = not cls._switches[ - cmd_name] if state is None else bool(state) - - @classmethod - def add_aliases(cls, aliases: Union[Iterable[str], str], cmd: Command): - """Register command alias(es) - - Args: - aliases (Union[Iterable[str], str]): Command aliases - cmd_name (Command): Command - """ - if isinstance(aliases, str): - aliases = (aliases,) - for alias in aliases: - if not isinstance(alias, str): - warnings.warn(f"Alias {alias} is not a string! Ignored") - return - elif alias in cls._aliases: - warnings.warn(f"Alias {alias} already exists") - return - cls._aliases[alias] = cmd - - def _add_command_to_tree(self, cmd_name: CommandName_T, cmd: Command, - tree: Dict[str, Union[Dict, Command]]) -> None: - """Add command to the target command tree. - - Args: - cmd_name (CommandName_T): Name of the command - cmd (Command): Command object - tree (Dict[str, Union[Dict, Command]): Target command tree - """ - current_parent = tree - for parent_key in cmd_name[:-1]: - current_parent[parent_key] = current_parent.get(parent_key) or {} - current_parent = current_parent[parent_key] - # TODO: 支持test test.sub子命令 - if not isinstance(current_parent, dict): - warnings.warn(f"{current_parent} is not a registry dict") - return - if cmd_name[-1] in current_parent: - warnings.warn(f"There is already a command named {cmd_name}") - return - current_parent[cmd_name[-1]] = cmd - - def _generate_command_tree(self, commands: Dict[CommandName_T, Command] - ) -> Dict[str, Union[Dict, Command]]: - """Generate command tree from commands dictionary. - - Args: - commands (Dict[CommandName_T, Command]): Dictionary of commands - - Returns: - Dict[str, Union[Dict, "Command"]]: Command tree - """ - cmd_tree = {} #type: Dict[str, Union[Dict, "Command"]] - for cmd_name, cmd in commands.items(): - self._add_command_to_tree(cmd_name, cmd, cmd_tree) - return cmd_tree - - def _find_command(self, - name: Union[str, CommandName_T]) -> Optional[Command]: - cmd_name = (name,) if isinstance(name, str) else name - if not cmd_name: - return None - - cmd_tree = self._generate_command_tree({ - name: cmd - for name, cmd in self.commands.items() - if self.switches.get(name, True) - }) - for part in cmd_name[:-1]: - if part not in cmd_tree or not isinstance( - cmd_tree[part], #type: ignore - dict): - return None - cmd_tree = cmd_tree[part] # type: ignore - - cmd = cmd_tree.get(cmd_name[-1]) # type: ignore - return cmd if isinstance(cmd, Command) else None - - def parse_command(self, bot: NoneBot, cmd_string: str - ) -> Tuple[Optional[Command], Optional[str]]: - logger.debug(f'Parsing command: {repr(cmd_string)}') - - matched_start = None - for start in bot.config.COMMAND_START: - # loop through COMMAND_START to find the longest matched start - curr_matched_start = None - if isinstance(start, type(re.compile(''))): - m = start.search(cmd_string) - if m and m.start(0) == 0: - curr_matched_start = m.group(0) - elif isinstance(start, str): - if cmd_string.startswith(start): - curr_matched_start = start - - if curr_matched_start is not None and \ - (matched_start is None or - len(curr_matched_start) > len(matched_start)): - # a longer start, use it - matched_start = curr_matched_start - - if matched_start is None: - # it's not a command - logger.debug('It\'s not a command') - return None, None - - logger.debug(f'Matched command start: ' - f'{matched_start}{"(empty)" if not matched_start else ""}') - full_command = cmd_string[len(matched_start):].lstrip() - - if not full_command: - # command is empty - return None, None - - cmd_name_text, *cmd_remained = full_command.split(maxsplit=1) - - cmd_name = None - for sep in bot.config.COMMAND_SEP: - # loop through COMMAND_SEP to find the most optimized split - curr_cmd_name = None - if isinstance(sep, type(re.compile(''))): - curr_cmd_name = tuple(sep.split(cmd_name_text)) - elif isinstance(sep, str): - curr_cmd_name = tuple(cmd_name_text.split(sep)) - - if curr_cmd_name is not None and \ - (not cmd_name or len(curr_cmd_name) > len(cmd_name)): - # a more optimized split, use it - cmd_name = curr_cmd_name - - if not cmd_name: - cmd_name = (cmd_name_text,) - logger.debug(f'Split command name: {cmd_name}') - - cmd = self._find_command(cmd_name) # type: ignore - if not cmd: - logger.debug(f'Command {cmd_name} not found. Try to match aliases') - cmd = self.aliases.get(cmd_name_text) - - if not cmd: - return None, None - - logger.debug(f'Command {cmd.name} found, function: {cmd.func}') - return cmd, ''.join(cmd_remained) - - def switch_command(self, - cmd_name: CommandName_T, - state: Optional[bool] = None): - """Change command state or simply switch it if `state` is None - - Args: - cmd_name (CommandName_T): Command name - state (Optional[bool]): State to change to. Defaults to None. - """ - self.switches[cmd_name] = not self.switches[ - cmd_name] if state is None else bool(state) - - -class _PauseException(Exception): - """ - Raised by session.pause() indicating that the command session - should be paused to ask the user for some arguments. - """ - pass - - -class _FinishException(Exception): - """ - Raised by session.finish() indicating that the command session - should be stopped and removed. - """ - - def __init__(self, result: bool = True): - """ - :param result: succeeded to call the command - """ - self.result = result - - -class SwitchException(Exception): - """ - Raised by session.switch() indicating that the command session - should be stopped and replaced with a new one (going through - handle_message() again). - - Since the new message will go through handle_message() again, - the later function should be notified. So this exception is - intended to be propagated to handle_message(). - """ - - def __init__(self, new_message: Message): - """ - :param new_message: new message which should be placed in event - """ - self.new_message = new_message - - -class CommandSession(BaseSession): - __slots__ = ('cmd', 'current_key', 'current_arg_filters', - '_current_send_kwargs', 'current_arg', '_current_arg_text', - '_current_arg_images', '_state', '_last_interaction', - '_running', '_run_future') - - def __init__(self, - bot: NoneBot, - event: CQEvent, - cmd: Command, - *, - current_arg: Optional[str] = '', - args: Optional[CommandArgs_T] = None): - super().__init__(bot, event) - self.cmd = cmd # Command object - - # unique key of the argument that is currently requesting (asking) - self.current_key: Optional[str] = None - - # initialize current argument filters - self.current_arg_filters: Optional[List[Filter_T]] = None - - self._current_send_kwargs: Dict[str, Any] = {} - - # initialize current argument - self.current_arg: Optional[str] = '' # with potential CQ codes - self._current_arg_text = None - self._current_arg_images = None - self.refresh(event, current_arg=current_arg) # fill the above - - self._run_future = partial(asyncio.run_coroutine_threadsafe, - loop=bot.loop) - - self._state: State_T = {} - if args: - self._state.update(args) - - self._last_interaction = None # last interaction time of this session - self._running = False - - @property - def state(self) -> State_T: - """ - State of the session. - - This contains all named arguments and - other session scope temporary values. - """ - return self._state - - @property - def args(self) -> CommandArgs_T: - """Deprecated. Use `session.state` instead.""" - return self.state - - @property - def running(self) -> bool: - return self._running - - @running.setter - def running(self, value) -> None: - if self._running is True and value is False: - # change status from running to not running, record the time - self._last_interaction = datetime.now() - self._running = value - - @property - def is_valid(self) -> bool: - """Check if the session is expired or not.""" - if self.bot.config.SESSION_EXPIRE_TIMEOUT and \ - self._last_interaction and \ - datetime.now() - self._last_interaction > \ - self.bot.config.SESSION_EXPIRE_TIMEOUT: - return False - return True - - @property - def is_first_run(self) -> bool: - return self._last_interaction is None - - @property - def current_arg_text(self) -> str: - """ - Plain text part in the current argument, without any CQ codes. - """ - if self._current_arg_text is None: - self._current_arg_text = Message( - self.current_arg).extract_plain_text() - return self._current_arg_text - - @property - def current_arg_images(self) -> List[str]: - """ - Images (as list of urls) in the current argument. - """ - if self._current_arg_images is None: - self._current_arg_images = [ - s.data['url'] - for s in Message(self.current_arg) - if s.type == 'image' and 'url' in s.data - ] - return self._current_arg_images - - @property - def argv(self) -> List[str]: - """ - Shell-like argument list, similar to sys.argv. - Only available while shell_like is True in on_command decorator. - """ - return self.state.get('argv', []) - - def refresh(self, event: CQEvent, *, - current_arg: Optional[str] = '') -> None: - """ - Refill the session with a new message event. - - :param event: new message event - :param current_arg: new command argument as a string - """ - self.event = event - self.current_arg = current_arg - self._current_arg_text = None - self._current_arg_images = None - - def get(self, - key: str, - *, - prompt: Optional[Message_T] = None, - arg_filters: Optional[List[Filter_T]] = None, - **kwargs) -> Any: - """ - Get an argument with a given key. - - If the argument does not exist in the current session, - a pause exception will be raised, and the caller of - the command will know it should keep the session for - further interaction with the user. - - :param key: argument key - :param prompt: prompt to ask the user - :param arg_filters: argument filters for the next user input - :return: the argument value - """ - if key in self.state: - return self.state[key] - - self.current_key = key - self.current_arg_filters = arg_filters - self._current_send_kwargs = kwargs - self.pause(prompt, **kwargs) - - def get_optional(self, key: str, - default: Optional[Any] = None) -> Optional[Any]: - """ - Simply get a argument with given key. - - Deprecated. Use `session.state.get()` instead. - """ - return self.state.get(key, default) - - def pause(self, message: Optional[Message_T] = None, **kwargs) -> None: - """Pause the session for further interaction.""" - if message: - self._run_future(self.send(message, **kwargs)) - raise _PauseException - - def finish(self, message: Optional[Message_T] = None, **kwargs) -> None: - """Finish the session.""" - if message: - self._run_future(self.send(message, **kwargs)) - raise _FinishException - - def switch(self, new_message: Message_T) -> None: - """ - Finish the session and switch to a new (fake) message event. - - The user may send another command (or another intention as natural - language) when interacting with the current session. In this case, - the session may not understand what the user is saying, so it - should call this method and pass in that message, then NoneBot will - handle the situation properly. - """ - if self.is_first_run: - # if calling this method during first run, - # we think the command is not handled - raise _FinishException(result=False) - - if not isinstance(new_message, Message): - new_message = Message(new_message) - raise SwitchException(new_message) - - -async def handle_command(bot: NoneBot, event: CQEvent, - manager: CommandManager) -> Optional[bool]: - """ - Handle a message as a command. - - This function is typically called by "handle_message". - - :param bot: NoneBot instance - :param event: message event - :param manager: command manager - :return: the message is handled as a command - """ - cmd, current_arg = manager.parse_command(bot, str(event.message).lstrip()) - is_privileged_cmd = cmd and cmd.privileged - if is_privileged_cmd and cmd.only_to_me and not event['to_me']: - is_privileged_cmd = False - disable_interaction = bool(is_privileged_cmd) - - if is_privileged_cmd: - logger.debug(f'Command {cmd.name} is a privileged command') - - ctx_id = context_id(event) - - if not is_privileged_cmd: - # wait for 1.5 seconds (at most) if the current session is running - retry = 5 - while retry > 0 and \ - _sessions.get(ctx_id) and _sessions[ctx_id].running: - retry -= 1 - await asyncio.sleep(0.3) - - check_perm = True - session = _sessions.get(ctx_id) if not is_privileged_cmd else None - if session: - if session.running: - logger.warning(f'There is a session of command ' - f'{session.cmd.name} running, notify the user') - asyncio.ensure_future( - send(bot, event, - render_expression(bot.config.SESSION_RUNNING_EXPRESSION))) - # pretend we are successful, so that NLP won't handle it - return True - - if session.is_valid: - logger.debug(f'Session of command {session.cmd.name} exists') - # since it's in a session, the user must be talking to me - event['to_me'] = True - session.refresh(event, current_arg=str(event['message'])) - # there is no need to check permission for existing session - check_perm = False - else: - # the session is expired, remove it - logger.debug(f'Session of command {session.cmd.name} is expired') - if ctx_id in _sessions: - del _sessions[ctx_id] - session = None - - if not session: - if not cmd: - logger.debug('Not a known command, ignored') - return False - if cmd.only_to_me and not event['to_me']: - logger.debug('Not to me, ignored') - return False - session = CommandSession(bot, event, cmd, current_arg=current_arg) - logger.debug(f'New session of command {session.cmd.name} created') - - return await _real_run_command(session, - ctx_id, - check_perm=check_perm, - disable_interaction=disable_interaction) - - -async def call_command(bot: NoneBot, - event: CQEvent, - name: Union[str, CommandName_T], - *, - current_arg: str = '', - args: Optional[CommandArgs_T] = None, - check_perm: bool = True, - disable_interaction: bool = False) -> Optional[bool]: - """ - Call a command internally. - - This function is typically called by some other commands - or "handle_natural_language" when handling NLPResult object. - - Note: If disable_interaction is not True, after calling this function, - any previous command session will be overridden, even if the command - being called here does not need further interaction (a.k.a asking - the user for more info). - - :param bot: NoneBot instance - :param event: message event - :param name: command name - :param current_arg: command current argument string - :param args: command args - :param check_perm: should check permission before running command - :param disable_interaction: disable the command's further interaction - :return: the command is successfully called - """ - cmd = CommandManager()._find_command(name) - if not cmd: - return False - session = CommandSession(bot, - event, - cmd, - current_arg=current_arg, - args=args) - return await _real_run_command(session, - context_id(session.event), - check_perm=check_perm, - disable_interaction=disable_interaction) - - -async def _real_run_command(session: CommandSession, - ctx_id: str, - disable_interaction: bool = False, - **kwargs) -> Optional[bool]: - if not disable_interaction: - # override session only when interaction is not disabled - _sessions[ctx_id] = session - try: - logger.debug(f'Running command {session.cmd.name}') - session.running = True - future = asyncio.ensure_future(session.cmd.run(session, **kwargs)) - timeout = None - if session.bot.config.SESSION_RUN_TIMEOUT: - timeout = session.bot.config.SESSION_RUN_TIMEOUT.total_seconds() - - try: - await asyncio.wait_for(future, timeout) - handled = future.result() - except asyncio.TimeoutError: - handled = True - except (_PauseException, _FinishException, SwitchException) as e: - raise e - except Exception as e: - logger.error(f'An exception occurred while ' - f'running command {session.cmd.name}:') - logger.exception(e) - handled = True - raise _FinishException(handled) - except _PauseException: - session.running = False - if disable_interaction: - # if the command needs further interaction, we view it as failed - return False - logger.debug(f'Further interaction needed for ' - f'command {session.cmd.name}') - # return True because this step of the session is successful - return True - except (_FinishException, SwitchException) as e: - session.running = False - logger.debug(f'Session of command {session.cmd.name} finished') - if not disable_interaction and ctx_id in _sessions: - # the command is finished, remove the session, - # but if interaction is disabled during this command call, - # we leave the _sessions untouched. - del _sessions[ctx_id] - - if isinstance(e, _FinishException): - return e.result - elif isinstance(e, SwitchException): - # we are guaranteed that the session is not first run here, - # which means interaction is definitely enabled, - # so we can safely touch _sessions here. - if ctx_id in _sessions: - # make sure there is no session waiting - del _sessions[ctx_id] - logger.debug(f'Session of command {session.cmd.name} switching, ' - f'new message: {e.new_message}') - raise e # this is intended to be propagated to handle_message() - - -def kill_current_session(event: CQEvent) -> None: - """ - Force kill current session of the given event context, - despite whether it is running or not. - - :param event: message event - """ - ctx_id = context_id(event) - if ctx_id in _sessions: - del _sessions[ctx_id] - - -from nonebot.command.group import CommandGroup diff --git a/nonebot/command/argfilter/__init__.py b/nonebot/command/argfilter/__init__.py deleted file mode 100644 index 64cbcadd..00000000 --- a/nonebot/command/argfilter/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import Optional - -from nonebot.typing import Message_T - - -class ValidateError(ValueError): - - def __init__(self, message: Optional[Message_T] = None): - self.message = message diff --git a/nonebot/command/argfilter/controllers.py b/nonebot/command/argfilter/controllers.py deleted file mode 100644 index 32708c67..00000000 --- a/nonebot/command/argfilter/controllers.py +++ /dev/null @@ -1,34 +0,0 @@ -import re - -from nonebot import CommandSession -from nonebot.helpers import render_expression - - -def handle_cancellation(session: CommandSession): - """ - If the input is a string of cancellation word, finish the command session. - """ - - def control(value): - if _is_cancellation(value) is True: - session.finish( - render_expression(session.bot.config.SESSION_CANCEL_EXPRESSION)) - return value - - return control - - -def _is_cancellation(sentence: str) -> bool: - for kw in ('算', '别', '不', '停', '取消'): - if kw in sentence: - # a keyword matches - break - else: - # no keyword matches - return False - - if re.match(r'^那?[算别不停]\w{0,3}了?吧?$', sentence) or \ - re.match(r'^那?(?:[给帮]我)?取消了?吧?$', sentence): - return True - - return False diff --git a/nonebot/command/argfilter/converters.py b/nonebot/command/argfilter/converters.py deleted file mode 100644 index 37d9db97..00000000 --- a/nonebot/command/argfilter/converters.py +++ /dev/null @@ -1,42 +0,0 @@ -from typing import Optional, List - - -def _simple_chinese_to_bool(text: str) -> Optional[bool]: - """ - Convert a chinese text to boolean. - - Examples: - - 是的 -> True - 好的呀 -> True - 不要 -> False - 不用了 -> False - 你好呀 -> None - """ - text = text.strip().lower().replace(' ', '') \ - .rstrip(',.!?~,。!?~了的呢吧呀啊呗啦') - if text in { - '要', '用', '是', '好', '对', '嗯', '行', 'ok', 'okay', 'yeah', 'yep', - '当真', '当然', '必须', '可以', '肯定', '没错', '确定', '确认' - }: - return True - if text in { - '不', '不要', '不用', '不是', '否', '不好', '不对', '不行', '别', 'no', 'nono', - 'nonono', 'nope', '不ok', '不可以', '不能', '不可以' - }: - return False - return None - - -def _split_nonempty_lines(text: str) -> List[str]: - return list(filter(lambda x: x, text.splitlines())) - - -def _split_nonempty_stripped_lines(text: str) -> List[str]: - return list(filter(lambda x: x, map(lambda x: x.strip(), - text.splitlines()))) - - -simple_chinese_to_bool = _simple_chinese_to_bool -split_nonempty_lines = _split_nonempty_lines -split_nonempty_stripped_lines = _split_nonempty_stripped_lines diff --git a/nonebot/command/argfilter/extractors.py b/nonebot/command/argfilter/extractors.py deleted file mode 100644 index 8c8b325a..00000000 --- a/nonebot/command/argfilter/extractors.py +++ /dev/null @@ -1,33 +0,0 @@ -import re -from typing import List - -from aiocqhttp.message import Message - -from nonebot.typing import Message_T - - -def _extract_text(arg: Message_T) -> str: - """Extract all plain text segments from a message-like object.""" - arg_as_msg = Message(arg) - return arg_as_msg.extract_plain_text() - - -def _extract_image_urls(arg: Message_T) -> List[str]: - """Extract all image urls from a message-like object.""" - arg_as_msg = Message(arg) - return [ - s.data['url'] - for s in arg_as_msg - if s.type == 'image' and 'url' in s.data - ] - - -def _extract_numbers(arg: Message_T) -> List[float]: - """Extract all numbers (integers and floats) from a message-like object.""" - s = str(arg) - return list(map(float, re.findall(r'[+-]?(\d*\.?\d+|\d+\.?\d*)', s))) - - -extract_text = _extract_text -extract_image_urls = _extract_image_urls -extract_numbers = _extract_numbers diff --git a/nonebot/command/argfilter/validators.py b/nonebot/command/argfilter/validators.py deleted file mode 100644 index 463783c1..00000000 --- a/nonebot/command/argfilter/validators.py +++ /dev/null @@ -1,100 +0,0 @@ -import re -from typing import Callable, Any - -from nonebot.typing import Filter_T -from nonebot.command.argfilter import ValidateError - - -class BaseValidator: - - def __init__(self, message=None): - self.message = message - - def raise_failure(self): - raise ValidateError(self.message) - - -def _raise_failure(message): - raise ValidateError(message) - - -def not_empty(message=None) -> Filter_T: - """ - Validate any object to ensure it's not empty (is None or has no elements). - """ - - def validate(value): - if value is None: - _raise_failure(message) - if hasattr(value, '__len__') and value.__len__() == 0: - _raise_failure(message) - return value - - return validate - - -def fit_size(min_length: int = 0, max_length: int = None, - message=None) -> Filter_T: - """ - Validate any sized object to ensure the size/length - is in a given range [min_length, max_length]. - """ - - def validate(value): - length = len(value) if value is not None else 0 - if length < min_length or \ - (max_length is not None and length > max_length): - _raise_failure(message) - return value - - return validate - - -def match_regex(pattern: str, message=None, *, flags=0, - fullmatch: bool = False) -> Filter_T: - """ - Validate any string object to ensure it matches a given pattern. - """ - - pattern = re.compile(pattern, flags) - - def validate(value): - if fullmatch: - if not re.fullmatch(pattern, value): - _raise_failure(message) - else: - if not re.match(pattern, value): - _raise_failure(message) - return value - - return validate - - -def ensure_true(bool_func: Callable[[Any], bool], message=None) -> Filter_T: - """ - Validate any object to ensure the result of applying - a boolean function to it is True. - """ - - def validate(value): - if bool_func(value) is not True: - _raise_failure(message) - return value - - return validate - - -def between_inclusive(start=None, end=None, message=None) -> Filter_T: - """ - Validate any comparable object to ensure it's between - `start` and `end` inclusively. - """ - - def validate(value): - if start is not None and value < start: - _raise_failure(message) - if end is not None and end < value: - _raise_failure(message) - return value - - return validate diff --git a/nonebot/command/group.py b/nonebot/command/group.py deleted file mode 100644 index 77e5d7bb..00000000 --- a/nonebot/command/group.py +++ /dev/null @@ -1,26 +0,0 @@ -from typing import Union, Callable - -from nonebot.plugin import on_command -from nonebot.typing import CommandName_T - - -class CommandGroup: - """ - Group a set of commands with same name prefix. - """ - - __slots__ = ('basename', 'base_kwargs') - - def __init__(self, name: Union[str, CommandName_T], **kwargs): - self.basename = (name,) if isinstance(name, str) else name - if 'aliases' in kwargs: - del kwargs['aliases'] # ensure there is no aliases here - self.base_kwargs = kwargs - - def command(self, name: Union[str, CommandName_T], **kwargs) -> Callable: - sub_name = (name,) if isinstance(name, str) else name - name = self.basename + sub_name - - final_kwargs = self.base_kwargs.copy() - final_kwargs.update(kwargs) - return on_command(name, **final_kwargs) diff --git a/nonebot/command/group.pyi b/nonebot/command/group.pyi deleted file mode 100644 index 68eee38a..00000000 --- a/nonebot/command/group.pyi +++ /dev/null @@ -1,24 +0,0 @@ -from typing import Union, Callable, Iterable - -from nonebot.typing import CommandName_T - - -class CommandGroup: - """ - Group a set of commands with same name prefix. - """ - - __slots__ = ('basename', 'base_kwargs') - - def __init__(self, name: Union[str, CommandName_T], *, - permission: int = ..., - only_to_me: bool = ..., - privileged: bool = ..., - shell_like: bool = ...): ... - - def command(self, name: Union[str, CommandName_T], *, - aliases: Union[Iterable[str], str] = ..., - permission: int = ..., - only_to_me: bool = ..., - privileged: bool = ..., - shell_like: bool = ...) -> Callable: ... diff --git a/nonebot/default_config.py b/nonebot/default_config.py deleted file mode 100644 index b68ea738..00000000 --- a/nonebot/default_config.py +++ /dev/null @@ -1,48 +0,0 @@ -""" -Default configurations. - -Any derived configurations must import everything from this module -at the very beginning of their code, and then set their own value -to override the default one. - -For example: - ->>> from nonebot.default_config import * ->>> PORT = 9090 ->>> DEBUG = False ->>> SUPERUSERS.add(123456) ->>> NICKNAME = '小明' -""" - -from datetime import timedelta -from typing import Container, Union, Iterable, Pattern, Optional, Dict, Any - -from .typing import Expression_T - -API_ROOT: str = '' -ACCESS_TOKEN: str = '' -SECRET: str = '' -HOST: str = '127.0.0.1' -PORT: int = 8080 -DEBUG: bool = True - -SUPERUSERS: Container[int] = set() -NICKNAME: Union[str, Iterable[str]] = '' - -COMMAND_START: Iterable[Union[str, Pattern]] = {'/', '!', '/', '!'} -COMMAND_SEP: Iterable[Union[str, Pattern]] = {'/', '.'} - -SESSION_EXPIRE_TIMEOUT: Optional[timedelta] = timedelta(minutes=5) -SESSION_RUN_TIMEOUT: Optional[timedelta] = None -SESSION_RUNNING_EXPRESSION: Expression_T = '您有命令正在执行,请稍后再试' - -SHORT_MESSAGE_MAX_LENGTH: int = 50 - -DEFAULT_VALIDATION_FAILURE_EXPRESSION: Expression_T = '您的输入不符合要求,请重新输入' -MAX_VALIDATION_FAILURES: int = 3 -TOO_MANY_VALIDATION_FAILURES_EXPRESSION: Expression_T = \ - '您输入错误太多次啦,如需重试,请重新触发本功能' - -SESSION_CANCEL_EXPRESSION: Expression_T = '好的' - -APSCHEDULER_CONFIG: Dict[str, Any] = {'apscheduler.timezone': 'Asia/Shanghai'} diff --git a/nonebot/exception.py b/nonebot/exception.py new file mode 100644 index 00000000..aa4342c0 --- /dev/null +++ b/nonebot/exception.py @@ -0,0 +1,8 @@ +class BlockedException(Exception): + """Block a message from further handling""" + pass + + +class RejectedException(Exception): + """Reject a message and return current handler back""" + pass diff --git a/nonebot/exceptions.py b/nonebot/exceptions.py deleted file mode 100644 index cc67c95d..00000000 --- a/nonebot/exceptions.py +++ /dev/null @@ -1 +0,0 @@ -from aiocqhttp import Error as CQHttpError diff --git a/nonebot/helpers.py b/nonebot/helpers.py deleted file mode 100644 index 6d4ae344..00000000 --- a/nonebot/helpers.py +++ /dev/null @@ -1,93 +0,0 @@ -import hashlib -import random -from typing import Sequence, Callable, Any - -from aiocqhttp.message import escape -from aiocqhttp import Event as CQEvent - -from . import NoneBot -from .exceptions import CQHttpError -from .typing import Message_T, Expression_T - - -def context_id(event: CQEvent, *, mode: str = 'default', - use_hash: bool = False) -> str: - """ - Calculate a unique id representing the context of the given event. - - mode: - default: one id for one context - group: one id for one group or discuss - user: one id for one user - - :param event: the event object - :param mode: unique id mode: "default", "group", or "user" - :param use_hash: use md5 to hash the id or not - """ - ctx_id = '' - if mode == 'default': - if event.group_id: - ctx_id = f'/group/{event.group_id}' - elif event.discuss_id: - ctx_id = f'/discuss/{event.discuss_id}' - if event.user_id: - ctx_id += f'/user/{event.user_id}' - elif mode == 'group': - if event.group_id: - ctx_id = f'/group/{event.group_id}' - elif event.discuss_id: - ctx_id = f'/discuss/{event.discuss_id}' - elif event.user_id: - ctx_id = f'/user/{event.user_id}' - elif mode == 'user': - if event.user_id: - ctx_id = f'/user/{event.user_id}' - - if ctx_id and use_hash: - ctx_id = hashlib.md5(ctx_id.encode('ascii')).hexdigest() - return ctx_id - - -async def send(bot: NoneBot, - event: CQEvent, - message: Message_T, - *, - ensure_private: bool = False, - ignore_failure: bool = True, - **kwargs) -> Any: - """Send a message ignoring failure by default.""" - try: - if ensure_private: - event = event.copy() - event['message_type'] = 'private' - return await bot.send(event, message, **kwargs) - except CQHttpError: - if not ignore_failure: - raise - return None - - -def render_expression(expr: Expression_T, - *args, - escape_args: bool = True, - **kwargs) -> str: - """ - Render an expression to message string. - - :param expr: expression to render - :param escape_args: should escape arguments or not - :param args: positional arguments used in str.format() - :param kwargs: keyword arguments used in str.format() - :return: the rendered message - """ - if isinstance(expr, Callable): - expr = expr(*args, **kwargs) - elif isinstance(expr, Sequence) and not isinstance(expr, str): - expr = random.choice(expr) - if escape_args: - return expr.format( - *[escape(s) if isinstance(s, str) else s for s in args], **{ - k: escape(v) if isinstance(v, str) else v - for k, v in kwargs.items() - }) - return expr.format(*args, **kwargs) diff --git a/nonebot/log.py b/nonebot/log.py deleted file mode 100644 index 669539c8..00000000 --- a/nonebot/log.py +++ /dev/null @@ -1,15 +0,0 @@ -""" -Provide logger object. - -Any other modules in "nonebot" should use "logger" from this module -to log messages. -""" - -import logging -import sys - -logger = logging.getLogger('nonebot') -default_handler = logging.StreamHandler(sys.stdout) -default_handler.setFormatter( - logging.Formatter('[%(asctime)s %(name)s] %(levelname)s: %(message)s')) -logger.addHandler(default_handler) diff --git a/nonebot/matcher.py b/nonebot/matcher.py new file mode 100644 index 00000000..ed003259 --- /dev/null +++ b/nonebot/matcher.py @@ -0,0 +1,135 @@ +import re +import copy +from functools import wraps +from typing import Union, Optional + +from .rule import Rule, startswith, regex, user +from .typing import Scope, Handler +from .exception import BlockedException, RejectedException + + +class Matcher: + + def __init__(self, + rule: Rule, + scope: Scope = "ALL", + permission: str = "ALL", + block: bool = True, + *, + handlers: list = [], + state: dict = {}, + temp: bool = False): + self.rule = rule + self.scope = scope + self.permission = permission + self.block = block + self.handlers = handlers + self.state = state + self.temp = temp + + def _default_parser(event: "Event", state: dict): + state[state.pop("_current_arg")] = event.message + + self._args_parser = _default_parser + + def __call__(self, func: Handler) -> Handler: + self.handlers.append(func) + + # TODO: export some functions + func.args_parser = self.args_parser + func.receive = self.receive + func.got = self.got + + return func + + def args_parser(self, func): + self._args_parser = func + return func + + def receive(self): + + def _decorator(func: Handler) -> Handler: + + @wraps(func) + def _handler(event: "Event", state: dict): + # TODO: add tmp matcher to matcher tree + matcher = Matcher(user(event.user_id) & self.rule, + scope=self.scope, + permission=self.permission, + block=self.block, + handlers=self.handlers, + state=state, + temp=True) + matcher.args_parser(self._args_parser) + raise BlockedException + + self.handlers.append(_handler) + + return func + + return _decorator + + def got(self, key, args_parser=None): + + def _decorator(func: Handler) -> Handler: + + @wraps(func) + def _handler(event: "Event", state: dict): + if key not in state: + state["_current_arg"] = key + + # TODO: add tmp matcher to matcher tree + matcher = copy.copy(self) + raise RejectedException + return func(event, state) + + self.handlers.append(_handler) + + return func + + return _decorator + + def finish(self): + # BlockedException用于阻止后续handler继续执行 + raise BlockedException + + def reject(self): + # RejectedException用于阻止后续handler继续执行并将当前handler放回队列 + raise RejectedException + + +def on_message(rule: Rule, + scope="ALL", + permission="ALL", + block=True, + *, + handlers=[], + state={}, + temp=False) -> Matcher: + # TODO: add matcher to matcher tree + return Matcher(rule, + scope, + permission, + block, + handlers=handlers, + state=state, + temp=temp) + + +def on_startswith(msg, + start: int = None, + end: int = None, + rule: Optional[Rule] = None, + **kwargs) -> Matcher: + return on_message(startswith(msg, start, end) & + rule, **kwargs) if rule else on_message( + startswith(msg, start, end), **kwargs) + + +def on_regex(pattern, + flags: Union[int, re.RegexFlag] = 0, + rule: Optional[Rule] = None, + **kwargs) -> Matcher: + return on_message(regex(pattern, flags) & + rule, **kwargs) if rule else on_message( + regex(pattern, flags), **kwargs) diff --git a/nonebot/message.py b/nonebot/message.py deleted file mode 100644 index dd7aaae8..00000000 --- a/nonebot/message.py +++ /dev/null @@ -1,143 +0,0 @@ -import re -import asyncio -from typing import Callable, Iterable - -from aiocqhttp import Event as CQEvent -from aiocqhttp.message import escape, unescape, Message, MessageSegment - -from . import NoneBot -from .log import logger -from .natural_language import handle_natural_language -from .command import handle_command, SwitchException -from .plugin import PluginManager - -_message_preprocessors = set() - - -def message_preprocessor(func: Callable) -> Callable: - _message_preprocessors.add(func) - return func - - -class CanceledException(Exception): - """ - Raised by message_preprocessor indicating that - the bot should ignore the message - """ - - def __init__(self, reason): - """ - :param reason: reason to ignore the message - """ - self.reason = reason - - -async def handle_message(bot: NoneBot, event: CQEvent) -> None: - _log_message(event) - - assert isinstance(event.message, Message) - if not event.message: - event.message.append(MessageSegment.text('')) # type: ignore - - raw_to_me = event.get('to_me', False) - _check_at_me(bot, event) - _check_calling_me_nickname(bot, event) - event['to_me'] = raw_to_me or event['to_me'] - - coros = [] - plugin_manager = PluginManager() - for preprocessor in _message_preprocessors: - coros.append(preprocessor(bot, event, plugin_manager)) - if coros: - try: - await asyncio.gather(*coros) - except CanceledException: - logger.info(f'Message {event["message_id"]} is ignored') - return - - while True: - try: - handled = await handle_command(bot, event, - plugin_manager.cmd_manager) - break - except SwitchException as e: - # we are sure that there is no session existing now - event['message'] = e.new_message - event['to_me'] = True - if handled: - logger.info(f'Message {event.message_id} is handled as a command') - return - - handled = await handle_natural_language(bot, event, - plugin_manager.nlp_manager) - if handled: - logger.info(f'Message {event.message_id} is handled ' - f'as natural language') - return - - -def _check_at_me(bot: NoneBot, event: CQEvent) -> None: - if event.detail_type == 'private': - event['to_me'] = True - else: - # group or discuss - event['to_me'] = False - at_me_seg = MessageSegment.at(event.self_id) - - # check the first segment - first_msg_seg = event.message[0] - if first_msg_seg == at_me_seg: - event['to_me'] = True - del event.message[0] - - if not event['to_me']: - # check the last segment - i = -1 - last_msg_seg = event.message[i] - if last_msg_seg.type == 'text' and \ - not last_msg_seg.data['text'].strip() and \ - len(event.message) >= 2: - i -= 1 - last_msg_seg = event.message[i] - - if last_msg_seg == at_me_seg: - event['to_me'] = True - del event.message[i:] - - if not event.message: - event.message.append(MessageSegment.text('')) - - -def _check_calling_me_nickname(bot: NoneBot, event: CQEvent) -> None: - first_msg_seg = event.message[0] - if first_msg_seg.type != 'text': - return - - first_text = first_msg_seg.data['text'] - - if bot.config.NICKNAME: - # check if the user is calling me with my nickname - if isinstance(bot.config.NICKNAME, str) or \ - not isinstance(bot.config.NICKNAME, Iterable): - nicknames = (bot.config.NICKNAME,) - else: - nicknames = filter(lambda n: n, bot.config.NICKNAME) - nickname_regex = '|'.join(nicknames) - m = re.search(rf'^({nickname_regex})([\s,,]*|$)', first_text, - re.IGNORECASE) - if m: - nickname = m.group(1) - logger.debug(f'User is calling me {nickname}') - event['to_me'] = True - first_msg_seg.data['text'] = first_text[m.end():] - - -def _log_message(event: CQEvent) -> None: - msg_from = str(event.user_id) - if event.detail_type == 'group': - msg_from += f'@[群:{event.group_id}]' - elif event.detail_type == 'discuss': - msg_from += f'@[讨论组:{event.discuss_id}]' - logger.info(f'Self: {event.self_id}, ' - f'Message {event.message_id} from {msg_from}: ' - f'{repr(str(event.message))}') diff --git a/nonebot/natural_language.py b/nonebot/natural_language.py deleted file mode 100644 index 554acd16..00000000 --- a/nonebot/natural_language.py +++ /dev/null @@ -1,217 +0,0 @@ -import asyncio -import warnings -from functools import update_wrapper -from typing import Set, Iterable, Optional, Callable, Union, NamedTuple - -from aiocqhttp import Event as CQEvent -from aiocqhttp.message import Message - -from .log import logger -from . import NoneBot, permission as perm -from .command import call_command -from .session import BaseSession -from .typing import CommandName_T, CommandArgs_T - - -class NLProcessor: - __slots__ = ('func', 'keywords', 'permission', 'only_to_me', - 'only_short_message', 'allow_empty_message') - - def __init__(self, *, func: Callable, keywords: Optional[Iterable], - permission: int, only_to_me: bool, only_short_message: bool, - allow_empty_message: bool): - self.func = func - self.keywords = keywords - self.permission = permission - self.only_to_me = only_to_me - self.only_short_message = only_short_message - self.allow_empty_message = allow_empty_message - - -class NLPManager: - _nl_processors: Set[NLProcessor] = set() - - def __init__(self): - self.nl_processors = NLPManager._nl_processors.copy() - - @classmethod - def add_nl_processor(cls, processor: NLProcessor) -> None: - """Register a natural language processor - - Args: - processor (NLProcessor): Processor object - """ - if processor in cls._nl_processors: - warnings.warn(f"NLProcessor {processor} already exists") - return - cls._nl_processors.add(processor) - - @classmethod - def remove_nl_processor(cls, processor: NLProcessor) -> bool: - """Remove a natural language processor globally - - Args: - processor (NLProcessor): Processor to remove - - Returns: - bool: Success or not - """ - if processor in cls._nl_processors: - cls._nl_processors.remove(processor) - return True - return False - - @classmethod - def switch_nlprocessor_global(cls, - processor: NLProcessor, - state: Optional[bool] = None - ) -> Optional[bool]: - """Remove or add a natural language processor globally - - Args: - processor (NLProcessor): Processor object - - Returns: - bool: True if removed, False if added - """ - if processor in cls._nl_processors and not state: - cls._nl_processors.remove(processor) - return True - elif processor not in cls._nl_processors and state != False: - cls._nl_processors.add(processor) - return False - - def switch_nlprocessor(self, - processor: NLProcessor, - state: Optional[bool] = None) -> Optional[bool]: - """Remove or add a natural language processor - - Args: - processor (NLProcessor): Processor to remove - - Returns: - bool: True if removed, False if added - """ - if processor in self.nl_processors and not state: - self.nl_processors.remove(processor) - return True - elif processor not in self.nl_processors and state != False: - self.nl_processors.add(processor) - return False - - -class NLPSession(BaseSession): - __slots__ = ('msg', 'msg_text', 'msg_images') - - def __init__(self, bot: NoneBot, event: CQEvent, msg: str): - super().__init__(bot, event) - self.msg = msg - tmp_msg = Message(msg) - self.msg_text = tmp_msg.extract_plain_text() - self.msg_images = [ - s.data['url'] - for s in tmp_msg - if s.type == 'image' and 'url' in s.data - ] - - -class NLPResult(NamedTuple): - """ - Deprecated. - Use class IntentCommand instead. - """ - confidence: float - cmd_name: Union[str, CommandName_T] - cmd_args: Optional[CommandArgs_T] = None - - def to_intent_command(self): - return IntentCommand(confidence=self.confidence, - name=self.cmd_name, - args=self.cmd_args) - - -class IntentCommand(NamedTuple): - """ - To represent a command that we think the user may be intended to call. - """ - confidence: float - name: Union[str, CommandName_T] - args: Optional[CommandArgs_T] = None - current_arg: str = '' - - -async def handle_natural_language(bot: NoneBot, event: CQEvent, - manager: NLPManager) -> bool: - """ - Handle a message as natural language. - - This function is typically called by "handle_message". - - :param bot: NoneBot instance - :param event: message event - :param manager: natural language processor manager - :return: the message is handled as natural language - """ - session = NLPSession(bot, event, str(event.message)) - - # use msg_text here because CQ code "share" may be very long, - # at the same time some plugins may want to handle it - msg_text_length = len(session.msg_text) - - futures = [] - for p in manager.nl_processors: - if not p.allow_empty_message and not session.msg: - # don't allow empty msg, but it is one, so skip to next - continue - - if p.only_short_message and \ - msg_text_length > bot.config.SHORT_MESSAGE_MAX_LENGTH: - continue - - if p.only_to_me and not event['to_me']: - continue - - should_run = await perm.check_permission(bot, event, p.permission) - if should_run and p.keywords: - for kw in p.keywords: - if kw in session.msg_text: - break - else: - # no keyword matches - should_run = False - - if should_run: - futures.append(asyncio.ensure_future(p.func(session))) - - if futures: - # wait for intent commands, and sort them by confidence - intent_commands = [] - for fut in futures: - try: - res = await fut - if isinstance(res, NLPResult): - intent_commands.append(res.to_intent_command()) - elif isinstance(res, IntentCommand): - intent_commands.append(res) - except Exception as e: - logger.error('An exception occurred while running ' - 'some natural language processor:') - logger.exception(e) - - intent_commands.sort(key=lambda ic: ic.confidence, reverse=True) - logger.debug(f'Intent commands: {intent_commands}') - - if intent_commands and intent_commands[0].confidence >= 60.0: - # choose the intent command with highest confidence - chosen_cmd = intent_commands[0] - logger.debug( - f'Intent command with highest confidence: {chosen_cmd}') - return await call_command(bot, - event, - chosen_cmd.name, - args=chosen_cmd.args, - current_arg=chosen_cmd.current_arg, - check_perm=False) # type: ignore - else: - logger.debug('No intent command has enough confidence') - return False diff --git a/nonebot/notice_request.py b/nonebot/notice_request.py deleted file mode 100644 index f455d6da..00000000 --- a/nonebot/notice_request.py +++ /dev/null @@ -1,93 +0,0 @@ -from functools import update_wrapper -from typing import List, Optional, Callable, Union - -from aiocqhttp import Event as CQEvent -from aiocqhttp.bus import EventBus - -from . import NoneBot -from .log import logger -from .exceptions import CQHttpError -from .session import BaseSession - -_bus = EventBus() - - -class EventHandler: - __slots__ = ('events', 'func') - - def __init__(self, events: List[str], func: Callable): - self.events = events - self.func = func - - -class NoticeSession(BaseSession): - __slots__ = () - - def __init__(self, bot: NoneBot, event: CQEvent): - super().__init__(bot, event) - - -class RequestSession(BaseSession): - __slots__ = () - - def __init__(self, bot: NoneBot, event: CQEvent): - super().__init__(bot, event) - - async def approve(self, remark: str = '') -> None: - """ - Approve the request. - - :param remark: remark of friend (only works in friend request) - """ - try: - await self.bot.call_action(action='.handle_quick_operation_async', - self_id=self.event.self_id, - context=self.event, - operation={ - 'approve': True, - 'remark': remark - }) - except CQHttpError: - pass - - async def reject(self, reason: str = '') -> None: - """ - Reject the request. - - :param reason: reason to reject (only works in group request) - """ - try: - await self.bot.call_action(action='.handle_quick_operation_async', - self_id=self.event.self_id, - context=self.event, - operation={ - 'approve': False, - 'reason': reason - }) - except CQHttpError: - pass - - -async def handle_notice_or_request(bot: NoneBot, event: CQEvent) -> None: - if event.type == 'notice': - _log_notice(event) - session = NoticeSession(bot, event) - else: # must be 'request' - _log_request(event) - session = RequestSession(bot, event) - - ev_name = event.name - logger.debug(f'Emitting event: {ev_name}') - try: - await _bus.emit(ev_name, session) - except Exception as e: - logger.error(f'An exception occurred while handling event {ev_name}:') - logger.exception(e) - - -def _log_notice(event: CQEvent) -> None: - logger.info(f'Notice: {event}') - - -def _log_request(event: CQEvent) -> None: - logger.info(f'Request: {event}') diff --git a/nonebot/permission.py b/nonebot/permission.py deleted file mode 100644 index 5976d932..00000000 --- a/nonebot/permission.py +++ /dev/null @@ -1,102 +0,0 @@ -from collections import namedtuple - -from aiocache import cached -from aiocqhttp import Event as CQEvent - -from . import NoneBot -from .exceptions import CQHttpError - -PRIVATE_FRIEND = 0x0001 -PRIVATE_GROUP = 0x0002 -PRIVATE_DISCUSS = 0x0004 -PRIVATE_OTHER = 0x0008 -PRIVATE = 0x000F -DISCUSS = 0x00F0 -GROUP_MEMBER = 0x0100 -GROUP_ADMIN = 0x0200 -GROUP_OWNER = 0x0400 -GROUP = 0x0F00 -SUPERUSER = 0xF000 -EVERYBODY = 0xFFFF - -IS_NOBODY = 0x0000 -IS_PRIVATE_FRIEND = PRIVATE_FRIEND -IS_PRIVATE_GROUP = PRIVATE_GROUP -IS_PRIVATE_DISCUSS = PRIVATE_DISCUSS -IS_PRIVATE_OTHER = PRIVATE_OTHER -IS_PRIVATE = PRIVATE -IS_DISCUSS = DISCUSS -IS_GROUP_MEMBER = GROUP_MEMBER -IS_GROUP_ADMIN = GROUP_MEMBER | GROUP_ADMIN -IS_GROUP_OWNER = GROUP_ADMIN | GROUP_OWNER -IS_GROUP = GROUP -IS_SUPERUSER = 0xFFFF - -_min_event_fields = ( - 'self_id', - 'message_type', - 'sub_type', - 'user_id', - 'discuss_id', - 'group_id', - 'anonymous', -) - -_MinEvent = namedtuple('MinEvent', _min_event_fields) - - -async def check_permission(bot: NoneBot, event: CQEvent, - permission_required: int) -> bool: - """ - Check if the event context has the permission required. - - :param bot: NoneBot instance - :param event: message event - :param permission_required: permission required - :return: the context has the permission - """ - min_event_kwargs = {} - for field in _min_event_fields: - if field in event: - min_event_kwargs[field] = event[field] - else: - min_event_kwargs[field] = None - min_event = _MinEvent(**min_event_kwargs) - return await _check(bot, min_event, permission_required) - - -@cached(ttl=2 * 60) # cache the result for 2 minute -async def _check(bot: NoneBot, min_event: _MinEvent, - permission_required: int) -> bool: - permission = 0 - if min_event.user_id in bot.config.SUPERUSERS: - permission |= IS_SUPERUSER - if min_event.message_type == 'private': - if min_event.sub_type == 'friend': - permission |= IS_PRIVATE_FRIEND - elif min_event.sub_type == 'group': - permission |= IS_PRIVATE_GROUP - elif min_event.sub_type == 'discuss': - permission |= IS_PRIVATE_DISCUSS - elif min_event.sub_type == 'other': - permission |= IS_PRIVATE_OTHER - elif min_event.message_type == 'group': - permission |= IS_GROUP_MEMBER - if not min_event.anonymous: - try: - member_info = await bot.get_group_member_info( - self_id=min_event.self_id, - group_id=min_event.group_id, - user_id=min_event.user_id, - no_cache=True) - if member_info: - if member_info['role'] == 'owner': - permission |= IS_GROUP_OWNER - elif member_info['role'] == 'admin': - permission |= IS_GROUP_ADMIN - except CQHttpError: - pass - elif min_event.message_type == 'discuss': - permission |= IS_DISCUSS - - return bool(permission & permission_required) diff --git a/nonebot/plugin.py b/nonebot/plugin.py deleted file mode 100644 index 9659d848..00000000 --- a/nonebot/plugin.py +++ /dev/null @@ -1,460 +0,0 @@ -import os -import re -import sys -import shlex -import warnings -import importlib -from types import ModuleType -from typing import Any, Set, Dict, Union, Optional, Iterable, Callable - -from .log import logger -from nonebot import permission as perm -from .command import Command, CommandManager -from .notice_request import _bus, EventHandler -from .natural_language import NLProcessor, NLPManager -from .typing import CommandName_T, CommandHandler_T - -_tmp_command: Set[Command] = set() -_tmp_nl_processor: Set[NLProcessor] = set() -_tmp_event_handler: Set[EventHandler] = set() - - -class Plugin: - __slots__ = ('module', 'name', 'usage', 'commands', 'nl_processors', - 'event_handlers') - - def __init__(self, - module: ModuleType, - name: Optional[str] = None, - usage: Optional[Any] = None, - commands: Set[Command] = set(), - nl_processors: Set[NLProcessor] = set(), - event_handlers: Set[EventHandler] = set()): - self.module = module - self.name = name - self.usage = usage - self.commands = commands - self.nl_processors = nl_processors - self.event_handlers = event_handlers - - -class PluginManager: - _plugins: Dict[str, Plugin] = {} - - def __init__(self): - self.cmd_manager = CommandManager() - self.nlp_manager = NLPManager() - - @classmethod - def add_plugin(cls, module_path: str, plugin: Plugin) -> None: - """Register a plugin - - Args: - module_path (str): module path - plugin (Plugin): Plugin object - """ - if module_path in cls._plugins: - warnings.warn(f"Plugin {module_path} already exists") - return - cls._plugins[module_path] = plugin - - @classmethod - def get_plugin(cls, module_path: str) -> Optional[Plugin]: - """Get plugin object by plugin module path - - Args: - module_path (str): Plugin module path - - Returns: - Optional[Plugin]: Plugin object - """ - return cls._plugins.get(module_path, None) - - @classmethod - def remove_plugin(cls, module_path: str) -> bool: - """Remove a plugin by plugin module path - - ** Warning: This function not remove plugin actually! ** - ** Just remove command, nlprocessor and event handlers ** - - Args: - module_path (str): Plugin module path - - Returns: - bool: Success or not - """ - plugin = cls.get_plugin(module_path) - if not plugin: - warnings.warn(f"Plugin {module_path} not exists") - return False - for command in plugin.commands: - CommandManager.remove_command(command.name) - for nl_processor in plugin.nl_processors: - NLPManager.remove_nl_processor(nl_processor) - for event_handler in plugin.event_handlers: - for event in event_handler.events: - _bus.unsubscribe(event, event_handler.func) - del cls._plugins[module_path] - return True - - @classmethod - def switch_plugin_global(cls, - module_path: str, - state: Optional[bool] = None) -> None: - """Change plugin state globally or simply switch it if `state` is None - - Args: - module_path (str): Plugin module path - state (Optional[bool]): State to change to. Defaults to None. - """ - plugin = cls.get_plugin(module_path) - if not plugin: - warnings.warn(f"Plugin {module_path} not found") - return - for command in plugin.commands: - CommandManager.switch_command_global(command.name, state) - for nl_processor in plugin.nl_processors: - NLPManager.switch_nlprocessor_global(nl_processor, state) - for event_handler in plugin.event_handlers: - for event in event_handler.events: - if event_handler.func in _bus._subscribers[event] and not state: - _bus.unsubscribe(event, event_handler.func) - elif event_handler.func not in _bus._subscribers[ - event] and state != False: - _bus.subscribe(event, event_handler.func) - - @classmethod - def switch_command_global(cls, - module_path: str, - state: Optional[bool] = None) -> None: - """Change plugin command state globally or simply switch it if `state` is None - - Args: - module_path (str): Plugin module path - state (Optional[bool]): State to change to. Defaults to None. - """ - plugin = cls.get_plugin(module_path) - if not plugin: - warnings.warn(f"Plugin {module_path} not found") - return - for command in plugin.commands: - CommandManager.switch_command_global(command.name, state) - - @classmethod - def switch_nlprocessor_global(cls, - module_path: str, - state: Optional[bool] = None) -> None: - """Change plugin nlprocessor state globally or simply switch it if `state` is None - - Args: - module_path (str): Plugin module path - state (Optional[bool]): State to change to. Defaults to None. - """ - plugin = cls.get_plugin(module_path) - if not plugin: - warnings.warn(f"Plugin {module_path} not found") - return - for processor in plugin.nl_processors: - NLPManager.switch_nlprocessor_global(processor, state) - - @classmethod - def switch_eventhandler_global(cls, - module_path: str, - state: Optional[bool] = None) -> None: - """Change plugin event handler state globally or simply switch it if `state` is None - - Args: - module_path (str): Plugin module path - state (Optional[bool]): State to change to. Defaults to None. - """ - plugin = cls.get_plugin(module_path) - if not plugin: - warnings.warn(f"Plugin {module_path} not found") - return - for event_handler in plugin.event_handlers: - for event in event_handler.events: - if event_handler.func in _bus._subscribers[event] and not state: - _bus.unsubscribe(event, event_handler.func) - elif event_handler.func not in _bus._subscribers[ - event] and state != False: - _bus.subscribe(event, event_handler.func) - - def switch_plugin(self, module_path: str, - state: Optional[bool] = None) -> None: - """Change plugin state or simply switch it if `state` is None - - Tips: - This method will only change the state of the plugin's - commands and natural language processors since change - state of the event handler for message is meaningless. - - Args: - module_path (str): Plugin module path - state (Optional[bool]): State to change to. Defaults to None. - """ - plugin = self.get_plugin(module_path) - if not plugin: - warnings.warn(f"Plugin {module_path} not found") - return - for command in plugin.commands: - self.cmd_manager.switch_command(command.name, state) - for nl_processor in plugin.nl_processors: - self.nlp_manager.switch_nlprocessor(nl_processor, state) - - def switch_command(self, module_path: str, - state: Optional[bool] = None) -> None: - """Change plugin command state or simply switch it if `state` is None - - Args: - module_path (str): Plugin module path - state (Optional[bool]): State to change to. Defaults to None. - """ - plugin = self.get_plugin(module_path) - if not plugin: - warnings.warn(f"Plugin {module_path} not found") - return - for command in plugin.commands: - self.cmd_manager.switch_command(command.name, state) - - def switch_nlprocessor(self, module_path: str, - state: Optional[bool] = None) -> None: - """Change plugin nlprocessor state or simply switch it if `state` is None - - Args: - module_path (str): Plugin module path - state (Optional[bool]): State to change to. Defaults to None. - """ - plugin = self.get_plugin(module_path) - if not plugin: - warnings.warn(f"Plugin {module_path} not found") - return - for processor in plugin.nl_processors: - self.nlp_manager.switch_nlprocessor(processor, state) - - -def load_plugin(module_path: str) -> Optional[Plugin]: - """Load a module as a plugin - - Args: - module_path (str): path of module to import - - Returns: - Optional[Plugin]: Plugin object loaded - """ - # Make sure tmp is clean - _tmp_command.clear() - _tmp_nl_processor.clear() - _tmp_event_handler.clear() - try: - module = importlib.import_module(module_path) - name = getattr(module, '__plugin_name__', None) - usage = getattr(module, '__plugin_usage__', None) - commands = _tmp_command.copy() - nl_processors = _tmp_nl_processor.copy() - event_handlers = _tmp_event_handler.copy() - plugin = Plugin(module, name, usage, commands, nl_processors, - event_handlers) - PluginManager.add_plugin(module_path, plugin) - logger.info(f'Succeeded to import "{module_path}"') - return plugin - except Exception as e: - logger.error(f'Failed to import "{module_path}", error: {e}') - logger.exception(e) - return None - - -def reload_plugin(module_path: str) -> Optional[Plugin]: - result = PluginManager.remove_plugin(module_path) - if not result: - return None - - for module in list( - filter(lambda x: x.startswith(module_path), sys.modules.keys())): - del sys.modules[module] - - _tmp_command.clear() - _tmp_nl_processor.clear() - _tmp_event_handler.clear() - try: - module = importlib.import_module(module_path) - name = getattr(module, '__plugin_name__', None) - usage = getattr(module, '__plugin_usage__', None) - commands = _tmp_command.copy() - nl_processors = _tmp_nl_processor.copy() - event_handlers = _tmp_event_handler.copy() - plugin = Plugin(module, name, usage, commands, nl_processors, - event_handlers) - PluginManager.add_plugin(module_path, plugin) - logger.info(f'Succeeded to reload "{module_path}"') - return plugin - except Exception as e: - logger.error(f'Failed to reload "{module_path}", error: {e}') - logger.exception(e) - return None - - -def load_plugins(plugin_dir: str, module_prefix: str) -> Set[Plugin]: - """Find all non-hidden modules or packages in a given directory, - and import them with the given module prefix. - - Args: - plugin_dir (str): Plugin directory to search - module_prefix (str): Module prefix used while importing - - Returns: - Set[Plugin]: Set of plugin objects successfully loaded - """ - - count = set() - for name in os.listdir(plugin_dir): - path = os.path.join(plugin_dir, name) - if os.path.isfile(path) and \ - (name.startswith('_') or not name.endswith('.py')): - continue - if os.path.isdir(path) and \ - (name.startswith('_') or not os.path.exists( - os.path.join(path, '__init__.py'))): - continue - - m = re.match(r'([_A-Z0-9a-z]+)(.py)?', name) - if not m: - continue - - result = load_plugin(f'{module_prefix}.{m.group(1)}') - if result: - count.add(result) - return count - - -def load_builtin_plugins() -> Set[Plugin]: - """ - Load built-in plugins distributed along with "nonebot" package. - """ - plugin_dir = os.path.join(os.path.dirname(__file__), 'plugins') - return load_plugins(plugin_dir, 'nonebot.plugins') - - -def get_loaded_plugins() -> Set[Plugin]: - """ - Get all plugins loaded. - - :return: a set of Plugin objects - """ - return set(PluginManager._plugins.values()) - - -def on_command(name: Union[str, CommandName_T], - *, - aliases: Union[Iterable[str], str] = (), - permission: int = perm.EVERYBODY, - only_to_me: bool = True, - privileged: bool = False, - shell_like: bool = False) -> Callable: - """ - Decorator to register a function as a command. - - :param name: command name (e.g. 'echo' or ('random', 'number')) - :param aliases: aliases of command name, for convenient access - :param permission: permission required by the command - :param only_to_me: only handle messages to me - :param privileged: can be run even when there is already a session - :param shell_like: use shell-like syntax to split arguments - """ - - def deco(func: CommandHandler_T) -> CommandHandler_T: - if not isinstance(name, (str, tuple)): - raise TypeError('the name of a command must be a str or tuple') - if not name: - raise ValueError('the name of a command must not be empty') - - cmd_name = (name,) if isinstance(name, str) else name - - cmd = Command(name=cmd_name, - func=func, - permission=permission, - only_to_me=only_to_me, - privileged=privileged) - - if shell_like: - - async def shell_like_args_parser(session): - session.args['argv'] = shlex.split(session.current_arg) - - cmd.args_parser_func = shell_like_args_parser - - CommandManager.add_command(cmd_name, cmd) - CommandManager.add_aliases(aliases, cmd) - - _tmp_command.add(cmd) - func.args_parser = cmd.args_parser - - return func - - return deco - - -def on_natural_language( - keywords: Union[Optional[Iterable], str, Callable] = None, - *, - permission: int = perm.EVERYBODY, - only_to_me: bool = True, - only_short_message: bool = True, - allow_empty_message: bool = False) -> Callable: - """ - Decorator to register a function as a natural language processor. - - :param keywords: keywords to respond to, if None, respond to all messages - :param permission: permission required by the processor - :param only_to_me: only handle messages to me - :param only_short_message: only handle short messages - :param allow_empty_message: handle empty messages - """ - - def deco(func: Callable) -> Callable: - nl_processor = NLProcessor( - func=func, - keywords=keywords, # type: ignore - permission=permission, - only_to_me=only_to_me, - only_short_message=only_short_message, - allow_empty_message=allow_empty_message) - NLPManager.add_nl_processor(nl_processor) - _tmp_nl_processor.add(nl_processor) - return func - - if isinstance(keywords, Callable): - # here "keywords" is the function to be decorated - return on_natural_language()(keywords) - else: - if isinstance(keywords, str): - keywords = (keywords,) - return deco - - -def _make_event_deco(post_type: str) -> Callable: - - def deco_deco(arg: Optional[Union[str, Callable]] = None, - *events: str) -> Callable: - - def deco(func: Callable) -> Callable: - if isinstance(arg, str): - events_tmp = list( - map(lambda x: f"{post_type}.{x}", [arg] + list(events))) - for e in events_tmp: - _bus.subscribe(e, func) - handler = EventHandler(events_tmp, func) - else: - _bus.subscribe(post_type, func) - handler = EventHandler([post_type], func) - _tmp_event_handler.add(handler) - return func - - if isinstance(arg, Callable): - return deco(arg) # type: ignore - return deco - - return deco_deco - - -on_notice = _make_event_deco('notice') -on_request = _make_event_deco('request') diff --git a/nonebot/plugins/__init__.py b/nonebot/plugins/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/nonebot/plugins/base.py b/nonebot/plugins/base.py deleted file mode 100644 index 96cb5d2b..00000000 --- a/nonebot/plugins/base.py +++ /dev/null @@ -1,13 +0,0 @@ -from nonebot import on_command, CommandSession, permission as perm -from nonebot.message import unescape - - -@on_command('echo') -async def echo(session: CommandSession): - await session.send(session.state.get('message') or session.current_arg) - - -@on_command('say', permission=perm.SUPERUSER) -async def say(session: CommandSession): - await session.send( - unescape(session.state.get('message') or session.current_arg)) diff --git a/nonebot/rule.py b/nonebot/rule.py new file mode 100644 index 00000000..aa0361b8 --- /dev/null +++ b/nonebot/rule.py @@ -0,0 +1,55 @@ +import re +from typing import Union, Callable + + +class Rule: + + def __init__(self, checker: Callable[["Event"], bool]): + self.checker = checker + + def __call__(self, event): + return self.checker(event) + + def __and__(self, other): + return Rule(lambda event: self.checker(event) and other.checker(event)) + + def __or__(self, other): + return Rule(lambda event: self.checker(event) or other.checker(event)) + + def __neg__(self): + return Rule(lambda event: not self.checker(event)) + + +def user(*qq: int) -> Rule: + return Rule(lambda event: event.user_id in qq) + + +def private() -> Rule: + return Rule(lambda event: event.detail_type == "private") + + +def group(*group: int) -> Rule: + return Rule( + lambda event: event.detail_type == "group" and event.group_id in group) + + +def discuss(*discuss: int) -> Rule: + return Rule(lambda event: event.detail_type == "discuss" and event. + discuss_id in discuss) + + +def startswith(msg, start: int = None, end: int = None) -> Rule: + return Rule(lambda event: event.message.startswith(msg, start, end)) + + +def endswith(msg, start: int = None, end: int = None) -> Rule: + return Rule(lambda event: event.message.endswith(msg, start=None, end=None)) + + +def has(msg: str) -> Rule: + return Rule(lambda event: msg in event.message) + + +def regex(regex, flags: Union[int, re.RegexFlag] = 0) -> Rule: + pattern = re.compile(regex, flags) + return Rule(lambda event: bool(pattern.search(event.message))) diff --git a/nonebot/sched.py b/nonebot/sched.py deleted file mode 100644 index 70710cc4..00000000 --- a/nonebot/sched.py +++ /dev/null @@ -1,12 +0,0 @@ -try: - from apscheduler.schedulers.asyncio import AsyncIOScheduler -except ImportError: - # APScheduler is not installed - AsyncIOScheduler = None - -if AsyncIOScheduler: - - class Scheduler(AsyncIOScheduler): - pass -else: - Scheduler = None diff --git a/nonebot/session.py b/nonebot/session.py deleted file mode 100644 index f5b17b8d..00000000 --- a/nonebot/session.py +++ /dev/null @@ -1,49 +0,0 @@ -from aiocqhttp import Event as CQEvent - -from . import NoneBot -from .helpers import send -from .typing import Message_T - - -class BaseSession: - __slots__ = ('bot', 'event') - - def __init__(self, bot: NoneBot, event: CQEvent): - self.bot = bot - self.event = event - - @property - def ctx(self) -> CQEvent: - return self.event - - @ctx.setter - def ctx(self, val: CQEvent) -> None: - self.event = val - - @property - def self_id(self) -> int: - return self.event.self_id - - async def send(self, - message: Message_T, - *, - at_sender: bool = False, - ensure_private: bool = False, - ignore_failure: bool = True, - **kwargs) -> None: - """ - Send a message ignoring failure by default. - - :param message: message to send - :param at_sender: @ the sender if in group or discuss chat - :param ensure_private: ensure the message is sent to private chat - :param ignore_failure: if any CQHttpError raised, ignore it - :return: the result returned by CQHTTP - """ - return await send(self.bot, - self.event, - message, - at_sender=at_sender, - ensure_private=ensure_private, - ignore_failure=ignore_failure, - **kwargs) diff --git a/nonebot/typing.py b/nonebot/typing.py index d44e08b9..bb11c609 100644 --- a/nonebot/typing.py +++ b/nonebot/typing.py @@ -1,10 +1,4 @@ -from typing import Union, List, Dict, Any, Sequence, Callable, Tuple, Awaitable +from typing import Literal, Callable -Context_T = Dict[str, Any] -Message_T = Union[str, Dict[str, Any], List[Dict[str, Any]]] -Expression_T = Union[str, Sequence[str], Callable] -CommandName_T = Tuple[str, ...] -CommandArgs_T = Dict[str, Any] -CommandHandler_T = Callable[["CommandSession"], Any] -State_T = Dict[str, Any] -Filter_T = Callable[[Any], Union[Any, Awaitable[Any]]] +Scope = Literal["PRIVATE", "DISCUSS", "GROUP", "ALL"] +Handler = Callable[["Event", dict], None]