//! ```text //! . //! ├── indexes //! │   ├── 01d7dd17-8241-4f1f-a7d1-2d1cb255f5b0 //! │   │   ├── documents.jsonl //! │   │   └── meta.json //! │   ├── 78be64a3-cae1-449e-b7ed-13e77c9a8a0c //! │   │   ├── documents.jsonl //! │   │   └── meta.json //! │   ├── ba553439-18fe-4733-ba53-44eed898280c //! │   │   ├── documents.jsonl //! │   │   └── meta.json //! │   └── c408bc22-5859-49d1-8e9f-c88e2fa95cb0 //! │   ├── documents.jsonl //! │   └── meta.json //! ├── index_uuids //! │   └── data.jsonl //! ├── metadata.json //! └── updates //! ├── data.jsonl //! └── updates_files //! └── 66d3f12d-fcf3-4b53-88cb-407017373de7 //! ``` use std::fs::{self, File}; use std::io::{BufRead, BufReader}; use std::path::Path; use serde::{Deserialize, Serialize}; use tempfile::TempDir; use time::OffsetDateTime; pub mod errors; pub mod meta; pub mod settings; pub mod updates; use self::meta::{DumpMeta, IndexUuid}; use super::compat::v3_to_v4::CompatV3ToV4; use super::Document; use crate::{Error, IndexMetadata, Result, Version}; pub type Settings = settings::Settings; pub type Checked = settings::Checked; pub type Unchecked = settings::Unchecked; pub type Task = updates::UpdateEntry; // ===== Other types to clarify the code of the compat module // everything related to the tasks pub type Status = updates::UpdateStatus; pub type Kind = updates::Update; // everything related to the settings pub type Setting = settings::Setting; // everything related to the errors pub type Code = errors::Code; #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct Metadata { db_version: String, index_db_size: usize, update_db_size: usize, #[serde(with = "time::serde::rfc3339")] dump_date: OffsetDateTime, } pub struct V3Reader { dump: TempDir, metadata: Metadata, tasks: BufReader, index_uuid: Vec, } impl V3Reader { pub fn open(dump: TempDir) -> Result { let meta_file = fs::read(dump.path().join("metadata.json"))?; let metadata = serde_json::from_reader(&*meta_file)?; let index_uuid = File::open(dump.path().join("index_uuids/data.jsonl"))?; let index_uuid = BufReader::new(index_uuid); let index_uuid = index_uuid .lines() .map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }) .collect::>>()?; Ok(V3Reader { metadata, tasks: BufReader::new(File::open(dump.path().join("updates").join("data.jsonl"))?), index_uuid, dump, }) } pub fn index_uuid(&self) -> Vec { self.index_uuid.clone() } pub fn to_v4(self) -> CompatV3ToV4 { CompatV3ToV4::new(self) } pub fn version(&self) -> Version { Version::V3 } pub fn date(&self) -> Option { Some(self.metadata.dump_date) } pub fn indexes(&self) -> Result> + '_> { Ok(self.index_uuid.iter().map(|index| -> Result<_> { V3IndexReader::new( &self.dump.path().join("indexes").join(index.uuid.to_string()), &index, BufReader::new( File::open(self.dump.path().join("updates").join("data.jsonl")).unwrap(), ), ) })) } pub fn tasks( &mut self, ) -> Box>)>> + '_> { Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { let task: Task = serde_json::from_str(&line?)?; if !task.is_finished() { if let Some(uuid) = task.get_content_uuid() { let update_file_path = self .dump .path() .join("updates") .join("updates_files") .join(uuid.to_string()); Ok(( task, Some( Box::new(UpdateFile::new(&update_file_path)?) as Box ), )) } else { Ok((task, None)) } } else { Ok((task, None)) } })) } } #[derive(Debug)] pub struct V3IndexReader { metadata: IndexMetadata, settings: Settings, documents: BufReader, } impl V3IndexReader { pub fn new(path: &Path, index_uuid: &IndexUuid, tasks: BufReader) -> Result { let meta = File::open(path.join("meta.json"))?; let meta: DumpMeta = serde_json::from_reader(meta)?; let mut created_at = None; let mut updated_at = None; for line in tasks.lines() { let task: Task = serde_json::from_str(&line?)?; if task.uuid != index_uuid.uuid || !task.is_finished() { continue; } let new_created_at = match task.update.meta() { Kind::DocumentAddition { .. } | Kind::Settings(_) => task.update.finished_at(), _ => None, }; let new_updated_at = task.update.finished_at(); if created_at.is_none() || created_at > new_created_at { created_at = new_created_at; } if updated_at.is_none() || updated_at < new_updated_at { updated_at = new_updated_at; } } let current_time = OffsetDateTime::now_utc(); let metadata = IndexMetadata { uid: index_uuid.uid.clone(), primary_key: meta.primary_key, created_at: created_at.unwrap_or(current_time), updated_at: updated_at.unwrap_or(current_time), }; let ret = V3IndexReader { metadata, settings: meta.settings.check(), documents: BufReader::new(File::open(path.join("documents.jsonl"))?), }; Ok(ret) } pub fn metadata(&self) -> &IndexMetadata { &self.metadata } pub fn documents(&mut self) -> Result> + '_> { Ok((&mut self.documents) .lines() .map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) })) } pub fn settings(&mut self) -> Result> { Ok(self.settings.clone()) } } pub struct UpdateFile { reader: BufReader, } impl UpdateFile { fn new(path: &Path) -> Result { Ok(UpdateFile { reader: BufReader::new(File::open(path)?) }) } } impl Iterator for UpdateFile { type Item = Result; fn next(&mut self) -> Option { (&mut self.reader) .lines() .map(|line| { line.map_err(Error::from) .and_then(|line| serde_json::from_str(&line).map_err(Error::from)) }) .next() } } #[cfg(test)] pub(crate) mod test { use std::fs::File; use std::io::BufReader; use flate2::bufread::GzDecoder; use meili_snap::insta; use tempfile::TempDir; use super::*; #[test] fn read_dump_v3() { let dump = File::open("tests/assets/v3.dump").unwrap(); let dir = TempDir::new().unwrap(); let mut dump = BufReader::new(dump); let gz = GzDecoder::new(&mut dump); let mut archive = tar::Archive::new(gz); archive.unpack(dir.path()).unwrap(); let mut dump = V3Reader::open(dir).unwrap(); // top level infos insta::assert_display_snapshot!(dump.date().unwrap(), @"2022-10-07 11:39:03.709153554 +00:00:00"); // tasks let tasks = dump.tasks().collect::>>().unwrap(); let (tasks, mut update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"63086d59c3f2074e4ab3fff7e8cc36c1"); assert_eq!(update_files.len(), 10); assert!(update_files[0].is_some()); // the enqueued document addition assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed let update_file = update_files.remove(0).unwrap().collect::>>().unwrap(); meili_snap::snapshot_hash!(meili_snap::json_string!(update_file), @"7b8889539b669c7b9ddba448bafa385d"); // indexes let mut indexes = dump.indexes().unwrap().collect::>>().unwrap(); // the index are not ordered in any way by default indexes.sort_by_key(|index| index.metadata().uid.to_string()); let mut products = indexes.pop().unwrap(); let mut movies2 = indexes.pop().unwrap(); let mut movies = indexes.pop().unwrap(); let mut spells = indexes.pop().unwrap(); assert!(indexes.is_empty()); // products insta::assert_json_snapshot!(products.metadata(), @r###" { "uid": "products", "primaryKey": "sku", "createdAt": "2022-10-07T11:38:54.74389899Z", "updatedAt": "2022-10-07T11:38:55.963185778Z" } "###); insta::assert_json_snapshot!(products.settings().unwrap()); let documents = products.documents().unwrap().collect::>>().unwrap(); assert_eq!(documents.len(), 10); meili_snap::snapshot_hash!(format!("{:#?}", documents), @"548284a84de510f71e88e6cdea495cf5"); // movies insta::assert_json_snapshot!(movies.metadata(), @r###" { "uid": "movies", "primaryKey": "id", "createdAt": "2022-10-07T11:38:54.026649575Z", "updatedAt": "2022-10-07T11:39:04.188852537Z" } "###); insta::assert_json_snapshot!(movies.settings().unwrap()); let documents = movies.documents().unwrap().collect::>>().unwrap(); assert_eq!(documents.len(), 110); meili_snap::snapshot_hash!(format!("{:#?}", documents), @"d153b5a81d8b3cdcbe1dec270b574022"); // movies2 insta::assert_json_snapshot!(movies2.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###" { "uid": "movies_2", "primaryKey": null, "createdAt": "[now]", "updatedAt": "[now]" } "###); insta::assert_json_snapshot!(movies2.settings().unwrap()); let documents = movies2.documents().unwrap().collect::>>().unwrap(); assert_eq!(documents.len(), 0); meili_snap::snapshot_hash!(format!("{:#?}", documents), @"d751713988987e9331980363e24189ce"); // spells insta::assert_json_snapshot!(spells.metadata(), @r###" { "uid": "dnd_spells", "primaryKey": "index", "createdAt": "2022-10-07T11:38:56.265951133Z", "updatedAt": "2022-10-07T11:38:56.521004328Z" } "###); insta::assert_json_snapshot!(spells.settings().unwrap()); let documents = spells.documents().unwrap().collect::>>().unwrap(); assert_eq!(documents.len(), 10); meili_snap::snapshot_hash!(format!("{:#?}", documents), @"235016433dd04262c7f2da01d1e808ce"); } }