From a97281af083b88cffd9e1009ac80f6a608a9aca5 Mon Sep 17 00:00:00 2001 From: Francis Murillo Date: Fri, 13 Jan 2023 22:45:45 +0800 Subject: [PATCH 1/3] Extract createdAt and updatedAt from v3 dump --- dump/src/reader/v3/mod.rs | 62 ++++++++++++++++++++++++++--------- dump/src/reader/v3/updates.rs | 20 +++++++++++ 2 files changed, 67 insertions(+), 15 deletions(-) diff --git a/dump/src/reader/v3/mod.rs b/dump/src/reader/v3/mod.rs index 5288fb61c..9b1b335df 100644 --- a/dump/src/reader/v3/mod.rs +++ b/dump/src/reader/v3/mod.rs @@ -114,6 +114,10 @@ impl V3Reader { V3IndexReader::new( index.uid.clone(), &self.dump.path().join("indexes").join(index.uuid.to_string()), + &index, + BufReader::new( + File::open(self.dump.path().join("updates").join("data.jsonl")).unwrap(), + ), ) })) } @@ -147,6 +151,7 @@ impl V3Reader { } } +#[derive(Debug)] pub struct V3IndexReader { metadata: IndexMetadata, settings: Settings, @@ -155,16 +160,43 @@ pub struct V3IndexReader { } impl V3IndexReader { - pub fn new(name: String, path: &Path) -> Result { + pub fn new( + name: String, + path: &Path, + index_uuid: &IndexUuid, + tasks: BufReader, + ) -> Result { let meta = File::open(path.join("meta.json"))?; let meta: DumpMeta = serde_json::from_reader(meta)?; + let mut created_entry = (u64::MAX, None); + let mut updated_entry = (u64::MIN, None); + + for line in tasks.lines() { + let task: Task = serde_json::from_str(&line?)?; + + if &task.uuid == &index_uuid.uuid { + let update_id = task.update.id(); + + if update_id <= created_entry.0 { + created_entry.0 = update_id; + created_entry.1 = task.update.created_at(); + } + + if update_id >= updated_entry.0 { + updated_entry.0 = update_id; + updated_entry.1 = task.update.processed_at(); + } + } + } + + let current_time = OffsetDateTime::now_utc(); + let metadata = IndexMetadata { uid: name, primary_key: meta.primary_key, - // FIXME: Iterate over the whole task queue to find the creation and last update date. - created_at: OffsetDateTime::now_utc(), - updated_at: OffsetDateTime::now_utc(), + created_at: created_entry.1.unwrap_or(current_time), + updated_at: updated_entry.1.unwrap_or(current_time), }; let ret = V3IndexReader { @@ -263,12 +295,12 @@ pub(crate) mod test { assert!(indexes.is_empty()); // products - insta::assert_json_snapshot!(products.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###" + insta::assert_json_snapshot!(products.metadata(), @r###" { "uid": "products", "primaryKey": "sku", - "createdAt": "[now]", - "updatedAt": "[now]" + "createdAt": "2022-10-07T11:38:54.734594617Z", + "updatedAt": "2022-10-07T11:38:55.963185778Z" } "###); @@ -278,12 +310,12 @@ pub(crate) mod test { meili_snap::snapshot_hash!(format!("{:#?}", documents), @"548284a84de510f71e88e6cdea495cf5"); // movies - insta::assert_json_snapshot!(movies.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###" + insta::assert_json_snapshot!(movies.metadata(), @r###" { "uid": "movies", "primaryKey": "id", - "createdAt": "[now]", - "updatedAt": "[now]" + "createdAt": "2022-10-07T11:38:54.004402239Z", + "updatedAt": "2022-10-07T11:39:04.188852537Z" } "###); @@ -293,11 +325,11 @@ pub(crate) mod test { meili_snap::snapshot_hash!(format!("{:#?}", documents), @"d153b5a81d8b3cdcbe1dec270b574022"); // movies2 - insta::assert_json_snapshot!(movies2.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###" + insta::assert_json_snapshot!(movies2.metadata(), { ".updatedAt" => "[now]" }, @r###" { "uid": "movies_2", "primaryKey": null, - "createdAt": "[now]", + "createdAt": "2022-10-07T11:39:03.703667164Z", "updatedAt": "[now]" } "###); @@ -308,12 +340,12 @@ pub(crate) mod test { meili_snap::snapshot_hash!(format!("{:#?}", documents), @"d751713988987e9331980363e24189ce"); // spells - insta::assert_json_snapshot!(spells.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###" + insta::assert_json_snapshot!(spells.metadata(), @r###" { "uid": "dnd_spells", "primaryKey": "index", - "createdAt": "[now]", - "updatedAt": "[now]" + "createdAt": "2022-10-07T11:38:56.263041061Z", + "updatedAt": "2022-10-07T11:38:56.521004328Z" } "###); diff --git a/dump/src/reader/v3/updates.rs b/dump/src/reader/v3/updates.rs index 2f9e49c1a..a485eaafb 100644 --- a/dump/src/reader/v3/updates.rs +++ b/dump/src/reader/v3/updates.rs @@ -74,6 +74,26 @@ impl UpdateStatus { _ => None, } } + + pub fn created_at(&self) -> Option { + match self { + UpdateStatus::Processing(u) => Some(u.from.enqueued_at.clone()), + UpdateStatus::Enqueued(u) => Some(u.enqueued_at.clone()), + UpdateStatus::Processed(u) => Some(u.from.from.enqueued_at.clone()), + UpdateStatus::Aborted(u) => Some(u.from.enqueued_at.clone()), + UpdateStatus::Failed(u) => Some(u.from.from.enqueued_at.clone()), + } + } + + pub fn processed_at(&self) -> Option { + match self { + UpdateStatus::Processing(u) => Some(u.started_processing_at.clone()), + UpdateStatus::Enqueued(_) => None, + UpdateStatus::Processed(u) => Some(u.processed_at.clone()), + UpdateStatus::Aborted(_) => None, + UpdateStatus::Failed(u) => Some(u.failed_at.clone()), + } + } } #[derive(Debug, Deserialize, Clone)] From 6993924f32a9517835adacf56a28f5bcaccc3b34 Mon Sep 17 00:00:00 2001 From: Francis Murillo Date: Tue, 17 Jan 2023 23:11:49 +0800 Subject: [PATCH 2/3] Use finished_at for v3 dumps instead --- dump/src/reader/v3/mod.rs | 52 +++++++++++++++++------------------ dump/src/reader/v3/updates.rs | 6 ++-- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/dump/src/reader/v3/mod.rs b/dump/src/reader/v3/mod.rs index 9b1b335df..59be1924b 100644 --- a/dump/src/reader/v3/mod.rs +++ b/dump/src/reader/v3/mod.rs @@ -112,7 +112,6 @@ impl V3Reader { pub fn indexes(&self) -> Result> + '_> { Ok(self.index_uuid.iter().map(|index| -> Result<_> { V3IndexReader::new( - index.uid.clone(), &self.dump.path().join("indexes").join(index.uuid.to_string()), &index, BufReader::new( @@ -160,43 +159,42 @@ pub struct V3IndexReader { } impl V3IndexReader { - pub fn new( - name: String, - path: &Path, - index_uuid: &IndexUuid, - tasks: BufReader, - ) -> Result { + pub fn new(path: &Path, index_uuid: &IndexUuid, tasks: BufReader) -> Result { let meta = File::open(path.join("meta.json"))?; let meta: DumpMeta = serde_json::from_reader(meta)?; - let mut created_entry = (u64::MAX, None); - let mut updated_entry = (u64::MIN, None); + let mut created_at = None; + let mut updated_at = None; for line in tasks.lines() { let task: Task = serde_json::from_str(&line?)?; - if &task.uuid == &index_uuid.uuid { - let update_id = task.update.id(); + if task.uuid != index_uuid.uuid || !task.is_finished() { + continue; + } - if update_id <= created_entry.0 { - created_entry.0 = update_id; - created_entry.1 = task.update.created_at(); - } + let new_created_at = match task.update.meta() { + Kind::DocumentAddition { .. } | Kind::Settings(_) => task.update.finished_at(), + _ => None, + }; + let new_updated_at = task.update.finished_at(); - if update_id >= updated_entry.0 { - updated_entry.0 = update_id; - updated_entry.1 = task.update.processed_at(); - } + if created_at.is_none() || created_at > new_created_at { + created_at = new_created_at; + } + + if updated_at.is_none() || updated_at < new_updated_at { + updated_at = new_updated_at; } } let current_time = OffsetDateTime::now_utc(); let metadata = IndexMetadata { - uid: name, + uid: index_uuid.uid.clone(), primary_key: meta.primary_key, - created_at: created_entry.1.unwrap_or(current_time), - updated_at: updated_entry.1.unwrap_or(current_time), + created_at: created_at.unwrap_or(current_time), + updated_at: updated_at.unwrap_or(current_time), }; let ret = V3IndexReader { @@ -299,7 +297,7 @@ pub(crate) mod test { { "uid": "products", "primaryKey": "sku", - "createdAt": "2022-10-07T11:38:54.734594617Z", + "createdAt": "2022-10-07T11:38:54.74389899Z", "updatedAt": "2022-10-07T11:38:55.963185778Z" } "###); @@ -314,7 +312,7 @@ pub(crate) mod test { { "uid": "movies", "primaryKey": "id", - "createdAt": "2022-10-07T11:38:54.004402239Z", + "createdAt": "2022-10-07T11:38:54.026649575Z", "updatedAt": "2022-10-07T11:39:04.188852537Z" } "###); @@ -325,11 +323,11 @@ pub(crate) mod test { meili_snap::snapshot_hash!(format!("{:#?}", documents), @"d153b5a81d8b3cdcbe1dec270b574022"); // movies2 - insta::assert_json_snapshot!(movies2.metadata(), { ".updatedAt" => "[now]" }, @r###" + insta::assert_json_snapshot!(movies2.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###" { "uid": "movies_2", "primaryKey": null, - "createdAt": "2022-10-07T11:39:03.703667164Z", + "createdAt": "[now]", "updatedAt": "[now]" } "###); @@ -344,7 +342,7 @@ pub(crate) mod test { { "uid": "dnd_spells", "primaryKey": "index", - "createdAt": "2022-10-07T11:38:56.263041061Z", + "createdAt": "2022-10-07T11:38:56.265951133Z", "updatedAt": "2022-10-07T11:38:56.521004328Z" } "###); diff --git a/dump/src/reader/v3/updates.rs b/dump/src/reader/v3/updates.rs index a485eaafb..89d0b86bb 100644 --- a/dump/src/reader/v3/updates.rs +++ b/dump/src/reader/v3/updates.rs @@ -75,7 +75,7 @@ impl UpdateStatus { } } - pub fn created_at(&self) -> Option { + pub fn enqueued_at(&self) -> Option { match self { UpdateStatus::Processing(u) => Some(u.from.enqueued_at.clone()), UpdateStatus::Enqueued(u) => Some(u.enqueued_at.clone()), @@ -85,9 +85,9 @@ impl UpdateStatus { } } - pub fn processed_at(&self) -> Option { + pub fn finished_at(&self) -> Option { match self { - UpdateStatus::Processing(u) => Some(u.started_processing_at.clone()), + UpdateStatus::Processing(_) => None, UpdateStatus::Enqueued(_) => None, UpdateStatus::Processed(u) => Some(u.processed_at.clone()), UpdateStatus::Aborted(_) => None, From 798aa4ee9204ce20ab90054cad3e7a2243544aa3 Mon Sep 17 00:00:00 2001 From: Francis Murillo Date: Thu, 19 Jan 2023 19:38:20 +0800 Subject: [PATCH 3/3] Fix clippy issues --- dump/src/reader/v3/mod.rs | 5 ++--- dump/src/reader/v3/updates.rs | 14 +++++++------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/dump/src/reader/v3/mod.rs b/dump/src/reader/v3/mod.rs index 59be1924b..560a901cd 100644 --- a/dump/src/reader/v3/mod.rs +++ b/dump/src/reader/v3/mod.rs @@ -113,7 +113,7 @@ impl V3Reader { Ok(self.index_uuid.iter().map(|index| -> Result<_> { V3IndexReader::new( &self.dump.path().join("indexes").join(index.uuid.to_string()), - &index, + index, BufReader::new( File::open(self.dump.path().join("updates").join("data.jsonl")).unwrap(), ), @@ -150,7 +150,6 @@ impl V3Reader { } } -#[derive(Debug)] pub struct V3IndexReader { metadata: IndexMetadata, settings: Settings, @@ -169,7 +168,7 @@ impl V3IndexReader { for line in tasks.lines() { let task: Task = serde_json::from_str(&line?)?; - if task.uuid != index_uuid.uuid || !task.is_finished() { + if !(task.uuid == index_uuid.uuid && task.is_finished()) { continue; } diff --git a/dump/src/reader/v3/updates.rs b/dump/src/reader/v3/updates.rs index 89d0b86bb..0d8325d47 100644 --- a/dump/src/reader/v3/updates.rs +++ b/dump/src/reader/v3/updates.rs @@ -77,11 +77,11 @@ impl UpdateStatus { pub fn enqueued_at(&self) -> Option { match self { - UpdateStatus::Processing(u) => Some(u.from.enqueued_at.clone()), - UpdateStatus::Enqueued(u) => Some(u.enqueued_at.clone()), - UpdateStatus::Processed(u) => Some(u.from.from.enqueued_at.clone()), - UpdateStatus::Aborted(u) => Some(u.from.enqueued_at.clone()), - UpdateStatus::Failed(u) => Some(u.from.from.enqueued_at.clone()), + UpdateStatus::Processing(u) => Some(u.from.enqueued_at), + UpdateStatus::Enqueued(u) => Some(u.enqueued_at), + UpdateStatus::Processed(u) => Some(u.from.from.enqueued_at), + UpdateStatus::Aborted(u) => Some(u.from.enqueued_at), + UpdateStatus::Failed(u) => Some(u.from.from.enqueued_at), } } @@ -89,9 +89,9 @@ impl UpdateStatus { match self { UpdateStatus::Processing(_) => None, UpdateStatus::Enqueued(_) => None, - UpdateStatus::Processed(u) => Some(u.processed_at.clone()), + UpdateStatus::Processed(u) => Some(u.processed_at), UpdateStatus::Aborted(_) => None, - UpdateStatus::Failed(u) => Some(u.failed_at.clone()), + UpdateStatus::Failed(u) => Some(u.failed_at), } } }