From 36e5efde0d1715ad0872359bdf578bb1cf32540b Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 4 Oct 2022 18:50:18 +0200 Subject: [PATCH] Update the tasks statuses --- index-scheduler/src/batch.rs | 120 ++++++++++++------ index-scheduler/src/index_mapper.rs | 2 +- index-scheduler/src/lib.rs | 3 +- index-scheduler/src/task.rs | 41 +++++- .../src/routes/indexes/documents.rs | 2 +- 5 files changed, 126 insertions(+), 42 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index fc406bd1e..a8179ac5c 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -39,6 +39,7 @@ pub(crate) enum IndexOperation { index_uid: String, primary_key: Option, method: IndexDocumentsMethod, + documents_counts: Vec, content_files: Vec, tasks: Vec, }, @@ -70,6 +71,7 @@ pub(crate) enum IndexOperation { primary_key: Option, method: IndexDocumentsMethod, + documents_counts: Vec, content_files: Vec, document_import_tasks: Vec, @@ -130,19 +132,27 @@ impl IndexScheduler { KindWithContent::DocumentImport { primary_key, .. } => primary_key.clone(), _ => unreachable!(), }; - let content_files = tasks - .iter() - .map(|task| match task.kind { - KindWithContent::DocumentImport { content_file, .. } => content_file, - _ => unreachable!(), - }) - .collect(); + + let mut documents_counts = Vec::new(); + let mut content_files = Vec::new(); + for task in &tasks { + if let KindWithContent::DocumentImport { + content_file, + documents_count, + .. + } = task.kind + { + documents_counts.push(documents_count); + content_files.push(content_file); + } + } Ok(Some(Batch::IndexOperation( IndexOperation::DocumentImport { index_uid, primary_key, method, + documents_counts, content_files, tasks, }, @@ -249,6 +259,7 @@ impl IndexScheduler { ( Some(Batch::IndexOperation(IndexOperation::DocumentImport { primary_key, + documents_counts, content_files, tasks: document_import_tasks, .. @@ -263,6 +274,7 @@ impl IndexScheduler { index_uid, primary_key, method, + documents_counts, content_files, document_import_tasks, settings, @@ -409,7 +421,7 @@ impl IndexScheduler { task, } => { let mut wtxn = self.env.write_txn()?; - let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?; + self.index_mapper.create_index(&mut wtxn, &index_uid)?; wtxn.commit()?; self.process_batch(Batch::IndexUpdate { @@ -421,12 +433,12 @@ impl IndexScheduler { Batch::IndexUpdate { index_uid, primary_key, - task, + mut task, } => { let rtxn = self.env.read_txn()?; let index = self.index_mapper.index(&rtxn, &index_uid)?; - if let Some(primary_key) = primary_key { + if let Some(primary_key) = primary_key.clone() { let mut index_wtxn = index.write_txn()?; let mut builder = milli::update::Settings::new( &mut index_wtxn, @@ -438,14 +450,28 @@ impl IndexScheduler { index_wtxn.commit()?; } + task.status = Status::Succeeded; + task.details = Some(Details::IndexInfo { primary_key }); + Ok(vec![task]) } - Batch::IndexDeletion { index_uid, tasks } => { + Batch::IndexDeletion { + index_uid, + mut tasks, + } => { let wtxn = self.env.write_txn()?; - // The write transaction is directly owned and commited here. - let index = self.index_mapper.delete_index(wtxn, &index_uid)?; + // The write transaction is directly owned and commited inside. + self.index_mapper.delete_index(wtxn, &index_uid)?; - todo!("update the tasks and mark them as succeeded"); + // We set all the tasks details to the default value. + for task in &mut tasks { + task.status = Status::Succeeded; + // TODO should we put a details = None, here? + // TODO we are putting Details::IndexInfo with a primary_key = None, this is not cool bro' + task.details = task.kind.default_details(); + } + + Ok(tasks) } } } @@ -457,28 +483,30 @@ impl IndexScheduler { operation: IndexOperation, ) -> Result> { match operation { - IndexOperation::DocumentClear { - index_uid, - mut tasks, - } => { + IndexOperation::DocumentClear { mut tasks, .. } => { let result = milli::update::ClearDocuments::new(index_wtxn, index).execute(); for task in &mut tasks { match result { Ok(deleted_documents) => { + task.status = Status::Succeeded; task.details = Some(Details::ClearAll { deleted_documents: Some(deleted_documents), - }) + }); + } + Err(ref error) => { + task.status = Status::Failed; + task.error = Some(MilliError(error).into()) } - Err(ref error) => task.error = Some(MilliError(error).into()), } } Ok(tasks) } IndexOperation::DocumentImport { - index_uid, + index_uid: _, primary_key, method, + documents_counts, content_files, mut tasks, } => { @@ -515,13 +543,10 @@ impl IndexScheduler { builder = new_builder; let user_result = match user_result { - Ok(count) => { - let addition = DocumentAdditionResult { - indexed_documents: count, - number_of_documents: count, - }; - Ok(addition) - } + Ok(count) => Ok(DocumentAdditionResult { + indexed_documents: count, + number_of_documents: count, + }), Err(e) => Err(IndexError::from(e)), }; @@ -533,25 +558,36 @@ impl IndexScheduler { info!("document addition done: {:?}", addition); } - for (task, ret) in tasks.iter_mut().zip(results) { + for (task, (ret, count)) in tasks + .iter_mut() + .zip(results.into_iter().zip(documents_counts)) + { match ret { Ok(DocumentAdditionResult { indexed_documents, number_of_documents, }) => { + task.status = Status::Succeeded; task.details = Some(Details::DocumentAddition { received_documents: number_of_documents, indexed_documents, - }) + }); + } + Err(error) => { + task.status = Status::Failed; + task.details = Some(Details::DocumentAddition { + received_documents: count, + indexed_documents: count, + }); + task.error = Some(error.into()) } - Err(error) => task.error = Some(error.into()), } } Ok(tasks) } IndexOperation::DocumentDeletion { - index_uid, + index_uid: _, documents, mut tasks, } => { @@ -559,27 +595,31 @@ impl IndexScheduler { documents.iter().for_each(|id| { builder.delete_external_id(id); }); - let result = builder.execute(); + let result = builder.execute(); for (task, documents) in tasks.iter_mut().zip(documents) { match result { Ok(DocumentDeletionResult { deleted_documents, remaining_documents: _, }) => { + task.status = Status::Succeeded; task.details = Some(Details::DocumentDeletion { received_document_ids: documents.len(), deleted_documents: Some(deleted_documents), }); } - Err(ref error) => task.error = Some(MilliError(error).into()), + Err(ref error) => { + task.status = Status::Failed; + task.error = Some(MilliError(error).into()); + } } } Ok(tasks) } IndexOperation::Settings { - index_uid, + index_uid: _, settings, mut tasks, } => { @@ -596,8 +636,12 @@ impl IndexScheduler { debug!("update: {:?}", indexing_step); }); - if let Err(ref error) = result { - task.error = Some(MilliError(error).into()); + match result { + Ok(_) => task.status = Status::Succeeded, + Err(ref error) => { + task.status = Status::Failed; + task.error = Some(MilliError(error).into()); + } } } @@ -607,6 +651,7 @@ impl IndexScheduler { index_uid, primary_key, method, + documents_counts, content_files, document_import_tasks, settings, @@ -629,6 +674,7 @@ impl IndexScheduler { index_uid, primary_key, method, + documents_counts, content_files, tasks: document_import_tasks, }, diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 00335609c..063688f9f 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -1,7 +1,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::path::PathBuf; -use std::sync::{Arc, RwLock, RwLockWriteGuard}; +use std::sync::{Arc, RwLock}; use std::{fs, thread}; use log::error; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 080b39eb9..d324af9d7 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -400,7 +400,6 @@ impl IndexScheduler { for mut task in tasks { task.started_at = Some(started_at); task.finished_at = Some(finished_at); - task.status = Status::Succeeded; // the info field should've been set by the process_batch function self.update_task(&mut wtxn, &task)?; @@ -616,7 +615,7 @@ mod tests { let (uuid, mut file) = index_scheduler.create_update_file().unwrap(); let documents_count = - document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64; index_scheduler .register(KindWithContent::DocumentImport { index_uid: S("doggos"), diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 28432b7e2..bf9855896 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -131,7 +131,7 @@ pub enum KindWithContent { primary_key: Option, method: IndexDocumentsMethod, content_file: Uuid, - documents_count: usize, + documents_count: u64, allow_index_creation: bool, }, DocumentDeletion { @@ -255,6 +255,45 @@ impl KindWithContent { IndexSwap { lhs, rhs } => Some(vec![lhs, rhs]), } } + + /// Returns the default `Details` that correspond to this `KindWithContent`, + /// `None` if it cannot be generated. + pub fn default_details(&self) -> Option
{ + match self { + KindWithContent::DocumentImport { + documents_count, .. + } => Some(Details::DocumentAddition { + received_documents: *documents_count, + indexed_documents: 0, + }), + KindWithContent::DocumentDeletion { + index_uid: _, + documents_ids, + } => Some(Details::DocumentDeletion { + received_document_ids: documents_ids.len(), + deleted_documents: None, + }), + KindWithContent::DocumentClear { .. } => Some(Details::ClearAll { + deleted_documents: None, + }), + KindWithContent::Settings { new_settings, .. } => Some(Details::Settings { + settings: new_settings.clone(), + }), + KindWithContent::IndexDeletion { .. } => Some(Details::IndexInfo { primary_key: None }), + KindWithContent::IndexCreation { primary_key, .. } + | KindWithContent::IndexUpdate { primary_key, .. } => Some(Details::IndexInfo { + primary_key: primary_key.clone(), + }), + KindWithContent::IndexSwap { .. } => { + todo!() + } + KindWithContent::CancelTask { .. } => { + todo!() + } + KindWithContent::DumpExport { .. } => None, + KindWithContent::Snapshot => None, + } + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 5f461693b..1f68245c0 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -274,7 +274,7 @@ async fn document_addition( .await; let documents_count = match documents_count { - Ok(Ok(documents_count)) => documents_count, + Ok(Ok(documents_count)) => documents_count as u64, Ok(Err(e)) => { index_scheduler.delete_update_file(uuid)?; return Err(e);