mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 20:15:07 +08:00
Merge #3061
3061: Name spawned threads r=irevoire a=dureuill # Pull Request ## Related issue None, this is to improve debuggability ## What does this PR do? - This PR replaces the raw `thread::spawn(...)` calls by `thread::Builder::new().name(...).spawn(...).unwrap()` calls so that we can give meaningful names to threads. - This PR also setup the `rayon` thread pool to give a name to its threads. - This improves debuggability, as the thread names are reported by debuggers: <img width="411" alt="Capture d’écran 2022-11-16 à 10 26 27" src="https://user-images.githubusercontent.com/41078892/202141870-a88663aa-d2f8-494f-b4da-709fdbd072ba.png"> (screen showing vscode's debugger and its main/scheduler/indexing threads) ## PR checklist Please check if your PR fulfills the following requirements: - [x] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)? - [x] Have you read the contributing guidelines? - [x] Have you made sure that the title is accurate and descriptive of the changes? Thank you so much for contributing to Meilisearch! Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
commit
8ddec58430
@ -126,24 +126,27 @@ impl IndexMapper {
|
|||||||
let index_map = self.index_map.clone();
|
let index_map = self.index_map.clone();
|
||||||
let index_path = self.base_path.join(uuid.to_string());
|
let index_path = self.base_path.join(uuid.to_string());
|
||||||
let index_name = name.to_string();
|
let index_name = name.to_string();
|
||||||
thread::spawn(move || {
|
thread::Builder::new()
|
||||||
// We first wait to be sure that the previously opened index is effectively closed.
|
.name(String::from("index_deleter"))
|
||||||
// This can take a lot of time, this is why we do that in a seperate thread.
|
.spawn(move || {
|
||||||
if let Some(closing_event) = closing_event {
|
// We first wait to be sure that the previously opened index is effectively closed.
|
||||||
closing_event.wait();
|
// This can take a lot of time, this is why we do that in a seperate thread.
|
||||||
}
|
if let Some(closing_event) = closing_event {
|
||||||
|
closing_event.wait();
|
||||||
|
}
|
||||||
|
|
||||||
// Then we remove the content from disk.
|
// Then we remove the content from disk.
|
||||||
if let Err(e) = fs::remove_dir_all(&index_path) {
|
if let Err(e) = fs::remove_dir_all(&index_path) {
|
||||||
error!(
|
error!(
|
||||||
"An error happened when deleting the index {} ({}): {}",
|
"An error happened when deleting the index {} ({}): {}",
|
||||||
index_name, uuid, e
|
index_name, uuid, e
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally we remove the entry from the index map.
|
// Finally we remove the entry from the index map.
|
||||||
assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted)));
|
assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted)));
|
||||||
});
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -412,28 +412,31 @@ impl IndexScheduler {
|
|||||||
/// only once per index scheduler.
|
/// only once per index scheduler.
|
||||||
fn run(&self) {
|
fn run(&self) {
|
||||||
let run = self.private_clone();
|
let run = self.private_clone();
|
||||||
std::thread::spawn(move || loop {
|
std::thread::Builder::new()
|
||||||
run.wake_up.wait();
|
.name(String::from("scheduler"))
|
||||||
|
.spawn(move || loop {
|
||||||
|
run.wake_up.wait();
|
||||||
|
|
||||||
match run.tick() {
|
match run.tick() {
|
||||||
Ok(0) => (),
|
Ok(0) => (),
|
||||||
Ok(_) => run.wake_up.signal(),
|
Ok(_) => run.wake_up.signal(),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::error!("{}", e);
|
log::error!("{}", e);
|
||||||
// Wait one second when an irrecoverable error occurs.
|
// Wait one second when an irrecoverable error occurs.
|
||||||
if matches!(
|
if matches!(
|
||||||
e,
|
e,
|
||||||
Error::CorruptedTaskQueue
|
Error::CorruptedTaskQueue
|
||||||
| Error::TaskDatabaseUpdate(_)
|
| Error::TaskDatabaseUpdate(_)
|
||||||
| Error::HeedTransaction(_)
|
| Error::HeedTransaction(_)
|
||||||
| Error::CreateBatch(_)
|
| Error::CreateBatch(_)
|
||||||
) {
|
) {
|
||||||
std::thread::sleep(Duration::from_secs(1));
|
std::thread::sleep(Duration::from_secs(1));
|
||||||
|
}
|
||||||
|
run.wake_up.signal();
|
||||||
}
|
}
|
||||||
run.wake_up.signal();
|
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
});
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn indexer_config(&self) -> &IndexerConfig {
|
pub fn indexer_config(&self) -> &IndexerConfig {
|
||||||
@ -925,7 +928,10 @@ impl IndexScheduler {
|
|||||||
// 2. Process the tasks
|
// 2. Process the tasks
|
||||||
let res = {
|
let res = {
|
||||||
let cloned_index_scheduler = self.private_clone();
|
let cloned_index_scheduler = self.private_clone();
|
||||||
let handle = std::thread::spawn(move || cloned_index_scheduler.process_batch(batch));
|
let handle = std::thread::Builder::new()
|
||||||
|
.name(String::from("batch-operation"))
|
||||||
|
.spawn(move || cloned_index_scheduler.process_batch(batch))
|
||||||
|
.unwrap();
|
||||||
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
|
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -204,12 +204,15 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
|
|||||||
if opt.schedule_snapshot {
|
if opt.schedule_snapshot {
|
||||||
let snapshot_delay = Duration::from_secs(opt.snapshot_interval_sec);
|
let snapshot_delay = Duration::from_secs(opt.snapshot_interval_sec);
|
||||||
let index_scheduler = index_scheduler.clone();
|
let index_scheduler = index_scheduler.clone();
|
||||||
thread::spawn(move || loop {
|
thread::Builder::new()
|
||||||
thread::sleep(snapshot_delay);
|
.name(String::from("register-snapshot-tasks"))
|
||||||
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
|
.spawn(move || loop {
|
||||||
error!("Error while registering snapshot: {}", e);
|
thread::sleep(snapshot_delay);
|
||||||
}
|
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
|
||||||
});
|
error!("Error while registering snapshot: {}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((index_scheduler, auth_controller))
|
Ok((index_scheduler, auth_controller))
|
||||||
|
@ -501,8 +501,10 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
|||||||
type Error = anyhow::Error;
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
|
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
|
||||||
let thread_pool =
|
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||||
rayon::ThreadPoolBuilder::new().num_threads(*other.max_indexing_threads).build()?;
|
.thread_name(|index| format!("indexing-thread:{index}"))
|
||||||
|
.num_threads(*other.max_indexing_threads)
|
||||||
|
.build()?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
log_every_n: Some(other.log_every_n),
|
log_every_n: Some(other.log_every_n),
|
||||||
|
Loading…
Reference in New Issue
Block a user