Fix indentation

This commit is contained in:
Clément Renault 2024-11-07 15:08:56 +01:00
parent 0e4e9e866a
commit 01f8f30a7a
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -170,42 +170,41 @@ where
// TODO manage the errors correctly // TODO manage the errors correctly
let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.in_place_scope(|_s| { pool.in_place_scope(|_s| {
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
let _entered = span.enter(); let _entered = span.enter();
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
// document but we need to create a function that collects and compresses documents. // document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.documents(); let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender, embedders); let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_documents(); let (finished_steps, step_name) = steps::extract_documents();
extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
for document_extractor_data in datastore {
for document_extractor_data in datastore { let document_extractor_data = document_extractor_data.0.into_inner();
let document_extractor_data = document_extractor_data.0.into_inner(); for (field, delta) in document_extractor_data.field_distribution_delta {
for (field, delta) in document_extractor_data.field_distribution_delta { let current = field_distribution.entry(field).or_default();
let current = field_distribution.entry(field).or_default(); // adding the delta should never cause a negative result, as we are removing fields that previously existed.
// adding the delta should never cause a negative result, as we are removing fields that previously existed. *current = current.saturating_add_signed(delta);
*current = current.saturating_add_signed(delta);
}
document_extractor_data.docids_delta.apply_to(document_ids);
} }
document_extractor_data.docids_delta.apply_to(document_ids);
}
field_distribution.retain(|_, v| *v == 0); field_distribution.retain(|_, v| *v == 0);
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
let current_num_threads = rayon::current_num_threads(); let current_num_threads = rayon::current_num_threads();
let max_memory = TEN_GIB / current_num_threads; let max_memory = TEN_GIB / current_num_threads;
eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads"); eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads");
let grenad_parameters = GrenadParameters { let grenad_parameters = GrenadParameters {
max_memory: Some(max_memory), max_memory: Some(max_memory),
..GrenadParameters::default() ..GrenadParameters::default()
}; };
let facet_field_ids_delta; let facet_field_ids_delta;
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
@ -324,65 +323,65 @@ where
)?; )?;
} }
'vectors: { 'vectors: {
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter(); let _entered = span.enter();
let index_embeddings = index.embedding_configs(&rtxn)?; let index_embeddings = index.embedding_configs(&rtxn)?;
if index_embeddings.is_empty() { if index_embeddings.is_empty() {
break 'vectors; break 'vectors;
} }
let embedding_sender = extractor_sender.embeddings(); let embedding_sender = extractor_sender.embeddings();
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_embeddings(); let (finished_steps, step_name) = steps::extract_embeddings();
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
let mut user_provided = HashMap::new(); let mut user_provided = HashMap::new();
for data in datastore { for data in datastore {
let data = data.into_inner().0; let data = data.into_inner().0;
for (embedder, deladd) in data.into_iter() { for (embedder, deladd) in data.into_iter() {
let user_provided = user_provided.entry(embedder).or_insert(Default::default()); let user_provided = user_provided.entry(embedder).or_insert(Default::default());
if let Some(del) = deladd.del { if let Some(del) = deladd.del {
*user_provided -= del; *user_provided -= del;
} }
if let Some(add) = deladd.add { if let Some(add) = deladd.add {
*user_provided |= add; *user_provided |= add;
}
} }
} }
embedding_sender.finish(user_provided).unwrap();
} }
{ embedding_sender.finish(user_provided).unwrap();
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); }
let _entered = span.enter();
let (finished_steps, step_name) = steps::write_db();
(indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None });
}
// TODO THIS IS TOO MUCH {
// - [ ] Extract fieldid docid facet number let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
// - [ ] Extract fieldid docid facet string let _entered = span.enter();
// - [ ] Extract facetid string fst let (finished_steps, step_name) = steps::write_db();
// - [ ] Extract facetid normalized string strings (indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None });
}
// TODO Inverted Indexes again // TODO THIS IS TOO MUCH
// - [x] Extract fieldid facet isempty docids // - [ ] Extract fieldid docid facet number
// - [x] Extract fieldid facet isnull docids // - [ ] Extract fieldid docid facet string
// - [x] Extract fieldid facet exists docids // - [ ] Extract facetid string fst
// - [ ] Extract facetid normalized string strings
// TODO This is the normal system // TODO Inverted Indexes again
// - [x] Extract fieldid facet number docids // - [x] Extract fieldid facet isempty docids
// - [x] Extract fieldid facet string docids // - [x] Extract fieldid facet isnull docids
// - [x] Extract fieldid facet exists docids
Result::Ok(facet_field_ids_delta) // TODO This is the normal system
}) // - [x] Extract fieldid facet number docids
// - [x] Extract fieldid facet string docids
Result::Ok(facet_field_ids_delta)
})
})?; })?;
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);