Remove unwrap sending errors in channel

This commit is contained in:
many 2021-08-24 13:01:31 +02:00
parent 5c962c03dd
commit a2f59a28f7
No known key found for this signature in database
GPG Key ID: 2CEF23B75189EACA
2 changed files with 120 additions and 84 deletions

View File

@ -34,7 +34,7 @@ use crate::{FieldId, Result};
pub(crate) fn data_from_obkv_documents(
obkv_chunks: impl Iterator<Item = Result<grenad::Reader<File>>> + Send,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<TypedChunk>,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
searchable_fields: Option<HashSet<FieldId>>,
faceted_fields: HashSet<FieldId>,
stop_words: Option<fst::Set<&[u8]>>,
@ -42,63 +42,14 @@ pub(crate) fn data_from_obkv_documents(
let result: Result<(Vec<_>, (Vec<_>, Vec<_>))> = obkv_chunks
.par_bridge()
.map(|result| {
let documents_chunk = result.and_then(|c| unsafe { into_clonable_grenad(c) }).unwrap();
lmdb_writer_sx.send(TypedChunk::Documents(documents_chunk.clone())).unwrap();
let (docid_word_positions_chunk, docid_fid_facet_values_chunks): (
Result<_>,
Result<_>,
) = rayon::join(
|| {
let (documents_ids, docid_word_positions_chunk) = extract_docid_word_positions(
documents_chunk.clone(),
indexer.clone(),
extract_documents_data(
result,
indexer,
lmdb_writer_sx.clone(),
&searchable_fields,
stop_words.as_ref(),
)?;
// send documents_ids to DB writer
lmdb_writer_sx.send(TypedChunk::NewDocumentsIds(documents_ids)).unwrap();
// send docid_word_positions_chunk to DB writer
let docid_word_positions_chunk =
unsafe { into_clonable_grenad(docid_word_positions_chunk)? };
lmdb_writer_sx
.send(TypedChunk::DocidWordPositions(docid_word_positions_chunk.clone()))
.unwrap();
Ok(docid_word_positions_chunk)
},
|| {
let (docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk) =
extract_fid_docid_facet_values(
documents_chunk.clone(),
indexer.clone(),
&faceted_fields,
)?;
// send docid_fid_facet_numbers_chunk to DB writer
let docid_fid_facet_numbers_chunk =
unsafe { into_clonable_grenad(docid_fid_facet_numbers_chunk)? };
lmdb_writer_sx
.send(TypedChunk::FieldIdDocidFacetNumbers(
docid_fid_facet_numbers_chunk.clone(),
))
.unwrap();
// send docid_fid_facet_strings_chunk to DB writer
let docid_fid_facet_strings_chunk =
unsafe { into_clonable_grenad(docid_fid_facet_strings_chunk)? };
lmdb_writer_sx
.send(TypedChunk::FieldIdDocidFacetStrings(
docid_fid_facet_strings_chunk.clone(),
))
.unwrap();
Ok((docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk))
},
);
Ok((docid_word_positions_chunk?, docid_fid_facet_values_chunks?))
&stop_words,
)
})
.collect();
@ -177,7 +128,7 @@ pub(crate) fn data_from_obkv_documents(
fn spawn_extraction_task<FE, FS>(
chunks: Vec<grenad::Reader<CursorClonableMmap>>,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<TypedChunk>,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
extract_fn: FE,
merge_fn: MergeFn,
serialize_fn: FS,
@ -190,14 +141,89 @@ fn spawn_extraction_task<FE, FS>(
FS: Fn(grenad::Reader<File>) -> TypedChunk + Sync + Send + 'static,
{
rayon::spawn(move || {
let chunks: Vec<_> = chunks
.into_par_iter()
.map(|chunk| extract_fn(chunk, indexer.clone()).unwrap())
.collect();
rayon::spawn(move || {
let chunks: Result<Vec<_>> =
chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer.clone())).collect();
rayon::spawn(move || match chunks {
Ok(chunks) => {
debug!("merge {} database", name);
let reader = merge_readers(chunks, merge_fn, indexer).unwrap();
lmdb_writer_sx.send(serialize_fn(reader)).unwrap();
});
let reader = merge_readers(chunks, merge_fn, indexer);
lmdb_writer_sx.send(reader.map(|r| serialize_fn(r))).unwrap();
}
Err(e) => lmdb_writer_sx.send(Err(e)).unwrap(),
})
});
}
/// Extract chuncked data and send it into lmdb_writer_sx sender:
/// - documents
/// - documents_ids
/// - docid_word_positions
/// - docid_fid_facet_numbers
/// - docid_fid_facet_strings
fn extract_documents_data(
documents_chunk: Result<grenad::Reader<File>>,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
searchable_fields: &Option<HashSet<FieldId>>,
faceted_fields: &HashSet<FieldId>,
stop_words: &Option<fst::Set<&[u8]>>,
) -> Result<(
grenad::Reader<CursorClonableMmap>,
(grenad::Reader<CursorClonableMmap>, grenad::Reader<CursorClonableMmap>),
)> {
let documents_chunk = documents_chunk.and_then(|c| unsafe { into_clonable_grenad(c) })?;
lmdb_writer_sx.send(Ok(TypedChunk::Documents(documents_chunk.clone()))).unwrap();
let (docid_word_positions_chunk, docid_fid_facet_values_chunks): (Result<_>, Result<_>) =
rayon::join(
|| {
let (documents_ids, docid_word_positions_chunk) = extract_docid_word_positions(
documents_chunk.clone(),
indexer.clone(),
searchable_fields,
stop_words.as_ref(),
)?;
// send documents_ids to DB writer
lmdb_writer_sx.send(Ok(TypedChunk::NewDocumentsIds(documents_ids))).unwrap();
// send docid_word_positions_chunk to DB writer
let docid_word_positions_chunk =
unsafe { into_clonable_grenad(docid_word_positions_chunk)? };
lmdb_writer_sx
.send(Ok(TypedChunk::DocidWordPositions(docid_word_positions_chunk.clone())))
.unwrap();
Ok(docid_word_positions_chunk)
},
|| {
let (docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk) =
extract_fid_docid_facet_values(
documents_chunk.clone(),
indexer.clone(),
faceted_fields,
)?;
// send docid_fid_facet_numbers_chunk to DB writer
let docid_fid_facet_numbers_chunk =
unsafe { into_clonable_grenad(docid_fid_facet_numbers_chunk)? };
lmdb_writer_sx
.send(Ok(TypedChunk::FieldIdDocidFacetNumbers(
docid_fid_facet_numbers_chunk.clone(),
)))
.unwrap();
// send docid_fid_facet_strings_chunk to DB writer
let docid_fid_facet_strings_chunk =
unsafe { into_clonable_grenad(docid_fid_facet_strings_chunk)? };
lmdb_writer_sx
.send(Ok(TypedChunk::FieldIdDocidFacetStrings(
docid_fid_facet_strings_chunk.clone(),
)))
.unwrap();
Ok((docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk))
},
);
Ok((docid_word_positions_chunk?, docid_fid_facet_values_chunks?))
}

View File

@ -222,8 +222,10 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
let documents_file = grenad::Reader::new(documents_file)?;
// create LMDB writer channel
let (lmdb_writer_sx, lmdb_writer_rx): (Sender<TypedChunk>, Receiver<TypedChunk>) =
crossbeam_channel::unbounded();
let (lmdb_writer_sx, lmdb_writer_rx): (
Sender<Result<TypedChunk>>,
Receiver<Result<TypedChunk>>,
) = crossbeam_channel::unbounded();
// get searchable fields for word databases
let searchable_fields =
@ -244,23 +246,31 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
};
// split obkv file into several chuncks
let mut chunk_iter = grenad_obkv_into_chunks(
let chunk_iter = grenad_obkv_into_chunks(
documents_file,
params.clone(),
self.log_every_n,
Byte::from_bytes(self.documents_chunk_size.unwrap_or(1024 * 1024 * 128) as u64), // 128MiB
)
.unwrap();
);
let result = chunk_iter.map(|chunk_iter| {
// extract all databases from the chunked obkv douments
extract::data_from_obkv_documents(
&mut chunk_iter,
chunk_iter,
params,
lmdb_writer_sx,
lmdb_writer_sx.clone(),
searchable_fields,
faceted_fields,
stop_words,
)
.unwrap();
});
if let Err(e) = result {
lmdb_writer_sx.send(Err(e)).unwrap();
}
// needs to be droped to avoid channel waiting lock.
drop(lmdb_writer_sx)
});
// We delete the documents that this document addition replaces. This way we are
@ -294,7 +304,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
for typed_chunk in lmdb_writer_rx {
let (docids, is_merged_database) =
write_typed_chunk_into_index(typed_chunk, &self.index, self.wtxn, index_is_empty)?;
write_typed_chunk_into_index(typed_chunk?, &self.index, self.wtxn, index_is_empty)?;
if !docids.is_empty() {
final_documents_ids |= docids;
let documents_seen_count = final_documents_ids.len();