diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 81596f3fe..e7f0cc825 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -170,42 +170,41 @@ where // TODO manage the errors correctly let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { pool.in_place_scope(|_s| { - let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); - let _entered = span.enter(); + let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); + let _entered = span.enter(); - let rtxn = index.read_txn()?; + let rtxn = index.read_txn()?; - // document but we need to create a function that collects and compresses documents. - let document_sender = extractor_sender.documents(); - let document_extractor = DocumentsExtractor::new(&document_sender, embedders); - let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); - let (finished_steps, step_name) = steps::extract_documents(); - extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; + // document but we need to create a function that collects and compresses documents. + let document_sender = extractor_sender.documents(); + let document_extractor = DocumentsExtractor::new(&document_sender, embedders); + let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); + let (finished_steps, step_name) = steps::extract_documents(); + extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; - - for document_extractor_data in datastore { - let document_extractor_data = document_extractor_data.0.into_inner(); - for (field, delta) in document_extractor_data.field_distribution_delta { - let current = field_distribution.entry(field).or_default(); - // adding the delta should never cause a negative result, as we are removing fields that previously existed. - *current = current.saturating_add_signed(delta); - } - document_extractor_data.docids_delta.apply_to(document_ids); + for document_extractor_data in datastore { + let document_extractor_data = document_extractor_data.0.into_inner(); + for (field, delta) in document_extractor_data.field_distribution_delta { + let current = field_distribution.entry(field).or_default(); + // adding the delta should never cause a negative result, as we are removing fields that previously existed. + *current = current.saturating_add_signed(delta); } + document_extractor_data.docids_delta.apply_to(document_ids); + } - field_distribution.retain(|_, v| *v == 0); + field_distribution.retain(|_, v| *v == 0); - const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; - let current_num_threads = rayon::current_num_threads(); - let max_memory = TEN_GIB / current_num_threads; - eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads"); + const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; + let current_num_threads = rayon::current_num_threads(); + let max_memory = TEN_GIB / current_num_threads; + eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads"); - let grenad_parameters = GrenadParameters { - max_memory: Some(max_memory), - ..GrenadParameters::default() - }; + let grenad_parameters = GrenadParameters { + max_memory: Some(max_memory), + ..GrenadParameters::default() + }; - let facet_field_ids_delta; + let facet_field_ids_delta; { let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); @@ -324,65 +323,65 @@ where )?; } - 'vectors: { - let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); - let _entered = span.enter(); + 'vectors: { + let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); + let _entered = span.enter(); - let index_embeddings = index.embedding_configs(&rtxn)?; - if index_embeddings.is_empty() { - break 'vectors; - } + let index_embeddings = index.embedding_configs(&rtxn)?; + if index_embeddings.is_empty() { + break 'vectors; + } - let embedding_sender = extractor_sender.embeddings(); - let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); - let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); - let (finished_steps, step_name) = steps::extract_embeddings(); + let embedding_sender = extractor_sender.embeddings(); + let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); + let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); + let (finished_steps, step_name) = steps::extract_embeddings(); - extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; + extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; - let mut user_provided = HashMap::new(); - for data in datastore { - let data = data.into_inner().0; - for (embedder, deladd) in data.into_iter() { - let user_provided = user_provided.entry(embedder).or_insert(Default::default()); - if let Some(del) = deladd.del { - *user_provided -= del; - } - if let Some(add) = deladd.add { - *user_provided |= add; - } + let mut user_provided = HashMap::new(); + for data in datastore { + let data = data.into_inner().0; + for (embedder, deladd) in data.into_iter() { + let user_provided = user_provided.entry(embedder).or_insert(Default::default()); + if let Some(del) = deladd.del { + *user_provided -= del; + } + if let Some(add) = deladd.add { + *user_provided |= add; } } - - embedding_sender.finish(user_provided).unwrap(); } - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); - let _entered = span.enter(); - let (finished_steps, step_name) = steps::write_db(); - (indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None }); - } + embedding_sender.finish(user_provided).unwrap(); + } - // TODO THIS IS TOO MUCH - // - [ ] Extract fieldid docid facet number - // - [ ] Extract fieldid docid facet string - // - [ ] Extract facetid string fst - // - [ ] Extract facetid normalized string strings + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); + let _entered = span.enter(); + let (finished_steps, step_name) = steps::write_db(); + (indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None }); + } - // TODO Inverted Indexes again - // - [x] Extract fieldid facet isempty docids - // - [x] Extract fieldid facet isnull docids - // - [x] Extract fieldid facet exists docids + // TODO THIS IS TOO MUCH + // - [ ] Extract fieldid docid facet number + // - [ ] Extract fieldid docid facet string + // - [ ] Extract facetid string fst + // - [ ] Extract facetid normalized string strings - // TODO This is the normal system - // - [x] Extract fieldid facet number docids - // - [x] Extract fieldid facet string docids + // TODO Inverted Indexes again + // - [x] Extract fieldid facet isempty docids + // - [x] Extract fieldid facet isnull docids + // - [x] Extract fieldid facet exists docids - Result::Ok(facet_field_ids_delta) - }) + // TODO This is the normal system + // - [x] Extract fieldid facet number docids + // - [x] Extract fieldid facet string docids + + Result::Ok(facet_field_ids_delta) + }) })?; let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);