From 0b5b7b0bf10769221919dc50e1a33bdbe6ffe0f7 Mon Sep 17 00:00:00 2001 From: qdequele Date: Thu, 26 Sep 2019 14:54:28 +0200 Subject: [PATCH] feat: add a method to get the current processed update id & next updates in queue --- meilidb-data/src/database/index/mod.rs | 18 +++++++++ meilidb-data/tests/index.rs | 51 +++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/meilidb-data/src/database/index/mod.rs b/meilidb-data/src/database/index/mod.rs index 0194b5d51..071b60bab 100644 --- a/meilidb-data/src/database/index/mod.rs +++ b/meilidb-data/src/database/index/mod.rs @@ -359,6 +359,24 @@ impl Index { SynonymsDeletion::new(self) } + pub fn current_update_id(&self) -> Result, Error> { + if let Some((key, _)) = self.updates_index.iter()?.next() { + return Ok(Some(key.as_ref().try_into().map(u64::from_be_bytes).unwrap())) + } + Ok(None) + } + + pub fn enqueued_updates_ids(&self) -> Result, Error> { + let mut updates = Vec::new(); + + for (key, _) in self.updates_index.iter()? { + let update_id = key.as_ref().try_into().map(u64::from_be_bytes).unwrap(); + updates.push(update_id); + } + + Ok(updates) + } + pub fn update_status( &self, update_id: u64, diff --git a/meilidb-data/tests/index.rs b/meilidb-data/tests/index.rs index 4d7a7658b..a8aee6c5b 100644 --- a/meilidb-data/tests/index.rs +++ b/meilidb-data/tests/index.rs @@ -1,6 +1,6 @@ mod common; -use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering::Relaxed}; use std::sync::Arc; use serde_json::json; @@ -97,3 +97,52 @@ fn documents_ids() { let documents_ids_count = index.documents_ids().unwrap().count(); assert_eq!(documents_ids_count, 3); } + +#[test] +fn current_update_id() { + let index = common::simple_index(); + let update_id = Arc::new(AtomicU64::new(0)); + + let update_id_cloned = update_id.clone(); + let index_cloned = index.clone(); + index.set_update_callback(move |_| { + let current_update_id = index_cloned.current_update_id().unwrap().unwrap(); + assert_eq!(current_update_id, update_id_cloned.load(Relaxed)); + }); + + let doc1 = json!({ "objectId": 123, "title": "hello" }); + let mut addition = index.documents_addition(); + addition.update_document(&doc1); + update_id.store(addition.finalize().unwrap(), Relaxed); +} + +#[test] +fn nest_updates_in_queue() { + let index = common::simple_index(); + + index.set_update_callback(move |_| { + std::thread::sleep(std::time::Duration::from_secs(15)); + }); + + let doc1 = json!({ "objectId": 123, "title": "hello" }); + let doc2 = json!({ "objectId": 456, "title": "world" }); + let doc3 = json!({ "objectId": 789 }); + + let mut addition = index.documents_addition(); + addition.update_document(&doc1); + let _ = addition.finalize().unwrap(); + + let mut addition = index.documents_addition(); + addition.update_document(&doc2); + let _ = addition.finalize().unwrap(); + + let mut addition = index.documents_addition(); + addition.update_document(&doc3); + let _ = addition.finalize().unwrap(); + + let should_have_in_queue_updates = vec![1, 2, 3]; + + let in_queue_updates = index.enqueued_updates_ids().unwrap(); + assert_eq!(in_queue_updates, should_have_in_queue_updates); + +}