Introduce an atomic to catch panics in thread pools

This commit is contained in:
Clément Renault 2024-04-22 16:25:58 +02:00
parent aa0bbbb246
commit 0c7003c5df
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 31 additions and 10 deletions

View File

@ -6,6 +6,7 @@ use std::num::ParseIntError;
use std::ops::Deref; use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{env, fmt, fs}; use std::{env, fmt, fs};
@ -666,15 +667,23 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> { fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
let pool_panic_catched = Arc::new(AtomicBool::new(false));
let thread_pool = rayon::ThreadPoolBuilder::new() let thread_pool = rayon::ThreadPoolBuilder::new()
.thread_name(|index| format!("indexing-thread:{index}")) .thread_name(|index| format!("indexing-thread:{index}"))
.num_threads(*other.max_indexing_threads) .num_threads(*other.max_indexing_threads)
.panic_handler({
// TODO What should we do with this Box<dyn Any + Send>.
// So, let's just set a value to true to cancel the task with a message for now.
let panic_cathed = pool_panic_catched.clone();
move |_result| panic_cathed.store(true, Ordering::SeqCst)
})
.build()?; .build()?;
Ok(Self { Ok(Self {
log_every_n: Some(DEFAULT_LOG_EVERY_N), log_every_n: Some(DEFAULT_LOG_EVERY_N),
max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize), max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize),
thread_pool: Some(thread_pool), thread_pool: Some(thread_pool),
pool_panic_catched,
max_positions_per_attributes: None, max_positions_per_attributes: None,
skip_index_budget: other.skip_index_budget, skip_index_budget: other.skip_index_budget,
..Default::default() ..Default::default()

View File

@ -8,6 +8,7 @@ use std::collections::{HashMap, HashSet};
use std::io::{Read, Seek}; use std::io::{Read, Seek};
use std::num::NonZeroU32; use std::num::NonZeroU32;
use std::result::Result as StdResult; use std::result::Result as StdResult;
use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
@ -296,20 +297,24 @@ where
let settings_diff = Arc::new(settings_diff); let settings_diff = Arc::new(settings_diff);
let backup_pool; let backup_pool;
let pool_catched_panic = self.indexer_config.pool_panic_catched.clone();
let pool = match self.indexer_config.thread_pool { let pool = match self.indexer_config.thread_pool {
Some(ref pool) => pool, Some(ref pool) => pool,
#[cfg(not(test))]
None => { None => {
// We initialize a bakcup pool with the default // We initialize a backup pool with the default
// settings if none have already been set. // settings if none have already been set.
backup_pool = rayon::ThreadPoolBuilder::new().build()?; let mut pool_builder = rayon::ThreadPoolBuilder::new();
&backup_pool pool_builder = pool_builder.panic_handler({
} let catched_panic = pool_catched_panic.clone();
#[cfg(test)] move |_result| catched_panic.store(true, Ordering::SeqCst)
None => { });
// We initialize a bakcup pool with the default
// settings if none have already been set. #[cfg(test)]
backup_pool = rayon::ThreadPoolBuilder::new().num_threads(1).build()?; {
pool_builder = pool_builder.num_threads(1);
}
backup_pool = pool_builder.build()?;
&backup_pool &backup_pool
} }
}; };

View File

@ -1,3 +1,6 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use grenad::CompressionType; use grenad::CompressionType;
use rayon::ThreadPool; use rayon::ThreadPool;
@ -10,6 +13,9 @@ pub struct IndexerConfig {
pub chunk_compression_type: CompressionType, pub chunk_compression_type: CompressionType,
pub chunk_compression_level: Option<u32>, pub chunk_compression_level: Option<u32>,
pub thread_pool: Option<ThreadPool>, pub thread_pool: Option<ThreadPool>,
/// Set to true if the thread pool catched a panic
/// and we must abort the task
pub pool_panic_catched: Arc<AtomicBool>,
pub max_positions_per_attributes: Option<u32>, pub max_positions_per_attributes: Option<u32>,
pub skip_index_budget: bool, pub skip_index_budget: bool,
} }
@ -24,6 +30,7 @@ impl Default for IndexerConfig {
chunk_compression_type: CompressionType::None, chunk_compression_type: CompressionType::None,
chunk_compression_level: None, chunk_compression_level: None,
thread_pool: None, thread_pool: None,
pool_panic_catched: Arc::default(),
max_positions_per_attributes: None, max_positions_per_attributes: None,
skip_index_budget: false, skip_index_budget: false,
} }