diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 35339271b..e9ae84fff 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -38,6 +38,7 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; use std::sync::{Arc, RwLock}; +use std::thread; use std::time::Duration; use dump::{KindDump, TaskDump, UpdateFile}; @@ -615,6 +616,14 @@ impl IndexScheduler { let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string()); latch.start().unwrap(); + let this = self.private_clone(); + zookeeper + .add_watch("/election", AddWatchMode::PersistentRecursive, move |_| { + thread::sleep(Duration::from_secs(1)); + this.wake_up.signal(); + }) + .unwrap(); + // Join the potential leaders list. // The lowest in the list is the leader. And if we're not the leader // we watch the node right before us to be notified if he dies. @@ -773,7 +782,7 @@ impl IndexScheduler { }; let this = self.private_clone(); - std::thread::spawn(move || { + thread::spawn(move || { loop { // we're either a leader or not running in a cluster, // either way we should wait until we receive a task. @@ -876,7 +885,7 @@ impl IndexScheduler { .clone(); // we don't want to hold the mutex log::info!("Deleting {} processed tasks", processed.len()); for task in processed { - let node = dbg!(format!("/tasks/task-{:0>10?}", task as i32)); + let node = format!("/tasks/task-{:0>10?}", task as i32); let _ = zookeeper // we don't want to crash if we can't delete an update file. .delete(&node, None) .unwrap(); @@ -889,7 +898,7 @@ impl IndexScheduler { log::error!("{}", e); // Wait one second when an irrecoverable error occurs. if !e.is_recoverable() { - std::thread::sleep(Duration::from_secs(1)); + thread::sleep(Duration::from_secs(1)); } } } @@ -1433,7 +1442,7 @@ impl IndexScheduler { // 2. Process the tasks let res = { let cloned_index_scheduler = self.private_clone(); - let handle = std::thread::Builder::new() + let handle = thread::Builder::new() .name(String::from("batch-operation")) .spawn(move || cloned_index_scheduler.process_batch(batch)) .unwrap();