React to changes towards the cluster members

This commit is contained in:
Clément Renault 2023-08-30 17:23:08 +02:00
parent 2d1434da81
commit 8c3ad57ef9
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -38,6 +38,7 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
use dump::{KindDump, TaskDump, UpdateFile};
@ -615,6 +616,14 @@ impl IndexScheduler {
let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string());
latch.start().unwrap();
let this = self.private_clone();
zookeeper
.add_watch("/election", AddWatchMode::PersistentRecursive, move |_| {
thread::sleep(Duration::from_secs(1));
this.wake_up.signal();
})
.unwrap();
// Join the potential leaders list.
// The lowest in the list is the leader. And if we're not the leader
// we watch the node right before us to be notified if he dies.
@ -773,7 +782,7 @@ impl IndexScheduler {
};
let this = self.private_clone();
std::thread::spawn(move || {
thread::spawn(move || {
loop {
// we're either a leader or not running in a cluster,
// either way we should wait until we receive a task.
@ -876,7 +885,7 @@ impl IndexScheduler {
.clone(); // we don't want to hold the mutex
log::info!("Deleting {} processed tasks", processed.len());
for task in processed {
let node = dbg!(format!("/tasks/task-{:0>10?}", task as i32));
let node = format!("/tasks/task-{:0>10?}", task as i32);
let _ = zookeeper // we don't want to crash if we can't delete an update file.
.delete(&node, None)
.unwrap();
@ -889,7 +898,7 @@ impl IndexScheduler {
log::error!("{}", e);
// Wait one second when an irrecoverable error occurs.
if !e.is_recoverable() {
std::thread::sleep(Duration::from_secs(1));
thread::sleep(Duration::from_secs(1));
}
}
}
@ -1433,7 +1442,7 @@ impl IndexScheduler {
// 2. Process the tasks
let res = {
let cloned_index_scheduler = self.private_clone();
let handle = std::thread::Builder::new()
let handle = thread::Builder::new()
.name(String::from("batch-operation"))
.spawn(move || cloned_index_scheduler.process_batch(batch))
.unwrap();