nonebot2/none/helpers.py

87 lines
2.8 KiB
Python
Raw Normal View History

2018-07-29 12:45:07 +00:00
import hashlib
2018-12-22 04:44:10 +00:00
import random
from typing import Sequence, Callable
2018-06-27 14:05:12 +00:00
2018-12-22 04:44:10 +00:00
from . import NoneBot
2018-10-15 17:03:50 +00:00
from .exceptions import CQHttpError
2018-12-22 04:44:10 +00:00
from .message import escape
2018-10-15 17:03:50 +00:00
from .typing import Context_T, Message_T, Expression_T
2018-06-14 22:58:24 +00:00
2018-10-15 17:03:50 +00:00
def context_id(ctx: Context_T, *,
2018-07-29 12:45:07 +00:00
mode: str = 'default', use_hash: bool = False) -> str:
"""
Calculate a unique id representing the current context.
mode:
default: one id for one context
group: one id for one group or discuss
user: one id for one user
:param ctx: the context dict
:param mode: unique id mode: "default", "group", or "user"
:param use_hash: use md5 to hash the id or not
"""
ctx_id = ''
if mode == 'default':
if ctx.get('group_id'):
ctx_id += f'/group/{ctx["group_id"]}'
elif ctx.get('discuss_id'):
ctx_id += f'/discuss/{ctx["discuss_id"]}'
if ctx.get('user_id'):
ctx_id += f'/user/{ctx["user_id"]}'
elif mode == 'group':
if ctx.get('group_id'):
ctx_id += f'/group/{ctx["group_id"]}'
elif ctx.get('discuss_id'):
ctx_id += f'/discuss/{ctx["discuss_id"]}'
elif mode == 'user':
if ctx.get('user_id'):
ctx_id += f'/user/{ctx["user_id"]}'
if ctx_id and use_hash:
ctx_id = hashlib.md5(ctx_id.encode('ascii')).hexdigest()
return ctx_id
2018-06-27 14:05:12 +00:00
2018-10-15 17:03:50 +00:00
async def send(bot: NoneBot, ctx: Context_T, message: Message_T, *,
ignore_failure: bool = True) -> None:
2018-07-04 11:50:42 +00:00
"""Send a message ignoring failure by default."""
2018-06-27 14:05:12 +00:00
try:
if ctx.get('post_type') == 'message':
await bot.send(ctx, message)
else:
ctx = ctx.copy()
if 'message' in ctx:
del ctx['message']
if 'group_id' in ctx:
await bot.send_group_msg(**ctx, message=message)
elif 'discuss_id' in ctx:
await bot.send_discuss_msg(**ctx, message=message)
elif 'user_id' in ctx:
await bot.send_private_msg(**ctx, message=message)
except CQHttpError:
if not ignore_failure:
raise
2018-12-22 04:44:10 +00:00
def render_expression(expr: Expression_T, *,
escape_args: bool = True, **kwargs) -> str:
"""
Render an expression to message string.
:param expr: expression to render
:param escape_args: should escape arguments or not
:param kwargs: keyword arguments used in str.format()
:return: the rendered message
"""
if isinstance(expr, Callable):
expr = expr(**kwargs)
elif isinstance(expr, Sequence) and not isinstance(expr, str):
expr = random.choice(expr)
if escape_args:
for k, v in kwargs.items():
if isinstance(v, str):
kwargs[k] = escape(v)
return expr.format(**kwargs)