diff --git a/.gitignore b/.gitignore index e9a9653..c743115 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,8 @@ config.yml # pyc/pyo **/*.pyc -**/*.pyo \ No newline at end of file +**/*.pyo + +# data +/data/ +``` \ No newline at end of file diff --git a/main.py b/main.py index b990d8d..28e8d51 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,9 @@ from src.liteyuki import * +from pymongo import MongoClient + +a = MongoClient("mongodb://localhost:27017/") if __name__ == '__main__': liteyuki = Liteyuki() app = liteyuki.get_asgi() - liteyuki.run(app="main:app") \ No newline at end of file + liteyuki.run(app="main:app") diff --git a/requirements.txt b/requirements.txt index 60bf44e..32f13d6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -nonebot2[fastapi] \ No newline at end of file +nonebot2[fastapi] +keyvalue-sqlite=1.0.7 +motor=3.3.2 \ No newline at end of file diff --git a/src/api/__init__.py b/src/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/api/data.py b/src/api/data.py index e69de29..e9994ec 100644 --- a/src/api/data.py +++ b/src/api/data.py @@ -0,0 +1,241 @@ +import sqlite3 +import uuid +from typing import Any + +from pymongo import MongoClient +from pydantic import BaseModel + + +class LiteModel(BaseModel): + pass + + +class BaseORM: + + def __init__(self, *args, **kwargs): + pass + + def auto_migrate(self, *args, **kwargs): + """自动迁移数据库 + Args: + *args: + **kwargs: + + Returns: + """ + raise NotImplementedError + + def save(self, *args, **kwargs): + """创建数据 + Args: + *args: + **kwargs: + + Returns: + """ + raise NotImplementedError + + def update(self, *args, **kwargs): + """更新数据 + Args: + *args: + **kwargs: + + Returns: + """ + raise NotImplementedError + + def delete(self, *args, **kwargs): + """删除数据 + Args: + *args: + **kwargs: + + Returns: + """ + raise NotImplementedError + + def first(self, *args, **kwargs): + """查询第一条数据 + Args: + *args: + **kwargs: + + Returns: + """ + raise NotImplementedError + + def where(self, *args, **kwargs): + """查询数据 + Args: + *args: + **kwargs: + + Returns: + """ + raise NotImplementedError + + def all(self, *args, **kwargs): + """查询所有数据 + Args: + *args: + **kwargs: + + Returns: + """ + raise NotImplementedError + + +class SqliteORM(BaseORM): + """同步sqlite数据库操作""" + type_map = { + int: 'INTEGER', + float: 'REAL', + str: 'TEXT', + bool: 'INTEGER', + } + + def __init__(self, db, *args, **kwargs): + super().__init__(*args, **kwargs) + self.db = sqlite3.connect(db) + + @staticmethod + def get_model_table_name(model: type(LiteModel) | LiteModel | str) -> str: + """获取模型对应的表名""" + if isinstance(model, str): + return model + elif isinstance(model, LiteModel): + return model.__class__.__name__ + elif isinstance(model, type(LiteModel)): + return model.__name__ + + def auto_migrate(self, *args: type(LiteModel) | LiteModel | str, **kwargs): + """自动迁移数据库 + Args: + *args: BaseModel + **kwargs: + delete_old_columns: bool = False # 是否删除旧字段 + add_new_columns: bool = True # 添加新字段 + Returns: + """ + for model in args: + # 获取模型对应的表名 + table_name = self.get_model_table_name(model) + + # 获取表中已有的字段 + existing_columns = set() + cursor = self.db.execute(f"PRAGMA table_info({table_name})") + for column_info in cursor.fetchall(): + existing_columns.add(column_info[1]) + + # 创建表,如果不存在的话 + self.db.execute(f'CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)') + + # 检测模型中的字段并添加新字段,按照类型添加 + for field_name, field_type in model.__annotations__.items(): + if field_name not in existing_columns: + self.db.execute(f'ALTER TABLE {table_name} ADD COLUMN {field_name} {self.type_map.get(field_type, "TEXT")}') + + # 提交事务 + self.db.commit() + + def save(self, model: LiteModel) -> int: + """保存或创建数据,对嵌套模型扁平化处理,加特殊前缀表示为模型,实际储存模型id,嵌套对象单独储存 + Args: + model: BaseModel + Returns: id主键 + """ + # 先检测表是否存在,不存在则创建 + table_name = self.get_model_table_name(model) + self.db.execute(f'CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)') + + # 构建插入语句 + column_list = [] + value_list = [] + for key, value in model.dict().items(): + if isinstance(value, LiteModel): + # 如果是嵌套模型,先保存嵌套模型 + nested_model_id = self.save(value) + # 保存嵌套模型的id, 以特殊前缀表示为模型 + column_list.append(f'$id_{key}') + value_list.append(f'{value.__class__.__name__}_{nested_model_id}') + elif isinstance(value, list): + # 如果是列表,先保存列表中的所有嵌套模型,有可能有多种类型的嵌套模型 + # 列表内存'ModelType_ModelId',以特殊前缀表示为模型类型 + nested_model_ids = [] + for nested_model in value: + nested_model_id = self.save(nested_model) + nested_model_ids.append(f'{nested_model.__class__.__name__}_{nested_model_id}') + column_list.append(f'$ids_{key}') + value_list.append(nested_model_ids) + + columns = ', '.join(column_list) + placeholders = ', '.join(['?' for _ in value_list]) + values = tuple(value_list) + print(model.dict()) + print(table_name, columns, placeholders, values) + + # 插入数据 + self.db.execute(f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})', values) + self.db.commit() + return self.db.execute(f'SELECT last_insert_rowid()').fetchone()[0] + + def where(self, model_type: type(LiteModel) | str, conditions: str, *args, objectify: bool = True) -> list[LiteModel]: + """查询数据 + Args: + objectify: bool: 是否将查询结果转换为模型 + model_type: BaseModel + conditions: str + *args: + Returns: + """ + table_name = self.get_model_table_name(model_type) + self.db.execute(f'CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)') + return [self._convert_to_model(model_type, item) for item in self.db.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args).fetchall()] + + def first(self, model_type: type(LiteModel) | str, conditions: str, *args, objectify: bool = True): + """查询第一条数据""" + table_name = self.get_model_table_name(model_type) + self.db.execute(f'CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)') + return self._convert_to_model(model_type, self.db.execute(f'SELECT * FROM {table_name} WHERE {conditions}', args).fetchone()) + + def all(self, model_type: type(LiteModel) | str, objectify: bool = True): + """查询所有数据""" + table_name = self.get_model_table_name(model_type) + self.db.execute(f'CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)') + return [self._convert_to_model(model_type, item) for item in self.db.execute(f'SELECT * FROM {table_name}').fetchall()] + + def update(self, model_type: type(LiteModel) | str, operation: str, conditions: str, *args): + """更新数据 + Args: + model_type: BaseModel + operation: str: 更新操作 + conditions: str: 查询条件 + *args: + Returns: + """ + table_name = self.get_model_table_name(model_type) + self.db.execute(f'CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)') + self.db.execute(f'UPDATE {table_name} SET {operation} WHERE {conditions}', args) + self.db.commit() + + def _convert_to_model(self, model_type: type(LiteModel), item: tuple) -> LiteModel: + """将查询结果转换为模型,处理嵌套模型""" + # 获取表中已有的字段,再用字段值构建字典 + table_name = self.get_model_table_name(model_type) + cursor = self.db.execute(f"PRAGMA table_info({table_name})") + columns = [column_info[1] for column_info in cursor.fetchall()] + item_dict = dict(zip(columns, item)) + # 遍历字典,处理嵌套模型 + new_item_dict = {} + for key, value in item_dict.items(): + if key.startswith('$id_'): + # 处理单个嵌套模型类型时从model_type中获取键 + new_item_dict[key.replace('$id_', '')] = self.first(model_type.__annotations__[key.replace('$id_', '')], 'id = ?', value.split('_')[-1]) + elif key.startswith('$ids_'): + # 处理多个嵌套模型类型使用eval获取数据库对应索引的键 + new_item_dict[key.replace('$ids_', '')] = [self.first(eval(type_id.split('_')[0]), 'id = ?', type_id.split('_')[-1]) for type_id in value] + else: + new_item_dict[key] = value + + return model_type(**new_item_dict) diff --git a/src/liteyuki.py b/src/liteyuki.py index 783ba76..f932cb7 100644 --- a/src/liteyuki.py +++ b/src/liteyuki.py @@ -1,9 +1,9 @@ from typing import Any, Optional -from nonebot import DOTENV_TYPE import nonebot -from nonebot import logger +from nonebot import DOTENV_TYPE from nonebot.adapters.onebot.v11 import Adapter as OnebotV11Adapter from nonebot.adapters.onebot.v12 import Adapter as OnebotV12Adapter +from sqlalchemy import create_engine from src.api.utils import load_config @@ -36,9 +36,9 @@ $$$$$$$$/ $$$$$$/ $$/ $$$$$$$$/ $$/ $$$$$$/ $$/ $$/ $$$$$$/ '' def run(self, *args, **kwargs): for adapter in adapters: self.driver.register_adapter(adapter) - self.nonebot.load_plugin('src.liteyuki_main') - self.nonebot.load_plugins('src/builtin') - self.nonebot.load_plugins('plugins') + self.nonebot.load_plugin('src.liteyuki_main') # Load main plugin + self.nonebot.load_plugins('src/builtin') # Load builtin plugins + self.nonebot.load_plugins('plugins') # Load custom plugins # Todo: load from database self.nonebot.run(*args, **kwargs)