nonebot2/none/natural_language.py
2018-12-24 11:59:28 +08:00

151 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import re
from typing import Iterable, Optional, Callable, Union, NamedTuple
from . import NoneBot, permission as perm
from .command import call_command
from .log import logger
from .message import Message
from .session import BaseSession
from .typing import Context_T, CommandName_T, CommandArgs_T
_nl_processors = set()
class NLProcessor:
__slots__ = ('func', 'keywords', 'permission',
'only_to_me', 'only_short_message',
'allow_empty_message')
def __init__(self, *, func: Callable, keywords: Optional[Iterable],
permission: int, only_to_me: bool, only_short_message: bool,
allow_empty_message: bool):
self.func = func
self.keywords = keywords
self.permission = permission
self.only_to_me = only_to_me
self.only_short_message = only_short_message
self.allow_empty_message = allow_empty_message
def on_natural_language(keywords: Union[Optional[Iterable], Callable] = None,
*, permission: int = perm.EVERYBODY,
only_to_me: bool = True,
only_short_message: bool = True,
allow_empty_message: bool = False) -> Callable:
"""
Decorator to register a function as a natural language processor.
:param keywords: keywords to respond to, if None, respond to all messages
:param permission: permission required by the processor
:param only_to_me: only handle messages to me
:param only_short_message: only handle short messages
:param allow_empty_message: handle empty messages
"""
def deco(func: Callable) -> Callable:
nl_processor = NLProcessor(func=func, keywords=keywords,
permission=permission,
only_to_me=only_to_me,
only_short_message=only_short_message,
allow_empty_message=allow_empty_message)
_nl_processors.add(nl_processor)
return func
if isinstance(keywords, Callable):
# here "keywords" is the function to be decorated
return on_natural_language()(keywords)
else:
return deco
class NLPSession(BaseSession):
__slots__ = ('msg', 'msg_text', 'msg_images')
def __init__(self, bot: NoneBot, ctx: Context_T, msg: str):
super().__init__(bot, ctx)
self.msg = msg
tmp_msg = Message(msg)
self.msg_text = tmp_msg.extract_plain_text()
self.msg_images = [s.data['url'] for s in tmp_msg
if s.type == 'image' and 'url' in s.data]
class NLPResult(NamedTuple):
confidence: float
cmd_name: Union[str, CommandName_T]
cmd_args: Optional[CommandArgs_T] = None
async def handle_natural_language(bot: NoneBot, ctx: Context_T) -> bool:
"""
Handle a message as natural language.
This function is typically called by "handle_message".
:param bot: NoneBot instance
:param ctx: message context
:return: the message is handled as natural language
"""
msg = str(ctx['message'])
if bot.config.NICKNAME:
# check if the user is calling me with my nickname
if isinstance(bot.config.NICKNAME, str) or \
not isinstance(bot.config.NICKNAME, Iterable):
nicknames = (bot.config.NICKNAME,)
else:
nicknames = filter(lambda n: n, bot.config.NICKNAME)
nickname_regex = '|'.join(nicknames)
m = re.search(rf'^({nickname_regex})([\s,]|$)', msg, re.IGNORECASE)
if m:
nickname = m.group(1)
logger.debug(f'User is calling me {nickname}')
ctx['to_me'] = True
msg = msg[m.end():]
session = NLPSession(bot, ctx, msg)
# use msg_text here because CQ code "share" may be very long,
# at the same time some plugins may want to handle it
msg_text_length = len(session.msg_text)
coros = []
for p in _nl_processors:
if not p.allow_empty_message and not session.msg:
# don't allow empty msg, but it is one, so skip to next
continue
if p.only_short_message and \
msg_text_length > bot.config.SHORT_MESSAGE_MAX_LENGTH:
continue
if p.only_to_me and not ctx['to_me']:
continue
should_run = await perm.check_permission(bot, ctx, p.permission)
if should_run and p.keywords:
for kw in p.keywords:
if kw in session.msg_text:
break
else:
# no keyword matches
should_run = False
if should_run:
coros.append(p.func(session))
if coros:
# wait for possible results, and sort them by confidence
results = sorted(filter(lambda r: r, await asyncio.gather(*coros)),
key=lambda r: r.confidence, reverse=True)
logger.debug(f'NLP results: {results}')
if results and results[0].confidence >= 60.0:
# choose the result with highest confidence
logger.debug(f'NLP result with highest confidence: {results[0]}')
return await call_command(bot, ctx, results[0].cmd_name,
args=results[0].cmd_args,
check_perm=False)
else:
logger.debug('No NLP result having enough confidence')
return False