nonebot2/packages/nonebot-adapter-feishu/nonebot/adapters/feishu/event.py

220 lines
5.1 KiB
Python
Raw Normal View History

2021-07-04 14:19:10 +08:00
import inspect
import json
2021-07-06 21:12:49 +08:00
from typing import Any, List, Literal, Optional, Type
2021-07-04 14:19:10 +08:00
from pygtrie import StringTrie
from pydantic import BaseModel, root_validator, Field
2021-07-03 13:58:26 +08:00
2021-07-01 07:59:50 +08:00
from nonebot.adapters import Event as BaseEvent
2021-07-03 13:58:26 +08:00
from nonebot.typing import overrides
2021-07-01 07:59:50 +08:00
2021-07-06 21:12:49 +08:00
from .message import Message, MessageDeserializer, MessageSerializer
2021-07-01 07:59:50 +08:00
2021-07-04 14:19:10 +08:00
class EventHeader(BaseModel):
event_id: str
event_type: str
create_time: str
token: str
app_id: str
tenant_key: str
2021-07-01 07:59:50 +08:00
class Event(BaseEvent):
2021-07-03 13:58:26 +08:00
"""
飞书协议事件各事件字段参考 `飞书文档`_
.. _飞书事件列表文档:
https://open.feishu.cn/document/ukTMukTMukTM/uYDNxYjL2QTM24iN0EjN/event-list
"""
2021-07-04 14:19:10 +08:00
__event__ = ""
schema_: str = Field("", alias='schema')
header: EventHeader
event: Any
2021-07-01 07:59:50 +08:00
2021-07-03 13:58:26 +08:00
@overrides(BaseEvent)
2021-07-01 07:59:50 +08:00
def get_type(self) -> str:
2021-07-04 14:19:10 +08:00
return self.header.event_type
2021-07-01 07:59:50 +08:00
2021-07-03 13:58:26 +08:00
@overrides(BaseEvent)
2021-07-01 07:59:50 +08:00
def get_event_name(self) -> str:
2021-07-04 14:19:10 +08:00
return self.header.event_type
2021-07-01 07:59:50 +08:00
2021-07-03 13:58:26 +08:00
@overrides(BaseEvent)
2021-07-01 07:59:50 +08:00
def get_event_description(self) -> str:
return str(self.dict())
2021-07-03 13:58:26 +08:00
@overrides(BaseEvent)
2021-07-01 07:59:50 +08:00
def get_message(self) -> Message:
2021-07-03 13:58:26 +08:00
raise ValueError("Event has no message!")
2021-07-01 07:59:50 +08:00
2021-07-03 13:58:26 +08:00
@overrides(BaseEvent)
2021-07-01 07:59:50 +08:00
def get_plaintext(self) -> str:
2021-07-03 13:58:26 +08:00
raise ValueError("Event has no plaintext!")
2021-07-01 07:59:50 +08:00
2021-07-03 13:58:26 +08:00
@overrides(BaseEvent)
2021-07-01 07:59:50 +08:00
def get_user_id(self) -> str:
2021-07-03 13:58:26 +08:00
raise ValueError("Event has no user_id!")
2021-07-01 07:59:50 +08:00
2021-07-03 13:58:26 +08:00
@overrides(BaseEvent)
2021-07-01 07:59:50 +08:00
def get_session_id(self) -> str:
2021-07-03 13:58:26 +08:00
raise ValueError("Event has no session_id!")
2021-07-01 07:59:50 +08:00
2021-07-03 13:58:26 +08:00
@overrides(BaseEvent)
2021-07-01 07:59:50 +08:00
def is_tome(self) -> bool:
return False
2021-07-03 13:58:26 +08:00
class UserId(BaseModel):
union_id: str
user_id: str
open_id: str
class Sender(BaseModel):
sender_id: UserId
sender_type: str
tenant_key: str
class Mention(BaseModel):
key: str
id: UserId
name: str
tenant_key: str
2021-07-04 14:19:10 +08:00
class EventMessage(BaseModel):
2021-07-03 13:58:26 +08:00
message_id: str
2021-07-04 14:19:10 +08:00
root_id: Optional[str]
parent_id: Optional[str]
2021-07-03 13:58:26 +08:00
create_time: str
chat_id: str
chat_type: str
message_type: str
content: Message
2021-07-04 14:19:10 +08:00
mentions: Optional[List[Mention]]
2021-07-03 13:58:26 +08:00
@root_validator(pre=True)
2021-07-04 14:19:10 +08:00
def parse_message(cls, values: dict):
values["content"] = MessageDeserializer(
data=json.loads(values["content"]),
type=values["message_type"]).deserialize()
2021-07-03 13:58:26 +08:00
return values
2021-07-04 14:19:10 +08:00
class GroupEventMessage(EventMessage):
chat_type: Literal["group"]
class PrivateEventMessage(EventMessage):
chat_type: Literal["p2p"]
2021-07-04 14:19:10 +08:00
class MessageEventDetail(BaseModel):
sender: Sender
message: EventMessage
2021-07-03 13:58:26 +08:00
class GroupMessageEventDetail(MessageEventDetail):
message: GroupEventMessage
class PrivateMessageEventDetail(MessageEventDetail):
message: PrivateEventMessage
2021-07-03 13:58:26 +08:00
class MessageEvent(Event):
2021-07-04 14:19:10 +08:00
__event__ = "im.message.receive_v1"
event: MessageEventDetail
2021-07-03 13:58:26 +08:00
@overrides(Event)
def get_type(self) -> Literal["message", "notice", "meta_event"]:
return "message"
@overrides(Event)
def get_event_name(self) -> str:
return f"{self.get_type()}.{self.event.message.chat_type}"
2021-07-03 13:58:26 +08:00
@overrides(Event)
def get_event_description(self) -> str:
#TODO:换成GroupId
2021-07-03 13:58:26 +08:00
return (
f"Message[{super().get_type()}]"
f" {self.event.message.message_id} from {self.get_user_id()}"
f"@[{self.event.message.chat_type}:{self.event.message.chat_id}]"
2021-07-06 21:12:49 +08:00
f" {str(self.get_message()) and MessageSerializer(self.get_message()).serialize()}")
2021-07-03 13:58:26 +08:00
@overrides(Event)
def get_message(self) -> Message:
2021-07-04 14:19:10 +08:00
return self.event.message.content
2021-07-03 13:58:26 +08:00
@overrides(Event)
def get_plaintext(self) -> str:
2021-07-04 14:19:10 +08:00
return str(self.event.message.content)
2021-07-03 13:58:26 +08:00
@overrides(Event)
def get_user_id(self) -> str:
2021-07-06 21:12:49 +08:00
return self.event.sender.sender_id.user_id
2021-07-03 13:58:26 +08:00
@overrides(Event)
def get_session_id(self) -> str:
return f"{self.event.message.chat_type}_{self.event.message.chat_id}_{self.get_user_id()}"
class GroupMessageEvent(MessageEvent):
__event__ = "im.message.receive_v1.group"
event: GroupMessageEventDetail
class PrivateMessageEvent(MessageEvent):
2021-07-06 14:58:38 +08:00
__event__ = "im.message.receive_v1.p2p"
event: PrivateMessageEventDetail
2021-07-03 13:58:26 +08:00
2021-07-04 14:19:10 +08:00
class MessageReader(BaseModel):
reader_id: UserId
read_time: str
tenant_key: str
2021-07-03 13:58:26 +08:00
2021-07-04 14:19:10 +08:00
class MessageReadEventDetail(BaseModel):
reader: MessageReader
message_id_list: List[str]
class MessageReadEvent(Event):
__event__ = "im.message.message_read_v1"
event: MessageReadEventDetail
2021-07-03 13:58:26 +08:00
class NoticeEvent(Event):
...
class MetaEvent(Event):
...
2021-07-04 14:19:10 +08:00
_t = StringTrie(separator=".")
# define `model` first to avoid globals changing while `for`
model = None
for model in globals().values():
if not inspect.isclass(model) or not issubclass(model, Event):
continue
_t["." + model.__event__] = model
def get_event_model(event_name) -> List[Type[Event]]:
"""
:说明:
根据事件名获取对应 ``Event Model`` ``FallBack Event Model`` 列表
:返回:
- ``List[Type[Event]]``
"""
return [model.value for model in _t.prefixes("." + event_name)][::-1]