mirror of
https://github.com/nonebot/nonebot2.git
synced 2024-11-24 00:55:07 +08:00
134 lines
4.9 KiB
Python
134 lines
4.9 KiB
Python
import hashlib
|
|
import random
|
|
from datetime import datetime
|
|
|
|
from little_shit import get_message_sources
|
|
|
|
_adapter_classes = {} # one message source, one adapter class
|
|
_adapter_instances = {} # one login account, one adapter instance
|
|
|
|
|
|
def as_adapter(via):
|
|
def decorator(cls):
|
|
_adapter_classes[via] = cls
|
|
return cls
|
|
|
|
return decorator
|
|
|
|
|
|
class Adapter(object):
|
|
def __init__(self, config: dict):
|
|
self.login_id = config.get('login_id')
|
|
self.superuser_id = config.get('superuser_id')
|
|
|
|
def unitize_context(self, ctx_msg: dict):
|
|
return None
|
|
|
|
def send_message(self, target: dict, content):
|
|
if target is None or content is None:
|
|
return
|
|
|
|
msg_type = target.get('msg_type', 'private')
|
|
if msg_type == 'group' and hasattr(self, 'send_group_message'):
|
|
self.send_group_message(target, content)
|
|
elif msg_type == 'discuss' and hasattr(self, 'send_discuss_message'):
|
|
self.send_discuss_message(target, content)
|
|
elif msg_type == 'private' and hasattr(self, 'send_private_message'):
|
|
if 'user_id' not in target and 'sender_id' in target:
|
|
target['user_id'] = target['sender_id'] # compatible with ctx_msg
|
|
if 'user_tid' not in target and 'sender_tid' in target:
|
|
target['user_tid'] = target.get('sender_tid') # compatible with ctx_msg
|
|
|
|
self.send_private_message(target, content)
|
|
|
|
if 'user_id' in target and 'sender_id' in target:
|
|
del target['user_id']
|
|
if 'user_tid' in target and 'sender_tid' in target:
|
|
del target['user_tid']
|
|
|
|
def get_login_info(self):
|
|
return {}
|
|
|
|
def is_sender_superuser(self, ctx_msg: dict):
|
|
return ctx_msg.get('sender_id') == self.superuser_id
|
|
|
|
def get_sender_group_role(self, ctx_msg: dict):
|
|
return 'member'
|
|
|
|
@staticmethod
|
|
def get_target(ctx_msg: dict):
|
|
"""
|
|
Target is used to distinguish the records in database.
|
|
Note: This value will not change after restarting the bot.
|
|
|
|
:return: an unique string (account id with some flags) representing a target,
|
|
or None if there is no persistent unique value
|
|
"""
|
|
if ctx_msg.get('msg_type') == 'group' and ctx_msg.get('group_id'):
|
|
return 'g#' + ctx_msg.get('group_id')
|
|
elif ctx_msg.get('msg_type') == 'discuss' and ctx_msg.get('discuss_id'):
|
|
return 'd#' + ctx_msg.get('discuss_id')
|
|
elif ctx_msg.get('msg_type') == 'private' and ctx_msg.get('sender_id'):
|
|
return 'p#' + ctx_msg.get('sender_id')
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_source(ctx_msg: dict):
|
|
"""
|
|
Source is used to distinguish the interactive sessions.
|
|
Note: This value may change after restarting the bot.
|
|
|
|
:return: a 32 character unique string (md5) representing a source, or a random value if something strange happened
|
|
"""
|
|
source = None
|
|
if ctx_msg.get('msg_type') == 'group' and ctx_msg.get('group_tid') and ctx_msg.get('sender_tid'):
|
|
source = 'g#' + ctx_msg.get('group_tid') + '#p#' + ctx_msg.get('sender_tid')
|
|
elif ctx_msg.get('msg_type') == 'discuss' and ctx_msg.get('discuss_tid') and ctx_msg.get('sender_tid'):
|
|
source = 'd#' + ctx_msg.get('discuss_tid') + '#p#' + ctx_msg.get('sender_tid')
|
|
elif ctx_msg.get('msg_type') == 'private' and ctx_msg.get('sender_tid'):
|
|
source = 'p#' + ctx_msg.get('sender_tid')
|
|
if not source:
|
|
source = str(int(datetime.now().timestamp())) + str(random.randint(100, 999))
|
|
return hashlib.md5(source.encode('utf-8')).hexdigest()
|
|
|
|
|
|
class ConfigurationError(KeyError):
|
|
pass
|
|
|
|
|
|
def get_adapter_by_ctx(ctx_msg: dict):
|
|
if ctx_msg:
|
|
via = ctx_msg.get('via')
|
|
login_id = ctx_msg.get('login_id')
|
|
return get_adapter(via, login_id)
|
|
return None
|
|
|
|
|
|
def get_adapter(via: str, login_id: str):
|
|
if via == 'default':
|
|
# For the situations where 'via' does not matter, e.g. when we just want 'get_target' (which is universal)
|
|
if 'default' in _adapter_instances:
|
|
return _adapter_instances['default']
|
|
else:
|
|
_adapter_instances['default'] = Adapter({})
|
|
return _adapter_instances['default']
|
|
|
|
if not (via and login_id):
|
|
return None
|
|
|
|
key = hashlib.md5(via.encode('utf-8') + login_id.encode('utf-8')).hexdigest()
|
|
if key in _adapter_instances:
|
|
return _adapter_instances[key]
|
|
else:
|
|
msg_src_list = list(filter(
|
|
lambda msg_src: msg_src['via'] == via and msg_src['login_id'] == login_id,
|
|
get_message_sources()
|
|
))
|
|
if len(msg_src_list):
|
|
if via not in _adapter_classes:
|
|
return None
|
|
_adapter_instances[key] = _adapter_classes[via](msg_src_list[0])
|
|
return _adapter_instances[key]
|
|
else:
|
|
return None
|