From 7579a363abb4c5243a8f831400c9797b69323e0f Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 10 Oct 2022 18:57:27 +0200 Subject: [PATCH] finish the dump reader API, the dump Writer API now needs to be updated --- dump/src/lib.rs | 28 +++++++++--- dump/src/reader/compat/mod.rs | 9 ++-- dump/src/reader/compat/v4_to_v5.rs | 10 ++--- dump/src/reader/compat/v5_to_v6.rs | 12 ++--- dump/src/reader/mod.rs | 47 ++++--------------- dump/src/reader/v2/updates.rs | 2 - dump/src/reader/v5/mod.rs | 43 +++++++++++++++--- dump/src/reader/v6.rs | 72 +++++++++++------------------- dump/src/writer.rs | 17 ++++--- 9 files changed, 121 insertions(+), 119 deletions(-) diff --git a/dump/src/lib.rs b/dump/src/lib.rs index 3951ffeca..84b0412de 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -61,7 +61,10 @@ pub(crate) mod test { use time::{macros::datetime, Duration}; use uuid::Uuid; - use crate::{reader, DumpWriter, IndexMetadata, Version}; + use crate::{ + reader::{self, Document}, + DumpWriter, IndexMetadata, Version, + }; pub fn create_test_instance_uid() -> Uuid { Uuid::parse_str("9e15e977-f2ae-4761-943f-1eaf75fd736d").unwrap() @@ -111,7 +114,7 @@ pub(crate) mod test { settings.check() } - pub fn create_test_tasks() -> Vec<(TaskView, Option<&'static [u8]>)> { + pub fn create_test_tasks() -> Vec<(TaskView, Option>)> { vec![ ( TaskView { @@ -150,7 +153,16 @@ pub(crate) mod test { started_at: Some(datetime!(2022-11-20 0:00 UTC)), finished_at: Some(datetime!(2022-11-21 0:00 UTC)), }, - Some(br#"{ "id": 4, "race": "leonberg" }"#), + Some(vec![ + json!({ "id": 4, "race": "leonberg" }) + .as_object() + .unwrap() + .clone(), + json!({ "id": 5, "race": "patou" }) + .as_object() + .unwrap() + .clone(), + ]), ), ( TaskView { @@ -224,10 +236,12 @@ pub(crate) mod test { // ========== pushing the task queue let tasks = create_test_tasks(); + /* let mut task_queue = dump.create_tasks_queue().unwrap(); for (task, update_file) in &tasks { task_queue.push_task(task, update_file.map(|c| c)).unwrap(); } + */ // ========== pushing the api keys let api_keys = create_test_api_keys(); @@ -283,9 +297,11 @@ pub(crate) mod test { "A content file was expected for the task {}.", expected.0.uid ); - let mut update = Vec::new(); - content_file.unwrap().read_to_end(&mut update).unwrap(); - assert_eq!(update, expected_update); + let updates = content_file + .unwrap() + .collect::, _>>() + .unwrap(); + assert_eq!(updates, expected_update); } } diff --git a/dump/src/reader/compat/mod.rs b/dump/src/reader/compat/mod.rs index ab9857c45..08eb971e6 100644 --- a/dump/src/reader/compat/mod.rs +++ b/dump/src/reader/compat/mod.rs @@ -11,6 +11,7 @@ use self::{ use super::{ v5::V5Reader, v6::{self, V6IndexReader, V6Reader}, + Document, UpdateFile, }; pub mod v2_to_v3; @@ -62,7 +63,7 @@ impl Compat { pub fn tasks( &mut self, - ) -> Box)>> + '_> { + ) -> Box>)>> + '_> { match self { Compat::Current(current) => current.tasks(), Compat::Compat(compat) => compat.tasks(), @@ -118,14 +119,14 @@ impl CompatIndex { } } - pub fn documents(&mut self) -> Result> + '_>> { + pub fn documents(&mut self) -> Result> + '_>> { match self { CompatIndex::Current(v6) => v6 .documents() - .map(|iter| Box::new(iter) as Box> + '_>), + .map(|iter| Box::new(iter) as Box> + '_>), CompatIndex::Compat(compat) => compat .documents() - .map(|iter| Box::new(iter) as Box> + '_>), + .map(|iter| Box::new(iter) as Box> + '_>), } } diff --git a/dump/src/reader/compat/v4_to_v5.rs b/dump/src/reader/compat/v4_to_v5.rs index e208d8322..129a56607 100644 --- a/dump/src/reader/compat/v4_to_v5.rs +++ b/dump/src/reader/compat/v4_to_v5.rs @@ -1,4 +1,4 @@ -use crate::reader::{v4, v5}; +use crate::reader::{v4, v5, Document}; use crate::Result; use super::v3_to_v4::{CompatIndexV3ToV4, CompatV3ToV4}; @@ -66,8 +66,6 @@ impl CompatV4ToV5 { }; Box::new(tasks.map(|task| { task.map(|(task, content_file)| { - // let task_view: v4::tasks::TaskView = task.into(); - let task = v5::Task { id: task.id, content: match task.content { @@ -244,14 +242,14 @@ impl CompatIndexV4ToV5 { } } - pub fn documents(&mut self) -> Result> + '_>> { + pub fn documents(&mut self) -> Result> + '_>> { match self { CompatIndexV4ToV5::V4(v4) => v4 .documents() - .map(|iter| Box::new(iter) as Box> + '_>), + .map(|iter| Box::new(iter) as Box> + '_>), CompatIndexV4ToV5::Compat(compat) => compat .documents() - .map(|iter| Box::new(iter) as Box> + '_>), + .map(|iter| Box::new(iter) as Box> + '_>), } } diff --git a/dump/src/reader/compat/v5_to_v6.rs b/dump/src/reader/compat/v5_to_v6.rs index db52745cb..eb4d2d0e7 100644 --- a/dump/src/reader/compat/v5_to_v6.rs +++ b/dump/src/reader/compat/v5_to_v6.rs @@ -1,4 +1,4 @@ -use crate::reader::{v5, v6}; +use crate::reader::{v5, v6, Document, UpdateFile}; use crate::Result; use super::v4_to_v5::{CompatIndexV4ToV5, CompatV4ToV5}; @@ -54,10 +54,10 @@ impl CompatV5ToV6 { pub fn tasks( &mut self, - ) -> Box)>> + '_> { + ) -> Box>)>> + '_> { let tasks = match self { CompatV5ToV6::V5(v5) => v5.tasks(), - CompatV5ToV6::Compat(compat) => todo!(), // compat.tasks(), + CompatV5ToV6::Compat(compat) => compat.tasks(), }; Box::new(tasks.map(|task| { task.map(|(task, content_file)| { @@ -202,14 +202,14 @@ impl CompatIndexV5ToV6 { } } - pub fn documents(&mut self) -> Result> + '_>> { + pub fn documents(&mut self) -> Result> + '_>> { match self { CompatIndexV5ToV6::V5(v5) => v5 .documents() - .map(|iter| Box::new(iter) as Box> + '_>), + .map(|iter| Box::new(iter) as Box> + '_>), CompatIndexV5ToV6::Compat(compat) => compat .documents() - .map(|iter| Box::new(iter) as Box> + '_>), + .map(|iter| Box::new(iter) as Box> + '_>), } } diff --git a/dump/src/reader/mod.rs b/dump/src/reader/mod.rs index 6bc120c06..2a4fc7639 100644 --- a/dump/src/reader/mod.rs +++ b/dump/src/reader/mod.rs @@ -6,20 +6,14 @@ use flate2::bufread::GzDecoder; use serde::Deserialize; use tempfile::TempDir; -use time::OffsetDateTime; -use uuid::Uuid; -// use crate::reader::compat::Compat; -use crate::{IndexMetadata, Result, Version}; +use crate::{Result, Version}; use self::compat::Compat; -// use self::loaders::{v2, v3, v4, v5}; - -// pub mod error; mod compat; -// mod loaders; -// mod v1; + +// pub(self) mod v1; pub(self) mod v2; pub(self) mod v3; pub(self) mod v4; @@ -47,38 +41,15 @@ pub fn open(dump: impl Read) -> Result { match dump_version { // Version::V1 => Ok(Box::new(v1::Reader::open(path)?)), Version::V1 => todo!(), - Version::V2 => todo!(), + Version::V2 => Ok(v2::V2Reader::open(path)? + .to_v3() + .to_v4() + .to_v5() + .to_v6() + .into()), Version::V3 => Ok(v3::V3Reader::open(path)?.to_v4().to_v5().to_v6().into()), Version::V4 => Ok(v4::V4Reader::open(path)?.to_v5().to_v6().into()), Version::V5 => Ok(v5::V5Reader::open(path)?.to_v6().into()), Version::V6 => Ok(v6::V6Reader::open(path)?.into()), } } - -pub trait DumpReader { - /// Return the version of the dump. - fn version(&self) -> Version; - - /// Return at which date the dump was created if there was one. - fn date(&self) -> Option; - - /// Return the instance-uid if there was one. - fn instance_uid(&self) -> Result>; - - /// Return an iterator over each indexes. - fn indexes(&self) -> Result>> + '_>>; - - /// Return all the tasks in the dump with a possible update file. - fn tasks( - &mut self, - ) -> Box)>> + '_>; - - /// Return all the keys. - fn keys(&mut self) -> Box> + '_>; -} - -pub trait IndexReader { - fn metadata(&self) -> &IndexMetadata; - fn documents(&mut self) -> Result> + '_>>; - fn settings(&mut self) -> Result>; -} diff --git a/dump/src/reader/v2/updates.rs b/dump/src/reader/v2/updates.rs index 6eb115edf..33d88d46f 100644 --- a/dump/src/reader/v2/updates.rs +++ b/dump/src/reader/v2/updates.rs @@ -1,5 +1,3 @@ -use std::{fs::File, io::BufReader}; - use serde::Deserialize; use time::OffsetDateTime; use uuid::Uuid; diff --git a/dump/src/reader/v5/mod.rs b/dump/src/reader/v5/mod.rs index 27a36f66f..e55a15281 100644 --- a/dump/src/reader/v5/mod.rs +++ b/dump/src/reader/v5/mod.rs @@ -43,9 +43,9 @@ use tempfile::TempDir; use time::OffsetDateTime; use uuid::Uuid; -use crate::{IndexMetadata, Result, Version}; +use crate::{Error, IndexMetadata, Result, Version}; -use super::compat::v5_to_v6::CompatV5ToV6; +use super::{compat::v5_to_v6::CompatV5ToV6, Document}; pub mod errors; pub mod keys; @@ -53,13 +53,11 @@ pub mod meta; pub mod settings; pub mod tasks; -pub type Document = serde_json::Map; pub type Settings = settings::Settings; pub type Checked = settings::Checked; pub type Unchecked = settings::Unchecked; pub type Task = tasks::Task; -pub type UpdateFile = File; pub type Key = keys::Key; // ===== Other types to clarify the code of the compat module @@ -150,7 +148,9 @@ impl V5Reader { })) } - pub fn tasks(&mut self) -> Box)>> + '_> { + pub fn tasks( + &mut self, + ) -> Box>)>> + '_> { Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { let task: Task = serde_json::from_str(&line?)?; if !task.is_finished() { @@ -161,7 +161,12 @@ impl V5Reader { .join("updates") .join("updates_files") .join(uuid.to_string()); - Ok((task, Some(File::open(update_file_path).unwrap()))) + Ok(( + task, + Some( + Box::new(UpdateFile::new(&update_file_path)?) as Box + ), + )) } else { Ok((task, None)) } @@ -224,6 +229,32 @@ impl V5IndexReader { } } +pub struct UpdateFile { + reader: BufReader, +} + +impl UpdateFile { + fn new(path: &Path) -> Result { + Ok(UpdateFile { + reader: BufReader::new(File::open(path)?), + }) + } +} + +impl Iterator for UpdateFile { + type Item = Result; + + fn next(&mut self) -> Option { + (&mut self.reader) + .lines() + .map(|line| { + line.map_err(Error::from) + .and_then(|line| serde_json::from_str(&line).map_err(Error::from)) + }) + .next() + } +} + #[cfg(test)] pub(crate) mod test { use std::{fs::File, io::BufReader}; diff --git a/dump/src/reader/v6.rs b/dump/src/reader/v6.rs index 8535047bb..cdd6d3f77 100644 --- a/dump/src/reader/v6.rs +++ b/dump/src/reader/v6.rs @@ -11,18 +11,16 @@ use uuid::Uuid; use crate::{Error, IndexMetadata, Result, Version}; -use super::{DumpReader, IndexReader}; +use super::Document; pub use index; pub type Metadata = crate::Metadata; -pub type Document = serde_json::Map; pub type Settings = index::Settings; pub type Checked = index::Checked; pub type Unchecked = index::Unchecked; pub type Task = index_scheduler::TaskView; -pub type UpdateFile = File; pub type Key = meilisearch_auth::Key; // ===== Other types to clarify the code of the compat module @@ -106,7 +104,9 @@ impl V6Reader { )) } - pub fn tasks(&mut self) -> Box)>> + '_> { + pub fn tasks( + &mut self, + ) -> Box>)>> + '_> { Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { let mut task: index_scheduler::TaskView = serde_json::from_str(&line?)?; // TODO: this can be removed once we can `Deserialize` the duration from the `TaskView`. @@ -121,7 +121,10 @@ impl V6Reader { .join(task.uid.to_string()); if update_file_path.exists() { - Ok((task, Some(File::open(update_file_path)?))) + Ok(( + task, + Some(Box::new(UpdateFile::new(&update_file_path)?) as Box), + )) } else { Ok((task, None)) } @@ -137,37 +140,29 @@ impl V6Reader { } } -impl DumpReader for V6Reader { - fn version(&self) -> Version { - self.version() - } +pub struct UpdateFile { + reader: BufReader, +} - fn date(&self) -> Option { - self.date() - } - - fn instance_uid(&self) -> Result> { - self.instance_uid() - } - - fn indexes( - &self, - ) -> Result>> + '_>> { - self.indexes().map(|iter| { - Box::new(iter.map(|result| { - result.map(|index| Box::new(index) as Box) - })) as Box> +impl UpdateFile { + fn new(path: &Path) -> Result { + Ok(UpdateFile { + reader: BufReader::new(File::open(path)?), }) } +} - fn tasks( - &mut self, - ) -> Box)>> + '_> { - Box::new(self.tasks()) - } +impl Iterator for UpdateFile { + type Item = Result; - fn keys(&mut self) -> Box> + '_> { - Box::new(self.keys()) + fn next(&mut self) -> Option { + (&mut self.reader) + .lines() + .map(|line| { + line.map_err(Error::from) + .and_then(|line| serde_json::from_str(&line).map_err(Error::from)) + }) + .next() } } @@ -205,18 +200,3 @@ impl V6IndexReader { Ok(settings.check()) } } - -impl IndexReader for V6IndexReader { - fn metadata(&self) -> &IndexMetadata { - self.metadata() - } - - fn documents(&mut self) -> Result> + '_>> { - self.documents() - .map(|iter| Box::new(iter) as Box> + '_>) - } - - fn settings(&mut self) -> Result> { - self.settings() - } -} diff --git a/dump/src/writer.rs b/dump/src/writer.rs index 848121de0..01bd30abc 100644 --- a/dump/src/writer.rs +++ b/dump/src/writer.rs @@ -160,9 +160,12 @@ pub(crate) mod test { use flate2::bufread::GzDecoder; use index::Unchecked; - use crate::test::{ - create_test_api_keys, create_test_documents, create_test_dump, create_test_instance_uid, - create_test_settings, create_test_tasks, + use crate::{ + reader::Document, + test::{ + create_test_api_keys, create_test_documents, create_test_dump, + create_test_instance_uid, create_test_settings, create_test_tasks, + }, }; use super::*; @@ -309,8 +312,12 @@ pub(crate) mod test { if let Some(expected_update) = expected.1 { let path = dump_path.join(format!("tasks/update_files/{}", expected.0.uid)); println!("trying to open {}", path.display()); - let update = fs::read(path).unwrap(); - assert_eq!(update, expected_update); + let update = fs::read_to_string(path).unwrap(); + let documents: Vec = update + .lines() + .map(|line| serde_json::from_str(line).unwrap()) + .collect(); + assert_eq!(documents, expected_update); } }