diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index 04cdb912f..bec1fedf5 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -1351,7 +1351,10 @@ impl IndexScheduler { let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + local_pool = ThreadPoolNoAbortBuilder::new() + .thread_name(|i| format!("indexing-thread-{i}")) + .build() + .unwrap(); &local_pool } }; @@ -1399,21 +1402,19 @@ impl IndexScheduler { } if tasks.iter().any(|res| res.error.is_none()) { - pool.install(|| { - indexer::index( - index_wtxn, - index, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| must_stop_processing.get(), - &send_progress, - ) - }) - .unwrap()?; + indexer::index( + index_wtxn, + index, + pool, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| must_stop_processing.get(), + &send_progress, + )?; tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } @@ -1489,34 +1490,34 @@ impl IndexScheduler { let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + local_pool = ThreadPoolNoAbortBuilder::new() + .thread_name(|i| format!("indexing-thread-{i}")) + .build() + .unwrap(); &local_pool } }; - pool.install(|| { - let indexer = - UpdateByFunction::new(candidates, context.clone(), code.clone()); - let document_changes = indexer.into_changes(&primary_key)?; - let embedders = index.embedding_configs(index_wtxn)?; - let embedders = self.embedders(embedders)?; + let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone()); + let document_changes = + pool.install(|| indexer.into_changes(&primary_key)).unwrap()?; - indexer::index( - index_wtxn, - index, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - None, // cannot change primary key in DocumentEdition - &document_changes, - embedders, - &|| must_stop_processing.get(), - &send_progress, - )?; + let embedders = index.embedding_configs(index_wtxn)?; + let embedders = self.embedders(embedders)?; - Result::Ok(()) - }) - .unwrap()?; + indexer::index( + index_wtxn, + index, + pool, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + None, // cannot change primary key in DocumentEdition + &document_changes, + embedders, + &|| must_stop_processing.get(), + &send_progress, + )?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } @@ -1641,7 +1642,10 @@ impl IndexScheduler { let pool = match &indexer_config.thread_pool { Some(pool) => pool, None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + local_pool = ThreadPoolNoAbortBuilder::new() + .thread_name(|i| format!("indexing-thread-{i}")) + .build() + .unwrap(); &local_pool } }; @@ -1652,21 +1656,19 @@ impl IndexScheduler { let embedders = index.embedding_configs(index_wtxn)?; let embedders = self.embedders(embedders)?; - pool.install(|| { - indexer::index( - index_wtxn, - index, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - None, // document deletion never changes primary key - &document_changes, - embedders, - &|| must_stop_processing.get(), - &send_progress, - ) - }) - .unwrap()?; + indexer::index( + index_wtxn, + index, + pool, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + None, // document deletion never changes primary key + &document_changes, + embedders, + &|| must_stop_processing.get(), + &send_progress, + )?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index fc05baa89..beba80ac8 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -55,6 +55,12 @@ pub fn extractor_writer_bbqueue( let producers = ThreadLocal::with_capacity(bbbuffers.len()); let consumers = rayon::broadcast(|bi| { + eprintln!( + "hello thread #{:?} (#{:?}, #{:?})", + bi.index(), + std::thread::current().name(), + std::thread::current().id(), + ); let bbqueue = &bbbuffers[bi.index()]; let (producer, consumer) = bbqueue.try_split_framed().unwrap(); producers.get_or(|| FullySend(RefCell::new(producer))); @@ -399,7 +405,15 @@ impl<'b> ExtractorBbqueueSender<'b> { fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> { let capacity = self.capacity; - let refcell = self.producers.get().unwrap(); + let refcell = match self.producers.get() { + Some(refcell) => refcell, + None => panic!( + "hello thread #{:?} (#{:?}, #{:?})", + rayon::current_thread_index(), + std::thread::current().name(), + std::thread::current().id() + ), + }; let mut producer = refcell.0.borrow_mut_or_yield(); let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }); @@ -438,7 +452,15 @@ impl<'b> ExtractorBbqueueSender<'b> { embedding: &[f32], ) -> crate::Result<()> { let capacity = self.capacity; - let refcell = self.producers.get().unwrap(); + let refcell = match self.producers.get() { + Some(refcell) => refcell, + None => panic!( + "hello thread #{:?} (#{:?}, #{:?})", + rayon::current_thread_index(), + std::thread::current().name(), + std::thread::current().id() + ), + }; let mut producer = refcell.0.borrow_mut_or_yield(); let payload_header = @@ -496,7 +518,15 @@ impl<'b> ExtractorBbqueueSender<'b> { F: FnOnce(&mut [u8]) -> crate::Result<()>, { let capacity = self.capacity; - let refcell = self.producers.get().unwrap(); + let refcell = match self.producers.get() { + Some(refcell) => refcell, + None => panic!( + "hello thread #{:?} (#{:?}, #{:?})", + rayon::current_thread_index(), + std::thread::current().name(), + std::thread::current().id() + ), + }; let mut producer = refcell.0.borrow_mut_or_yield(); let operation = DbOperation { database, key_length: Some(key_length) }; diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 89c1b850d..b7d5431b4 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -62,6 +62,7 @@ mod update_by_function; pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>( wtxn: &mut RwTxn, index: &'index Index, + pool: &ThreadPoolNoAbort, grenad_parameters: GrenadParameters, db_fields_ids_map: &'indexer FieldsIdsMap, new_fields_ids_map: FieldsIdsMap, @@ -77,10 +78,15 @@ where SP: Fn(Progress) + Sync, { /// TODO restrict memory and remove this memory from the extractors bump allocators - let bbbuffers: Vec<_> = (0..rayon::current_num_threads()) - .map(|_| bbqueue::BBBuffer::new(100 * 1024 * 1024)) // 100 MiB by thread - .collect(); - let (extractor_sender, mut writer_receiver) = extractor_writer_bbqueue(&bbbuffers, 1000); + let bbbuffers: Vec<_> = pool + .install(|| { + (0..rayon::current_num_threads()) + .map(|_| bbqueue::BBBuffer::new(100 * 1024 * 1024)) // 100 MiB by thread + .collect() + }) + .unwrap(); + let (extractor_sender, mut writer_receiver) = + pool.install(|| extractor_writer_bbqueue(&bbbuffers, 1000)).unwrap(); let finished_extraction = AtomicBool::new(false); let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; @@ -112,253 +118,255 @@ where let field_distribution = &mut field_distribution; let document_ids = &mut document_ids; let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { - let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); - let _entered = span.enter(); - - let rtxn = index.read_txn()?; - - // document but we need to create a function that collects and compresses documents. - let document_sender = extractor_sender.documents(); - let document_extractor = DocumentsExtractor::new(document_sender, embedders); - let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - { - let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); + pool.install(move || { + let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); let _entered = span.enter(); - extract(document_changes, - &document_extractor, - indexing_context, - &mut extractor_allocs, - &datastore, - Step::ExtractingDocuments, - )?; - } - { - let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents"); - let _entered = span.enter(); - for document_extractor_data in datastore { - let document_extractor_data = document_extractor_data.0.into_inner(); - for (field, delta) in document_extractor_data.field_distribution_delta { - let current = field_distribution.entry(field).or_default(); - // adding the delta should never cause a negative result, as we are removing fields that previously existed. - *current = current.saturating_add_signed(delta); + + let rtxn = index.read_txn()?; + + // document but we need to create a function that collects and compresses documents. + let document_sender = extractor_sender.documents(); + let document_extractor = DocumentsExtractor::new(document_sender, embedders); + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + { + let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); + let _entered = span.enter(); + extract(document_changes, + &document_extractor, + indexing_context, + &mut extractor_allocs, + &datastore, + Step::ExtractingDocuments, + )?; + } + { + let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents"); + let _entered = span.enter(); + for document_extractor_data in datastore { + let document_extractor_data = document_extractor_data.0.into_inner(); + for (field, delta) in document_extractor_data.field_distribution_delta { + let current = field_distribution.entry(field).or_default(); + // adding the delta should never cause a negative result, as we are removing fields that previously existed. + *current = current.saturating_add_signed(delta); + } + document_extractor_data.docids_delta.apply_to(document_ids); } - document_extractor_data.docids_delta.apply_to(document_ids); + + field_distribution.retain(|_, v| *v != 0); } - field_distribution.retain(|_, v| *v != 0); - } + let facet_field_ids_delta; - let facet_field_ids_delta; + { + let caches = { + let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted"); + let _entered = span.enter(); - { - let caches = { - let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted"); - let _entered = span.enter(); + FacetedDocidsExtractor::run_extraction( + grenad_parameters, + document_changes, + indexing_context, + &mut extractor_allocs, + &extractor_sender.field_id_docid_facet_sender(), + Step::ExtractingFacets + )? + }; - FacetedDocidsExtractor::run_extraction( + { + let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted"); + let _entered = span.enter(); + + facet_field_ids_delta = merge_and_send_facet_docids( + caches, + FacetDatabases::new(index), + index, + extractor_sender.facet_docids(), + )?; + } + } + + { + let WordDocidsCaches { + word_docids, + word_fid_docids, + exact_word_docids, + word_position_docids, + fid_word_count_docids, + } = { + let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); + let _entered = span.enter(); + + WordDocidsExtractors::run_extraction( grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, - &extractor_sender.field_id_docid_facet_sender(), - Step::ExtractingFacets + Step::ExtractingWords )? - }; + }; - { - let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted"); - let _entered = span.enter(); + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); + let _entered = span.enter(); + merge_and_send_docids( + word_docids, + index.word_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } - facet_field_ids_delta = merge_and_send_facet_docids( - caches, - FacetDatabases::new(index), - index, - extractor_sender.facet_docids(), - )?; - } - } + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); + let _entered = span.enter(); + merge_and_send_docids( + word_fid_docids, + index.word_fid_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } - { - let WordDocidsCaches { - word_docids, - word_fid_docids, - exact_word_docids, - word_position_docids, - fid_word_count_docids, - } = { - let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); - let _entered = span.enter(); + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); + let _entered = span.enter(); + merge_and_send_docids( + exact_word_docids, + index.exact_word_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } - WordDocidsExtractors::run_extraction( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - Step::ExtractingWords - )? - }; + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); + let _entered = span.enter(); + merge_and_send_docids( + word_position_docids, + index.word_position_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); - let _entered = span.enter(); - merge_and_send_docids( - word_docids, - index.word_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); + let _entered = span.enter(); + merge_and_send_docids( + fid_word_count_docids, + index.field_id_word_count_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } } - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); - let _entered = span.enter(); - merge_and_send_docids( - word_fid_docids, - index.word_fid_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; + // run the proximity extraction only if the precision is by word + // this works only if the settings didn't change during this transaction. + let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); + if proximity_precision == ProximityPrecision::ByWord { + let caches = { + let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); + let _entered = span.enter(); + + ::run_extraction( + grenad_parameters, + document_changes, + indexing_context, + &mut extractor_allocs, + Step::ExtractingWordProximity, + )? + }; + + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); + let _entered = span.enter(); + + merge_and_send_docids( + caches, + index.word_pair_proximity_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } } - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); - let _entered = span.enter(); - merge_and_send_docids( - exact_word_docids, - index.exact_word_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } + 'vectors: { + if index_embeddings.is_empty() { + break 'vectors; + } - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); - let _entered = span.enter(); - merge_and_send_docids( - word_position_docids, - index.word_position_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } + let embedding_sender = extractor_sender.embeddings(); + let extractor = EmbeddingExtractor::new(embedders, embedding_sender, field_distribution, request_threads()); + let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); + let _entered = span.enter(); - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); - let _entered = span.enter(); - merge_and_send_docids( - fid_word_count_docids, - index.field_id_word_count_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } - } + extract( + document_changes, + &extractor, + indexing_context, + &mut extractor_allocs, + &datastore, + Step::ExtractingEmbeddings, + )?; + } + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors"); + let _entered = span.enter(); - // run the proximity extraction only if the precision is by word - // this works only if the settings didn't change during this transaction. - let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); - if proximity_precision == ProximityPrecision::ByWord { - let caches = { - let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); - let _entered = span.enter(); - - ::run_extraction( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - Step::ExtractingWordProximity, - )? - }; - - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); - let _entered = span.enter(); - - merge_and_send_docids( - caches, - index.word_pair_proximity_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } - } - - 'vectors: { - if index_embeddings.is_empty() { - break 'vectors; - } - - let embedding_sender = extractor_sender.embeddings(); - let extractor = EmbeddingExtractor::new(embedders, embedding_sender, field_distribution, request_threads()); - let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); - let _entered = span.enter(); - - extract( - document_changes, - &extractor, - indexing_context, - &mut extractor_allocs, - &datastore, - Step::ExtractingEmbeddings, - )?; - } - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors"); - let _entered = span.enter(); - - for config in &mut index_embeddings { - 'data: for data in datastore.iter_mut() { - let data = &mut data.get_mut().0; - let Some(deladd) = data.remove(&config.name) else { continue 'data; }; - deladd.apply_to(&mut config.user_provided); + for config in &mut index_embeddings { + 'data: for data in datastore.iter_mut() { + let data = &mut data.get_mut().0; + let Some(deladd) = data.remove(&config.name) else { continue 'data; }; + deladd.apply_to(&mut config.user_provided); + } } } } - } - 'geo: { - let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { - break 'geo; - }; - let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + 'geo: { + let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { + break 'geo; + }; + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); - let _entered = span.enter(); + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); + let _entered = span.enter(); - extract( - document_changes, - &extractor, - indexing_context, - &mut extractor_allocs, - &datastore, - Step::WritingGeoPoints + extract( + document_changes, + &extractor, + indexing_context, + &mut extractor_allocs, + &datastore, + Step::WritingGeoPoints + )?; + } + + merge_and_send_rtree( + datastore, + &rtxn, + index, + extractor_sender.geo(), + &indexing_context.must_stop_processing, )?; } - merge_and_send_rtree( - datastore, - &rtxn, - index, - extractor_sender.geo(), - &indexing_context.must_stop_processing, - )?; - } + (indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase)); - (indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase)); + finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); - finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); - - Result::Ok((facet_field_ids_delta, index_embeddings)) + Result::Ok((facet_field_ids_delta, index_embeddings)) + }).unwrap() })?; let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);