2024-03-01 23:24:36 +08:00
|
|
|
|
import json
|
2024-03-19 00:27:40 +08:00
|
|
|
|
import os
|
2024-03-01 00:07:49 +08:00
|
|
|
|
import sqlite3
|
2024-03-02 02:43:18 +08:00
|
|
|
|
import types
|
2024-03-01 23:24:36 +08:00
|
|
|
|
from abc import ABC
|
|
|
|
|
from collections.abc import Iterable
|
2024-03-19 00:27:40 +08:00
|
|
|
|
|
|
|
|
|
import nonebot
|
2024-03-01 00:07:49 +08:00
|
|
|
|
from pydantic import BaseModel
|
2024-03-23 19:55:12 +08:00
|
|
|
|
from typing import Any
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
2024-03-01 23:24:36 +08:00
|
|
|
|
BaseIterable = list | tuple | set | dict
|
|
|
|
|
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
|
|
|
|
class LiteModel(BaseModel):
|
2024-03-02 02:43:18 +08:00
|
|
|
|
"""轻量级模型基类
|
|
|
|
|
类型注解统一使用Python3.9的PEP585标准,如需使用泛型请使用typing模块的泛型类型
|
|
|
|
|
"""
|
2024-03-20 12:30:17 +08:00
|
|
|
|
id: int = None
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
|
|
|
|
|
2024-03-01 23:24:36 +08:00
|
|
|
|
class BaseORMAdapter(ABC):
|
|
|
|
|
def __init__(self):
|
2024-03-01 00:07:49 +08:00
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def auto_migrate(self, *args, **kwargs):
|
2024-03-01 23:24:36 +08:00
|
|
|
|
"""自动迁移
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
|
|
|
|
Returns:
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-01 00:07:49 +08:00
|
|
|
|
"""
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
2024-03-24 09:43:34 +08:00
|
|
|
|
def upsert(self, *args, **kwargs):
|
2024-03-01 23:24:36 +08:00
|
|
|
|
"""存储数据
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
2024-03-01 23:24:36 +08:00
|
|
|
|
def first(self, *args, **kwargs):
|
|
|
|
|
"""查询第一条数据
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
|
|
|
|
Returns:
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-01 00:07:49 +08:00
|
|
|
|
"""
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
2024-03-01 23:24:36 +08:00
|
|
|
|
def all(self, *args, **kwargs):
|
|
|
|
|
"""查询所有数据
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
|
|
|
|
Returns:
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-01 00:07:49 +08:00
|
|
|
|
"""
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
2024-03-01 23:24:36 +08:00
|
|
|
|
def delete(self, *args, **kwargs):
|
|
|
|
|
"""删除数据
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
|
|
|
|
Returns:
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-01 00:07:49 +08:00
|
|
|
|
"""
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
2024-03-01 23:24:36 +08:00
|
|
|
|
def update(self, *args, **kwargs):
|
|
|
|
|
"""更新数据
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
|
|
|
|
Returns:
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-01 00:07:49 +08:00
|
|
|
|
"""
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
|
2024-03-20 12:30:17 +08:00
|
|
|
|
class Database(BaseORMAdapter):
|
2024-03-18 18:21:56 +08:00
|
|
|
|
"""SQLiteORM适配器,严禁使用`FORIEGNID`和`JSON`作为主键前缀,严禁使用`$ID:`作为字符串值前缀
|
2024-03-02 02:43:18 +08:00
|
|
|
|
|
|
|
|
|
Attributes:
|
|
|
|
|
|
|
|
|
|
"""
|
2024-03-01 00:07:49 +08:00
|
|
|
|
type_map = {
|
2024-03-01 23:24:36 +08:00
|
|
|
|
# default: TEXT
|
|
|
|
|
str : 'TEXT',
|
|
|
|
|
int : 'INTEGER',
|
|
|
|
|
float: 'REAL',
|
|
|
|
|
bool : 'INTEGER',
|
|
|
|
|
list : 'TEXT'
|
2024-03-01 00:07:49 +08:00
|
|
|
|
}
|
2024-03-20 12:30:17 +08:00
|
|
|
|
|
2024-03-22 07:44:41 +08:00
|
|
|
|
DEFAULT_VALUE = {
|
2024-03-21 14:52:02 +08:00
|
|
|
|
'TEXT' : '',
|
|
|
|
|
'INTEGER': 0,
|
|
|
|
|
'REAL' : 0.0
|
2024-03-20 12:30:17 +08:00
|
|
|
|
}
|
|
|
|
|
|
2024-03-02 02:43:18 +08:00
|
|
|
|
FOREIGNID = 'FOREIGNID'
|
|
|
|
|
JSON = 'JSON'
|
2024-03-22 07:44:41 +08:00
|
|
|
|
LIST = 'LIST'
|
|
|
|
|
DICT = 'DICT'
|
2024-03-02 02:43:18 +08:00
|
|
|
|
ID = '$ID'
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
2024-03-01 23:24:36 +08:00
|
|
|
|
def __init__(self, db_name: str):
|
|
|
|
|
super().__init__()
|
2024-03-19 00:27:40 +08:00
|
|
|
|
if not os.path.exists(os.path.dirname(db_name)):
|
|
|
|
|
os.makedirs(os.path.dirname(db_name))
|
2024-03-01 23:24:36 +08:00
|
|
|
|
self.conn = sqlite3.connect(db_name)
|
2024-03-02 02:43:18 +08:00
|
|
|
|
self.conn.row_factory = sqlite3.Row
|
2024-03-01 23:24:36 +08:00
|
|
|
|
self.cursor = self.conn.cursor()
|
|
|
|
|
|
2024-03-02 02:43:18 +08:00
|
|
|
|
def auto_migrate(self, *args: type(LiteModel)):
|
2024-03-01 23:24:36 +08:00
|
|
|
|
"""自动迁移,检测新模型字段和原有表字段的差异,如有差异自动增删新字段
|
|
|
|
|
|
2024-03-01 00:07:49 +08:00
|
|
|
|
Args:
|
2024-03-02 02:43:18 +08:00
|
|
|
|
*args: 模型类
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-01 00:07:49 +08:00
|
|
|
|
Returns:
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-01 00:07:49 +08:00
|
|
|
|
"""
|
2024-03-24 10:26:40 +08:00
|
|
|
|
nonebot.logger.warning("Database.auto_migrate is deprecated, use Database.migrate instead")
|
2024-03-21 14:52:02 +08:00
|
|
|
|
table_name = ''
|
2024-03-01 00:07:49 +08:00
|
|
|
|
for model in args:
|
2024-03-02 02:43:18 +08:00
|
|
|
|
model: type(LiteModel)
|
|
|
|
|
# 检测并创建表,若模型未定义id字段则使用自增主键,有定义的话使用id字段,且id有可能为字符串
|
|
|
|
|
table_name = model.__name__
|
|
|
|
|
if 'id' in model.__annotations__ and model.__annotations__['id'] is not None:
|
|
|
|
|
# 如果模型定义了id字段,那么使用模型的id字段
|
|
|
|
|
id_type = self.type_map.get(model.__annotations__['id'], 'TEXT')
|
|
|
|
|
self.cursor.execute(f'CREATE TABLE IF NOT EXISTS {table_name} (id {id_type} PRIMARY KEY)')
|
|
|
|
|
else:
|
|
|
|
|
# 如果模型未定义id字段,那么使用自增主键
|
|
|
|
|
self.cursor.execute(f'CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)')
|
2024-03-01 23:24:36 +08:00
|
|
|
|
# 获取表字段
|
2024-03-02 02:43:18 +08:00
|
|
|
|
self.cursor.execute(f'PRAGMA table_info({table_name})')
|
2024-03-01 23:24:36 +08:00
|
|
|
|
table_fields = self.cursor.fetchall()
|
|
|
|
|
table_fields = [field[1] for field in table_fields]
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
2024-03-22 07:44:41 +08:00
|
|
|
|
raw_fields, raw_types = zip(*model.__annotations__.items())
|
2024-03-02 02:43:18 +08:00
|
|
|
|
# 获取模型字段,若有模型则添加FOREIGNID前缀,若为BaseIterable则添加JSON前缀,用多行if判断
|
|
|
|
|
model_fields = []
|
|
|
|
|
model_types = []
|
2024-03-22 07:44:41 +08:00
|
|
|
|
for field, r_type in zip(raw_fields, raw_types):
|
|
|
|
|
if isinstance(r_type, type(LiteModel)):
|
2024-03-02 02:43:18 +08:00
|
|
|
|
model_fields.append(f'{self.FOREIGNID}{field}')
|
|
|
|
|
model_types.append('TEXT')
|
2024-03-22 13:39:01 +08:00
|
|
|
|
elif r_type in [list[str], list[int], list[float], list[bool], list]:
|
2024-03-22 07:44:41 +08:00
|
|
|
|
model_fields.append(f'{self.LIST}{field}')
|
|
|
|
|
model_types.append('TEXT')
|
2024-03-22 13:39:01 +08:00
|
|
|
|
elif r_type in [dict[str, str], dict[str, int], dict[str, float], dict[str, bool], dict]:
|
2024-03-22 07:44:41 +08:00
|
|
|
|
model_fields.append(f'{self.DICT}{field}')
|
|
|
|
|
model_types.append('TEXT')
|
|
|
|
|
elif isinstance(r_type, types.GenericAlias):
|
2024-03-02 02:43:18 +08:00
|
|
|
|
model_fields.append(f'{self.JSON}{field}')
|
|
|
|
|
model_types.append('TEXT')
|
|
|
|
|
else:
|
|
|
|
|
model_fields.append(field)
|
2024-03-22 07:44:41 +08:00
|
|
|
|
model_types.append(self.type_map.get(r_type, 'TEXT'))
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
2024-03-22 07:44:41 +08:00
|
|
|
|
# 检测新字段或字段类型是否有变化,有则增删字段,已经加了前缀类型
|
2024-03-22 13:39:01 +08:00
|
|
|
|
for field_changed, type_, r_type in zip(model_fields, model_types, raw_types):
|
|
|
|
|
if field_changed not in table_fields:
|
|
|
|
|
nonebot.logger.debug(f'ALTER TABLE {table_name} ADD COLUMN {field_changed} {type_}')
|
|
|
|
|
self.cursor.execute(f'ALTER TABLE {table_name} ADD COLUMN {field_changed} {type_}')
|
2024-03-20 12:30:17 +08:00
|
|
|
|
# 在原有的行中添加新字段对应类型的默认值,从DEFAULT_TYPE中获取
|
2024-03-22 13:39:01 +08:00
|
|
|
|
self.cursor.execute(f'UPDATE {table_name} SET {field_changed} = ? WHERE {field_changed} IS NULL', (self.DEFAULT_VALUE.get(type_, ""),))
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-02 02:43:18 +08:00
|
|
|
|
# 检测多余字段,除了id字段
|
2024-03-01 23:24:36 +08:00
|
|
|
|
for field in table_fields:
|
2024-03-02 02:43:18 +08:00
|
|
|
|
if field not in model_fields and field != 'id':
|
2024-03-19 00:27:40 +08:00
|
|
|
|
nonebot.logger.debug(f'ALTER TABLE {table_name} DROP COLUMN {field}')
|
2024-03-02 02:43:18 +08:00
|
|
|
|
self.cursor.execute(f'ALTER TABLE {table_name} DROP COLUMN {field}')
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
|
|
|
|
self.conn.commit()
|
2024-03-22 07:44:41 +08:00
|
|
|
|
nonebot.logger.debug(f'Table {table_name} migrated successfully')
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-24 09:43:34 +08:00
|
|
|
|
def upsert(self, *models: LiteModel) -> int | tuple:
|
2024-03-02 02:43:18 +08:00
|
|
|
|
"""存储数据,检查id字段,如果有id字段则更新,没有则插入
|
2024-03-01 00:07:49 +08:00
|
|
|
|
|
|
|
|
|
Args:
|
2024-03-01 23:24:36 +08:00
|
|
|
|
models: 数据
|
|
|
|
|
|
|
|
|
|
Returns:
|
2024-03-02 02:43:18 +08:00
|
|
|
|
id: 数据id,如果有多个数据则返回id元组
|
2024-03-01 00:07:49 +08:00
|
|
|
|
"""
|
2024-03-21 14:52:02 +08:00
|
|
|
|
|
2024-03-02 02:43:18 +08:00
|
|
|
|
ids = []
|
2024-03-01 23:24:36 +08:00
|
|
|
|
for model in models:
|
|
|
|
|
table_name = model.__class__.__name__
|
2024-03-21 14:52:02 +08:00
|
|
|
|
if not self._detect_for_table(table_name):
|
|
|
|
|
raise ValueError(f'表{table_name}不存在,请先迁移')
|
2024-03-01 23:24:36 +08:00
|
|
|
|
key_list = []
|
|
|
|
|
value_list = []
|
|
|
|
|
# 处理外键,添加前缀'$IDFieldName'
|
|
|
|
|
for field, value in model.__dict__.items():
|
|
|
|
|
if isinstance(value, LiteModel):
|
2024-03-02 02:43:18 +08:00
|
|
|
|
key_list.append(f'{self.FOREIGNID}{field}')
|
2024-03-24 09:43:34 +08:00
|
|
|
|
value_list.append(f'{self.ID}:{value.__class__.__name__}:{self.upsert(value)}')
|
2024-03-22 13:39:01 +08:00
|
|
|
|
elif isinstance(value, list):
|
|
|
|
|
key_list.append(f'{self.LIST}{field}')
|
|
|
|
|
value_list.append(self._flat(value))
|
|
|
|
|
elif isinstance(value, dict):
|
|
|
|
|
key_list.append(f'{self.DICT}{field}')
|
|
|
|
|
value_list.append(self._flat(value))
|
2024-03-02 02:43:18 +08:00
|
|
|
|
elif isinstance(value, BaseIterable):
|
|
|
|
|
key_list.append(f'{self.JSON}{field}')
|
2024-03-18 18:21:56 +08:00
|
|
|
|
value_list.append(self._flat(value))
|
2024-03-01 23:24:36 +08:00
|
|
|
|
else:
|
|
|
|
|
key_list.append(field)
|
|
|
|
|
value_list.append(value)
|
2024-03-02 02:43:18 +08:00
|
|
|
|
# 更新或插入数据,用?占位
|
2024-03-19 00:27:40 +08:00
|
|
|
|
nonebot.logger.debug(f'INSERT OR REPLACE INTO {table_name} ({",".join(key_list)}) VALUES ({",".join(["?" for _ in key_list])})')
|
2024-03-02 02:43:18 +08:00
|
|
|
|
self.cursor.execute(f'INSERT OR REPLACE INTO {table_name} ({",".join(key_list)}) VALUES ({",".join(["?" for _ in key_list])})', value_list)
|
|
|
|
|
|
|
|
|
|
ids.append(self.cursor.lastrowid)
|
|
|
|
|
self.conn.commit()
|
|
|
|
|
return ids[0] if len(ids) == 1 else tuple(ids)
|
|
|
|
|
|
2024-03-18 18:21:56 +08:00
|
|
|
|
def _flat(self, data: Iterable) -> str:
|
2024-03-02 02:43:18 +08:00
|
|
|
|
"""扁平化数据,返回扁平化对象
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
data: 数据,可迭代对象
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-02 02:43:18 +08:00
|
|
|
|
Returns: json字符串
|
|
|
|
|
"""
|
2024-03-01 23:24:36 +08:00
|
|
|
|
if isinstance(data, dict):
|
2024-03-02 02:43:18 +08:00
|
|
|
|
return_data = {}
|
2024-03-01 23:24:36 +08:00
|
|
|
|
for k, v in data.items():
|
2024-03-02 02:43:18 +08:00
|
|
|
|
if isinstance(v, LiteModel):
|
2024-03-24 09:43:34 +08:00
|
|
|
|
return_data[f'{self.FOREIGNID}{k}'] = f'{self.ID}:{v.__class__.__name__}:{self.upsert(v)}'
|
2024-03-22 13:39:01 +08:00
|
|
|
|
elif isinstance(v, list):
|
|
|
|
|
return_data[f'{self.LIST}{k}'] = self._flat(v)
|
|
|
|
|
elif isinstance(v, dict):
|
|
|
|
|
return_data[f'{self.DICT}{k}'] = self._flat(v)
|
2024-03-02 02:43:18 +08:00
|
|
|
|
elif isinstance(v, BaseIterable):
|
2024-03-18 18:21:56 +08:00
|
|
|
|
return_data[f'{self.JSON}{k}'] = self._flat(v)
|
2024-03-02 02:43:18 +08:00
|
|
|
|
else:
|
|
|
|
|
return_data[k] = v
|
|
|
|
|
|
|
|
|
|
elif isinstance(data, list | tuple | set):
|
|
|
|
|
return_data = []
|
|
|
|
|
for v in data:
|
|
|
|
|
if isinstance(v, LiteModel):
|
2024-03-24 09:43:34 +08:00
|
|
|
|
return_data.append(f'{self.ID}:{v.__class__.__name__}:{self.upsert(v)}')
|
2024-03-22 13:39:01 +08:00
|
|
|
|
elif isinstance(v, list):
|
|
|
|
|
return_data.append(self._flat(v))
|
|
|
|
|
elif isinstance(v, dict):
|
|
|
|
|
return_data.append(self._flat(v))
|
2024-03-02 02:43:18 +08:00
|
|
|
|
elif isinstance(v, BaseIterable):
|
2024-03-18 18:21:56 +08:00
|
|
|
|
return_data.append(self._flat(v))
|
2024-03-01 23:24:36 +08:00
|
|
|
|
else:
|
2024-03-02 02:43:18 +08:00
|
|
|
|
return_data.append(v)
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError('数据类型错误')
|
|
|
|
|
|
|
|
|
|
return json.dumps(return_data)
|
|
|
|
|
|
2024-03-21 14:52:02 +08:00
|
|
|
|
def _detect_for_table(self, table_name: str) -> bool:
|
|
|
|
|
"""在进行增删查改前检测表是否存在
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
table_name: 表名
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
return self.cursor.execute(f'SELECT * FROM sqlite_master WHERE type = "table" AND name = ?', (table_name,)).fetchone()
|
|
|
|
|
|
2024-03-02 02:43:18 +08:00
|
|
|
|
def first(self, model: type(LiteModel), conditions, *args, default: Any = None) -> LiteModel | None:
|
|
|
|
|
"""查询第一条数据
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
model: 模型
|
|
|
|
|
conditions: 查询条件
|
|
|
|
|
*args: 参数化查询条件参数
|
|
|
|
|
default: 未查询到结果默认返回值
|
|
|
|
|
|
|
|
|
|
Returns: 数据
|
|
|
|
|
"""
|
|
|
|
|
table_name = model.__name__
|
2024-03-21 14:52:02 +08:00
|
|
|
|
|
|
|
|
|
if not self._detect_for_table(table_name):
|
|
|
|
|
return default
|
|
|
|
|
|
2024-03-02 02:43:18 +08:00
|
|
|
|
self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args)
|
2024-03-20 12:30:17 +08:00
|
|
|
|
if row_data := self.cursor.fetchone():
|
|
|
|
|
data = dict(row_data)
|
2024-03-19 00:27:40 +08:00
|
|
|
|
return model(**self.convert_to_dict(data))
|
|
|
|
|
return default
|
2024-03-02 02:43:18 +08:00
|
|
|
|
|
2024-03-20 12:30:17 +08:00
|
|
|
|
def all(self, model: type(LiteModel), conditions=None, *args, default: Any = None) -> list[LiteModel] | None:
|
2024-03-02 02:43:18 +08:00
|
|
|
|
"""查询所有数据
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
model: 模型
|
|
|
|
|
conditions: 查询条件
|
|
|
|
|
*args: 参数化查询条件参数
|
|
|
|
|
default: 未查询到结果默认返回值
|
|
|
|
|
|
|
|
|
|
Returns: 数据
|
|
|
|
|
"""
|
|
|
|
|
table_name = model.__name__
|
2024-03-20 12:30:17 +08:00
|
|
|
|
|
2024-03-21 14:52:02 +08:00
|
|
|
|
if not self._detect_for_table(table_name):
|
|
|
|
|
return default
|
2024-03-20 12:30:17 +08:00
|
|
|
|
|
|
|
|
|
if conditions:
|
|
|
|
|
self.cursor.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args)
|
|
|
|
|
else:
|
|
|
|
|
self.cursor.execute(f'SELECT * FROM {table_name}')
|
|
|
|
|
if row_datas := self.cursor.fetchall():
|
|
|
|
|
datas = [dict(row_data) for row_data in row_datas]
|
|
|
|
|
return [model(**self.convert_to_dict(d)) for d in datas] if datas else default
|
|
|
|
|
return default
|
2024-03-02 02:43:18 +08:00
|
|
|
|
|
|
|
|
|
def delete(self, model: type(LiteModel), conditions, *args):
|
|
|
|
|
"""删除数据
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
model: 模型
|
|
|
|
|
conditions: 查询条件
|
|
|
|
|
*args: 参数化查询条件参数
|
|
|
|
|
|
|
|
|
|
Returns:
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-02 02:43:18 +08:00
|
|
|
|
"""
|
|
|
|
|
table_name = model.__name__
|
2024-03-21 14:52:02 +08:00
|
|
|
|
|
|
|
|
|
if not self._detect_for_table(table_name):
|
|
|
|
|
return
|
2024-03-19 00:27:40 +08:00
|
|
|
|
nonebot.logger.debug(f'DELETE FROM {table_name} WHERE {conditions}')
|
2024-03-02 02:43:18 +08:00
|
|
|
|
self.cursor.execute(f'DELETE FROM {table_name} WHERE {conditions}', args)
|
|
|
|
|
self.conn.commit()
|
2024-03-01 23:24:36 +08:00
|
|
|
|
|
2024-03-02 02:43:18 +08:00
|
|
|
|
def convert_to_dict(self, data: dict) -> dict:
|
|
|
|
|
"""将json字符串转换为字典
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
data: json字符串
|
|
|
|
|
|
|
|
|
|
Returns: 字典
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def load(d: BaseIterable) -> BaseIterable:
|
|
|
|
|
"""递归加载数据,去除前缀"""
|
|
|
|
|
if isinstance(d, dict):
|
|
|
|
|
new_d = {}
|
|
|
|
|
for k, v in d.items():
|
|
|
|
|
if k.startswith(self.FOREIGNID):
|
2024-03-03 21:22:49 +08:00
|
|
|
|
new_d[k.replace(self.FOREIGNID, '')] = load(
|
2024-03-21 14:52:02 +08:00
|
|
|
|
dict(self.cursor.execute(f'SELECT * FROM {v.split(":", 2)[1]} WHERE id = ?', (v.split(":", 2)[2],)).fetchone()))
|
2024-03-22 13:39:01 +08:00
|
|
|
|
|
2024-03-22 07:44:41 +08:00
|
|
|
|
elif k.startswith(self.LIST):
|
|
|
|
|
if v == '': v = '[]'
|
|
|
|
|
new_d[k.replace(self.LIST, '')] = load(json.loads(v))
|
|
|
|
|
elif k.startswith(self.DICT):
|
|
|
|
|
if v == '': v = '{}'
|
|
|
|
|
new_d[k.replace(self.DICT, '')] = load(json.loads(v))
|
2024-03-22 13:39:01 +08:00
|
|
|
|
elif k.startswith(self.JSON):
|
|
|
|
|
if v == '': v = '[]'
|
|
|
|
|
new_d[k.replace(self.JSON, '')] = load(json.loads(v))
|
2024-03-02 02:43:18 +08:00
|
|
|
|
else:
|
|
|
|
|
new_d[k] = v
|
|
|
|
|
elif isinstance(d, list | tuple | set):
|
|
|
|
|
new_d = []
|
|
|
|
|
for i, v in enumerate(d):
|
|
|
|
|
if isinstance(v, str) and v.startswith(self.ID):
|
2024-03-21 14:52:02 +08:00
|
|
|
|
new_d.append(load(dict(self.cursor.execute(f'SELECT * FROM {v.split(":", 2)[1]} WHERE id = ?', (v.split(":", 2)[2],)).fetchone())))
|
2024-03-02 02:43:18 +08:00
|
|
|
|
elif isinstance(v, BaseIterable):
|
|
|
|
|
new_d.append(load(v))
|
|
|
|
|
else:
|
|
|
|
|
new_d = d
|
|
|
|
|
return new_d
|
|
|
|
|
|
2024-03-21 14:52:02 +08:00
|
|
|
|
return load(data)
|