🚧 update cqhttp bot

This commit is contained in:
yanyongyu 2020-12-09 17:51:24 +08:00
parent 2bc05b2576
commit 2c97902f4e
4 changed files with 51 additions and 58 deletions

View File

@ -14,13 +14,13 @@ from nonebot.message import handle_event
from nonebot.adapters import Bot as BaseBot
from nonebot.exception import RequestDenied
from .event import Event
from .utils import log
from .event import Reply, CQHTTPEvent, MessageEvent
from .message import Message, MessageSegment
from .exception import NetworkError, ApiNotAvailable, ActionFailed
from .utils import log
if TYPE_CHECKING:
from nonebot.drivers import BaseDriver as Driver, BaseWebSocket as WebSocket
from nonebot.drivers import Driver, WebSocket
def get_auth_bearer(access_token: Optional[str] = None) -> Optional[str]:
@ -32,7 +32,7 @@ def get_auth_bearer(access_token: Optional[str] = None) -> Optional[str]:
return param
async def _check_reply(bot: "Bot", event: "Event"):
async def _check_reply(bot: "Bot", event: "CQHTTPEvent"):
"""
:说明:
@ -41,9 +41,9 @@ async def _check_reply(bot: "Bot", event: "Event"):
:参数:
* ``bot: Bot``: Bot 对象
* ``event: Event``: Event 对象
* ``event: CQHTTPEvent``: CQHTTPEvent 对象
"""
if event.type != "message":
if not isinstance(event, MessageEvent):
return
try:
@ -52,9 +52,10 @@ async def _check_reply(bot: "Bot", event: "Event"):
except ValueError:
return
msg_seg = event.message[index]
event.reply = await bot.get_msg(message_id=msg_seg.data["id"])
event.reply = Reply.parse_obj(await
bot.get_msg(message_id=msg_seg.data["id"]))
# ensure string comparation
if str(event.reply["sender"]["user_id"]) == str(event.self_id):
if str(event.reply.sender.user_id) == str(event.self_id):
event.to_me = True
del event.message[index]
if len(event.message) > index and event.message[index].type == "at":
@ -68,7 +69,7 @@ async def _check_reply(bot: "Bot", event: "Event"):
event.message.append(MessageSegment.text(""))
def _check_at_me(bot: "Bot", event: "Event"):
def _check_at_me(bot: "Bot", event: "CQHTTPEvent"):
"""
:说明:
@ -77,12 +78,12 @@ def _check_at_me(bot: "Bot", event: "Event"):
:参数:
* ``bot: Bot``: Bot 对象
* ``event: Event``: Event 对象
* ``event: CQHTTPEvent``: CQHTTPEvent 对象
"""
if event.type != "message":
if not isinstance(event, MessageEvent):
return
if event.detail_type == "private":
if event.message_type == "private":
event.to_me = True
else:
at_me_seg = MessageSegment.at(event.self_id)
@ -122,7 +123,7 @@ def _check_at_me(bot: "Bot", event: "Event"):
event.message.append(MessageSegment.text(""))
def _check_nickname(bot: "Bot", event: "Event"):
def _check_nickname(bot: "Bot", event: "CQHTTPEvent"):
"""
:说明:
@ -131,9 +132,9 @@ def _check_nickname(bot: "Bot", event: "Event"):
:参数:
* ``bot: Bot``: Bot 对象
* ``event: Event``: Event 对象
* ``event: CQHTTPEvent``: CQHTTPEvent 对象
"""
if event.type != "message":
if not isinstance(event, MessageEvent):
return
first_msg_seg = event.message[0]
@ -286,7 +287,7 @@ class Bot(BaseBot):
"""
:说明:
调用 `_check_reply <#async-check-reply-bot-event>`_, `_check_at_me <#check-at-me-bot-event>`_, `_check_nickname <#check-nickname-bot-event>`_ 处理事件并转换为 `Event <#class-event>`_
调用 `_check_reply <#async-check-reply-bot-event>`_, `_check_at_me <#check-at-me-bot-event>`_, `_check_nickname <#check-nickname-bot-event>`_ 处理事件并转换为 `CQHTTPEvent <#class-event>`_
"""
if not message:
return
@ -296,7 +297,7 @@ class Bot(BaseBot):
return
try:
event = Event(message)
event = CQHTTPEvent.parse_obj(message)
# Check whether user is calling me
await _check_reply(self, event)
@ -379,7 +380,7 @@ class Bot(BaseBot):
@overrides(BaseBot)
async def send(self,
event: Event,
event: CQHTTPEvent,
message: Union[str, Message, MessageSegment],
at_sender: bool = False,
**kwargs) -> Any:
@ -390,7 +391,7 @@ class Bot(BaseBot):
:参数:
* ``event: Event``: Event 对象
* ``event: CQHTTPEvent``: CQHTTPEvent 对象
* ``message: Union[str, Message, MessageSegment]``: 要发送的消息
* ``at_sender: bool``: 是否 @ 事件主体
* ``**kwargs``: 覆盖默认参数

