Add get_loaded_plugins() function

This commit is contained in:
Richard Chien 2019-01-03 19:58:56 +08:00
parent 7dbf216b5c
commit a20996d83a
2 changed files with 85 additions and 60 deletions

View File

@ -1,8 +1,5 @@
import asyncio
import importlib
import logging
import os
import re
from typing import Any, Optional
import aiocqhttp.message
@ -104,66 +101,22 @@ def run(host: Optional[str] = None, port: Optional[int] = None,
get_bot().run(host=host, port=port, *args, **kwargs)
_plugins = set()
def load_plugin(module_name: str) -> bool:
"""
Load a module as a plugin.
:param module_name: name of module to import
:return: successful or not
"""
try:
_plugins.add(importlib.import_module(module_name))
logger.info(f'Succeeded to import "{module_name}"')
return True
except Exception as e:
logger.error(f'Failed to import "{module_name}", error: {e}')
logger.exception(e)
return False
def load_plugins(plugin_dir: str, module_prefix: str) -> int:
"""
Find all non-hidden modules or packages in a given directory,
and import them with the given module prefix.
:param plugin_dir: plugin directory to search
:param module_prefix: module prefix used while importing
:return: number of plugins successfully loaded
"""
count = 0
for name in os.listdir(plugin_dir):
path = os.path.join(plugin_dir, name)
if os.path.isfile(path) and \
(name.startswith('_') or not name.endswith('.py')):
continue
if os.path.isdir(path) and \
(name.startswith('_') or not os.path.exists(
os.path.join(path, '__init__.py'))):
continue
m = re.match(r'([_A-Z0-9a-z]+)(.py)?', name)
if not m:
continue
if load_plugin(f'{module_prefix}.{m.group(1)}'):
count += 1
return count
def load_builtin_plugins() -> int:
"""
Load built-in plugins distributed along with "nonebot" package.
"""
plugin_dir = os.path.join(os.path.dirname(__file__), 'plugins')
return load_plugins(plugin_dir, 'nonebot.plugins')
from .exceptions import *
from .plugin import (load_plugin, load_plugins, load_builtin_plugins,
get_loaded_plugins)
from .message import message_preprocessor, Message, MessageSegment
from .command import on_command, CommandSession, CommandGroup
from .natural_language import on_natural_language, NLPSession, NLPResult
from .notice_request import (on_notice, NoticeSession,
on_request, RequestSession)
__all__ = [
'NoneBot', 'scheduler', 'init', 'get_bot', 'run',
'CQHttpError',
'load_plugin', 'load_plugins', 'load_builtin_plugins',
'get_loaded_plugins',
'message_preprocessor', 'Message', 'MessageSegment',
'on_command', 'CommandSession', 'CommandGroup',
'on_natural_language', 'NLPSession', 'NLPResult',
'on_notice', 'NoticeSession', 'on_request', 'RequestSession',
]

72
nonebot/plugin.py Normal file
View File

@ -0,0 +1,72 @@
import importlib
import os
import re
from collections import namedtuple
from typing import Set
from .log import logger
Plugin = namedtuple('Plugin', ['module', 'name', 'usage'])
_plugins: Set[Plugin] = set()
def load_plugin(module_name: str) -> bool:
"""
Load a module as a plugin.
:param module_name: name of module to import
:return: successful or not
"""
try:
module = importlib.import_module(module_name)
name = getattr(module, '__plugin_name__', None)
usage = getattr(module, '__plugin_usage__', None)
_plugins.add(Plugin(module, name, usage))
logger.info(f'Succeeded to import "{module_name}"')
return True
except Exception as e:
logger.error(f'Failed to import "{module_name}", error: {e}')
logger.exception(e)
return False
def load_plugins(plugin_dir: str, module_prefix: str) -> int:
"""
Find all non-hidden modules or packages in a given directory,
and import them with the given module prefix.
:param plugin_dir: plugin directory to search
:param module_prefix: module prefix used while importing
:return: number of plugins successfully loaded
"""
count = 0
for name in os.listdir(plugin_dir):
path = os.path.join(plugin_dir, name)
if os.path.isfile(path) and \
(name.startswith('_') or not name.endswith('.py')):
continue
if os.path.isdir(path) and \
(name.startswith('_') or not os.path.exists(
os.path.join(path, '__init__.py'))):
continue
m = re.match(r'([_A-Z0-9a-z]+)(.py)?', name)
if not m:
continue
if load_plugin(f'{module_prefix}.{m.group(1)}'):
count += 1
return count
def load_builtin_plugins() -> int:
"""
Load built-in plugins distributed along with "nonebot" package.
"""
plugin_dir = os.path.join(os.path.dirname(__file__), 'plugins')
return load_plugins(plugin_dir, 'nonebot.plugins')
def get_loaded_plugins() -> Set[Plugin]:
return _plugins