🎨 change args into optional

This commit is contained in:
yanyongyu 2020-09-27 18:05:13 +08:00
parent 3dc95b904f
commit 2042097f83
3 changed files with 25 additions and 28 deletions

View File

@ -14,7 +14,7 @@
import asyncio import asyncio
from nonebot.utils import run_sync from nonebot.utils import run_sync
from nonebot.typing import Bot, Event, Union, NoReturn, Callable, Awaitable, PermissionChecker from nonebot.typing import Bot, Event, Union, NoReturn, Optional, Callable, Awaitable, PermissionChecker
class Permission: class Permission:
@ -53,10 +53,13 @@ class Permission:
def __and__(self, other) -> NoReturn: def __and__(self, other) -> NoReturn:
raise RuntimeError("And operation between Permissions is not allowed.") raise RuntimeError("And operation between Permissions is not allowed.")
def __or__(self, other: Union["Permission", def __or__(
PermissionChecker]) -> "Permission": self, other: Optional[Union["Permission",
PermissionChecker]]) -> "Permission":
checkers = self.checkers.copy() checkers = self.checkers.copy()
if isinstance(other, Permission): if other is None:
return self
elif isinstance(other, Permission):
checkers |= other.checkers checkers |= other.checkers
elif asyncio.iscoroutinefunction(other): elif asyncio.iscoroutinefunction(other):
checkers.add(other) # type: ignore checkers.add(other) # type: ignore

View File

