nonebot2/none/__init__.py

146 lines
4.1 KiB
Python
Raw Normal View History

2018-07-02 16:54:29 +08:00
import asyncio
2018-06-15 06:58:24 +08:00
import importlib
import logging
2018-07-02 16:54:29 +08:00
import os
2018-06-15 06:58:24 +08:00
import re
2018-07-04 16:21:01 +08:00
from typing import Any, Optional
2018-06-15 06:58:24 +08:00
from aiocqhttp import CQHttp
from aiocqhttp.message import Message
2018-07-04 09:28:31 +08:00
from . import default_config
2018-07-02 16:54:29 +08:00
from .log import logger
2018-06-15 06:58:24 +08:00
2018-07-04 09:28:31 +08:00
class NoneBot(CQHttp):
2018-07-04 16:21:01 +08:00
def __init__(self, config_object: Any = None):
if config_object is None:
config_object = default_config
2018-07-04 09:28:31 +08:00
2018-07-04 16:21:01 +08:00
super_kwargs = {k.lower(): v for k, v in config_object.__dict__.items()
if k.isupper() and not k.startswith('_')}
super().__init__(message_class=Message, **super_kwargs)
2018-07-04 09:28:31 +08:00
2018-07-04 16:21:01 +08:00
self.config = config_object
self.asgi.debug = self.config.DEBUG
2018-06-15 06:58:24 +08:00
2018-07-04 16:21:01 +08:00
from .message import handle_message
from .notice_request import handle_notice_or_request
2018-06-15 06:58:24 +08:00
2018-07-04 16:21:01 +08:00
@self.on_message
async def _(ctx):
asyncio.ensure_future(handle_message(self, ctx))
@self.on_notice
async def _(ctx):
asyncio.ensure_future(handle_notice_or_request(self, ctx))
@self.on_request
async def _(ctx):
asyncio.ensure_future(handle_notice_or_request(self, ctx))
def run(self, host=None, port=None, *args, **kwargs):
super().run(host=host, port=port, loop=asyncio.get_event_loop(),
*args, **kwargs)
def get_data_folder(self,
*sub_folder: str) -> Optional[str]:
folder = self.config.DATA_FOLDER
if not folder:
return None
if sub_folder:
folder = os.path.join(folder, *sub_folder)
if not os.path.isdir(folder):
os.makedirs(folder, 0o755, exist_ok=True)
return folder
def get_data_file(self, path: str, *others: str) -> Optional[str]:
rel_path = os.path.join(path, *others)
parent = self.get_data_folder(os.path.dirname(rel_path))
if not parent:
return None
return os.path.join(parent, os.path.basename(rel_path))
_bot = None
2018-07-04 19:50:42 +08:00
def init(config_object: Any = None) -> None:
"""
Initialize NoneBot instance.
This function must be called at the very beginning of code,
otherwise the get_bot() function will return None and nothing
is gonna work properly.
:param config_object: configuration object
"""
2018-07-04 16:21:01 +08:00
global _bot
_bot = NoneBot(config_object)
if _bot.config.DEBUG:
2018-06-15 06:58:24 +08:00
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
2018-07-04 09:28:31 +08:00
2018-07-04 16:21:01 +08:00
def get_bot() -> NoneBot:
2018-07-04 19:50:42 +08:00
"""
Get the NoneBot instance.
:raise ValueError: instance not initialized
"""
2018-07-04 16:21:01 +08:00
if _bot is None:
raise ValueError('NoneBot instance has not been initialized')
# noinspection PyTypeChecker
return _bot
2018-06-15 06:58:24 +08:00
2018-07-04 16:21:01 +08:00
def run(host: str = None, port: int = None, *args, **kwargs) -> None:
2018-07-04 19:50:42 +08:00
"""Run the NoneBot instance."""
2018-07-04 16:21:01 +08:00
get_bot().run(host=host, port=port, *args, **kwargs)
2018-06-15 06:58:24 +08:00
_plugins = set()
2018-07-04 16:21:01 +08:00
def clear_plugins() -> None:
_plugins.clear()
2018-07-01 20:01:05 +08:00
def load_plugins(plugin_dir: str, module_prefix: str) -> None:
2018-06-26 10:25:11 +08:00
for name in os.listdir(plugin_dir):
path = os.path.join(plugin_dir, name)
2018-06-15 06:58:24 +08:00
if os.path.isfile(path) and \
2018-06-26 10:25:11 +08:00
(name.startswith('_') or not name.endswith('.py')):
2018-06-15 06:58:24 +08:00
continue
if os.path.isdir(path) and \
2018-06-26 10:25:11 +08:00
(name.startswith('_') or not os.path.exists(
2018-06-15 06:58:24 +08:00
os.path.join(path, '__init__.py'))):
continue
2018-06-26 10:25:11 +08:00
m = re.match(r'([_A-Z0-9a-z]+)(.py)?', name)
2018-06-15 06:58:24 +08:00
if not m:
continue
2018-06-26 10:25:11 +08:00
mod_name = f'{module_prefix}.{m.group(1)}'
2018-06-15 06:58:24 +08:00
try:
_plugins.add(importlib.import_module(mod_name))
logger.info('Succeeded to import "{}"'.format(mod_name))
except ImportError:
logger.warning('Failed to import "{}"'.format(mod_name))
2018-06-26 10:25:11 +08:00
2018-07-01 20:01:05 +08:00
def load_builtin_plugins() -> None:
2018-06-26 10:25:11 +08:00
plugin_dir = os.path.join(os.path.dirname(__file__), 'plugins')
load_plugins(plugin_dir, 'none.plugins')
2018-06-27 22:05:12 +08:00
2018-07-01 11:01:24 +08:00
from .command import on_command, CommandSession, CommandGroup
2018-07-01 17:51:01 +08:00
from .natural_language import on_natural_language, NLPSession, NLPResult
2018-06-27 22:50:01 +08:00
from .notice_request import (
on_notice, NoticeSession,
on_request, RequestSession,
)