View File

@ -3,7 +3,7 @@ from typing import Any, Dict, List, Union, Optional
from nonebot.config import Config
from nonebot.adapters import Bot as BaseBot
from nonebot.drivers import BaseDriver as Driver, BaseWebSocket as WebSocket
from nonebot.drivers import Driver, WebSocket
from .event import Event
from .message import Message, MessageSegment

View File

@ -210,14 +210,13 @@ from .message import Message
class CQHTTPEvent(Event):
type: Literal["message", "notice", "request", "meta_event"]
time: int
self_id: int
post_type: str
post_type: Literal["message", "notice", "request", "meta_event"]
@overrides(Event)
def get_type(self) -> Literal["message", "notice", "request", "meta_event"]:
return self.type
return self.post_type
@overrides(Event)
def get_event_name(self) -> str:
@ -244,6 +243,18 @@ class Sender(BaseModel):
extra = "allow"
class Reply(BaseModel):
time: int
message_type: str
message_id: int
real_id: int
sender: Sender
message: Message
class Config:
extra = "allow"
class Anonymous(BaseModel):
id: int
name: str
@ -274,7 +285,16 @@ class Status(BaseModel):
# Message Events
class MessageEvent(CQHTTPEvent):
post_type: Literal["message"]
sub_type: str
user_id: int
message_type: str
message_id: int
message: Message
raw_message: str
font: int
sender: Sender
to_me: bool = False
reply: Optional[Reply] = None
@overrides(CQHTTPEvent)
def get_event_name(self) -> str:
@ -285,13 +305,6 @@ class MessageEvent(CQHTTPEvent):
class PrivateMessageEvent(MessageEvent):
message_type: Literal["private"]
sub_type: str
user_id: int
message_id: int
message: Message
raw_message: str
font: int
sender: Sender
@overrides(CQHTTPEvent)
def get_event_description(self) -> str:
@ -304,14 +317,7 @@ class PrivateMessageEvent(MessageEvent):
class GroupMessageEvent(MessageEvent):
message_type: Literal["group"]
sub_type: str
user_id: int
group_id: int
message_id: int
message: Message
raw_message: str
font: int
sender: Sender
anonymous: Anonymous
@overrides(CQHTTPEvent)

View File

@ -13,7 +13,7 @@ from nonebot.log import logger
from nonebot.rule import TrieRule
from nonebot.utils import escape_tag
from nonebot.matcher import matchers, Matcher
from nonebot.exception import IgnoredException, StopPropagation
from nonebot.exception import IgnoredException, StopPropagation, NoLogException
from nonebot.typing import State, EventPreProcessor, RunPreProcessor, EventPostProcessor, RunPostProcessor
if TYPE_CHECKING:
@ -208,24 +208,10 @@ async def handle_event(bot: "Bot", event: "Event"):
asyncio.create_task(handle_event(bot, event))
"""
show_log = True
log_msg = f"<m>{bot.type.upper()} </m>| {event.self_id} [{event.name}]: "
if event.type == "message":
log_msg += f"Message {event.id} from "
log_msg += str(event.user_id)
if event.detail_type == "group":
log_msg += f"@[群:{event.group_id}]:"
log_msg += ' "' + "".join(
map(
lambda x: escape_tag(str(x))
if x.type == "text" else f"<le>{escape_tag(str(x))}</le>",
event.message)) + '"' # type: ignore
elif event.type == "notice":
log_msg += f"Notice {event.raw_event}"
elif event.type == "request":
log_msg += f"Request {event.raw_event}"
elif event.type == "meta_event":
# log_msg += f"MetaEvent {event.detail_type}"
log_msg = f"<m>{bot.type.upper()} {bot.self_id}</m> | "
try:
log_msg += event.get_log_string()
except NoLogException:
show_log = False
if show_log:
logger.opt(colors=True).info(log_msg)
@ -237,8 +223,8 @@ async def handle_event(bot: "Bot", event: "Event"):
logger.debug("Running PreProcessors...")
await asyncio.gather(*coros)
except IgnoredException:
logger.opt(
colors=True).info(f"Event {event.name} is <b>ignored</b>")
logger.opt(colors=True).info(
f"Event {event.get_event_name()} is <b>ignored</b>")
return
except Exception as e:
logger.opt(colors=True, exception=e).error(