From 93afeedceabd90cec287ee8f11e436a9151fa4d8 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 16 Nov 2022 09:50:47 +0100 Subject: [PATCH 1/2] Spawn threads with names --- index-scheduler/src/index_mapper.rs | 35 ++++++++++++---------- index-scheduler/src/lib.rs | 46 ++++++++++++++++------------- meilisearch-http/src/lib.rs | 15 ++++++---- 3 files changed, 54 insertions(+), 42 deletions(-) diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 80e4127c0..a647012fe 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -126,24 +126,27 @@ impl IndexMapper { let index_map = self.index_map.clone(); let index_path = self.base_path.join(uuid.to_string()); let index_name = name.to_string(); - thread::spawn(move || { - // We first wait to be sure that the previously opened index is effectively closed. - // This can take a lot of time, this is why we do that in a seperate thread. - if let Some(closing_event) = closing_event { - closing_event.wait(); - } + thread::Builder::new() + .name(String::from("index_deleter")) + .spawn(move || { + // We first wait to be sure that the previously opened index is effectively closed. + // This can take a lot of time, this is why we do that in a seperate thread. + if let Some(closing_event) = closing_event { + closing_event.wait(); + } - // Then we remove the content from disk. - if let Err(e) = fs::remove_dir_all(&index_path) { - error!( - "An error happened when deleting the index {} ({}): {}", - index_name, uuid, e - ); - } + // Then we remove the content from disk. + if let Err(e) = fs::remove_dir_all(&index_path) { + error!( + "An error happened when deleting the index {} ({}): {}", + index_name, uuid, e + ); + } - // Finally we remove the entry from the index map. - assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted))); - }); + // Finally we remove the entry from the index map. + assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted))); + }) + .unwrap(); Ok(()) } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 1655acdac..956f85631 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -412,28 +412,31 @@ impl IndexScheduler { /// only once per index scheduler. fn run(&self) { let run = self.private_clone(); - std::thread::spawn(move || loop { - run.wake_up.wait(); + std::thread::Builder::new() + .name(String::from("scheduler")) + .spawn(move || loop { + run.wake_up.wait(); - match run.tick() { - Ok(0) => (), - Ok(_) => run.wake_up.signal(), - Err(e) => { - log::error!("{}", e); - // Wait one second when an irrecoverable error occurs. - if matches!( - e, - Error::CorruptedTaskQueue - | Error::TaskDatabaseUpdate(_) - | Error::HeedTransaction(_) - | Error::CreateBatch(_) - ) { - std::thread::sleep(Duration::from_secs(1)); + match run.tick() { + Ok(0) => (), + Ok(_) => run.wake_up.signal(), + Err(e) => { + log::error!("{}", e); + // Wait one second when an irrecoverable error occurs. + if matches!( + e, + Error::CorruptedTaskQueue + | Error::TaskDatabaseUpdate(_) + | Error::HeedTransaction(_) + | Error::CreateBatch(_) + ) { + std::thread::sleep(Duration::from_secs(1)); + } + run.wake_up.signal(); } - run.wake_up.signal(); } - } - }); + }) + .unwrap(); } pub fn indexer_config(&self) -> &IndexerConfig { @@ -925,7 +928,10 @@ impl IndexScheduler { // 2. Process the tasks let res = { let cloned_index_scheduler = self.private_clone(); - let handle = std::thread::spawn(move || cloned_index_scheduler.process_batch(batch)); + let handle = std::thread::Builder::new() + .name(String::from("batch-operation")) + .spawn(move || cloned_index_scheduler.process_batch(batch)) + .unwrap(); handle.join().unwrap_or(Err(Error::ProcessBatchPanicked)) }; diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index 9a3ce857e..d47dced57 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -204,12 +204,15 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Auth if opt.schedule_snapshot { let snapshot_delay = Duration::from_secs(opt.snapshot_interval_sec); let index_scheduler = index_scheduler.clone(); - thread::spawn(move || loop { - thread::sleep(snapshot_delay); - if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) { - error!("Error while registering snapshot: {}", e); - } - }); + thread::Builder::new() + .name(String::from("register-snapshot-tasks")) + .spawn(move || loop { + thread::sleep(snapshot_delay); + if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) { + error!("Error while registering snapshot: {}", e); + } + }) + .unwrap(); } Ok((index_scheduler, auth_controller)) From 1a1ede96de6f882a69fb5cd2a9a0dc52ca22b7ab Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 16 Nov 2022 10:28:25 +0100 Subject: [PATCH 2/2] Spawn rayon threads with names --- meilisearch-http/src/option.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/meilisearch-http/src/option.rs b/meilisearch-http/src/option.rs index 91608d0d2..a73e4ce19 100644 --- a/meilisearch-http/src/option.rs +++ b/meilisearch-http/src/option.rs @@ -501,8 +501,10 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { type Error = anyhow::Error; fn try_from(other: &IndexerOpts) -> Result { - let thread_pool = - rayon::ThreadPoolBuilder::new().num_threads(*other.max_indexing_threads).build()?; + let thread_pool = rayon::ThreadPoolBuilder::new() + .thread_name(|index| format!("indexing-thread:{index}")) + .num_threads(*other.max_indexing_threads) + .build()?; Ok(Self { log_every_n: Some(other.log_every_n),