nonebot2/nonebot/helpers.py

90 lines
2.8 KiB
Python
Raw Normal View History

2018-07-29 20:45:07 +08:00
import hashlib
2018-12-22 12:44:10 +08:00
import random
2019-01-05 21:36:57 +08:00
from typing import Sequence, Callable, Any
2018-06-27 22:05:12 +08:00
from aiocqhttp import Event as CQEvent
2018-12-22 12:44:10 +08:00
from . import NoneBot
2018-10-16 01:03:50 +08:00
from .exceptions import CQHttpError
2018-12-22 12:44:10 +08:00
from .message import escape
from .typing import Message_T, Expression_T
2018-06-15 06:58:24 +08:00
def context_id(event: CQEvent, *,
2018-07-29 20:45:07 +08:00
mode: str = 'default', use_hash: bool = False) -> str:
"""
Calculate a unique id representing the context of the given event.
2018-07-29 20:45:07 +08:00
mode:
default: one id for one context
group: one id for one group or discuss
user: one id for one user
:param event: the event object
2018-07-29 20:45:07 +08:00
:param mode: unique id mode: "default", "group", or "user"
:param use_hash: use md5 to hash the id or not
"""
ctx_id = ''
if mode == 'default':
if event.group_id:
ctx_id = f'/group/{event.group_id}'
elif event.discuss_id:
ctx_id = f'/discuss/{event.discuss_id}'
if event.user_id:
ctx_id += f'/user/{event.user_id}'
2018-07-29 20:45:07 +08:00
elif mode == 'group':
if event.group_id:
ctx_id = f'/group/{event.group_id}'
elif event.discuss_id:
ctx_id = f'/discuss/{event.discuss_id}'
elif event.user_id:
ctx_id = f'/user/{event.user_id}'
2018-07-29 20:45:07 +08:00
elif mode == 'user':
if event.user_id:
ctx_id = f'/user/{event.user_id}'
2018-07-29 20:45:07 +08:00
if ctx_id and use_hash:
ctx_id = hashlib.md5(ctx_id.encode('ascii')).hexdigest()
return ctx_id
2018-06-27 22:05:12 +08:00
async def send(bot: NoneBot, event: CQEvent,
2018-12-23 20:20:40 +08:00
message: Message_T, *,
ensure_private: bool = False,
ignore_failure: bool = True,
2019-01-05 21:36:57 +08:00
**kwargs) -> Any:
2018-07-04 19:50:42 +08:00
"""Send a message ignoring failure by default."""
2018-06-27 22:05:12 +08:00
try:
2018-12-23 20:20:40 +08:00
if ensure_private:
event = event.copy()
event['message_type'] = 'private'
return await bot.send(event, message, **kwargs)
2018-06-27 22:05:12 +08:00
except CQHttpError:
if not ignore_failure:
raise
2019-01-05 21:36:57 +08:00
return None
2018-06-27 22:05:12 +08:00
def render_expression(expr: Expression_T, *args,
2018-12-22 12:44:10 +08:00
escape_args: bool = True, **kwargs) -> str:
"""
Render an expression to message string.
:param expr: expression to render
:param escape_args: should escape arguments or not
:param args: positional arguments used in str.format()
2018-12-22 12:44:10 +08:00
:param kwargs: keyword arguments used in str.format()
:return: the rendered message
"""
if isinstance(expr, Callable):
expr = expr(*args, **kwargs)
2018-12-22 12:44:10 +08:00
elif isinstance(expr, Sequence) and not isinstance(expr, str):
expr = random.choice(expr)
if escape_args:
return expr.format(
*[escape(s) if isinstance(s, str) else s for s in args],
2020-02-14 11:05:38 +08:00
**{k: escape(v) if isinstance(v, str) else v
for k, v in kwargs.items()}
)
return expr.format(*args, **kwargs)