diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index a2682f55d..5bc0acb2b 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -3,8 +3,13 @@ use crate::{ task::{Details, Kind, KindWithContent, Status, Task}, Error, IndexScheduler, Result, TaskId, }; +use index::apply_settings_to_builder; +use index::error::{IndexError, MilliError}; use index::{Settings, Unchecked}; -use milli::heed::RoTxn; +use log::{debug, info}; +use milli::documents::DocumentsBatchReader; +use milli::heed::{RoTxn, RwTxn}; +use milli::update::IndexDocumentsConfig; use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod}; use uuid::Uuid; @@ -12,6 +17,24 @@ pub(crate) enum Batch { Cancel(Task), Snapshot(Vec), Dump(Vec), + IndexOperation(IndexOperation), + IndexCreation { + index_uid: String, + primary_key: Option, + task: Task, + }, + IndexUpdate { + index_uid: String, + primary_key: Option, + task: Task, + }, + IndexDeletion { + index_uid: String, + tasks: Vec, + }, +} + +pub(crate) enum IndexOperation { DocumentImport { index_uid: String, primary_key: Option, @@ -54,20 +77,6 @@ pub(crate) enum Batch { settings: Vec<(bool, Settings)>, settings_tasks: Vec, }, - IndexCreation { - index_uid: String, - primary_key: Option, - task: Task, - }, - IndexUpdate { - index_uid: String, - primary_key: Option, - task: Task, - }, - IndexDeletion { - index_uid: String, - tasks: Vec, - }, } impl Batch { @@ -76,23 +85,27 @@ impl Batch { Batch::Cancel(task) | Batch::IndexCreation { task, .. } | Batch::IndexUpdate { task, .. } => vec![task.uid], - Batch::Snapshot(tasks) - | Batch::Dump(tasks) - | Batch::DocumentImport { tasks, .. } - | Batch::DocumentDeletion { tasks, .. } - | Batch::Settings { tasks, .. } - | Batch::DocumentClear { tasks, .. } - | Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(), - Batch::SettingsAndDocumentImport { - document_import_tasks: tasks, - settings_tasks: other, - .. + Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::IndexDeletion { tasks, .. } => { + tasks.iter().map(|task| task.uid).collect() } - | Batch::DocumentClearAndSetting { - cleared_tasks: tasks, - settings_tasks: other, - .. - } => tasks.iter().chain(other).map(|task| task.uid).collect(), + Batch::IndexOperation(operation) => match operation { + IndexOperation::DocumentImport { tasks, .. } + | IndexOperation::DocumentDeletion { tasks, .. } + | IndexOperation::Settings { tasks, .. } + | IndexOperation::DocumentClear { tasks, .. } => { + tasks.iter().map(|task| task.uid).collect() + } + IndexOperation::SettingsAndDocumentImport { + document_import_tasks: tasks, + settings_tasks: other, + .. + } + | IndexOperation::DocumentClearAndSetting { + cleared_tasks: tasks, + settings_tasks: other, + .. + } => tasks.iter().chain(other).map(|task| task.uid).collect(), + }, } } } @@ -105,10 +118,12 @@ impl IndexScheduler { batch: BatchKind, ) -> Result> { match batch { - BatchKind::DocumentClear { ids } => Ok(Some(Batch::DocumentClear { - tasks: self.get_existing_tasks(rtxn, ids)?, - index_uid, - })), + BatchKind::DocumentClear { ids } => { + Ok(Some(Batch::IndexOperation(IndexOperation::DocumentClear { + tasks: self.get_existing_tasks(rtxn, ids)?, + index_uid, + }))) + } BatchKind::DocumentImport { method, import_ids } => { let tasks = self.get_existing_tasks(rtxn, import_ids)?; let primary_key = match &tasks[0].kind { @@ -123,13 +138,15 @@ impl IndexScheduler { }) .collect(); - Ok(Some(Batch::DocumentImport { - index_uid, - primary_key, - method, - content_files, - tasks, - })) + Ok(Some(Batch::IndexOperation( + IndexOperation::DocumentImport { + index_uid, + primary_key, + method, + content_files, + tasks, + }, + ))) } BatchKind::DocumentDeletion { deletion_ids } => { let tasks = self.get_existing_tasks(rtxn, deletion_ids)?; @@ -144,11 +161,13 @@ impl IndexScheduler { } } - Ok(Some(Batch::DocumentDeletion { - index_uid, - documents, - tasks, - })) + Ok(Some(Batch::IndexOperation( + IndexOperation::DocumentDeletion { + index_uid, + documents, + tasks, + }, + ))) } BatchKind::Settings { settings_ids } => { let tasks = self.get_existing_tasks(rtxn, settings_ids)?; @@ -165,11 +184,11 @@ impl IndexScheduler { } } - Ok(Some(Batch::Settings { + Ok(Some(Batch::IndexOperation(IndexOperation::Settings { index_uid, settings, tasks, - })) + }))) } BatchKind::ClearAndSettings { other, @@ -179,11 +198,11 @@ impl IndexScheduler { .create_next_batch_index(rtxn, index_uid, BatchKind::Settings { settings_ids })? .unwrap() { - Batch::Settings { + Batch::IndexOperation(IndexOperation::Settings { index_uid, settings, tasks, - } => (index_uid, settings, tasks), + }) => (index_uid, settings, tasks), _ => unreachable!(), }; let (index_uid, cleared_tasks) = match self @@ -194,16 +213,20 @@ impl IndexScheduler { )? .unwrap() { - Batch::DocumentClear { index_uid, tasks } => (index_uid, tasks), + Batch::IndexOperation(IndexOperation::DocumentClear { index_uid, tasks }) => { + (index_uid, tasks) + } _ => unreachable!(), }; - Ok(Some(Batch::DocumentClearAndSetting { - index_uid, - cleared_tasks, - settings, - settings_tasks, - })) + Ok(Some(Batch::IndexOperation( + IndexOperation::DocumentClearAndSetting { + index_uid, + cleared_tasks, + settings, + settings_tasks, + }, + ))) } BatchKind::SettingsAndDocumentImport { settings_ids, @@ -224,26 +247,28 @@ impl IndexScheduler { match (document_import, settings) { ( - Some(Batch::DocumentImport { + Some(Batch::IndexOperation(IndexOperation::DocumentImport { primary_key, content_files, tasks: document_import_tasks, .. - }), - Some(Batch::Settings { + })), + Some(Batch::IndexOperation(IndexOperation::Settings { settings, tasks: settings_tasks, .. - }), - ) => Ok(Some(Batch::SettingsAndDocumentImport { - index_uid, - primary_key, - method, - content_files, - document_import_tasks, - settings, - settings_tasks, - })), + })), + ) => Ok(Some(Batch::IndexOperation( + IndexOperation::SettingsAndDocumentImport { + index_uid, + primary_key, + method, + content_files, + document_import_tasks, + settings, + settings_tasks, + }, + ))), _ => unreachable!(), } } @@ -351,206 +376,30 @@ impl IndexScheduler { Batch::Cancel(_) => todo!(), Batch::Snapshot(_) => todo!(), Batch::Dump(_) => todo!(), - Batch::DocumentClear { - index_uid, - mut tasks, - } => { - let rtxn = self.env.read_txn()?; - let index = self.index_mapper.index(&rtxn, &index_uid)?; - rtxn.abort()?; - - let ret = index.clear_documents(); - for task in &mut tasks { - task.details = Some(Details::ClearAll { - // TODO where can I find this information of how many documents did we delete? - deleted_documents: None, - }); - if let Err(ref error) = ret { - task.error = Some(error.into()); + Batch::IndexOperation(operation) => { + let index = match operation { + IndexOperation::DocumentDeletion { ref index_uid, .. } + | IndexOperation::DocumentClear { ref index_uid, .. } => { + // only get the index, don't create it + let rtxn = self.env.read_txn()?; + self.index_mapper.index(&rtxn, &index_uid)? } - } - - Ok(tasks) - } - Batch::DocumentImport { - index_uid, - primary_key, - method, - content_files, - mut tasks, - } => { - // we NEED a write transaction for the index creation. - // To avoid blocking the whole process we're going to commit asap. - let mut wtxn = self.env.write_txn()?; - let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?; - wtxn.commit()?; - - let ret = index.update_documents( - method, - primary_key, - self.file_store.clone(), - content_files, - )?; - - for (task, ret) in tasks.iter_mut().zip(ret) { - match ret { - Ok(DocumentAdditionResult { - indexed_documents, - number_of_documents, - }) => { - task.details = Some(Details::DocumentAddition { - received_documents: number_of_documents, - indexed_documents, - }); - } - Err(error) => task.error = Some(error.into()), + IndexOperation::DocumentImport { ref index_uid, .. } + | IndexOperation::Settings { ref index_uid, .. } + | IndexOperation::DocumentClearAndSetting { ref index_uid, .. } + | IndexOperation::SettingsAndDocumentImport { ref index_uid, .. } => { + // create the index if it doesn't already exist + let mut wtxn = self.env.write_txn()?; + let index = self.index_mapper.index(&mut wtxn, index_uid)?; + wtxn.commit()?; + index } - } + }; - Ok(tasks) - } - Batch::SettingsAndDocumentImport { - index_uid, - primary_key, - method, - content_files, - mut document_import_tasks, - settings, - mut settings_tasks, - } => { - // we NEED a write transaction for the index creation. - // To avoid blocking the whole process we're going to commit asap. - let mut wtxn = self.env.write_txn()?; - let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?; - wtxn.commit()?; + let mut index_wtxn = index.write_txn()?; + let tasks = self.apply_index_operation(&mut index_wtxn, &index, operation)?; + index_wtxn.commit()?; - // TODO merge the settings to only do a reindexation once. - for (task, (_, settings)) in settings_tasks.iter_mut().zip(settings) { - let checked_settings = settings.clone().check(); - task.details = Some(Details::Settings { settings }); - if let Err(error) = index.update_settings(&checked_settings) { - task.error = Some(error.into()); - } - } - - // TODO we must use the same write transaction, here! - - let ret = index.update_documents( - method, - primary_key, - self.file_store.clone(), - content_files, - )?; - - for (task, ret) in document_import_tasks.iter_mut().zip(ret) { - match ret { - Ok(DocumentAdditionResult { - indexed_documents, - number_of_documents, - }) => { - task.details = Some(Details::DocumentAddition { - received_documents: number_of_documents, - indexed_documents, - }); - } - Err(error) => task.error = Some(error.into()), - } - } - - let mut tasks = settings_tasks; - tasks.append(&mut document_import_tasks); - - Ok(tasks) - } - Batch::DocumentDeletion { - index_uid, - documents, - mut tasks, - } => { - let rtxn = self.env.read_txn()?; - let index = self.index_mapper.index(&rtxn, &index_uid)?; - - let ret = index.delete_documents(&documents); - for task in &mut tasks { - match ret { - Ok(DocumentDeletionResult { - deleted_documents, - remaining_documents: _, - }) => { - // TODO we are assigning the same amount of documents to - // all the tasks that are in the same batch. That's wrong! - task.details = Some(Details::DocumentDeletion { - received_document_ids: documents.len(), - deleted_documents: Some(deleted_documents), - }); - } - Err(ref error) => task.error = Some(error.into()), - } - } - - Ok(tasks) - } - Batch::Settings { - index_uid, - settings, - mut tasks, - } => { - // we NEED a write transaction for the index creation. - // To avoid blocking the whole process we're going to commit asap. - let mut wtxn = self.env.write_txn()?; - let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?; - wtxn.commit()?; - - // TODO merge the settings to only do a reindexation once. - for (task, (_, settings)) in tasks.iter_mut().zip(settings) { - let checked_settings = settings.clone().check(); - task.details = Some(Details::Settings { settings }); - if let Err(error) = index.update_settings(&checked_settings) { - task.error = Some(error.into()); - } - } - - Ok(tasks) - } - Batch::DocumentClearAndSetting { - index_uid, - mut cleared_tasks, - settings, - mut settings_tasks, - } => { - // If the settings were given before the document clear - // we must create the index first. - // we NEED a write transaction for the index creation. - // To avoid blocking the whole process we're going to commit asap. - let mut wtxn = self.env.write_txn()?; - let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?; - wtxn.commit()?; - - // TODO We must use the same write transaction to commit - // the clear AND the settings in one transaction. - - let ret = index.clear_documents(); - for task in &mut cleared_tasks { - task.details = Some(Details::ClearAll { - // TODO where can I find this information of how many documents did we delete? - deleted_documents: None, - }); - if let Err(ref error) = ret { - task.error = Some(error.into()); - } - } - - // TODO merge the settings to only do a reindexation once. - for (task, (_, settings)) in settings_tasks.iter_mut().zip(settings) { - let checked_settings = settings.clone().check(); - task.details = Some(Details::Settings { settings }); - if let Err(error) = index.update_settings(&checked_settings) { - task.error = Some(error.into()); - } - } - - let mut tasks = cleared_tasks; - tasks.append(&mut settings_tasks); Ok(tasks) } Batch::IndexCreation { @@ -566,4 +415,223 @@ impl IndexScheduler { Batch::IndexDeletion { index_uid, tasks } => todo!(), } } + + fn apply_index_operation<'txn, 'i>( + &self, + index_wtxn: &'txn mut RwTxn<'i, '_>, + index: &'i milli::Index, + operation: IndexOperation, + ) -> Result> { + match operation { + IndexOperation::DocumentClear { + index_uid, + mut tasks, + } => { + let result = milli::update::ClearDocuments::new(index_wtxn, index).execute(); + for task in &mut tasks { + match result { + Ok(deleted_documents) => { + task.details = Some(Details::ClearAll { + deleted_documents: Some(deleted_documents), + }) + } + Err(ref error) => task.error = Some(MilliError(error).into()), + } + } + + Ok(tasks) + } + IndexOperation::DocumentImport { + index_uid, + primary_key, + method, + content_files, + mut tasks, + } => { + let indexer_config = self.index_mapper.indexer_config(); + if let Some(primary_key) = primary_key { + if index.primary_key(index_wtxn)?.is_none() { + let mut builder = + milli::update::Settings::new(index_wtxn, index, indexer_config); + builder.set_primary_key(primary_key); + builder.execute(|_| ())?; + } + } + + let config = IndexDocumentsConfig { + update_method: method, + ..Default::default() + }; + + let mut builder = milli::update::IndexDocuments::new( + index_wtxn, + index, + indexer_config, + config, + |indexing_step| debug!("update: {:?}", indexing_step), + )?; + + let mut results = Vec::new(); + for content_uuid in content_files.into_iter() { + let content_file = self.file_store.get_update(content_uuid)?; + let reader = DocumentsBatchReader::from_reader(content_file) + .map_err(IndexError::from)?; + let (new_builder, user_result) = builder.add_documents(reader)?; + builder = new_builder; + + let user_result = match user_result { + Ok(count) => { + let addition = DocumentAdditionResult { + indexed_documents: count, + number_of_documents: count, + }; + Ok(addition) + } + Err(e) => Err(IndexError::from(e)), + }; + + results.push(user_result); + } + + if results.iter().any(|res| res.is_ok()) { + let addition = builder.execute()?; + info!("document addition done: {:?}", addition); + } + + for (task, ret) in tasks.iter_mut().zip(results) { + match ret { + Ok(DocumentAdditionResult { + indexed_documents, + number_of_documents, + }) => { + task.details = Some(Details::DocumentAddition { + received_documents: number_of_documents, + indexed_documents, + }) + } + Err(error) => task.error = Some(error.into()), + } + } + + Ok(tasks) + } + IndexOperation::DocumentDeletion { + index_uid, + documents, + mut tasks, + } => { + let mut builder = milli::update::DeleteDocuments::new(index_wtxn, index)?; + documents.iter().for_each(|id| { + builder.delete_external_id(id); + }); + let result = builder.execute(); + + for (task, documents) in tasks.iter_mut().zip(documents) { + match result { + Ok(DocumentDeletionResult { + deleted_documents, + remaining_documents: _, + }) => { + task.details = Some(Details::DocumentDeletion { + received_document_ids: documents.len(), + deleted_documents: Some(deleted_documents), + }); + } + Err(ref error) => task.error = Some(MilliError(error).into()), + } + } + + Ok(tasks) + } + IndexOperation::Settings { + index_uid, + settings, + mut tasks, + } => { + let indexer_config = self.index_mapper.indexer_config(); + // TODO merge the settings to only do *one* reindexation. + for (task, (_, settings)) in tasks.iter_mut().zip(settings) { + let checked_settings = settings.clone().check(); + task.details = Some(Details::Settings { settings }); + + let mut builder = + milli::update::Settings::new(index_wtxn, index, indexer_config); + apply_settings_to_builder(&checked_settings, &mut builder); + let result = builder.execute(|indexing_step| { + debug!("update: {:?}", indexing_step); + }); + + if let Err(ref error) = result { + task.error = Some(MilliError(error).into()); + } + } + + Ok(tasks) + } + IndexOperation::SettingsAndDocumentImport { + index_uid, + primary_key, + method, + content_files, + document_import_tasks, + settings, + settings_tasks, + } => { + let settings_tasks = self.apply_index_operation( + index_wtxn, + index, + IndexOperation::Settings { + index_uid: index_uid.clone(), + settings, + tasks: settings_tasks, + }, + )?; + + let mut import_tasks = self.apply_index_operation( + index_wtxn, + index, + IndexOperation::DocumentImport { + index_uid, + primary_key, + method, + content_files, + tasks: document_import_tasks, + }, + )?; + + let mut tasks = settings_tasks; + tasks.append(&mut import_tasks); + Ok(tasks) + } + IndexOperation::DocumentClearAndSetting { + index_uid, + cleared_tasks, + settings, + settings_tasks, + } => { + let mut import_tasks = self.apply_index_operation( + index_wtxn, + index, + IndexOperation::DocumentClear { + index_uid: index_uid.clone(), + tasks: cleared_tasks, + }, + )?; + + let settings_tasks = self.apply_index_operation( + index_wtxn, + index, + IndexOperation::Settings { + index_uid, + settings, + tasks: settings_tasks, + }, + )?; + + let mut tasks = settings_tasks; + tasks.append(&mut import_tasks); + Ok(tasks) + } + } + } } diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 767402fc0..0849527dd 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -133,4 +133,8 @@ impl IndexMapper { Ok(()) } + + pub fn indexer_config(&self) -> &IndexerConfig { + &self.indexer_config + } }