Guess and retrieve primary key correctly in batch

This commit is contained in:
Louis Dureuil 2024-10-16 09:27:18 +02:00
parent f9a6c624a7
commit 198238687f
No known key found for this signature in database

View File

@ -28,7 +28,7 @@ use bumpalo::Bump;
use dump::IndexMetadata;
use meilisearch_types::error::Code;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::update::new::indexer::{
self, retrieve_or_guess_primary_key, UpdateByFunction,
@ -1253,7 +1253,6 @@ impl IndexScheduler {
mut tasks,
} => {
let started_processing_at = std::time::Instant::now();
let primary_key_has_been_set = false;
let must_stop_processing = self.must_stop_processing.clone();
let indexer_config = self.index_mapper.indexer_config();
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
@ -1261,7 +1260,6 @@ impl IndexScheduler {
// to a fresh thread.
/// TODO manage errors correctly
let rtxn = index.read_txn()?;
let first_addition_uuid = operations
.iter()
.find_map(|op| match op {
@ -1281,6 +1279,7 @@ impl IndexScheduler {
}
}
let rtxn = index.read_txn()?;
let db_fields_ids_map = index.fields_ids_map(&rtxn)?;
let mut new_fields_ids_map = db_fields_ids_map.clone();
@ -1292,13 +1291,14 @@ impl IndexScheduler {
None => None,
};
let primary_key = retrieve_or_guess_primary_key(
let (primary_key, primary_key_has_been_set) = retrieve_or_guess_primary_key(
&rtxn,
index,
&mut new_fields_ids_map,
primary_key.as_deref(),
first_document.as_ref(),
)?
.unwrap();
.map_err(milli::Error::from)?;
let mut content_files_iter = content_files.iter();
let mut indexer = indexer::DocumentOperation::new(method);
@ -1373,6 +1373,7 @@ impl IndexScheduler {
index,
&db_fields_ids_map,
new_fields_ids_map,
primary_key_has_been_set.then_some(primary_key),
&pool,
&document_changes,
)?;
@ -1417,12 +1418,40 @@ impl IndexScheduler {
Some(Err(e)) => return Err(e.into()),
};
let (original_filter, context, function) = if let Some(Details::DocumentEdition {
original_filter,
context,
function,
..
}) = task.details
{
(original_filter, context, function)
} else {
// In the case of a `documentEdition` the details MUST be set
unreachable!();
};
if candidates.is_empty() {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentEdition {
original_filter,
context,
function,
deleted_documents: Some(0),
edited_documents: Some(0),
});
return Ok(vec![task]);
}
let rtxn = index.read_txn()?;
let db_fields_ids_map = index.fields_ids_map(&rtxn)?;
let mut new_fields_ids_map = db_fields_ids_map.clone();
let primary_key =
retrieve_or_guess_primary_key(&rtxn, index, &mut new_fields_ids_map, None)?
.unwrap();
// candidates not empty => index not empty => a primary key is set
let primary_key = index.primary_key(&rtxn)?.unwrap();
let primary_key = PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map)
.map_err(milli::Error::from)?;
let result_count = Ok((candidates.len(), candidates.len())) as Result<_>;
@ -1439,6 +1468,7 @@ impl IndexScheduler {
index,
&db_fields_ids_map,
new_fields_ids_map,
None, // cannot change primary key in DocumentEdition
&pool,
&document_changes,
)?;
@ -1446,19 +1476,6 @@ impl IndexScheduler {
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
}
let (original_filter, context, function) = if let Some(Details::DocumentEdition {
original_filter,
context,
function,
..
}) = task.details
{
(original_filter, context, function)
} else {
// In the case of a `documentEdition` the details MUST be set
unreachable!();
};
match result_count {
Ok((deleted_documents, edited_documents)) => {
task.status = Status::Succeeded;
@ -1559,13 +1576,19 @@ impl IndexScheduler {
}
}
if to_delete.is_empty() {
return Ok(tasks);
}
let rtxn = index.read_txn()?;
let db_fields_ids_map = index.fields_ids_map(&rtxn)?;
let mut new_fields_ids_map = db_fields_ids_map.clone();
let primary_key =
retrieve_or_guess_primary_key(&rtxn, index, &mut new_fields_ids_map, None)?
.unwrap();
// to_delete not empty => index not empty => primary key set
let primary_key = index.primary_key(&rtxn)?.unwrap();
let primary_key = PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map)
.map_err(milli::Error::from)?;
if !tasks.iter().all(|res| res.error.is_some()) {
/// TODO create a pool if needed
@ -1581,6 +1604,7 @@ impl IndexScheduler {
index,
&db_fields_ids_map,
new_fields_ids_map,
None, // document deletion never changes primary key
&pool,
&document_changes,
)?;