From 198238687fcbfbd6ce3a247f54d41a1b820180d7 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 16 Oct 2024 09:27:18 +0200 Subject: [PATCH] Guess and retrieve primary key correctly in batch --- index-scheduler/src/batch.rs | 72 ++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 24 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 328a5aed7..14bbcfe53 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -28,7 +28,7 @@ use bumpalo::Bump; use dump::IndexMetadata; use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; -use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; +use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::update::new::indexer::{ self, retrieve_or_guess_primary_key, UpdateByFunction, @@ -1253,7 +1253,6 @@ impl IndexScheduler { mut tasks, } => { let started_processing_at = std::time::Instant::now(); - let primary_key_has_been_set = false; let must_stop_processing = self.must_stop_processing.clone(); let indexer_config = self.index_mapper.indexer_config(); // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. @@ -1261,7 +1260,6 @@ impl IndexScheduler { // to a fresh thread. /// TODO manage errors correctly - let rtxn = index.read_txn()?; let first_addition_uuid = operations .iter() .find_map(|op| match op { @@ -1281,6 +1279,7 @@ impl IndexScheduler { } } + let rtxn = index.read_txn()?; let db_fields_ids_map = index.fields_ids_map(&rtxn)?; let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -1292,13 +1291,14 @@ impl IndexScheduler { None => None, }; - let primary_key = retrieve_or_guess_primary_key( + let (primary_key, primary_key_has_been_set) = retrieve_or_guess_primary_key( &rtxn, index, &mut new_fields_ids_map, + primary_key.as_deref(), first_document.as_ref(), )? - .unwrap(); + .map_err(milli::Error::from)?; let mut content_files_iter = content_files.iter(); let mut indexer = indexer::DocumentOperation::new(method); @@ -1373,6 +1373,7 @@ impl IndexScheduler { index, &db_fields_ids_map, new_fields_ids_map, + primary_key_has_been_set.then_some(primary_key), &pool, &document_changes, )?; @@ -1417,12 +1418,40 @@ impl IndexScheduler { Some(Err(e)) => return Err(e.into()), }; + let (original_filter, context, function) = if let Some(Details::DocumentEdition { + original_filter, + context, + function, + .. + }) = task.details + { + (original_filter, context, function) + } else { + // In the case of a `documentEdition` the details MUST be set + unreachable!(); + }; + + if candidates.is_empty() { + task.status = Status::Succeeded; + task.details = Some(Details::DocumentEdition { + original_filter, + context, + function, + deleted_documents: Some(0), + edited_documents: Some(0), + }); + + return Ok(vec![task]); + } + let rtxn = index.read_txn()?; let db_fields_ids_map = index.fields_ids_map(&rtxn)?; let mut new_fields_ids_map = db_fields_ids_map.clone(); - let primary_key = - retrieve_or_guess_primary_key(&rtxn, index, &mut new_fields_ids_map, None)? - .unwrap(); + // candidates not empty => index not empty => a primary key is set + let primary_key = index.primary_key(&rtxn)?.unwrap(); + + let primary_key = PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map) + .map_err(milli::Error::from)?; let result_count = Ok((candidates.len(), candidates.len())) as Result<_>; @@ -1439,6 +1468,7 @@ impl IndexScheduler { index, &db_fields_ids_map, new_fields_ids_map, + None, // cannot change primary key in DocumentEdition &pool, &document_changes, )?; @@ -1446,19 +1476,6 @@ impl IndexScheduler { // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } - let (original_filter, context, function) = if let Some(Details::DocumentEdition { - original_filter, - context, - function, - .. - }) = task.details - { - (original_filter, context, function) - } else { - // In the case of a `documentEdition` the details MUST be set - unreachable!(); - }; - match result_count { Ok((deleted_documents, edited_documents)) => { task.status = Status::Succeeded; @@ -1559,13 +1576,19 @@ impl IndexScheduler { } } + if to_delete.is_empty() { + return Ok(tasks); + } + let rtxn = index.read_txn()?; let db_fields_ids_map = index.fields_ids_map(&rtxn)?; let mut new_fields_ids_map = db_fields_ids_map.clone(); - let primary_key = - retrieve_or_guess_primary_key(&rtxn, index, &mut new_fields_ids_map, None)? - .unwrap(); + // to_delete not empty => index not empty => primary key set + let primary_key = index.primary_key(&rtxn)?.unwrap(); + + let primary_key = PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map) + .map_err(milli::Error::from)?; if !tasks.iter().all(|res| res.error.is_some()) { /// TODO create a pool if needed @@ -1581,6 +1604,7 @@ impl IndexScheduler { index, &db_fields_ids_map, new_fields_ids_map, + None, // document deletion never changes primary key &pool, &document_changes, )?;