From acb5e624c6e76bfedaf883df9c206a9cc494ee8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 12 Nov 2019 18:00:47 +0100 Subject: [PATCH] Add enqueued and processed datetimes --- Cargo.lock | 1 + meilidb-core/Cargo.toml | 1 + meilidb-core/src/database.rs | 18 +- meilidb-core/src/store/mod.rs | 14 +- meilidb-core/src/store/updates_results.rs | 3 +- meilidb-core/src/update/clear_all.rs | 2 +- meilidb-core/src/update/customs_update.rs | 2 +- meilidb-core/src/update/documents_addition.rs | 4 +- meilidb-core/src/update/documents_deletion.rs | 2 +- meilidb-core/src/update/mod.rs | 161 +++++++++++++----- meilidb-core/src/update/schema_update.rs | 2 +- .../src/update/stop_words_addition.rs | 2 +- .../src/update/stop_words_deletion.rs | 2 +- meilidb-core/src/update/synonyms_addition.rs | 2 +- meilidb-core/src/update/synonyms_deletion.rs | 2 +- meilidb-http/src/routes/index.rs | 18 +- 16 files changed, 160 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a755b1845..e15bea55c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -864,6 +864,7 @@ dependencies = [ "assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "bincode 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "csv 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "deunicode 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/meilidb-core/Cargo.toml b/meilidb-core/Cargo.toml index 5b974bb33..c246b5368 100644 --- a/meilidb-core/Cargo.toml +++ b/meilidb-core/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" arc-swap = "0.4.3" bincode = "1.1.4" byteorder = "1.3.2" +chrono = { version = "0.4.9", features = ["serde"] } crossbeam-channel = "0.4.0" deunicode = "1.0.0" env_logger = "0.7.0" diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index 790a74c7d..2ff16a461 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -323,7 +323,7 @@ mod tests { let reader = env.read_txn().unwrap(); let result = index.update_status(&reader, update_id).unwrap(); - assert_matches!(result, UpdateStatus::Processed { content } if content.error.is_none()); + assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); } #[test] @@ -384,7 +384,7 @@ mod tests { let reader = env.read_txn().unwrap(); let result = index.update_status(&reader, update_id).unwrap(); - assert_matches!(result, UpdateStatus::Processed { content } if content.error.is_some()); + assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_some()); } #[test] @@ -434,7 +434,7 @@ mod tests { let reader = env.read_txn().unwrap(); let result = index.update_status(&reader, update_id).unwrap(); - assert_matches!(result, UpdateStatus::Processed { content } if content.error.is_none()); + assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); } #[test] @@ -524,7 +524,7 @@ mod tests { // check if it has been accepted let reader = env.read_txn().unwrap(); let result = index.update_status(&reader, update_id).unwrap(); - assert_matches!(result, UpdateStatus::Processed { content } if content.error.is_none()); + assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); reader.abort(); let mut additions = index.documents_addition(); @@ -558,7 +558,7 @@ mod tests { // check if it has been accepted let reader = env.read_txn().unwrap(); let result = index.update_status(&reader, update_id).unwrap(); - assert_matches!(result, UpdateStatus::Processed { content } if content.error.is_none()); + assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); // even try to search for a document let results = index.query_builder().query(&reader, "21 ", 0..20).unwrap(); @@ -604,7 +604,7 @@ mod tests { // check if it has been accepted let reader = env.read_txn().unwrap(); let result = index.update_status(&reader, update_id).unwrap(); - assert_matches!(result, UpdateStatus::Processed { content } if content.error.is_some()); + assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_some()); } #[test] @@ -668,7 +668,7 @@ mod tests { let reader = env.read_txn().unwrap(); let result = index.update_status(&reader, update_id).unwrap(); - assert_matches!(result, UpdateStatus::Processed { content } if content.error.is_none()); + assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); let document: Option = index.document(&reader, None, DocumentId(25)).unwrap(); assert!(document.is_none()); @@ -748,7 +748,7 @@ mod tests { let reader = env.read_txn().unwrap(); let result = index.update_status(&reader, update_id).unwrap(); - assert_matches!(result, UpdateStatus::Processed { content } if content.error.is_none()); + assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); let document: Option = index.document(&reader, None, DocumentId(25)).unwrap(); assert!(document.is_none()); @@ -791,7 +791,7 @@ mod tests { let reader = env.read_txn().unwrap(); let result = index.update_status(&reader, update_id).unwrap(); - assert_matches!(result, UpdateStatus::Processed { content } if content.error.is_none()); + assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); let document: Option = index .document(&reader, None, DocumentId(7900334843754999545)) diff --git a/meilidb-core/src/store/mod.rs b/meilidb-core/src/store/mod.rs index 2c5bdc31e..f88d7b6d9 100644 --- a/meilidb-core/src/store/mod.rs +++ b/meilidb-core/src/store/mod.rs @@ -221,7 +221,7 @@ impl Index { &self, reader: &heed::RoTxn, update_id: u64, - ) -> MResult { + ) -> MResult> { update::update_status(reader, self.updates, self.updates_results, update_id) } @@ -234,17 +234,19 @@ impl Index { updates.reserve(last_id as usize); for id in 0..=last_id { - let update = self.update_status(reader, id)?; - updates.push(update); - last_update_result_id = id; + if let Some(update) = self.update_status(reader, id)? { + updates.push(update); + last_update_result_id = id; + } } } // retrieve all enqueued updates if let Some((last_id, _)) = self.updates.last_update_id(reader)? { for id in last_update_result_id + 1..last_id { - let update = self.update_status(reader, id)?; - updates.push(update); + if let Some(update) = self.update_status(reader, id)? { + updates.push(update); + } } } diff --git a/meilidb-core/src/store/updates_results.rs b/meilidb-core/src/store/updates_results.rs index d85a38f6d..1db58fc42 100644 --- a/meilidb-core/src/store/updates_results.rs +++ b/meilidb-core/src/store/updates_results.rs @@ -5,8 +5,7 @@ use heed::Result as ZResult; #[derive(Copy, Clone)] pub struct UpdatesResults { - pub(crate) updates_results: - heed::Database, SerdeJson>, + pub(crate) updates_results: heed::Database, SerdeJson>, } impl UpdatesResults { diff --git a/meilidb-core/src/update/clear_all.rs b/meilidb-core/src/update/clear_all.rs index 1612f11d2..d0910a26d 100644 --- a/meilidb-core/src/update/clear_all.rs +++ b/meilidb-core/src/update/clear_all.rs @@ -26,7 +26,7 @@ pub fn push_clear_all( updates_results_store: store::UpdatesResults, ) -> MResult { let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; - let update = Update::ClearAll; + let update = Update::clear_all(); updates_store.put_update(writer, last_update_id, &update)?; Ok(last_update_id) diff --git a/meilidb-core/src/update/customs_update.rs b/meilidb-core/src/update/customs_update.rs index 72ecfba71..09e15cc80 100644 --- a/meilidb-core/src/update/customs_update.rs +++ b/meilidb-core/src/update/customs_update.rs @@ -18,7 +18,7 @@ pub fn push_customs_update( ) -> ZResult { let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; - let update = Update::Customs(customs); + let update = Update::customs(customs); updates_store.put_update(writer, last_update_id, &update)?; Ok(last_update_id) diff --git a/meilidb-core/src/update/documents_addition.rs b/meilidb-core/src/update/documents_addition.rs index 91683a931..834e06341 100644 --- a/meilidb-core/src/update/documents_addition.rs +++ b/meilidb-core/src/update/documents_addition.rs @@ -91,9 +91,9 @@ pub fn push_documents_addition( let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; let update = if is_partial { - Update::DocumentsPartial(values) + Update::documents_partial(values) } else { - Update::DocumentsAddition(values) + Update::documents_addition(values) }; updates_store.put_update(writer, last_update_id, &update)?; diff --git a/meilidb-core/src/update/documents_deletion.rs b/meilidb-core/src/update/documents_deletion.rs index fb0f16ea1..0f01e98c7 100644 --- a/meilidb-core/src/update/documents_deletion.rs +++ b/meilidb-core/src/update/documents_deletion.rs @@ -76,7 +76,7 @@ pub fn push_documents_deletion( ) -> MResult { let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; - let update = Update::DocumentsDeletion(deletion); + let update = Update::documents_deletion(deletion); updates_store.put_update(writer, last_update_id, &update)?; Ok(last_update_id) diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 6bb4317e0..29c9ea02a 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -24,6 +24,7 @@ use std::cmp; use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::time::Instant; +use chrono::{DateTime, Utc}; use heed::Result as ZResult; use log::debug; use serde::{Deserialize, Serialize}; @@ -32,7 +33,85 @@ use crate::{store, DocumentId, MResult}; use meilidb_schema::Schema; #[derive(Debug, Clone, Serialize, Deserialize)] -pub enum Update { +pub struct Update { + data: UpdateData, + enqueued_at: DateTime, +} + +impl Update { + fn clear_all() -> Update { + Update { + data: UpdateData::ClearAll, + enqueued_at: Utc::now(), + } + } + + fn schema(data: Schema) -> Update { + Update { + data: UpdateData::Schema(data), + enqueued_at: Utc::now(), + } + } + + fn customs(data: Vec) -> Update { + Update { + data: UpdateData::Customs(data), + enqueued_at: Utc::now(), + } + } + + fn documents_addition(data: Vec>) -> Update { + Update { + data: UpdateData::DocumentsAddition(data), + enqueued_at: Utc::now(), + } + } + + fn documents_partial(data: Vec>) -> Update { + Update { + data: UpdateData::DocumentsPartial(data), + enqueued_at: Utc::now(), + } + } + + fn documents_deletion(data: Vec) -> Update { + Update { + data: UpdateData::DocumentsDeletion(data), + enqueued_at: Utc::now(), + } + } + + fn synonyms_addition(data: BTreeMap>) -> Update { + Update { + data: UpdateData::SynonymsAddition(data), + enqueued_at: Utc::now(), + } + } + + fn synonyms_deletion(data: BTreeMap>>) -> Update { + Update { + data: UpdateData::SynonymsDeletion(data), + enqueued_at: Utc::now(), + } + } + + fn stop_words_addition(data: BTreeSet) -> Update { + Update { + data: UpdateData::StopWordsAddition(data), + enqueued_at: Utc::now(), + } + } + + fn stop_words_deletion(data: BTreeSet) -> Update { + Update { + data: UpdateData::StopWordsDeletion(data), + enqueued_at: Utc::now(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum UpdateData { ClearAll, Schema(Schema), Customs(Vec), @@ -45,31 +124,31 @@ pub enum Update { StopWordsDeletion(BTreeSet), } -impl Update { +impl UpdateData { pub fn update_type(&self) -> UpdateType { match self { - Update::ClearAll => UpdateType::ClearAll, - Update::Schema(_) => UpdateType::Schema, - Update::Customs(_) => UpdateType::Customs, - Update::DocumentsAddition(addition) => UpdateType::DocumentsAddition { + UpdateData::ClearAll => UpdateType::ClearAll, + UpdateData::Schema(_) => UpdateType::Schema, + UpdateData::Customs(_) => UpdateType::Customs, + UpdateData::DocumentsAddition(addition) => UpdateType::DocumentsAddition { number: addition.len(), }, - Update::DocumentsPartial(addition) => UpdateType::DocumentsPartial { + UpdateData::DocumentsPartial(addition) => UpdateType::DocumentsPartial { number: addition.len(), }, - Update::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion { + UpdateData::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion { number: deletion.len(), }, - Update::SynonymsAddition(addition) => UpdateType::SynonymsAddition { + UpdateData::SynonymsAddition(addition) => UpdateType::SynonymsAddition { number: addition.len(), }, - Update::SynonymsDeletion(deletion) => UpdateType::SynonymsDeletion { + UpdateData::SynonymsDeletion(deletion) => UpdateType::SynonymsDeletion { number: deletion.len(), }, - Update::StopWordsAddition(addition) => UpdateType::StopWordsAddition { + UpdateData::StopWordsAddition(addition) => UpdateType::StopWordsAddition { number: addition.len(), }, - Update::StopWordsDeletion(deletion) => UpdateType::StopWordsDeletion { + UpdateData::StopWordsDeletion(deletion) => UpdateType::StopWordsDeletion { number: deletion.len(), }, } @@ -99,26 +178,28 @@ pub struct ProcessedUpdateResult { #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, pub duration: f64, // in seconds + pub enqueued_at: DateTime, + pub processed_at: DateTime, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EnqueuedUpdateResult { pub update_id: u64, pub update_type: UpdateType, + pub enqueued_at: DateTime, } #[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "status")] +#[serde(rename_all = "camelCase", tag = "status")] pub enum UpdateStatus { Enqueued { #[serde(flatten)] - content: EnqueuedUpdateResult + content: EnqueuedUpdateResult, }, Processed { #[serde(flatten)] - content: ProcessedUpdateResult + content: ProcessedUpdateResult, }, - Unknown, } pub fn update_status( @@ -126,19 +207,19 @@ pub fn update_status( updates_store: store::Updates, updates_results_store: store::UpdatesResults, update_id: u64, -) -> MResult { +) -> MResult> { match updates_results_store.update_result(reader, update_id)? { - Some(result) => Ok(UpdateStatus::Processed { content: result }), - None => { - if let Some(update) = updates_store.get(reader, update_id)? { - Ok(UpdateStatus::Enqueued { content: EnqueuedUpdateResult { + Some(result) => Ok(Some(UpdateStatus::Processed { content: result })), + None => match updates_store.get(reader, update_id)? { + Some(update) => Ok(Some(UpdateStatus::Enqueued { + content: EnqueuedUpdateResult { update_id, - update_type: update.update_type(), - }}) - } else { - Ok(UpdateStatus::Unknown) - } - } + update_type: update.data.update_type(), + enqueued_at: update.enqueued_at, + }, + })), + None => Ok(None), + }, } } @@ -167,8 +248,10 @@ pub fn update_task<'a, 'b>( ) -> MResult { debug!("Processing update number {}", update_id); - let (update_type, result, duration) = match update { - Update::ClearAll => { + let Update { enqueued_at, data } = update; + + let (update_type, result, duration) = match data { + UpdateData::ClearAll => { let start = Instant::now(); let update_type = UpdateType::ClearAll; @@ -183,7 +266,7 @@ pub fn update_task<'a, 'b>( (update_type, result, start.elapsed()) } - Update::Schema(schema) => { + UpdateData::Schema(schema) => { let start = Instant::now(); let update_type = UpdateType::Schema; @@ -199,7 +282,7 @@ pub fn update_task<'a, 'b>( (update_type, result, start.elapsed()) } - Update::Customs(customs) => { + UpdateData::Customs(customs) => { let start = Instant::now(); let update_type = UpdateType::Customs; @@ -207,7 +290,7 @@ pub fn update_task<'a, 'b>( (update_type, result, start.elapsed()) } - Update::DocumentsAddition(documents) => { + UpdateData::DocumentsAddition(documents) => { let start = Instant::now(); let update_type = UpdateType::DocumentsAddition { @@ -226,7 +309,7 @@ pub fn update_task<'a, 'b>( (update_type, result, start.elapsed()) } - Update::DocumentsPartial(documents) => { + UpdateData::DocumentsPartial(documents) => { let start = Instant::now(); let update_type = UpdateType::DocumentsPartial { @@ -245,7 +328,7 @@ pub fn update_task<'a, 'b>( (update_type, result, start.elapsed()) } - Update::DocumentsDeletion(documents) => { + UpdateData::DocumentsDeletion(documents) => { let start = Instant::now(); let update_type = UpdateType::DocumentsDeletion { @@ -264,7 +347,7 @@ pub fn update_task<'a, 'b>( (update_type, result, start.elapsed()) } - Update::SynonymsAddition(synonyms) => { + UpdateData::SynonymsAddition(synonyms) => { let start = Instant::now(); let update_type = UpdateType::SynonymsAddition { @@ -275,7 +358,7 @@ pub fn update_task<'a, 'b>( (update_type, result, start.elapsed()) } - Update::SynonymsDeletion(synonyms) => { + UpdateData::SynonymsDeletion(synonyms) => { let start = Instant::now(); let update_type = UpdateType::SynonymsDeletion { @@ -286,7 +369,7 @@ pub fn update_task<'a, 'b>( (update_type, result, start.elapsed()) } - Update::StopWordsAddition(stop_words) => { + UpdateData::StopWordsAddition(stop_words) => { let start = Instant::now(); let update_type = UpdateType::StopWordsAddition { @@ -298,7 +381,7 @@ pub fn update_task<'a, 'b>( (update_type, result, start.elapsed()) } - Update::StopWordsDeletion(stop_words) => { + UpdateData::StopWordsDeletion(stop_words) => { let start = Instant::now(); let update_type = UpdateType::StopWordsDeletion { @@ -329,6 +412,8 @@ pub fn update_task<'a, 'b>( update_type, error: result.map_err(|e| e.to_string()).err(), duration: duration.as_secs_f64(), + enqueued_at, + processed_at: Utc::now(), }; Ok(status) diff --git a/meilidb-core/src/update/schema_update.rs b/meilidb-core/src/update/schema_update.rs index c033df6cf..2087e8f29 100644 --- a/meilidb-core/src/update/schema_update.rs +++ b/meilidb-core/src/update/schema_update.rs @@ -68,7 +68,7 @@ pub fn push_schema_update( ) -> MResult { let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; - let update = Update::Schema(schema); + let update = Update::schema(schema); updates_store.put_update(writer, last_update_id, &update)?; Ok(last_update_id) diff --git a/meilidb-core/src/update/stop_words_addition.rs b/meilidb-core/src/update/stop_words_addition.rs index 12b00e373..53f890a7f 100644 --- a/meilidb-core/src/update/stop_words_addition.rs +++ b/meilidb-core/src/update/stop_words_addition.rs @@ -53,7 +53,7 @@ pub fn push_stop_words_addition( ) -> MResult { let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; - let update = Update::StopWordsAddition(addition); + let update = Update::stop_words_addition(addition); updates_store.put_update(writer, last_update_id, &update)?; Ok(last_update_id) diff --git a/meilidb-core/src/update/stop_words_deletion.rs b/meilidb-core/src/update/stop_words_deletion.rs index eb4d1700c..e502959e4 100644 --- a/meilidb-core/src/update/stop_words_deletion.rs +++ b/meilidb-core/src/update/stop_words_deletion.rs @@ -54,7 +54,7 @@ pub fn push_stop_words_deletion( ) -> MResult { let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; - let update = Update::StopWordsDeletion(deletion); + let update = Update::stop_words_deletion(deletion); updates_store.put_update(writer, last_update_id, &update)?; Ok(last_update_id) diff --git a/meilidb-core/src/update/synonyms_addition.rs b/meilidb-core/src/update/synonyms_addition.rs index b6a5ab2d4..4d9968c52 100644 --- a/meilidb-core/src/update/synonyms_addition.rs +++ b/meilidb-core/src/update/synonyms_addition.rs @@ -63,7 +63,7 @@ pub fn push_synonyms_addition( ) -> MResult { let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; - let update = Update::SynonymsAddition(addition); + let update = Update::synonyms_addition(addition); updates_store.put_update(writer, last_update_id, &update)?; Ok(last_update_id) diff --git a/meilidb-core/src/update/synonyms_deletion.rs b/meilidb-core/src/update/synonyms_deletion.rs index 6ec875bb6..a2ded59a1 100644 --- a/meilidb-core/src/update/synonyms_deletion.rs +++ b/meilidb-core/src/update/synonyms_deletion.rs @@ -70,7 +70,7 @@ pub fn push_synonyms_deletion( ) -> MResult { let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; - let update = Update::SynonymsDeletion(deletion); + let update = Update::synonyms_deletion(deletion); updates_store.put_update(writer, last_update_id, &update)?; Ok(last_update_id) diff --git a/meilidb-http/src/routes/index.rs b/meilidb-http/src/routes/index.rs index 51adf3ba3..92c5dfa57 100644 --- a/meilidb-http/src/routes/index.rs +++ b/meilidb-http/src/routes/index.rs @@ -1,5 +1,5 @@ use http::StatusCode; -use meilidb_core::{ProcessedUpdateResult, UpdateStatus}; +use meilidb_core::ProcessedUpdateResult; use meilidb_schema::Schema; use serde_json::json; use tide::response::IntoResponse; @@ -150,16 +150,12 @@ pub async fn get_update_status(ctx: Context) -> SResult { .map_err(ResponseError::internal)?; let response = match status { - UpdateStatus::Unknown => { - tide::response::json(json!({ "message": "unknown update id" })) - .with_status(StatusCode::NOT_FOUND) - .into_response() - } - status => { - tide::response::json(status) - .with_status(StatusCode::OK) - .into_response() - } + Some(status) => tide::response::json(status) + .with_status(StatusCode::OK) + .into_response(), + None => tide::response::json(json!({ "message": "unknown update id" })) + .with_status(StatusCode::NOT_FOUND) + .into_response(), }; Ok(response)