From 39b27e42be3dde17231360e21ba07b255a07c58d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 8 Oct 2024 16:04:19 +0200 Subject: [PATCH] Plug the deletion pipeline --- index-scheduler/src/batch.rs | 43 +++++++++++-------- .../update/new/indexer/document_deletion.rs | 2 +- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 28cb8a9e6..69eb28372 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1500,26 +1500,35 @@ impl IndexScheduler { } } - let config = IndexDocumentsConfig { - update_method: IndexDocumentsMethod::ReplaceDocuments, - ..Default::default() - }; + let rtxn = index.read_txn()?; + let mut fields_ids_map = index.fields_ids_map(&rtxn)?; - let must_stop_processing = self.must_stop_processing.clone(); - let mut builder = milli::update::IndexDocuments::new( - index_wtxn, - index, - self.index_mapper.indexer_config(), - config, - |indexing_step| tracing::debug!(update = ?indexing_step), - || must_stop_processing.get(), - )?; + let primary_key = + retrieve_or_guess_primary_key(&rtxn, index, &mut fields_ids_map, None)? + .unwrap(); - let (new_builder, _count) = - builder.remove_documents_from_db_no_batch(&to_delete)?; - builder = new_builder; + if !tasks.iter().all(|res| res.error.is_some()) { + /// TODO create a pool if needed + // let pool = indexer_config.thread_pool.unwrap(); + let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); - let _ = builder.execute()?; + let param = (index, &fields_ids_map, &primary_key); + let mut indexer = indexer::DocumentDeletion::new(); + indexer.delete_documents_by_docids(to_delete); + /// TODO remove this fields-ids-map, it's useless for the deletion pipeline (the &mut cloned one). + let document_changes = + indexer.document_changes(&mut fields_ids_map.clone(), param)?; + /// TODO pass/write the FieldsIdsMap + indexer::index( + index_wtxn, + index, + fields_ids_map.clone(), + &pool, + document_changes, + )?; + + // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); + } Ok(tasks) } diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index 21d7635c9..9dbc4e52d 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -43,7 +43,7 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion { |rtxn, docid| { let current = index.document(rtxn, docid)?; let external_document_id = primary_key - .document_id(¤t, fields_ids_map)? + .document_id(current, fields_ids_map)? .map_err(|_| InternalError::DatabaseMissingEntry { db_name: EXTERNAL_DOCUMENTS_IDS, key: None,