💡QQ群话痨排行可视化

This commit is contained in:
ShiXui 2022-09-11 14:00:49 +08:00
parent 89a6c811c5
commit 316c2044f0
3 changed files with 46 additions and 9 deletions

View File

@ -1,5 +1,6 @@
import re import re
import time import time
import asyncio
from typing import List, Tuple, Union from typing import List, Tuple, Union
from datetime import datetime, timedelta from datetime import datetime, timedelta
@ -184,6 +185,7 @@ async def handle_message(
guild_id = event.guild_id guild_id = event.guild_id
msg = await get_guild_message_records(guild_id=str(guild_id),bot=bot) msg = await get_guild_message_records(guild_id=str(guild_id),bot=bot)
msg += plugin_config.dialectlist_string_suffix_format.format(timecost=time.time()-st) await rankings.send(msg)
await rankings.finish(msg) await asyncio.sleep(1) #让图片先发出来
await rankings.finish(plugin_config.dialectlist_string_suffix_format.format(timecost=time.time()-st-1))

View File

@ -9,10 +9,13 @@ class Config(BaseModel, extra=Extra.ignore):
timezone: Optional[str] timezone: Optional[str]
dialectlist_string_format: str = '{index}名:\n{nickname},{chatdatanum}条消息\n' dialectlist_string_format: str = '{index}名:\n{nickname},{chatdatanum}条消息\n'
dialectlist_string_suffix_format: str = '\n你们的职业是水群吗————MYX\n计算花费时间:{timecost}' dialectlist_string_suffix_format: str = '你们的职业是水群吗————MYX\n计算花费时间:{timecost}'
dialectlist_path:str = os.path.dirname(__file__) dialectlist_path:str = os.path.dirname(__file__)
dialectlist_image_path: Path = Path(dialectlist_path)/'image.png'
dialectlist_imageSvg_path: Path = Path(dialectlist_path)/'image.svg'
dialectlist_json_path:Path = Path(dialectlist_path)/'qqguild.json' dialectlist_json_path:Path = Path(dialectlist_path)/'qqguild.json'
dialectlist_get_num:int = 10 dialectlist_get_num:int = 10
dialectlist_visualization:bool = True
global_config = get_driver().config global_config = get_driver().config
plugin_config = Config.parse_obj(global_config) plugin_config = Config.parse_obj(global_config)

View File

@ -1,13 +1,18 @@
import time import time
import pygal
import unicodedata
from datetime import datetime from datetime import datetime
from sqlmodel import select, or_ from sqlmodel import select, or_
from typing_extensions import Literal from typing_extensions import Literal
from typing import Iterable, List, Optional, Dict from typing import Iterable, List, Optional, Dict
from pygal.style import Style
style=Style(font_family="SimHei",)
from nonebot.log import logger from nonebot.log import logger
from nonebot.adapters import Bot from nonebot.adapters import Bot
from nonebot.adapters.onebot.v11 import Message from nonebot.adapters.onebot.v11 import Message,MessageSegment
from nonebot.adapters.onebot.v11.exception import ActionFailed from nonebot.adapters.onebot.v11.exception import ActionFailed
from nonebot_plugin_datastore import create_session from nonebot_plugin_datastore import create_session
@ -16,6 +21,17 @@ from nonebot_plugin_chatrecorder.model import MessageRecord
from .config import plugin_config from .config import plugin_config
def remove_control_characters(string:str) -> str:
"""将字符串中的控制符去除
Args:
string (str): 需要去除的字符串
Returns:
(str): 经过处理的字符串
"""
return "".join(ch for ch in string if unicodedata.category(ch)[0]!="C")
async def get_message_records( async def get_message_records(
user_ids: Optional[Iterable[str]] = None, user_ids: Optional[Iterable[str]] = None,
group_ids: Optional[Iterable[str]] = None, group_ids: Optional[Iterable[str]] = None,
@ -86,7 +102,7 @@ async def msg_counter(
got_num:int=10, got_num:int=10,
)->Message: )->Message:
''' '''
计算出结果并返回可以直接发送的字符串 计算出结果并返回可以直接发送的字符串和图片
''' '''
st = time.time() st = time.time()
@ -111,7 +127,7 @@ async def msg_counter(
try: try:
maxkey = max(lst, key=lst.get) # type: ignore maxkey = max(lst, key=lst.get) # type: ignore
except ValueError: except ValueError:
ranking.append(("null",0)) ranking.append(["null",0])
continue continue
logger.debug('searching number {} from group {}'.format(str(maxkey),str(gid))) logger.debug('searching number {} from group {}'.format(str(maxkey),str(gid)))
@ -125,7 +141,7 @@ async def msg_counter(
no_cache=True no_cache=True
) )
nickname:str = member_info['nickname']if not member_info['card'] else member_info['card'] nickname:str = member_info['nickname']if not member_info['card'] else member_info['card']
ranking.append([nickname.strip(),lst.pop(maxkey)]) ranking.append([remove_control_characters(nickname).strip(),lst.pop(maxkey)])
except ActionFailed as e: except ActionFailed as e:
@ -135,14 +151,30 @@ async def msg_counter(
logger.debug('loaded list:\n{}'.format(ranking)) logger.debug('loaded list:\n{}'.format(ranking))
if plugin_config.dialectlist_visualization:
view = pygal.Pie(inner_radius=0.6,style=style)
view.title = '消息圆环图'
for i in ranking:
view.add(str(i[0]),int(i[1]))
try:
png: bytes = view.render_to_png()# type: ignore
process_msg = Message(MessageSegment.image(png))
except OSError:
logger.error('GTK+(GIMP Toolkit) is not installed, the svg can not be transformed to png')
plugin_config.dialectlist_visualization = False
process_msg = Message('无法发送可视化图片请检查是否安装GTK+详细安装教程可见github\nhttps://github.com/tschoonj/GTK-for-Windows-Runtime-Environment-Installer \n若不想安装这个软件,再次使用指令会转换为发送字符串而不是发送图片')
else:
process_msg = ''
out:str = '' out:str = ''
for i in range(got_num): for i in range(got_num):
index = i+1 index = i+1
nickname,chatdatanum = str(ranking[i]) nickname,chatdatanum = ranking[i]
str_example = plugin_config.dialectlist_string_format.format(index=index,nickname=nickname,chatdatanum=chatdatanum) str_example = plugin_config.dialectlist_string_format.format(index=index,nickname=nickname,chatdatanum=chatdatanum)
out = out + str_example out = out + str_example
logger.debug(out) logger.debug(out)
logger.info('spent {} seconds to count from {} msg'.format(time.time()-st,msg_len)) logger.info('spent {} seconds to count from {} msg'.format(time.time()-st,msg_len))
return Message(out) return Message(out)+process_msg