nonebot2/nonebot/rule.py

171 lines
4.8 KiB
Python
Raw Normal View History

2020-06-30 10:13:58 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2020-05-02 20:03:36 +08:00
import re
2020-08-14 17:41:24 +08:00
import asyncio
2020-08-17 16:09:41 +08:00
from itertools import product
2020-05-05 16:11:05 +08:00
2020-08-17 16:09:41 +08:00
from pygtrie import CharTrie
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
from nonebot import get_driver
from nonebot.log import logger
from nonebot.utils import run_sync
from nonebot.typing import Bot, Any, Dict, Event, Union, Tuple, NoReturn, RuleChecker
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
class Rule:
__slots__ = ("checkers",)
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
def __init__(self, *checkers: RuleChecker) -> None:
self.checkers = list(checkers)
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
async def __call__(self, bot: Bot, event: Event, state: dict) -> bool:
results = await asyncio.gather(
*map(lambda c: c(bot, event, state), self.checkers))
return all(results)
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
def __and__(self, other: Union["Rule", RuleChecker]) -> "Rule":
checkers = [*self.checkers]
if isinstance(other, Rule):
checkers.extend(other.checkers)
elif asyncio.iscoroutinefunction(other):
checkers.append(other)
2020-08-14 17:41:24 +08:00
else:
2020-08-17 16:09:41 +08:00
checkers.append(run_sync(other))
return Rule(*checkers)
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
def __or__(self, other) -> NoReturn:
raise RuntimeError("Or operation between rules is not allowed.")
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
class TrieRule:
prefix: CharTrie = CharTrie()
suffix: CharTrie = CharTrie()
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
@classmethod
def add_prefix(cls, prefix: str, value: Any):
if prefix in cls.prefix:
logger.warning(f'Duplicated prefix rule "{prefix}"')
return
cls.prefix[prefix] = value
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
@classmethod
def add_suffix(cls, suffix: str, value: Any):
if suffix[::-1] in cls.suffix:
logger.warning(f'Duplicated suffix rule "{suffix}"')
return
cls.suffix[suffix[::-1]] = value
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
@classmethod
def get_value(cls, bot: Bot, event: Event,
state: dict) -> Tuple[Dict[str, Any], Dict[str, Any]]:
2020-08-24 17:59:36 +08:00
if event.type != "message":
state["_prefix"] = {"raw_command": None, "command": None}
state["_suffix"] = {"raw_command": None, "command": None}
return {
"raw_command": None,
"command": None
}, {
"raw_command": None,
"command": None
}
2020-08-24 17:59:36 +08:00
2020-08-17 16:09:41 +08:00
prefix = None
suffix = None
message = event.message[0]
if message.type == "text":
prefix = cls.prefix.longest_prefix(message.data["text"].lstrip())
message_r = event.message[-1]
if message_r.type == "text":
suffix = cls.suffix.longest_prefix(
message_r.data["text"].rstrip()[::-1])
2020-08-14 17:41:24 +08:00
state["_prefix"] = {
"raw_command": prefix.key,
"command": prefix.value
} if prefix else {
"raw_command": None,
"command": None
}
state["_suffix"] = {
"raw_command": suffix.key,
"command": suffix.value
} if suffix else {
"raw_command": None,
"command": None
}
2020-08-14 17:41:24 +08:00
2020-08-17 16:09:41 +08:00
return ({
"raw_command": prefix.key,
"command": prefix.value
} if prefix else {
"raw_command": None,
"command": None
}, {
"raw_command": suffix.key,
"command": suffix.value
} if suffix else {
"raw_command": None,
"command": None
})
2020-07-25 12:28:30 +08:00
2020-08-17 16:09:41 +08:00
def startswith(msg: str) -> Rule:
TrieRule.add_prefix(msg, (msg,))
2020-07-25 12:28:30 +08:00
2020-08-17 16:09:41 +08:00
async def _startswith(bot: Bot, event: Event, state: dict) -> bool:
return msg in state["_prefix"]
2020-07-25 12:28:30 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_startswith)
2020-07-25 12:28:30 +08:00
2020-08-17 16:09:41 +08:00
def endswith(msg: str) -> Rule:
TrieRule.add_suffix(msg, (msg,))
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
async def _endswith(bot: Bot, event: Event, state: dict) -> bool:
return msg in state["_suffix"]
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_endswith)
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
def keyword(msg: str) -> Rule:
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
async def _keyword(bot: Bot, event: Event, state: dict) -> bool:
return bool(event.plain_text and msg in event.plain_text)
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_keyword)
2020-05-02 20:03:36 +08:00
2020-08-23 20:01:58 +08:00
def command(command: Tuple[str, ...]) -> Rule:
2020-08-17 16:09:41 +08:00
config = get_driver().config
command_start = config.command_start
command_sep = config.command_sep
2020-08-24 17:59:36 +08:00
if len(command) == 1:
for start in command_start:
TrieRule.add_prefix(f"{start}{command[0]}", command)
else:
for start, sep in product(command_start, command_sep):
TrieRule.add_prefix(f"{start}{sep.join(command)}", command)
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
async def _command(bot: Bot, event: Event, state: dict) -> bool:
return command == state["_prefix"]["command"]
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_command)
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
def regex(regex: str, flags: Union[int, re.RegexFlag] = 0) -> Rule:
pattern = re.compile(regex, flags)
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
async def _regex(bot: Bot, event: Event, state: dict) -> bool:
return bool(pattern.search(str(event.message)))
2020-05-02 20:03:36 +08:00
2020-08-17 16:09:41 +08:00
return Rule(_regex)
2020-08-23 10:45:26 +08:00
def to_me() -> Rule:
async def _to_me(bot: Bot, event: Event, state: dict) -> bool:
return bool(event.to_me)
return Rule(_to_me)