From f40b373f9f28ea160754a6dfbb426396b1284f38 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= <clement@meilisearch.com>
Date: Mon, 26 Aug 2019 18:21:16 +0200
Subject: [PATCH] feat: Introduce the UpdateStatus type

---
 meilidb-data/src/database/index/mod.rs | 88 +++++++++++++++++++-------
 meilidb-data/tests/updates.rs          |  8 +--
 2 files changed, 68 insertions(+), 28 deletions(-)

diff --git a/meilidb-data/src/database/index/mod.rs b/meilidb-data/src/database/index/mod.rs
index bf5f3575c..9661de629 100644
--- a/meilidb-data/src/database/index/mod.rs
+++ b/meilidb-data/src/database/index/mod.rs
@@ -1,6 +1,8 @@
 use std::collections::{HashSet, BTreeMap};
+use std::convert::TryInto;
 use std::sync::Arc;
 use std::thread;
+use std::time::{Duration, Instant};
 
 use arc_swap::{ArcSwap, Guard};
 use meilidb_core::criterion::Criteria;
@@ -58,41 +60,79 @@ enum Update {
     SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>),
 }
 
+#[derive(Serialize, Deserialize)]
+pub enum UpdateType {
+    DocumentsAddition { number: usize },
+    DocumentsDeletion { number: usize },
+    SynonymsAddition { number: usize },
+    SynonymsDeletion { number: usize },
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct DetailedDuration {
+    main: Duration,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct UpdateStatus {
+    pub update_id: u64,
+    pub update_type: UpdateType,
+    pub result: Result<(), String>,
+    pub detailed_duration: DetailedDuration,
+}
+
 fn spawn_update_system(index: Index) -> thread::JoinHandle<()> {
     thread::spawn(move || {
         loop {
             let subscription = index.updates_index.watch_prefix(vec![]);
             while let Some(result) = index.updates_index.iter().next() {
                 let (key, _) = result.unwrap();
+                let update_id = key.as_ref().try_into().map(u64::from_be_bytes).unwrap();
 
                 let updates = &index.updates_index;
                 let results = &index.updates_results_index;
+
                 (updates, results).transaction(|(updates, results)| {
                     let update = updates.remove(&key)?.unwrap();
 
-                    // this is an emulation of the try block (#31436)
-                    let result: Result<(), Error> = (|| {
-                        match rmp_serde::from_read_ref(&update)? {
-                            UpdateOwned::DocumentsAddition(documents) => {
-                                let ranked_map = index.cache.load().ranked_map.clone();
-                                apply_documents_addition(&index, ranked_map, documents)?;
-                            },
-                            UpdateOwned::DocumentsDeletion(documents) => {
-                                let ranked_map = index.cache.load().ranked_map.clone();
-                                apply_documents_deletion(&index, ranked_map, documents)?;
-                            },
-                            UpdateOwned::SynonymsAddition(synonyms) => {
-                                apply_synonyms_addition(&index, synonyms)?;
-                            },
-                            UpdateOwned::SynonymsDeletion(synonyms) => {
-                                apply_synonyms_deletion(&index, synonyms)?;
-                            },
-                        }
-                        Ok(())
-                    })();
+                    let (update_type, result, duration) = match rmp_serde::from_read_ref(&update).unwrap() {
+                        UpdateOwned::DocumentsAddition(documents) => {
+                            let update_type = UpdateType::DocumentsAddition { number: documents.len() };
+                            let ranked_map = index.cache.load().ranked_map.clone();
+                            let start = Instant::now();
+                            let result = apply_documents_addition(&index, ranked_map, documents);
+                            (update_type, result, start.elapsed())
+                        },
+                        UpdateOwned::DocumentsDeletion(documents) => {
+                            let update_type = UpdateType::DocumentsDeletion { number: documents.len() };
+                            let ranked_map = index.cache.load().ranked_map.clone();
+                            let start = Instant::now();
+                            let result = apply_documents_deletion(&index, ranked_map, documents);
+                            (update_type, result, start.elapsed())
+                        },
+                        UpdateOwned::SynonymsAddition(synonyms) => {
+                            let update_type = UpdateType::SynonymsAddition { number: synonyms.len() };
+                            let start = Instant::now();
+                            let result = apply_synonyms_addition(&index, synonyms);
+                            (update_type, result, start.elapsed())
+                        },
+                        UpdateOwned::SynonymsDeletion(synonyms) => {
+                            let update_type = UpdateType::SynonymsDeletion { number: synonyms.len() };
+                            let start = Instant::now();
+                            let result = apply_synonyms_deletion(&index, synonyms);
+                            (update_type, result, start.elapsed())
+                        },
+                    };
 
-                    let result = result.map_err(|e| e.to_string());
-                    let value = bincode::serialize(&result).unwrap();
+                    let detailed_duration = DetailedDuration { main: duration };
+                    let status = UpdateStatus {
+                        update_id,
+                        update_type,
+                        result: result.map_err(|e| e.to_string()),
+                        detailed_duration,
+                    };
+
+                    let value = bincode::serialize(&status).unwrap();
                     results.insert(&key, value)
                 })
                 .unwrap();
@@ -267,7 +307,7 @@ impl Index {
     pub fn update_status(
         &self,
         update_id: u64,
-    ) -> Result<Option<Result<(), String>>, Error>
+    ) -> Result<Option<UpdateStatus>, Error>
     {
         let update_id = update_id.to_be_bytes();
         match self.updates_results_index.get(update_id)? {
@@ -282,7 +322,7 @@ impl Index {
     pub fn update_status_blocking(
         &self,
         update_id: u64,
-    ) -> Result<Result<(), String>, Error>
+    ) -> Result<UpdateStatus, Error>
     {
         let update_id_bytes = update_id.to_be_bytes().to_vec();
         let mut subscription = self.updates_results_index.watch_prefix(update_id_bytes);
diff --git a/meilidb-data/tests/updates.rs b/meilidb-data/tests/updates.rs
index 783664410..3a292c082 100644
--- a/meilidb-data/tests/updates.rs
+++ b/meilidb-data/tests/updates.rs
@@ -23,7 +23,7 @@ fn insert_delete_document() {
     addition.update_document(&doc1);
     let update_id = addition.finalize().unwrap();
     let status = index.update_status_blocking(update_id).unwrap();
-    assert_eq!(status, Ok(()));
+    assert!(status.result.is_ok());
 
     let docs = index.query_builder().query("hello", 0..10).unwrap();
     assert_eq!(docs.len(), 1);
@@ -33,7 +33,7 @@ fn insert_delete_document() {
     deletion.delete_document(&doc1).unwrap();
     let update_id = deletion.finalize().unwrap();
     let status = index.update_status_blocking(update_id).unwrap();
-    assert_eq!(status, Ok(()));
+    assert!(status.result.is_ok());
 
     let docs = index.query_builder().query("hello", 0..10).unwrap();
     assert_eq!(docs.len(), 0);
@@ -54,7 +54,7 @@ fn replace_document() {
     addition.update_document(&doc1);
     let update_id = addition.finalize().unwrap();
     let status = index.update_status_blocking(update_id).unwrap();
-    assert_eq!(status, Ok(()));
+    assert!(status.result.is_ok());
 
     let docs = index.query_builder().query("hello", 0..10).unwrap();
     assert_eq!(docs.len(), 1);
@@ -64,7 +64,7 @@ fn replace_document() {
     deletion.update_document(&doc2);
     let update_id = deletion.finalize().unwrap();
     let status = index.update_status_blocking(update_id).unwrap();
-    assert_eq!(status, Ok(()));
+    assert!(status.result.is_ok());
 
     let docs = index.query_builder().query("hello", 0..10).unwrap();
     assert_eq!(docs.len(), 0);