@ -28,7 +28,7 @@ class Plugin(object):
def on(rule: Optional[Union[Rule, RuleChecker]] = None, def on(rule: Optional[Union[Rule, RuleChecker]] = None,
permission: Permission = Permission(), permission: Optional[Permission] = None,
*, *,
handlers: Optional[List[Handler]] = None, handlers: Optional[List[Handler]] = None,
temp: bool = False, temp: bool = False,
@ -37,7 +37,7 @@ def on(rule: Optional[Union[Rule, RuleChecker]] = None,
state: Optional[dict] = None) -> Type[Matcher]: state: Optional[dict] = None) -> Type[Matcher]:
matcher = Matcher.new("", matcher = Matcher.new("",
Rule() & rule, Rule() & rule,
permission, permission or Permission(),
temp=temp, temp=temp,
priority=priority, priority=priority,
block=block, block=block,
@ -67,7 +67,7 @@ def on_metaevent(rule: Optional[Union[Rule, RuleChecker]] = None,
def on_message(rule: Optional[Union[Rule, RuleChecker]] = None, def on_message(rule: Optional[Union[Rule, RuleChecker]] = None,
permission: Permission = Permission(), permission: Optional[Permission] = None,
*, *,
handlers: Optional[List[Handler]] = None, handlers: Optional[List[Handler]] = None,
temp: bool = False, temp: bool = False,
@ -76,7 +76,7 @@ def on_message(rule: Optional[Union[Rule, RuleChecker]] = None,
state: Optional[dict] = None) -> Type[Matcher]: state: Optional[dict] = None) -> Type[Matcher]:
matcher = Matcher.new("message", matcher = Matcher.new("message",
Rule() & rule, Rule() & rule,
permission, permission or Permission(),
temp=temp, temp=temp,
priority=priority, priority=priority,
block=block, block=block,
@ -126,26 +126,21 @@ def on_request(rule: Optional[Union[Rule, RuleChecker]] = None,
def on_startswith(msg: str, def on_startswith(msg: str,
rule: Optional[Optional[Union[Rule, RuleChecker]]] = None, rule: Optional[Optional[Union[Rule, RuleChecker]]] = None,
permission: Permission = Permission(),
**kwargs) -> Type[Matcher]: **kwargs) -> Type[Matcher]:
return on_message(startswith(msg) & return on_message(startswith(msg) & rule, **kwargs) if rule else on_message(
rule, permission, **kwargs) if rule else on_message( startswith(msg), **kwargs)
startswith(msg), permission, **kwargs)
def on_endswith(msg: str, def on_endswith(msg: str,
rule: Optional[Optional[Union[Rule, RuleChecker]]] = None, rule: Optional[Optional[Union[Rule, RuleChecker]]] = None,
permission: Permission = Permission(),
**kwargs) -> Type[Matcher]: **kwargs) -> Type[Matcher]:
return on_message(endswith(msg) & return on_message(endswith(msg) & rule, **kwargs) if rule else on_message(
rule, permission, **kwargs) if rule else on_message( startswith(msg), **kwargs)
startswith(msg), permission, **kwargs)
def on_command(cmd: Union[str, Tuple[str, ...]], def on_command(cmd: Union[str, Tuple[str, ...]],
alias: Set[Union[str, Tuple[str, ...]]] = None, alias: Set[Union[str, Tuple[str, ...]]] = None,
rule: Optional[Union[Rule, RuleChecker]] = None, rule: Optional[Union[Rule, RuleChecker]] = None,
permission: Permission = Permission(),
**kwargs) -> Union[Type[Matcher], MatcherGroup]: **kwargs) -> Union[Type[Matcher], MatcherGroup]:
if isinstance(cmd, str): if isinstance(cmd, str):
cmd = (cmd,) cmd = (cmd,)
@ -162,27 +157,24 @@ def on_command(cmd: Union[str, Tuple[str, ...]],
alias = set(map(lambda x: (x,) if isinstance(x, str) else x, alias)) alias = set(map(lambda x: (x,) if isinstance(x, str) else x, alias))
group = MatcherGroup("message", group = MatcherGroup("message",
Rule() & rule, Rule() & rule,
permission,
handlers=handlers, handlers=handlers,
**kwargs) **kwargs)
for cmd_ in [cmd, *alias]: for cmd_ in [cmd, *alias]:
group.new(rule=command(cmd_)) group.new(rule=command(cmd_))
return group return group
else: else:
return on_message( return on_message(command(cmd) & rule, handlers=handlers, **
command(cmd) & rule, permission, handlers=handlers, ** kwargs) if rule else on_message(
kwargs) if rule else on_message( command(cmd), handlers=handlers, **kwargs)
command(cmd), permission, handlers=handlers, **kwargs)
def on_regex(pattern: str, def on_regex(pattern: str,
flags: Union[int, re.RegexFlag] = 0, flags: Union[int, re.RegexFlag] = 0,
rule: Optional[Rule] = None, rule: Optional[Rule] = None,
permission: Permission = Permission(),
**kwargs) -> Type[Matcher]: **kwargs) -> Type[Matcher]:
return on_message(regex(pattern, flags) & return on_message(regex(pattern, flags) &
rule, permission, **kwargs) if rule else on_message( rule, **kwargs) if rule else on_message(
regex(pattern, flags), permission, **kwargs) regex(pattern, flags), **kwargs)
def load_plugin(module_path: str) -> Optional[Plugin]: def load_plugin(module_path: str) -> Optional[Plugin]:

View File

@ -20,7 +20,7 @@ from pygtrie import CharTrie
from nonebot import get_driver from nonebot import get_driver
from nonebot.log import logger from nonebot.log import logger
from nonebot.utils import run_sync from nonebot.utils import run_sync
from nonebot.typing import Bot, Any, Dict, Event, Union, Tuple, NoReturn, Callable, Awaitable, RuleChecker from nonebot.typing import Bot, Any, Dict, Event, Union, Tuple, NoReturn, Optional, Callable, Awaitable, RuleChecker
class Rule: class Rule:
@ -68,9 +68,11 @@ class Rule:
*map(lambda c: c(bot, event, state), self.checkers)) *map(lambda c: c(bot, event, state), self.checkers))
return all(results) return all(results)
def __and__(self, other: Union["Rule", RuleChecker]) -> "Rule": def __and__(self, other: Optional[Union["Rule", RuleChecker]]) -> "Rule":
checkers = self.checkers.copy() checkers = self.checkers.copy()
if isinstance(other, Rule): if other is None:
return self
elif isinstance(other, Rule):
checkers |= other.checkers checkers |= other.checkers
elif asyncio.iscoroutinefunction(other): elif asyncio.iscoroutinefunction(other):
checkers.add(other) # type: ignore checkers.add(other) # type: ignore