diff --git a/dump/src/lib.rs b/dump/src/lib.rs index fd628426e..2bed7d12a 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -12,7 +12,7 @@ mod reader; mod writer; pub use error::Error; -pub use reader::open; +pub use reader::DumpReader; pub use writer::DumpWriter; const CURRENT_DUMP_VERSION: Version = Version::V6; @@ -193,27 +193,21 @@ pub(crate) mod test { use big_s::S; use maplit::btreeset; - use meilisearch_types::tasks::{Kind, Status}; + use meilisearch_types::keys::{Action, Key}; + use meilisearch_types::milli::{self, update::Setting}; + use meilisearch_types::tasks::Status; use meilisearch_types::{index_uid::IndexUid, star_or::StarOr}; - use meilisearch_types::{ - keys::{Action, Key}, - tasks::Task, - }; - use meilisearch_types::{ - milli::{self, update::Setting}, - tasks::KindWithContent, - }; use meilisearch_types::{ settings::{Checked, Settings}, tasks::Details, }; use serde_json::{json, Map, Value}; - use time::{macros::datetime, Duration}; + use time::macros::datetime; use uuid::Uuid; use crate::{ reader::{self, Document}, - DumpWriter, IndexMetadata, KindDump, TaskDump, Version, + DumpReader, DumpWriter, IndexMetadata, KindDump, TaskDump, Version, }; pub fn create_test_instance_uid() -> Uuid { @@ -419,7 +413,7 @@ pub(crate) mod test { #[test] fn test_creating_and_read_dump() { let mut file = create_test_dump(); - let mut dump = reader::open(&mut file).unwrap(); + let mut dump = DumpReader::open(&mut file).unwrap(); // ==== checking the top level infos assert_eq!(dump.version(), Version::V6); diff --git a/dump/src/reader/compat/mod.rs b/dump/src/reader/compat/mod.rs index 08eb971e6..29836aa61 100644 --- a/dump/src/reader/compat/mod.rs +++ b/dump/src/reader/compat/mod.rs @@ -1,151 +1,4 @@ -// pub mod v2; -// pub mod v3; -// pub mod v4; -use crate::Result; - -use self::{ - v4_to_v5::CompatV4ToV5, - v5_to_v6::{CompatIndexV5ToV6, CompatV5ToV6}, -}; - -use super::{ - v5::V5Reader, - v6::{self, V6IndexReader, V6Reader}, - Document, UpdateFile, -}; - pub mod v2_to_v3; pub mod v3_to_v4; pub mod v4_to_v5; pub mod v5_to_v6; - -pub enum Compat { - Current(V6Reader), - Compat(CompatV5ToV6), -} - -impl Compat { - pub fn version(&self) -> crate::Version { - match self { - Compat::Current(current) => current.version(), - Compat::Compat(compat) => compat.version(), - } - } - - pub fn date(&self) -> Option { - match self { - Compat::Current(current) => current.date(), - Compat::Compat(compat) => compat.date(), - } - } - - pub fn instance_uid(&self) -> Result> { - match self { - Compat::Current(current) => current.instance_uid(), - Compat::Compat(compat) => compat.instance_uid(), - } - } - - pub fn indexes(&self) -> Result> + '_>> { - match self { - Compat::Current(current) => { - let indexes = Box::new(current.indexes()?.map(|res| res.map(CompatIndex::from))) - as Box> + '_>; - Ok(indexes) - } - Compat::Compat(compat) => { - let indexes = Box::new(compat.indexes()?.map(|res| res.map(CompatIndex::from))) - as Box> + '_>; - Ok(indexes) - } - } - } - - pub fn tasks( - &mut self, - ) -> Box>)>> + '_> { - match self { - Compat::Current(current) => current.tasks(), - Compat::Compat(compat) => compat.tasks(), - } - } - - pub fn keys(&mut self) -> Box> + '_> { - match self { - Compat::Current(current) => current.keys(), - Compat::Compat(compat) => compat.keys(), - } - } -} - -impl From for Compat { - fn from(value: V6Reader) -> Self { - Compat::Current(value) - } -} - -impl From for Compat { - fn from(value: CompatV5ToV6) -> Self { - Compat::Compat(value) - } -} - -impl From for Compat { - fn from(value: V5Reader) -> Self { - Compat::Compat(value.to_v6()) - } -} - -impl From for Compat { - fn from(value: CompatV4ToV5) -> Self { - Compat::Compat(value.to_v6()) - } -} - -pub enum CompatIndex { - Current(v6::V6IndexReader), - Compat(CompatIndexV5ToV6), -} - -impl CompatIndex { - pub fn new_v6(v6: v6::V6IndexReader) -> CompatIndex { - CompatIndex::Current(v6) - } - - pub fn metadata(&self) -> &crate::IndexMetadata { - match self { - CompatIndex::Current(v6) => v6.metadata(), - CompatIndex::Compat(compat) => compat.metadata(), - } - } - - pub fn documents(&mut self) -> Result> + '_>> { - match self { - CompatIndex::Current(v6) => v6 - .documents() - .map(|iter| Box::new(iter) as Box> + '_>), - CompatIndex::Compat(compat) => compat - .documents() - .map(|iter| Box::new(iter) as Box> + '_>), - } - } - - pub fn settings(&mut self) -> Result> { - match self { - CompatIndex::Current(v6) => v6.settings(), - CompatIndex::Compat(compat) => compat.settings(), - } - } -} - -impl From for CompatIndex { - fn from(value: V6IndexReader) -> Self { - CompatIndex::Current(value) - } -} - -impl From for CompatIndex { - fn from(value: CompatIndexV5ToV6) -> Self { - CompatIndex::Compat(value) - } -} diff --git a/dump/src/reader/mod.rs b/dump/src/reader/mod.rs index 6542786f2..daa7df1f9 100644 --- a/dump/src/reader/mod.rs +++ b/dump/src/reader/mod.rs @@ -1,15 +1,15 @@ use std::io::Read; use std::{fs::File, io::BufReader}; -use flate2::bufread::GzDecoder; - -use serde::Deserialize; - -use tempfile::TempDir; - +use self::compat::v4_to_v5::CompatV4ToV5; +use self::compat::v5_to_v6::{CompatIndexV5ToV6, CompatV5ToV6}; +use self::v5::V5Reader; +use self::v6::{V6IndexReader, V6Reader}; use crate::{Result, Version}; -use self::compat::Compat; +use flate2::bufread::GzDecoder; +use serde::Deserialize; +use tempfile::TempDir; mod compat; @@ -23,34 +23,165 @@ pub(self) mod v6; pub type Document = serde_json::Map; pub type UpdateFile = dyn Iterator>; -pub fn open(dump: impl Read) -> Result { - let path = TempDir::new()?; - let mut dump = BufReader::new(dump); - let gz = GzDecoder::new(&mut dump); - let mut archive = tar::Archive::new(gz); - archive.unpack(path.path())?; +pub enum DumpReader { + Current(V6Reader), + Compat(CompatV5ToV6), +} - #[derive(Deserialize)] - #[serde(rename_all = "camelCase")] - struct MetadataVersion { - pub dump_version: Version, +impl DumpReader { + pub fn open(dump: impl Read) -> Result { + let path = TempDir::new()?; + let mut dump = BufReader::new(dump); + let gz = GzDecoder::new(&mut dump); + let mut archive = tar::Archive::new(gz); + archive.unpack(path.path())?; + + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct MetadataVersion { + pub dump_version: Version, + } + let mut meta_file = File::open(path.path().join("metadata.json"))?; + let MetadataVersion { dump_version } = serde_json::from_reader(&mut meta_file)?; + + match dump_version { + // Version::V1 => Ok(Box::new(v1::Reader::open(path)?)), + Version::V1 => todo!(), + Version::V2 => Ok(v2::V2Reader::open(path)? + .to_v3() + .to_v4() + .to_v5() + .to_v6() + .into()), + Version::V3 => Ok(v3::V3Reader::open(path)?.to_v4().to_v5().to_v6().into()), + Version::V4 => Ok(v4::V4Reader::open(path)?.to_v5().to_v6().into()), + Version::V5 => Ok(v5::V5Reader::open(path)?.to_v6().into()), + Version::V6 => Ok(v6::V6Reader::open(path)?.into()), + } } - let mut meta_file = File::open(path.path().join("metadata.json"))?; - let MetadataVersion { dump_version } = serde_json::from_reader(&mut meta_file)?; - match dump_version { - // Version::V1 => Ok(Box::new(v1::Reader::open(path)?)), - Version::V1 => todo!(), - Version::V2 => Ok(v2::V2Reader::open(path)? - .to_v3() - .to_v4() - .to_v5() - .to_v6() - .into()), - Version::V3 => Ok(v3::V3Reader::open(path)?.to_v4().to_v5().to_v6().into()), - Version::V4 => Ok(v4::V4Reader::open(path)?.to_v5().to_v6().into()), - Version::V5 => Ok(v5::V5Reader::open(path)?.to_v6().into()), - Version::V6 => Ok(v6::V6Reader::open(path)?.into()), + pub fn version(&self) -> crate::Version { + match self { + DumpReader::Current(current) => current.version(), + DumpReader::Compat(compat) => compat.version(), + } + } + + pub fn date(&self) -> Option { + match self { + DumpReader::Current(current) => current.date(), + DumpReader::Compat(compat) => compat.date(), + } + } + + pub fn instance_uid(&self) -> Result> { + match self { + DumpReader::Current(current) => current.instance_uid(), + DumpReader::Compat(compat) => compat.instance_uid(), + } + } + + pub fn indexes(&self) -> Result> + '_>> { + match self { + DumpReader::Current(current) => { + let indexes = Box::new(current.indexes()?.map(|res| res.map(DumpIndexReader::from))) + as Box> + '_>; + Ok(indexes) + } + DumpReader::Compat(compat) => { + let indexes = Box::new(compat.indexes()?.map(|res| res.map(DumpIndexReader::from))) + as Box> + '_>; + Ok(indexes) + } + } + } + + pub fn tasks( + &mut self, + ) -> Box>)>> + '_> { + match self { + DumpReader::Current(current) => current.tasks(), + DumpReader::Compat(compat) => compat.tasks(), + } + } + + pub fn keys(&mut self) -> Box> + '_> { + match self { + DumpReader::Current(current) => current.keys(), + DumpReader::Compat(compat) => compat.keys(), + } + } +} + +impl From for DumpReader { + fn from(value: V6Reader) -> Self { + DumpReader::Current(value) + } +} + +impl From for DumpReader { + fn from(value: CompatV5ToV6) -> Self { + DumpReader::Compat(value) + } +} + +impl From for DumpReader { + fn from(value: V5Reader) -> Self { + DumpReader::Compat(value.to_v6()) + } +} + +impl From for DumpReader { + fn from(value: CompatV4ToV5) -> Self { + DumpReader::Compat(value.to_v6()) + } +} + +pub enum DumpIndexReader { + Current(v6::V6IndexReader), + Compat(CompatIndexV5ToV6), +} + +impl DumpIndexReader { + pub fn new_v6(v6: v6::V6IndexReader) -> DumpIndexReader { + DumpIndexReader::Current(v6) + } + + pub fn metadata(&self) -> &crate::IndexMetadata { + match self { + DumpIndexReader::Current(v6) => v6.metadata(), + DumpIndexReader::Compat(compat) => compat.metadata(), + } + } + + pub fn documents(&mut self) -> Result> + '_>> { + match self { + DumpIndexReader::Current(v6) => v6 + .documents() + .map(|iter| Box::new(iter) as Box> + '_>), + DumpIndexReader::Compat(compat) => compat + .documents() + .map(|iter| Box::new(iter) as Box> + '_>), + } + } + + pub fn settings(&mut self) -> Result> { + match self { + DumpIndexReader::Current(v6) => v6.settings(), + DumpIndexReader::Compat(compat) => compat.settings(), + } + } +} + +impl From for DumpIndexReader { + fn from(value: V6IndexReader) -> Self { + DumpIndexReader::Current(value) + } +} + +impl From for DumpIndexReader { + fn from(value: CompatIndexV5ToV6) -> Self { + DumpIndexReader::Compat(value) } } @@ -63,7 +194,7 @@ pub(crate) mod test { #[test] fn import_dump_v5() { let dump = File::open("tests/assets/v5.dump").unwrap(); - let mut dump = open(dump).unwrap(); + let mut dump = DumpReader::open(dump).unwrap(); // top level infos insta::assert_display_snapshot!(dump.date().unwrap(), @"2022-10-04 15:55:10.344982459 +00:00:00"); @@ -153,7 +284,7 @@ pub(crate) mod test { #[test] fn import_dump_v4() { let dump = File::open("tests/assets/v4.dump").unwrap(); - let mut dump = open(dump).unwrap(); + let mut dump = DumpReader::open(dump).unwrap(); // top level infos insta::assert_display_snapshot!(dump.date().unwrap(), @"2022-10-06 12:53:49.131989609 +00:00:00"); @@ -242,7 +373,7 @@ pub(crate) mod test { #[test] fn import_dump_v3() { let dump = File::open("tests/assets/v3.dump").unwrap(); - let mut dump = open(dump).unwrap(); + let mut dump = DumpReader::open(dump).unwrap(); // top level infos insta::assert_display_snapshot!(dump.date().unwrap(), @"2022-10-07 11:39:03.709153554 +00:00:00"); @@ -351,7 +482,7 @@ pub(crate) mod test { #[test] fn import_dump_v2() { let dump = File::open("tests/assets/v2.dump").unwrap(); - let mut dump = open(dump).unwrap(); + let mut dump = DumpReader::open(dump).unwrap(); // top level infos insta::assert_display_snapshot!(dump.date().unwrap(), @"2022-10-09 20:27:59.904096267 +00:00:00"); diff --git a/dump/src/reader/v6/mod.rs b/dump/src/reader/v6/mod.rs index 9752187e3..008a5ad27 100644 --- a/dump/src/reader/v6/mod.rs +++ b/dump/src/reader/v6/mod.rs @@ -11,8 +11,6 @@ use uuid::Uuid; use crate::{Error, IndexMetadata, Result, Version}; -mod tasks; - pub use meilisearch_types::milli; use super::Document; diff --git a/dump/src/reader/v6/tasks.rs b/dump/src/reader/v6/tasks.rs deleted file mode 100644 index 8b1378917..000000000 --- a/dump/src/reader/v6/tasks.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/dump/src/writer.rs b/dump/src/writer.rs index 92ba5fb7a..4f0d20754 100644 --- a/dump/src/writer.rs +++ b/dump/src/writer.rs @@ -8,7 +8,6 @@ use flate2::{write::GzEncoder, Compression}; use meilisearch_types::{ keys::Key, settings::{Checked, Settings}, - tasks::Task, }; use serde_json::{Map, Value}; use tempfile::TempDir; diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index b52da0f55..bb7a3613c 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -503,11 +503,12 @@ impl IndexScheduler { let mut tasks = dump.create_tasks_queue()?; for ret in self.all_tasks.iter(&rtxn)? { let (_, task) = ret?; - let mut dump_content_file = tasks.push_task((&task).into())?; + let content_file = task.content_uuid().map(|uuid| uuid.clone()); + let mut dump_content_file = tasks.push_task(&task.into())?; // 2.1. Dump the `content_file` associated with the task if there is one. - if let Some(content_file) = task.content_uuid() { - let content_file = self.file_store.get_update(*content_file)?; + if let Some(content_file) = content_file { + let content_file = self.file_store.get_update(content_file)?; let reader = DocumentsBatchReader::from_reader(content_file) .map_err(milli::Error::from)?; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index a2d182dd9..e4fd34fd1 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -220,6 +220,10 @@ impl IndexScheduler { Ok(this) } + pub fn import_dump(&self, dump_path: PathBuf) -> Result<()> { + todo!() + } + /// This function will execute in a different thread and must be called only once. fn run(&self) { let run = Self {