diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 0f533f5aa..e285ca9cb 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -109,55 +109,71 @@ where let rtxn = index.read_txn()?; + // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.documents(); let document_extractor = DocumentsExtractor::new(&document_sender, embedders); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - - extract(document_changes, - &document_extractor, - indexing_context, - &mut extractor_allocs, - &datastore, - Step::ExtractingDocuments, - )?; - - for document_extractor_data in datastore { - let document_extractor_data = document_extractor_data.0.into_inner(); - for (field, delta) in document_extractor_data.field_distribution_delta { - let current = field_distribution.entry(field).or_default(); - // adding the delta should never cause a negative result, as we are removing fields that previously existed. - *current = current.saturating_add_signed(delta); - } - document_extractor_data.docids_delta.apply_to(document_ids); + { + let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); + let _entered = span.enter(); + extract(document_changes, + &document_extractor, + indexing_context, + &mut extractor_allocs, + &datastore, + Step::ExtractingDocuments, + )?; } + { + let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents"); + let _entered = span.enter(); + for document_extractor_data in datastore { + let document_extractor_data = document_extractor_data.0.into_inner(); + for (field, delta) in document_extractor_data.field_distribution_delta { + let current = field_distribution.entry(field).or_default(); + // adding the delta should never cause a negative result, as we are removing fields that previously existed. + *current = current.saturating_add_signed(delta); + } + document_extractor_data.docids_delta.apply_to(document_ids); + } - field_distribution.retain(|_, v| *v != 0); + field_distribution.retain(|_, v| *v != 0); + } let facet_field_ids_delta; { - let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); - let _entered = span.enter(); + let caches = { + let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted"); + let _entered = span.enter(); - facet_field_ids_delta = merge_and_send_facet_docids( FacetedDocidsExtractor::run_extraction( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - &extractor_sender.field_id_docid_facet_sender(), - Step::ExtractingFacets - )?, - FacetDatabases::new(index), - index, - extractor_sender.facet_docids(), - )?; + grenad_parameters, + document_changes, + indexing_context, + &mut extractor_allocs, + &extractor_sender.field_id_docid_facet_sender(), + Step::ExtractingFacets + )? + }; + + { + let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted"); + let _entered = span.enter(); + + facet_field_ids_delta = merge_and_send_facet_docids( + caches, + FacetDatabases::new(index), + index, + extractor_sender.facet_docids(), + )?; + } } { - let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); - let _entered = span.enter(); + + let WordDocidsCaches { @@ -166,15 +182,19 @@ where exact_word_docids, word_position_docids, fid_word_count_docids, - } = WordDocidsExtractors::run_extraction( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - Step::ExtractingWords - )?; + } = { + let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); + let _entered = span.enter(); + + WordDocidsExtractors::run_extraction( + grenad_parameters, + document_changes, + indexing_context, + &mut extractor_allocs, + Step::ExtractingWords + )? + }; - // TODO Word Docids Merger { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); let _entered = span.enter(); @@ -187,7 +207,6 @@ where )?; } - // Word Fid Docids Merging { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); let _entered = span.enter(); @@ -200,7 +219,6 @@ where )?; } - // Exact Word Docids Merging { let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); let _entered = span.enter(); @@ -213,7 +231,6 @@ where )?; } - // Word Position Docids Merging { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); let _entered = span.enter(); @@ -226,7 +243,6 @@ where )?; } - // Fid Word Count Docids Merging { let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); let _entered = span.enter(); @@ -244,30 +260,34 @@ where // this works only if the settings didn't change during this transaction. let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); if proximity_precision == ProximityPrecision::ByWord { - let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); - let _entered = span.enter(); + let caches = { + let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); + let _entered = span.enter(); + ::run_extraction( + grenad_parameters, + document_changes, + indexing_context, + &mut extractor_allocs, + Step::ExtractingWordProximity, + )? + }; - let caches = ::run_extraction( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - Step::ExtractingWordProximity, - )?; + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); + let _entered = span.enter(); - merge_and_send_docids( - caches, - index.word_pair_proximity_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; + merge_and_send_docids( + caches, + index.word_pair_proximity_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } } 'vectors: { - let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); - let _entered = span.enter(); let mut index_embeddings = index.embedding_configs(&rtxn)?; if index_embeddings.is_empty() { @@ -277,13 +297,22 @@ where let embedding_sender = extractor_sender.embeddings(); let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, Step::ExtractingEmbeddings)?; + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); + let _entered = span.enter(); - for config in &mut index_embeddings { - 'data: for data in datastore.iter_mut() { - let data = &mut data.get_mut().0; - let Some(deladd) = data.remove(&config.name) else { continue 'data; }; - deladd.apply_to(&mut config.user_provided); + extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, Step::ExtractingEmbeddings)?; + } + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors"); + let _entered = span.enter(); + + for config in &mut index_embeddings { + 'data: for data in datastore.iter_mut() { + let data = &mut data.get_mut().0; + let Some(deladd) = data.remove(&config.name) else { continue 'data; }; + deladd.apply_to(&mut config.user_provided); + } } } @@ -291,21 +320,24 @@ where } 'geo: { - let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); - let _entered = span.enter(); - let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { break 'geo; }; let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - extract( - document_changes, - &extractor, - indexing_context, - &mut extractor_allocs, - &datastore, - Step::WritingGeoPoints - )?; + + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); + let _entered = span.enter(); + + extract( + document_changes, + &extractor, + indexing_context, + &mut extractor_allocs, + &datastore, + Step::WritingGeoPoints + )?; + } merge_and_send_rtree( datastore, @@ -316,11 +348,7 @@ where )?; } - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); - let _entered = span.enter(); - (indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase)); - } + (indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase)); Result::Ok(facet_field_ids_delta) })?; @@ -352,90 +380,103 @@ where .collect(); let mut arroy_writers = arroy_writers?; - for operation in writer_receiver { - match operation { - WriterOperation::DbOperation(db_operation) => { - let database = db_operation.database(index); - let database_name = db_operation.database_name(); - match db_operation.entry() { - EntryOperation::Delete(e) => match database.delete(wtxn, e.entry()) { - Ok(false) => unreachable!("We tried to delete an unknown key"), - Ok(_) => (), - Err(error) => { - return Err(Error::InternalError(InternalError::StoreDeletion { - database_name, - key: e.entry().to_owned(), - error, - })); - } - }, - EntryOperation::Write(e) => { - if let Err(error) = database.put(wtxn, e.key(), e.value()) { - return Err(Error::InternalError(InternalError::StorePut { - database_name, - key: e.key().to_owned(), - value_length: e.value().len(), - error, - })); + { + let span = tracing::trace_span!(target: "indexing::write_db", "all"); + let _entered = span.enter(); + + for operation in writer_receiver { + match operation { + WriterOperation::DbOperation(db_operation) => { + let database = db_operation.database(index); + let database_name = db_operation.database_name(); + match db_operation.entry() { + EntryOperation::Delete(e) => match database.delete(wtxn, e.entry()) { + Ok(false) => unreachable!("We tried to delete an unknown key"), + Ok(_) => (), + Err(error) => { + return Err(Error::InternalError( + InternalError::StoreDeletion { + database_name, + key: e.entry().to_owned(), + error, + }, + )); + } + }, + EntryOperation::Write(e) => { + if let Err(error) = database.put(wtxn, e.key(), e.value()) { + return Err(Error::InternalError(InternalError::StorePut { + database_name, + key: e.key().to_owned(), + value_length: e.value().len(), + error, + })); + } } } } + WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation { + ArroyOperation::DeleteVectors { docid } => { + for ( + _embedder_index, + (_embedder_name, _embedder, writer, dimensions), + ) in &mut arroy_writers + { + let dimensions = *dimensions; + writer.del_items(wtxn, dimensions, docid)?; + } + } + ArroyOperation::SetVectors { + docid, + embedder_id, + embeddings: raw_embeddings, + } => { + let (_, _, writer, dimensions) = arroy_writers + .get(&embedder_id) + .expect("requested a missing embedder"); + + let mut embeddings = Embeddings::new(*dimensions); + for embedding in raw_embeddings { + embeddings.append(embedding).unwrap(); + } + + writer.del_items(wtxn, *dimensions, docid)?; + writer.add_items(wtxn, docid, &embeddings)?; + } + ArroyOperation::SetVector { docid, embedder_id, embedding } => { + let (_, _, writer, dimensions) = arroy_writers + .get(&embedder_id) + .expect("requested a missing embedder"); + writer.del_items(wtxn, *dimensions, docid)?; + writer.add_item(wtxn, docid, &embedding)?; + } + ArroyOperation::Finish { configs } => { + let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); + let _entered = span.enter(); + + (indexing_context.send_progress)(Progress::from_step( + Step::WritingEmbeddingsToDatabase, + )); + + for ( + _embedder_index, + (_embedder_name, _embedder, writer, dimensions), + ) in &mut arroy_writers + { + let dimensions = *dimensions; + writer.build_and_quantize( + wtxn, + &mut rng, + dimensions, + false, + &indexing_context.must_stop_processing, + )?; + } + + index.put_embedding_configs(wtxn, configs)?; + } + }, } - WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation { - ArroyOperation::DeleteVectors { docid } => { - for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in - &mut arroy_writers - { - let dimensions = *dimensions; - writer.del_items(wtxn, dimensions, docid)?; - } - } - ArroyOperation::SetVectors { - docid, - embedder_id, - embeddings: raw_embeddings, - } => { - let (_, _, writer, dimensions) = - arroy_writers.get(&embedder_id).expect("requested a missing embedder"); - // TODO: switch to Embeddings - let mut embeddings = Embeddings::new(*dimensions); - for embedding in raw_embeddings { - embeddings.append(embedding).unwrap(); - } - - writer.del_items(wtxn, *dimensions, docid)?; - writer.add_items(wtxn, docid, &embeddings)?; - } - ArroyOperation::SetVector { docid, embedder_id, embedding } => { - let (_, _, writer, dimensions) = - arroy_writers.get(&embedder_id).expect("requested a missing embedder"); - writer.del_items(wtxn, *dimensions, docid)?; - writer.add_item(wtxn, docid, &embedding)?; - } - ArroyOperation::Finish { configs } => { - let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); - let _entered = span.enter(); - - (indexing_context.send_progress)(Progress::from_step( - Step::WritingEmbeddingsToDatabase, - )); - - for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in - &mut arroy_writers - { - let dimensions = *dimensions; - writer.build_and_quantize( - wtxn, - &mut rng, - dimensions, - false, - &indexing_context.must_stop_processing, - )?; - } - - index.put_embedding_configs(wtxn, configs)?; - } - }, } }