use std::{ fs::{self, File}, io::{BufWriter, Write}, path::PathBuf, }; use flate2::{write::GzEncoder, Compression}; use meilisearch_types::{ keys::Key, settings::{Checked, Settings}, }; use serde_json::{Map, Value}; use tempfile::TempDir; use time::OffsetDateTime; use uuid::Uuid; use crate::{reader::Document, IndexMetadata, Metadata, Result, TaskDump, CURRENT_DUMP_VERSION}; pub struct DumpWriter { dir: TempDir, } impl DumpWriter { pub fn new(instance_uuid: Option) -> Result { let dir = TempDir::new()?; if let Some(instance_uuid) = instance_uuid { fs::write( dir.path().join("instance_uid.uuid"), &instance_uuid.as_hyphenated().to_string(), )?; } let metadata = Metadata { dump_version: CURRENT_DUMP_VERSION, db_version: env!("CARGO_PKG_VERSION").to_string(), dump_date: OffsetDateTime::now_utc(), }; fs::write( dir.path().join("metadata.json"), serde_json::to_string(&metadata)?, )?; std::fs::create_dir(&dir.path().join("indexes"))?; Ok(DumpWriter { dir }) } pub fn create_index(&self, index_name: &str, metadata: &IndexMetadata) -> Result { IndexWriter::new(self.dir.path().join("indexes").join(index_name), metadata) } pub fn create_keys(&self) -> Result { KeyWriter::new(self.dir.path().to_path_buf()) } pub fn create_tasks_queue(&self) -> Result { TaskWriter::new(self.dir.path().join("tasks")) } pub fn persist_to(self, mut writer: impl Write) -> Result<()> { let gz_encoder = GzEncoder::new(&mut writer, Compression::default()); let mut tar_encoder = tar::Builder::new(gz_encoder); tar_encoder.append_dir_all(".", self.dir.path())?; let gz_encoder = tar_encoder.into_inner()?; gz_encoder.finish()?; writer.flush()?; Ok(()) } } pub struct KeyWriter { keys: BufWriter, } impl KeyWriter { pub(crate) fn new(path: PathBuf) -> Result { let keys = File::create(path.join("keys.jsonl"))?; Ok(KeyWriter { keys: BufWriter::new(keys), }) } pub fn push_key(&mut self, key: &Key) -> Result<()> { self.keys.write_all(&serde_json::to_vec(key)?)?; self.keys.write_all(b"\n")?; Ok(()) } } pub struct TaskWriter { queue: BufWriter, update_files: PathBuf, } impl TaskWriter { pub(crate) fn new(path: PathBuf) -> Result { std::fs::create_dir(&path)?; let queue = File::create(path.join("queue.jsonl"))?; let update_files = path.join("update_files"); std::fs::create_dir(&update_files)?; Ok(TaskWriter { queue: BufWriter::new(queue), update_files, }) } /// Pushes tasks in the dump. /// If the tasks has an associated `update_file` it'll use the `task_id` as its name. pub fn push_task(&mut self, task: &TaskDump) -> Result { self.queue.write_all(&serde_json::to_vec(task)?)?; self.queue.write_all(b"\n")?; self.queue.flush()?; Ok(UpdateFile::new( self.update_files.join(format!("{}.jsonl", task.uid)), )) } } pub struct UpdateFile { path: PathBuf, writer: Option>, } impl UpdateFile { pub(crate) fn new(path: PathBuf) -> UpdateFile { UpdateFile { path, writer: None } } pub fn push_document(&mut self, document: &Document) -> Result<()> { if let Some(writer) = self.writer.as_mut() { writer.write_all(&serde_json::to_vec(document)?)?; writer.write_all(b"\n")?; writer.flush()?; } else { let file = File::create(&self.path).unwrap(); self.writer = Some(BufWriter::new(file)); self.push_document(document)?; } Ok(()) } } pub struct IndexWriter { documents: BufWriter, settings: File, } impl IndexWriter { pub(self) fn new(path: PathBuf, metadata: &IndexMetadata) -> Result { std::fs::create_dir(&path)?; let metadata_file = File::create(path.join("metadata.json"))?; serde_json::to_writer(metadata_file, metadata)?; let documents = File::create(path.join("documents.jsonl"))?; let settings = File::create(path.join("settings.json"))?; Ok(IndexWriter { documents: BufWriter::new(documents), settings, }) } pub fn push_document(&mut self, document: &Map) -> Result<()> { self.documents.write_all(&serde_json::to_vec(document)?)?; self.documents.write_all(b"\n")?; self.documents.flush()?; Ok(()) } pub fn settings(mut self, settings: &Settings) -> Result<()> { self.settings.write_all(&serde_json::to_vec(&settings)?)?; Ok(()) } } #[cfg(test)] pub(crate) mod test { use std::{fmt::Write, io::BufReader, path::Path, str::FromStr}; use flate2::bufread::GzDecoder; use meilisearch_types::settings::Unchecked; use crate::{ reader::Document, test::{ create_test_api_keys, create_test_documents, create_test_dump, create_test_instance_uid, create_test_settings, create_test_tasks, }, }; use super::*; fn create_directory_hierarchy(dir: &Path) -> String { let mut ret = String::new(); writeln!(ret, ".").unwrap(); ret.push_str(&_create_directory_hierarchy(dir, 0)); ret } fn _create_directory_hierarchy(dir: &Path, depth: usize) -> String { let mut ret = String::new(); // the entries are not guarenteed to be returned in the same order thus we need to sort them. let mut entries = fs::read_dir(dir) .unwrap() .collect::, _>>() .unwrap(); // I want the directories first and then sort by name. entries.sort_by(|a, b| { let (aft, bft) = (a.file_type().unwrap(), b.file_type().unwrap()); if aft.is_dir() && bft.is_dir() { a.file_name().cmp(&b.file_name()) } else if aft.is_file() { std::cmp::Ordering::Greater } else if bft.is_file() { std::cmp::Ordering::Less } else { a.file_name().cmp(&b.file_name()) } }); for (idx, entry) in entries.iter().enumerate() { let mut ident = String::new(); for _ in 0..depth { ident.push_str(&"│"); ident.push_str(&" ".repeat(4)); } if idx == entries.len() - 1 { ident.push_str(&"└"); } else { ident.push_str(&"├"); } ident.push_str(&"-".repeat(4)); let name = entry.file_name().into_string().unwrap(); let file_type = entry.file_type().unwrap(); let is_dir = file_type.is_dir().then_some("/").unwrap_or(""); assert!(!file_type.is_symlink()); writeln!(ret, "{ident} {name}{is_dir}").unwrap(); if file_type.is_dir() { ret.push_str(&_create_directory_hierarchy(&entry.path(), depth + 1)); } } ret } #[test] fn test_creating_dump() { let file = create_test_dump(); let mut file = BufReader::new(file); // ============ ensuring we wrote everything in the correct place. let dump = tempfile::tempdir().unwrap(); let gz = GzDecoder::new(&mut file); let mut tar = tar::Archive::new(gz); tar.unpack(dump.path()).unwrap(); let dump_path = dump.path(); // ==== checking global file hierarchy (we want to be sure there isn't too many files or too few) insta::assert_display_snapshot!(create_directory_hierarchy(dump_path), @r###" . ├---- indexes/ │ └---- doggos/ │ │ ├---- settings.json │ │ ├---- documents.jsonl │ │ └---- metadata.json ├---- tasks/ │ ├---- update_files/ │ │ └---- 1.jsonl │ └---- queue.jsonl ├---- keys.jsonl ├---- metadata.json └---- instance_uid.uuid "###); // ==== checking the top level infos let metadata = fs::read_to_string(dump_path.join("metadata.json")).unwrap(); let metadata: Metadata = serde_json::from_str(&metadata).unwrap(); insta::assert_json_snapshot!(metadata, { ".dumpDate" => "[date]" }, @r###" { "dumpVersion": "V6", "dbVersion": "0.29.0", "dumpDate": "[date]" } "###); let instance_uid = fs::read_to_string(dump_path.join("instance_uid.uuid")).unwrap(); assert_eq!( Uuid::from_str(&instance_uid).unwrap(), create_test_instance_uid() ); // ==== checking the index let docs = fs::read_to_string(dump_path.join("indexes/doggos/documents.jsonl")).unwrap(); for (document, expected) in docs.lines().zip(create_test_documents()) { assert_eq!( serde_json::from_str::>(document).unwrap(), expected ); } let test_settings = fs::read_to_string(dump_path.join("indexes/doggos/settings.json")).unwrap(); assert_eq!( serde_json::from_str::>(&test_settings).unwrap(), create_test_settings().into_unchecked() ); let metadata = fs::read_to_string(dump_path.join("indexes/doggos/metadata.json")).unwrap(); let metadata: IndexMetadata = serde_json::from_str(&metadata).unwrap(); insta::assert_json_snapshot!(metadata, { ".createdAt" => "[date]", ".updatedAt" => "[date]" }, @r###" { "uid": "doggo", "primaryKey": null, "createdAt": "[date]", "updatedAt": "[date]" } "###); // ==== checking the task queue let tasks_queue = fs::read_to_string(dump_path.join("tasks/queue.jsonl")).unwrap(); for (task, mut expected) in tasks_queue.lines().zip(create_test_tasks()) { // TODO: uncomment this one once the we write the dump integration in the index-scheduler // assert_eq!(serde_json::from_str::(task).unwrap(), expected.0); if let Some(expected_update) = expected.1 { let path = dump_path.join(format!("tasks/update_files/{}.jsonl", expected.0.uid)); println!("trying to open {}", path.display()); let update = fs::read_to_string(path).unwrap(); let documents: Vec = update .lines() .map(|line| serde_json::from_str(line).unwrap()) .collect(); assert_eq!(documents, expected_update); } } // ==== checking the keys let keys = fs::read_to_string(dump_path.join("keys.jsonl")).unwrap(); for (key, expected) in keys.lines().zip(create_test_api_keys()) { assert_eq!(serde_json::from_str::(key).unwrap(), expected); } } }