3913: Expose a Puffin server to profile the indexing process r=Kerollmops a=Kerollmops

This PR exposes a puffin HTTP server to expose the internal timing it takes to index documents, delete documents, or update the settings of an index.

<img width="1752" alt="Capture d’écran 2023-07-10 à 18 44 58" src="https://github.com/meilisearch/meilisearch/assets/3610253/a3c7a6bf-db5b-42f4-8be1-c4e31c869843">

## To be done

 - [x] Move the puffin HTTP server under a feature flag.
 - [x] Use [the `puffin::set_scopes_on` function](https://docs.rs/puffin/latest/puffin/fn.set_scopes_on.html) to toggle it (by using the feature directly).
     When this function is called with `false`, [a call to `profile_scope!` talked 1-2ns](https://docs.rs/puffin/latest/puffin/fn.set_scopes_on.html).
 - [x] Create a _PROFILING.md_ file explaining how to use it.
   - [x] Explain that merging scopes on the interface is not always useful.
 - [x] Add more info on the number of batched tasks (using the `puffin::profile_scope!` macro data).
   - I added more info, but that's more continuous work when we consider we need more info here and there.
 - [x] Clean up some scopes, and don't touch too much code to inject puffin.
   - I am not sure that the _index_documents/mod.rs_ function is that complex with the addition of the scope.
 - [x] Think about what we consider frames. One indexation operation or the wall program. When must we stop the frame, then?
   - What we consider a frame is one single `IndexScheduler::tick` execution.
   - We can change that later.

Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2023-07-19 09:44:01 +00:00 committed by GitHub
commit 2dfbb6813a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 231 additions and 17 deletions

39
Cargo.lock generated
View File

@ -1973,6 +1973,7 @@ dependencies = [
"meilisearch-types", "meilisearch-types",
"nelson", "nelson",
"page_size 0.5.0", "page_size 0.5.0",
"puffin",
"roaring", "roaring",
"serde", "serde",
"serde_json", "serde_json",
@ -2498,6 +2499,12 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "lz4_flex"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b8c72594ac26bfd34f2d99dfced2edfaddfe8a476e3ff2ca0eb293d925c4f83"
[[package]] [[package]]
name = "manifest-dir-macros" name = "manifest-dir-macros"
version = "0.1.17" version = "0.1.17"
@ -2587,6 +2594,8 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
"platform-dirs", "platform-dirs",
"prometheus", "prometheus",
"puffin",
"puffin_http",
"rand", "rand",
"rayon", "rayon",
"regex", "regex",
@ -2731,6 +2740,7 @@ dependencies = [
"obkv", "obkv",
"once_cell", "once_cell",
"ordered-float", "ordered-float",
"puffin",
"rand", "rand",
"rand_pcg", "rand_pcg",
"rayon", "rayon",
@ -3256,6 +3266,35 @@ version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "puffin"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76425abd4e1a0ad4bd6995dd974b52f414fca9974171df8e3708b3e660d05a21"
dependencies = [
"anyhow",
"bincode",
"byteorder",
"cfg-if",
"instant",
"lz4_flex",
"once_cell",
"parking_lot",
"serde",
]
[[package]]
name = "puffin_http"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13bffc600c35913d282ae1e96a6ffcdf36dc7a7cdb9310e0ba15914d258c8193"
dependencies = [
"anyhow",
"crossbeam-channel",
"log",
"puffin",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.28" version = "1.0.28"

19
PROFILING.md Normal file
View File

@ -0,0 +1,19 @@
# Profiling Meilisearch
Search engine technologies are complex pieces of software that require thorough profiling tools. We chose to use [Puffin](https://github.com/EmbarkStudios/puffin), which the Rust gaming industry uses extensively. You can export and import the profiling reports using the top bar's _File_ menu options.
![An example profiling with Puffin viewer](assets/profiling-example.png)
## Profiling the Indexing Process
When you enable the `profile-with-puffin` feature of Meilisearch, a Puffin HTTP server will run on Meilisearch and listen on the default _0.0.0.0:8585_ address. This server will record a "frame" whenever it executes the `IndexScheduler::tick` method.
Once your Meilisearch is running and awaits new indexation operations, you must [install and run the `puffin_viewer` tool](https://github.com/EmbarkStudios/puffin/tree/main/puffin_viewer) to see the profiling results. I advise you to run the viewer with the `RUST_LOG=puffin_http::client=debug` environment variable to see the client trying to connect to your server.
Another piece of advice on the Puffin viewer UI interface is to consider the _Merge children with same ID_ option. It can hide the exact actual timings at which events were sent. Please turn it off when you see strange gaps on the Flamegraph. It can help.
## Profiling the Search Process
We still need to take the time to profile the search side of the engine with Puffin. It would require time to profile the filtering phase, query parsing, creation, and execution. We could even profile the Actix HTTP server.
The only issue we see is the framing system. Puffin requires a global frame-based profiling phase, which collides with Meilisearch's ability to accept and answer multiple requests on different threads simultaneously.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 MiB

View File

@ -22,6 +22,7 @@ log = "0.4.17"
meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" } meilisearch-types = { path = "../meilisearch-types" }
page_size = "0.5.0" page_size = "0.5.0"
puffin = "0.16.0"
roaring = { version = "0.10.1", features = ["serde"] } roaring = { version = "0.10.1", features = ["serde"] }
serde = { version = "1.0.160", features = ["derive"] } serde = { version = "1.0.160", features = ["derive"] }
serde_json = { version = "1.0.95", features = ["preserve_order"] } serde_json = { version = "1.0.95", features = ["preserve_order"] }

View File

@ -471,6 +471,8 @@ impl IndexScheduler {
#[cfg(test)] #[cfg(test)]
self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?; self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?;
puffin::profile_function!();
let enqueued = &self.get_status(rtxn, Status::Enqueued)?; let enqueued = &self.get_status(rtxn, Status::Enqueued)?;
let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued; let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
@ -575,6 +577,9 @@ impl IndexScheduler {
self.maybe_fail(crate::tests::FailureLocation::PanicInsideProcessBatch)?; self.maybe_fail(crate::tests::FailureLocation::PanicInsideProcessBatch)?;
self.breakpoint(crate::Breakpoint::InsideProcessBatch); self.breakpoint(crate::Breakpoint::InsideProcessBatch);
} }
puffin::profile_function!(format!("{:?}", batch));
match batch { match batch {
Batch::TaskCancelation { mut task, previous_started_at, previous_processing_tasks } => { Batch::TaskCancelation { mut task, previous_started_at, previous_processing_tasks } => {
// 1. Retrieve the tasks that matched the query at enqueue-time. // 1. Retrieve the tasks that matched the query at enqueue-time.
@ -1111,6 +1116,8 @@ impl IndexScheduler {
index: &'i Index, index: &'i Index,
operation: IndexOperation, operation: IndexOperation,
) -> Result<Vec<Task>> { ) -> Result<Vec<Task>> {
puffin::profile_function!();
match operation { match operation {
IndexOperation::DocumentClear { mut tasks, .. } => { IndexOperation::DocumentClear { mut tasks, .. } => {
let count = milli::update::ClearDocuments::new(index_wtxn, index).execute()?; let count = milli::update::ClearDocuments::new(index_wtxn, index).execute()?;

View File

@ -1032,6 +1032,8 @@ impl IndexScheduler {
self.breakpoint(Breakpoint::Start); self.breakpoint(Breakpoint::Start);
} }
puffin::GlobalProfiler::lock().new_frame();
self.cleanup_task_queue()?; self.cleanup_task_queue()?;
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;

View File

@ -67,6 +67,8 @@ permissive-json-pointer = { path = "../permissive-json-pointer" }
pin-project-lite = "0.2.9" pin-project-lite = "0.2.9"
platform-dirs = "0.3.0" platform-dirs = "0.3.0"
prometheus = { version = "0.13.3", features = ["process"] } prometheus = { version = "0.13.3", features = ["process"] }
puffin = "0.16.0"
puffin_http = { version = "0.13.0", optional = true }
rand = "0.8.5" rand = "0.8.5"
rayon = "1.7.0" rayon = "1.7.0"
regex = "1.7.3" regex = "1.7.3"
@ -133,6 +135,7 @@ zip = { version = "0.6.4", optional = true }
[features] [features]
default = ["analytics", "meilisearch-types/all-tokenizations", "mini-dashboard"] default = ["analytics", "meilisearch-types/all-tokenizations", "mini-dashboard"]
analytics = ["segment"] analytics = ["segment"]
profile-with-puffin = ["dep:puffin_http"]
mini-dashboard = [ mini-dashboard = [
"actix-web-static-files", "actix-web-static-files",
"static-files", "static-files",

View File

@ -29,6 +29,10 @@ fn setup(opt: &Opt) -> anyhow::Result<()> {
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let (opt, config_read_from) = Opt::try_build()?; let (opt, config_read_from) = Opt::try_build()?;
#[cfg(feature = "profile-with-puffin")]
let _server = puffin_http::Server::new(&format!("0.0.0.0:{}", puffin_http::DEFAULT_PORT))?;
puffin::set_scopes_on(cfg!(feature = "profile-with-puffin"));
anyhow::ensure!( anyhow::ensure!(
!(cfg!(windows) && opt.experimental_reduce_indexing_memory_usage), !(cfg!(windows) && opt.experimental_reduce_indexing_memory_usage),
"The `experimental-reduce-indexing-memory-usage` flag is not supported on Windows" "The `experimental-reduce-indexing-memory-usage` flag is not supported on Windows"

View File

@ -67,6 +67,9 @@ filter-parser = { path = "../filter-parser" }
# documents words self-join # documents words self-join
itertools = "0.10.5" itertools = "0.10.5"
# profiling
puffin = "0.16.0"
# logging # logging
log = "0.4.17" log = "0.4.17"
logging_timer = "1.1.0" logging_timer = "1.1.0"

View File

@ -15,6 +15,8 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
} }
pub fn execute(self) -> Result<u64> { pub fn execute(self) -> Result<u64> {
puffin::profile_function!();
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?; self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
let Index { let Index {
env: _env, env: _env,

View File

@ -110,6 +110,8 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
Some(docid) Some(docid)
} }
pub fn execute(self) -> Result<DocumentDeletionResult> { pub fn execute(self) -> Result<DocumentDeletionResult> {
puffin::profile_function!();
let DetailedDocumentDeletionResult { deleted_documents, remaining_documents } = let DetailedDocumentDeletionResult { deleted_documents, remaining_documents } =
self.execute_inner()?; self.execute_inner()?;

View File

@ -31,6 +31,8 @@ pub fn enrich_documents_batch<R: Read + Seek>(
autogenerate_docids: bool, autogenerate_docids: bool,
reader: DocumentsBatchReader<R>, reader: DocumentsBatchReader<R>,
) -> Result<StdResult<EnrichedDocumentsBatchReader<R>, UserError>> { ) -> Result<StdResult<EnrichedDocumentsBatchReader<R>, UserError>> {
puffin::profile_function!();
let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index(); let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index();
let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?; let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?;

View File

@ -30,6 +30,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
stop_words: Option<&fst::Set<&[u8]>>, stop_words: Option<&fst::Set<&[u8]>>,
max_positions_per_attributes: Option<u32>, max_positions_per_attributes: Option<u32>,
) -> Result<(RoaringBitmap, grenad::Reader<File>, ScriptLanguageDocidsMap)> { ) -> Result<(RoaringBitmap, grenad::Reader<File>, ScriptLanguageDocidsMap)> {
puffin::profile_function!();
let max_positions_per_attributes = max_positions_per_attributes let max_positions_per_attributes = max_positions_per_attributes
.map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE)); .map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE));
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();

View File

@ -20,6 +20,8 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
docid_fid_facet_number: grenad::Reader<R>, docid_fid_facet_number: grenad::Reader<R>,
indexer: GrenadParameters, indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> { ) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut facet_number_docids_sorter = create_sorter( let mut facet_number_docids_sorter = create_sorter(

View File

@ -18,6 +18,8 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
docid_fid_facet_string: grenad::Reader<R>, docid_fid_facet_string: grenad::Reader<R>,
indexer: GrenadParameters, indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> { ) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut facet_string_docids_sorter = create_sorter( let mut facet_string_docids_sorter = create_sorter(

View File

@ -34,6 +34,8 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
faceted_fields: &HashSet<FieldId>, faceted_fields: &HashSet<FieldId>,
) -> Result<ExtractedFacetValues> { ) -> Result<ExtractedFacetValues> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut fid_docid_facet_numbers_sorter = create_sorter( let mut fid_docid_facet_numbers_sorter = create_sorter(

View File

@ -22,6 +22,8 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>, docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters, indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> { ) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut fid_word_count_docids_sorter = create_sorter( let mut fid_word_count_docids_sorter = create_sorter(

View File

@ -19,6 +19,8 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
primary_key_id: FieldId, primary_key_id: FieldId,
(lat_fid, lng_fid): (FieldId, FieldId), (lat_fid, lng_fid): (FieldId, FieldId),
) -> Result<grenad::Reader<File>> { ) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let mut writer = create_writer( let mut writer = create_writer(
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,

View File

@ -19,6 +19,8 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
primary_key_id: FieldId, primary_key_id: FieldId,
vectors_fid: FieldId, vectors_fid: FieldId,
) -> Result<grenad::Reader<File>> { ) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let mut writer = create_writer( let mut writer = create_writer(
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,

View File

@ -27,6 +27,8 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
exact_attributes: &HashSet<FieldId>, exact_attributes: &HashSet<FieldId>,
) -> Result<(grenad::Reader<File>, grenad::Reader<File>)> { ) -> Result<(grenad::Reader<File>, grenad::Reader<File>)> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut word_docids_sorter = create_sorter( let mut word_docids_sorter = create_sorter(

View File

@ -15,6 +15,8 @@ pub fn extract_word_fid_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>, docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters, indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> { ) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut word_fid_docids_sorter = create_sorter( let mut word_fid_docids_sorter = create_sorter(

View File

@ -21,6 +21,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>, docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters, indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> { ) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut word_pair_proximity_docids_sorter = create_sorter( let mut word_pair_proximity_docids_sorter = create_sorter(

View File

@ -18,6 +18,8 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>, docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters, indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> { ) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut word_position_docids_sorter = create_sorter( let mut word_position_docids_sorter = create_sorter(

View File

@ -52,6 +52,8 @@ pub(crate) fn data_from_obkv_documents(
max_positions_per_attributes: Option<u32>, max_positions_per_attributes: Option<u32>,
exact_attributes: HashSet<FieldId>, exact_attributes: HashSet<FieldId>,
) -> Result<()> { ) -> Result<()> {
puffin::profile_function!();
original_obkv_chunks original_obkv_chunks
.par_bridge() .par_bridge()
.map(|original_documents_chunk| { .map(|original_documents_chunk| {
@ -238,11 +240,13 @@ fn spawn_extraction_task<FE, FS, M>(
M::Output: Send, M::Output: Send,
{ {
rayon::spawn(move || { rayon::spawn(move || {
puffin::profile_scope!("extract_multiple_chunks", name);
let chunks: Result<M> = let chunks: Result<M> =
chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer)).collect(); chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer)).collect();
rayon::spawn(move || match chunks { rayon::spawn(move || match chunks {
Ok(chunks) => { Ok(chunks) => {
debug!("merge {} database", name); debug!("merge {} database", name);
puffin::profile_scope!("merge_multiple_chunks", name);
let reader = chunks.merge(merge_fn, &indexer); let reader = chunks.merge(merge_fn, &indexer);
let _ = lmdb_writer_sx.send(reader.map(serialize_fn)); let _ = lmdb_writer_sx.send(reader.map(serialize_fn));
} }

View File

@ -214,6 +214,7 @@ pub fn sorter_into_lmdb_database(
sorter: Sorter<MergeFn>, sorter: Sorter<MergeFn>,
merge: MergeFn, merge: MergeFn,
) -> Result<()> { ) -> Result<()> {
puffin::profile_function!();
debug!("Writing MTBL sorter..."); debug!("Writing MTBL sorter...");
let before = Instant::now(); let before = Instant::now();

View File

@ -137,6 +137,8 @@ where
mut self, mut self,
reader: DocumentsBatchReader<R>, reader: DocumentsBatchReader<R>,
) -> Result<(Self, StdResult<u64, UserError>)> { ) -> Result<(Self, StdResult<u64, UserError>)> {
puffin::profile_function!();
// Early return when there is no document to add // Early return when there is no document to add
if reader.is_empty() { if reader.is_empty() {
return Ok((self, Ok(0))); return Ok((self, Ok(0)));
@ -175,6 +177,8 @@ where
mut self, mut self,
to_delete: Vec<String>, to_delete: Vec<String>,
) -> Result<(Self, StdResult<u64, UserError>)> { ) -> Result<(Self, StdResult<u64, UserError>)> {
puffin::profile_function!();
// Early return when there is no document to add // Early return when there is no document to add
if to_delete.is_empty() { if to_delete.is_empty() {
return Ok((self, Ok(0))); return Ok((self, Ok(0)));
@ -194,6 +198,8 @@ where
#[logging_timer::time("IndexDocuments::{}")] #[logging_timer::time("IndexDocuments::{}")]
pub fn execute(mut self) -> Result<DocumentAdditionResult> { pub fn execute(mut self) -> Result<DocumentAdditionResult> {
puffin::profile_function!();
if self.added_documents == 0 { if self.added_documents == 0 {
let number_of_documents = self.index.number_of_documents(self.wtxn)?; let number_of_documents = self.index.number_of_documents(self.wtxn)?;
return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents }); return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents });
@ -232,6 +238,8 @@ where
FP: Fn(UpdateIndexingStep) + Sync, FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync, FA: Fn() -> bool + Sync,
{ {
puffin::profile_function!();
let TransformOutput { let TransformOutput {
primary_key, primary_key,
fields_ids_map, fields_ids_map,
@ -322,6 +330,7 @@ where
// Run extraction pipeline in parallel. // Run extraction pipeline in parallel.
pool.install(|| { pool.install(|| {
puffin::profile_scope!("extract_and_send_grenad_chunks");
// split obkv file into several chunks // split obkv file into several chunks
let original_chunk_iter = let original_chunk_iter =
grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size); grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size);
@ -477,6 +486,8 @@ where
FP: Fn(UpdateIndexingStep) + Sync, FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync, FA: Fn() -> bool + Sync,
{ {
puffin::profile_function!();
// Merged databases are already been indexed, we start from this count; // Merged databases are already been indexed, we start from this count;
let mut databases_seen = MERGED_DATABASE_COUNT; let mut databases_seen = MERGED_DATABASE_COUNT;
@ -511,26 +522,36 @@ where
return Err(Error::InternalError(InternalError::AbortedIndexation)); return Err(Error::InternalError(InternalError::AbortedIndexation));
} }
let current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; let current_prefix_fst;
let common_prefix_fst_words_tmp;
let common_prefix_fst_words: Vec<_>;
let new_prefix_fst_words;
let del_prefix_fst_words;
{
puffin::profile_scope!("compute_prefix_diffs");
current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
// We retrieve the common words between the previous and new prefix word fst. // We retrieve the common words between the previous and new prefix word fst.
let common_prefix_fst_words = fst_stream_into_vec( common_prefix_fst_words_tmp = fst_stream_into_vec(
previous_words_prefixes_fst.op().add(&current_prefix_fst).intersection(), previous_words_prefixes_fst.op().add(&current_prefix_fst).intersection(),
); );
let common_prefix_fst_words: Vec<_> = common_prefix_fst_words common_prefix_fst_words = common_prefix_fst_words_tmp
.as_slice() .as_slice()
.linear_group_by_key(|x| x.chars().next().unwrap()) .linear_group_by_key(|x| x.chars().next().unwrap())
.collect(); .collect();
// We retrieve the newly added words between the previous and new prefix word fst. // We retrieve the newly added words between the previous and new prefix word fst.
let new_prefix_fst_words = fst_stream_into_vec( new_prefix_fst_words = fst_stream_into_vec(
current_prefix_fst.op().add(&previous_words_prefixes_fst).difference(), current_prefix_fst.op().add(&previous_words_prefixes_fst).difference(),
); );
// We compute the set of prefixes that are no more part of the prefix fst. // We compute the set of prefixes that are no more part of the prefix fst.
let del_prefix_fst_words = fst_stream_into_hashset( del_prefix_fst_words = fst_stream_into_hashset(
previous_words_prefixes_fst.op().add(&current_prefix_fst).difference(), previous_words_prefixes_fst.op().add(&current_prefix_fst).difference(),
); );
}
databases_seen += 1; databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
@ -668,6 +689,8 @@ fn execute_word_prefix_docids(
common_prefix_fst_words: &[&[String]], common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>, del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> { ) -> Result<()> {
puffin::profile_function!();
let cursor = reader.into_cursor()?; let cursor = reader.into_cursor()?;
let mut builder = WordPrefixDocids::new(txn, word_docids_db, word_prefix_docids_db); let mut builder = WordPrefixDocids::new(txn, word_docids_db, word_prefix_docids_db);
builder.chunk_compression_type = indexer_config.chunk_compression_type; builder.chunk_compression_type = indexer_config.chunk_compression_type;

View File

@ -558,6 +558,8 @@ impl<'a, 'i> Transform<'a, 'i> {
where where
F: Fn(UpdateIndexingStep) + Sync, F: Fn(UpdateIndexingStep) + Sync,
{ {
puffin::profile_function!();
let primary_key = self let primary_key = self
.index .index
.primary_key(wtxn)? .primary_key(wtxn)?

View File

@ -49,6 +49,66 @@ pub(crate) enum TypedChunk {
ScriptLanguageDocids(HashMap<(Script, Language), RoaringBitmap>), ScriptLanguageDocids(HashMap<(Script, Language), RoaringBitmap>),
} }
impl TypedChunk {
pub fn to_debug_string(&self) -> String {
match self {
TypedChunk::FieldIdDocidFacetStrings(grenad) => {
format!("FieldIdDocidFacetStrings {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdDocidFacetNumbers(grenad) => {
format!("FieldIdDocidFacetNumbers {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::Documents(grenad) => {
format!("Documents {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdWordcountDocids(grenad) => {
format!("FieldIdWordcountDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::NewDocumentsIds(grenad) => {
format!("NewDocumentsIds {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => format!(
"WordDocids {{ word_docids_reader: {}, exact_word_docids_reader: {} }}",
word_docids_reader.len(),
exact_word_docids_reader.len()
),
TypedChunk::WordPositionDocids(grenad) => {
format!("WordPositionDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordFidDocids(grenad) => {
format!("WordFidDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordPairProximityDocids(grenad) => {
format!("WordPairProximityDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetStringDocids(grenad) => {
format!("FieldIdFacetStringDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetNumberDocids(grenad) => {
format!("FieldIdFacetNumberDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetExistsDocids(grenad) => {
format!("FieldIdFacetExistsDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetIsNullDocids(grenad) => {
format!("FieldIdFacetIsNullDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetIsEmptyDocids(grenad) => {
format!("FieldIdFacetIsEmptyDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::GeoPoints(grenad) => {
format!("GeoPoints {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::VectorPoints(grenad) => {
format!("VectorPoints {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::ScriptLanguageDocids(grenad) => {
format!("ScriptLanguageDocids {{ number_of_entries: {} }}", grenad.len())
}
}
}
}
/// Write typed chunk in the corresponding LMDB database of the provided index. /// Write typed chunk in the corresponding LMDB database of the provided index.
/// Return new documents seen. /// Return new documents seen.
pub(crate) fn write_typed_chunk_into_index( pub(crate) fn write_typed_chunk_into_index(
@ -57,6 +117,8 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
index_is_empty: bool, index_is_empty: bool,
) -> Result<(RoaringBitmap, bool)> { ) -> Result<(RoaringBitmap, bool)> {
puffin::profile_function!(typed_chunk.to_debug_string());
let mut is_merged_database = false; let mut is_merged_database = false;
match typed_chunk { match typed_chunk {
TypedChunk::Documents(obkv_documents_iter) => { TypedChunk::Documents(obkv_documents_iter) => {
@ -336,6 +398,8 @@ where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>, FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>, FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>,
{ {
puffin::profile_function!(format!("number of entries: {}", data.len()));
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let database = database.remap_types::<ByteSlice, ByteSlice>(); let database = database.remap_types::<ByteSlice, ByteSlice>();
@ -378,6 +442,8 @@ where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>, FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>, FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>,
{ {
puffin::profile_function!(format!("number of entries: {}", data.len()));
if !index_is_empty { if !index_is_empty {
return write_entries_into_database( return write_entries_into_database(
data, data,

View File

@ -50,6 +50,8 @@ impl<'t, 'u, 'i> PrefixWordPairsProximityDocids<'t, 'u, 'i> {
common_prefix_fst_words: &[&'a [String]], common_prefix_fst_words: &[&'a [String]],
del_prefix_fst_words: &HashSet<Vec<u8>>, del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> { ) -> Result<()> {
puffin::profile_function!();
index_word_prefix_database( index_word_prefix_database(
self.wtxn, self.wtxn,
self.index.word_pair_proximity_docids, self.index.word_pair_proximity_docids,

View File

@ -27,6 +27,8 @@ pub fn index_prefix_word_database(
chunk_compression_type: CompressionType, chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>, chunk_compression_level: Option<u32>,
) -> Result<()> { ) -> Result<()> {
puffin::profile_function!();
let max_proximity = max_proximity - 1; let max_proximity = max_proximity - 1;
debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk..."); debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk...");

View File

@ -191,6 +191,7 @@ pub fn index_word_prefix_database(
chunk_compression_type: CompressionType, chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>, chunk_compression_level: Option<u32>,
) -> Result<()> { ) -> Result<()> {
puffin::profile_function!();
debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk..."); debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk...");
// Make a prefix trie from the common prefixes that are shorter than self.max_prefix_length // Make a prefix trie from the common prefixes that are shorter than self.max_prefix_length

View File

@ -303,6 +303,8 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
FP: Fn(UpdateIndexingStep) + Sync, FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync, FA: Fn() -> bool + Sync,
{ {
puffin::profile_function!();
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
// if the settings are set before any document update, we don't need to do anything, and // if the settings are set before any document update, we don't need to do anything, and
// will set the primary key during the first document addition. // will set the primary key during the first document addition.

View File

@ -45,6 +45,8 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
common_prefix_fst_words: &[&[String]], common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>, del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> { ) -> Result<()> {
puffin::profile_function!();
// It is forbidden to keep a mutable reference into the database // It is forbidden to keep a mutable reference into the database
// and write into it at the same time, therefore we write into another file. // and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter( let mut prefix_docids_sorter = create_sorter(

View File

@ -50,6 +50,7 @@ impl<'t, 'u, 'i> WordPrefixIntegerDocids<'t, 'u, 'i> {
common_prefix_fst_words: &[&[String]], common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>, del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> { ) -> Result<()> {
puffin::profile_function!();
debug!("Computing and writing the word levels integers docids into LMDB on disk..."); debug!("Computing and writing the word levels integers docids into LMDB on disk...");
let mut prefix_integer_docids_sorter = create_sorter( let mut prefix_integer_docids_sorter = create_sorter(

View File

@ -42,6 +42,8 @@ impl<'t, 'u, 'i> WordsPrefixesFst<'t, 'u, 'i> {
#[logging_timer::time("WordsPrefixesFst::{}")] #[logging_timer::time("WordsPrefixesFst::{}")]
pub fn execute(self) -> Result<()> { pub fn execute(self) -> Result<()> {
puffin::profile_function!();
let words_fst = self.index.words_fst(self.wtxn)?; let words_fst = self.index.words_fst(self.wtxn)?;
let mut current_prefix = vec![SmallString32::new(); self.max_prefix_length]; let mut current_prefix = vec![SmallString32::new(); self.max_prefix_length];