Add more precise spans

This commit is contained in:
Louis Dureuil 2024-11-25 16:09:15 +01:00
parent 5560452ef9
commit aa460819a7
No known key found for this signature in database

View File

@ -109,55 +109,71 @@ where
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
// document but we need to create a function that collects and compresses documents. // document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.documents(); let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender, embedders); let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
{
extract(document_changes, let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");
&document_extractor, let _entered = span.enter();
indexing_context, extract(document_changes,
&mut extractor_allocs, &document_extractor,
&datastore, indexing_context,
Step::ExtractingDocuments, &mut extractor_allocs,
)?; &datastore,
Step::ExtractingDocuments,
for document_extractor_data in datastore { )?;
let document_extractor_data = document_extractor_data.0.into_inner();
for (field, delta) in document_extractor_data.field_distribution_delta {
let current = field_distribution.entry(field).or_default();
// adding the delta should never cause a negative result, as we are removing fields that previously existed.
*current = current.saturating_add_signed(delta);
}
document_extractor_data.docids_delta.apply_to(document_ids);
} }
{
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents");
let _entered = span.enter();
for document_extractor_data in datastore {
let document_extractor_data = document_extractor_data.0.into_inner();
for (field, delta) in document_extractor_data.field_distribution_delta {
let current = field_distribution.entry(field).or_default();
// adding the delta should never cause a negative result, as we are removing fields that previously existed.
*current = current.saturating_add_signed(delta);
}
document_extractor_data.docids_delta.apply_to(document_ids);
}
field_distribution.retain(|_, v| *v != 0); field_distribution.retain(|_, v| *v != 0);
}
let facet_field_ids_delta; let facet_field_ids_delta;
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); let caches = {
let _entered = span.enter(); let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted");
let _entered = span.enter();
facet_field_ids_delta = merge_and_send_facet_docids(
FacetedDocidsExtractor::run_extraction( FacetedDocidsExtractor::run_extraction(
grenad_parameters, grenad_parameters,
document_changes, document_changes,
indexing_context, indexing_context,
&mut extractor_allocs, &mut extractor_allocs,
&extractor_sender.field_id_docid_facet_sender(), &extractor_sender.field_id_docid_facet_sender(),
Step::ExtractingFacets Step::ExtractingFacets
)?, )?
FacetDatabases::new(index), };
index,
extractor_sender.facet_docids(), {
)?; let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted");
let _entered = span.enter();
facet_field_ids_delta = merge_and_send_facet_docids(
caches,
FacetDatabases::new(index),
index,
extractor_sender.facet_docids(),
)?;
}
} }
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
let WordDocidsCaches { let WordDocidsCaches {
@ -166,15 +182,19 @@ where
exact_word_docids, exact_word_docids,
word_position_docids, word_position_docids,
fid_word_count_docids, fid_word_count_docids,
} = WordDocidsExtractors::run_extraction( } = {
grenad_parameters, let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
document_changes, let _entered = span.enter();
indexing_context,
&mut extractor_allocs, WordDocidsExtractors::run_extraction(
Step::ExtractingWords grenad_parameters,
)?; document_changes,
indexing_context,
&mut extractor_allocs,
Step::ExtractingWords
)?
};
// TODO Word Docids Merger
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter(); let _entered = span.enter();
@ -187,7 +207,6 @@ where
)?; )?;
} }
// Word Fid Docids Merging
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
let _entered = span.enter(); let _entered = span.enter();
@ -200,7 +219,6 @@ where
)?; )?;
} }
// Exact Word Docids Merging
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter(); let _entered = span.enter();
@ -213,7 +231,6 @@ where
)?; )?;
} }
// Word Position Docids Merging
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter(); let _entered = span.enter();
@ -226,7 +243,6 @@ where
)?; )?;
} }
// Fid Word Count Docids Merging
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let _entered = span.enter(); let _entered = span.enter();
@ -244,30 +260,34 @@ where
// this works only if the settings didn't change during this transaction. // this works only if the settings didn't change during this transaction.
let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default();
if proximity_precision == ProximityPrecision::ByWord { if proximity_precision == ProximityPrecision::ByWord {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); let caches = {
let _entered = span.enter(); let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
let _entered = span.enter();
<WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(
grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
Step::ExtractingWordProximity,
)?
};
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction( {
grenad_parameters, let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
document_changes, let _entered = span.enter();
indexing_context,
&mut extractor_allocs,
Step::ExtractingWordProximity,
)?;
merge_and_send_docids( merge_and_send_docids(
caches, caches,
index.word_pair_proximity_docids.remap_types(), index.word_pair_proximity_docids.remap_types(),
index, index,
extractor_sender.docids::<WordPairProximityDocids>(), extractor_sender.docids::<WordPairProximityDocids>(),
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
)?; )?;
}
} }
'vectors: { 'vectors: {
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter();
let mut index_embeddings = index.embedding_configs(&rtxn)?; let mut index_embeddings = index.embedding_configs(&rtxn)?;
if index_embeddings.is_empty() { if index_embeddings.is_empty() {
@ -277,13 +297,22 @@ where
let embedding_sender = extractor_sender.embeddings(); let embedding_sender = extractor_sender.embeddings();
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, Step::ExtractingEmbeddings)?; {
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter();
for config in &mut index_embeddings { extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, Step::ExtractingEmbeddings)?;
'data: for data in datastore.iter_mut() { }
let data = &mut data.get_mut().0; {
let Some(deladd) = data.remove(&config.name) else { continue 'data; }; let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors");
deladd.apply_to(&mut config.user_provided); let _entered = span.enter();
for config in &mut index_embeddings {
'data: for data in datastore.iter_mut() {
let data = &mut data.get_mut().0;
let Some(deladd) = data.remove(&config.name) else { continue 'data; };
deladd.apply_to(&mut config.user_provided);
}
} }
} }
@ -291,21 +320,24 @@ where
} }
'geo: { 'geo: {
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
let _entered = span.enter();
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
break 'geo; break 'geo;
}; };
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
extract(
document_changes, {
&extractor, let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
indexing_context, let _entered = span.enter();
&mut extractor_allocs,
&datastore, extract(
Step::WritingGeoPoints document_changes,
)?; &extractor,
indexing_context,
&mut extractor_allocs,
&datastore,
Step::WritingGeoPoints
)?;
}
merge_and_send_rtree( merge_and_send_rtree(
datastore, datastore,
@ -316,11 +348,7 @@ where
)?; )?;
} }
{ (indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase));
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
let _entered = span.enter();
(indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase));
}
Result::Ok(facet_field_ids_delta) Result::Ok(facet_field_ids_delta)
})?; })?;
@ -352,90 +380,103 @@ where
.collect(); .collect();
let mut arroy_writers = arroy_writers?; let mut arroy_writers = arroy_writers?;
for operation in writer_receiver { {
match operation { let span = tracing::trace_span!(target: "indexing::write_db", "all");
WriterOperation::DbOperation(db_operation) => { let _entered = span.enter();
let database = db_operation.database(index);
let database_name = db_operation.database_name(); for operation in writer_receiver {
match db_operation.entry() { match operation {
EntryOperation::Delete(e) => match database.delete(wtxn, e.entry()) { WriterOperation::DbOperation(db_operation) => {
Ok(false) => unreachable!("We tried to delete an unknown key"), let database = db_operation.database(index);
Ok(_) => (), let database_name = db_operation.database_name();
Err(error) => { match db_operation.entry() {
return Err(Error::InternalError(InternalError::StoreDeletion { EntryOperation::Delete(e) => match database.delete(wtxn, e.entry()) {
database_name, Ok(false) => unreachable!("We tried to delete an unknown key"),
key: e.entry().to_owned(), Ok(_) => (),
error, Err(error) => {
})); return Err(Error::InternalError(
} InternalError::StoreDeletion {
}, database_name,
EntryOperation::Write(e) => { key: e.entry().to_owned(),
if let Err(error) = database.put(wtxn, e.key(), e.value()) { error,
return Err(Error::InternalError(InternalError::StorePut { },
database_name, ));
key: e.key().to_owned(), }
value_length: e.value().len(), },
error, EntryOperation::Write(e) => {
})); if let Err(error) = database.put(wtxn, e.key(), e.value()) {
return Err(Error::InternalError(InternalError::StorePut {
database_name,
key: e.key().to_owned(),
value_length: e.value().len(),
error,
}));
}
} }
} }
} }
WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation {
ArroyOperation::DeleteVectors { docid } => {
for (
_embedder_index,
(_embedder_name, _embedder, writer, dimensions),
) in &mut arroy_writers
{
let dimensions = *dimensions;
writer.del_items(wtxn, dimensions, docid)?;
}
}
ArroyOperation::SetVectors {
docid,
embedder_id,
embeddings: raw_embeddings,
} => {
let (_, _, writer, dimensions) = arroy_writers
.get(&embedder_id)
.expect("requested a missing embedder");
let mut embeddings = Embeddings::new(*dimensions);
for embedding in raw_embeddings {
embeddings.append(embedding).unwrap();
}
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_items(wtxn, docid, &embeddings)?;
}
ArroyOperation::SetVector { docid, embedder_id, embedding } => {
let (_, _, writer, dimensions) = arroy_writers
.get(&embedder_id)
.expect("requested a missing embedder");
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_item(wtxn, docid, &embedding)?;
}
ArroyOperation::Finish { configs } => {
let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
let _entered = span.enter();
(indexing_context.send_progress)(Progress::from_step(
Step::WritingEmbeddingsToDatabase,
));
for (
_embedder_index,
(_embedder_name, _embedder, writer, dimensions),
) in &mut arroy_writers
{
let dimensions = *dimensions;
writer.build_and_quantize(
wtxn,
&mut rng,
dimensions,
false,
&indexing_context.must_stop_processing,
)?;
}
index.put_embedding_configs(wtxn, configs)?;
}
},
} }
WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation {
ArroyOperation::DeleteVectors { docid } => {
for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in
&mut arroy_writers
{
let dimensions = *dimensions;
writer.del_items(wtxn, dimensions, docid)?;
}
}
ArroyOperation::SetVectors {
docid,
embedder_id,
embeddings: raw_embeddings,
} => {
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
// TODO: switch to Embeddings
let mut embeddings = Embeddings::new(*dimensions);
for embedding in raw_embeddings {
embeddings.append(embedding).unwrap();
}
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_items(wtxn, docid, &embeddings)?;
}
ArroyOperation::SetVector { docid, embedder_id, embedding } => {
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_item(wtxn, docid, &embedding)?;
}
ArroyOperation::Finish { configs } => {
let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
let _entered = span.enter();
(indexing_context.send_progress)(Progress::from_step(
Step::WritingEmbeddingsToDatabase,
));
for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in
&mut arroy_writers
{
let dimensions = *dimensions;
writer.build_and_quantize(
wtxn,
&mut rng,
dimensions,
false,
&indexing_context.must_stop_processing,
)?;
}
index.put_embedding_configs(wtxn, configs)?;
}
},
} }
} }