diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 00c0a4a5f..591c8d4cd 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -34,7 +34,7 @@ use crate::{FieldId, Result}; pub(crate) fn data_from_obkv_documents( obkv_chunks: impl Iterator>> + Send, indexer: GrenadParameters, - lmdb_writer_sx: Sender, + lmdb_writer_sx: Sender>, searchable_fields: Option>, faceted_fields: HashSet, stop_words: Option>, @@ -42,63 +42,14 @@ pub(crate) fn data_from_obkv_documents( let result: Result<(Vec<_>, (Vec<_>, Vec<_>))> = obkv_chunks .par_bridge() .map(|result| { - let documents_chunk = result.and_then(|c| unsafe { into_clonable_grenad(c) }).unwrap(); - - lmdb_writer_sx.send(TypedChunk::Documents(documents_chunk.clone())).unwrap(); - - let (docid_word_positions_chunk, docid_fid_facet_values_chunks): ( - Result<_>, - Result<_>, - ) = rayon::join( - || { - let (documents_ids, docid_word_positions_chunk) = extract_docid_word_positions( - documents_chunk.clone(), - indexer.clone(), - &searchable_fields, - stop_words.as_ref(), - )?; - - // send documents_ids to DB writer - lmdb_writer_sx.send(TypedChunk::NewDocumentsIds(documents_ids)).unwrap(); - - // send docid_word_positions_chunk to DB writer - let docid_word_positions_chunk = - unsafe { into_clonable_grenad(docid_word_positions_chunk)? }; - lmdb_writer_sx - .send(TypedChunk::DocidWordPositions(docid_word_positions_chunk.clone())) - .unwrap(); - Ok(docid_word_positions_chunk) - }, - || { - let (docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk) = - extract_fid_docid_facet_values( - documents_chunk.clone(), - indexer.clone(), - &faceted_fields, - )?; - - // send docid_fid_facet_numbers_chunk to DB writer - let docid_fid_facet_numbers_chunk = - unsafe { into_clonable_grenad(docid_fid_facet_numbers_chunk)? }; - lmdb_writer_sx - .send(TypedChunk::FieldIdDocidFacetNumbers( - docid_fid_facet_numbers_chunk.clone(), - )) - .unwrap(); - - // send docid_fid_facet_strings_chunk to DB writer - let docid_fid_facet_strings_chunk = - unsafe { into_clonable_grenad(docid_fid_facet_strings_chunk)? }; - lmdb_writer_sx - .send(TypedChunk::FieldIdDocidFacetStrings( - docid_fid_facet_strings_chunk.clone(), - )) - .unwrap(); - - Ok((docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk)) - }, - ); - Ok((docid_word_positions_chunk?, docid_fid_facet_values_chunks?)) + extract_documents_data( + result, + indexer, + lmdb_writer_sx.clone(), + &searchable_fields, + &faceted_fields, + &stop_words, + ) }) .collect(); @@ -177,7 +128,7 @@ pub(crate) fn data_from_obkv_documents( fn spawn_extraction_task( chunks: Vec>, indexer: GrenadParameters, - lmdb_writer_sx: Sender, + lmdb_writer_sx: Sender>, extract_fn: FE, merge_fn: MergeFn, serialize_fn: FS, @@ -190,14 +141,89 @@ fn spawn_extraction_task( FS: Fn(grenad::Reader) -> TypedChunk + Sync + Send + 'static, { rayon::spawn(move || { - let chunks: Vec<_> = chunks - .into_par_iter() - .map(|chunk| extract_fn(chunk, indexer.clone()).unwrap()) - .collect(); - rayon::spawn(move || { - debug!("merge {} database", name); - let reader = merge_readers(chunks, merge_fn, indexer).unwrap(); - lmdb_writer_sx.send(serialize_fn(reader)).unwrap(); - }); + let chunks: Result> = + chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer.clone())).collect(); + rayon::spawn(move || match chunks { + Ok(chunks) => { + debug!("merge {} database", name); + let reader = merge_readers(chunks, merge_fn, indexer); + lmdb_writer_sx.send(reader.map(|r| serialize_fn(r))).unwrap(); + } + Err(e) => lmdb_writer_sx.send(Err(e)).unwrap(), + }) }); } + +/// Extract chuncked data and send it into lmdb_writer_sx sender: +/// - documents +/// - documents_ids +/// - docid_word_positions +/// - docid_fid_facet_numbers +/// - docid_fid_facet_strings +fn extract_documents_data( + documents_chunk: Result>, + indexer: GrenadParameters, + lmdb_writer_sx: Sender>, + searchable_fields: &Option>, + faceted_fields: &HashSet, + stop_words: &Option>, +) -> Result<( + grenad::Reader, + (grenad::Reader, grenad::Reader), +)> { + let documents_chunk = documents_chunk.and_then(|c| unsafe { into_clonable_grenad(c) })?; + + lmdb_writer_sx.send(Ok(TypedChunk::Documents(documents_chunk.clone()))).unwrap(); + + let (docid_word_positions_chunk, docid_fid_facet_values_chunks): (Result<_>, Result<_>) = + rayon::join( + || { + let (documents_ids, docid_word_positions_chunk) = extract_docid_word_positions( + documents_chunk.clone(), + indexer.clone(), + searchable_fields, + stop_words.as_ref(), + )?; + + // send documents_ids to DB writer + lmdb_writer_sx.send(Ok(TypedChunk::NewDocumentsIds(documents_ids))).unwrap(); + + // send docid_word_positions_chunk to DB writer + let docid_word_positions_chunk = + unsafe { into_clonable_grenad(docid_word_positions_chunk)? }; + lmdb_writer_sx + .send(Ok(TypedChunk::DocidWordPositions(docid_word_positions_chunk.clone()))) + .unwrap(); + Ok(docid_word_positions_chunk) + }, + || { + let (docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk) = + extract_fid_docid_facet_values( + documents_chunk.clone(), + indexer.clone(), + faceted_fields, + )?; + + // send docid_fid_facet_numbers_chunk to DB writer + let docid_fid_facet_numbers_chunk = + unsafe { into_clonable_grenad(docid_fid_facet_numbers_chunk)? }; + lmdb_writer_sx + .send(Ok(TypedChunk::FieldIdDocidFacetNumbers( + docid_fid_facet_numbers_chunk.clone(), + ))) + .unwrap(); + + // send docid_fid_facet_strings_chunk to DB writer + let docid_fid_facet_strings_chunk = + unsafe { into_clonable_grenad(docid_fid_facet_strings_chunk)? }; + lmdb_writer_sx + .send(Ok(TypedChunk::FieldIdDocidFacetStrings( + docid_fid_facet_strings_chunk.clone(), + ))) + .unwrap(); + + Ok((docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk)) + }, + ); + Ok((docid_word_positions_chunk?, docid_fid_facet_values_chunks?)) +} diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index b7fa1492c..4cf7c83f1 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -222,8 +222,10 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { let documents_file = grenad::Reader::new(documents_file)?; // create LMDB writer channel - let (lmdb_writer_sx, lmdb_writer_rx): (Sender, Receiver) = - crossbeam_channel::unbounded(); + let (lmdb_writer_sx, lmdb_writer_rx): ( + Sender>, + Receiver>, + ) = crossbeam_channel::unbounded(); // get searchable fields for word databases let searchable_fields = @@ -244,23 +246,31 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { }; // split obkv file into several chuncks - let mut chunk_iter = grenad_obkv_into_chunks( + let chunk_iter = grenad_obkv_into_chunks( documents_file, params.clone(), self.log_every_n, Byte::from_bytes(self.documents_chunk_size.unwrap_or(1024 * 1024 * 128) as u64), // 128MiB - ) - .unwrap(); - // extract all databases from the chunked obkv douments - extract::data_from_obkv_documents( - &mut chunk_iter, - params, - lmdb_writer_sx, - searchable_fields, - faceted_fields, - stop_words, - ) - .unwrap(); + ); + + let result = chunk_iter.map(|chunk_iter| { + // extract all databases from the chunked obkv douments + extract::data_from_obkv_documents( + chunk_iter, + params, + lmdb_writer_sx.clone(), + searchable_fields, + faceted_fields, + stop_words, + ) + }); + + if let Err(e) = result { + lmdb_writer_sx.send(Err(e)).unwrap(); + } + + // needs to be droped to avoid channel waiting lock. + drop(lmdb_writer_sx) }); // We delete the documents that this document addition replaces. This way we are @@ -294,7 +304,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { for typed_chunk in lmdb_writer_rx { let (docids, is_merged_database) = - write_typed_chunk_into_index(typed_chunk, &self.index, self.wtxn, index_is_empty)?; + write_typed_chunk_into_index(typed_chunk?, &self.index, self.wtxn, index_is_empty)?; if !docids.is_empty() { final_documents_ids |= docids; let documents_seen_count = final_documents_ids.len();