diff --git a/Cargo.lock b/Cargo.lock index a29d22ec5..b22640253 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4964,7 +4964,7 @@ dependencies = [ [[package]] name = "zookeeper-client-sync" version = "0.1.0" -source = "git+https://github.com/irevoire/zookeeper-client-sync.git#26814be86726425692a40903d9ee3ec2d974012a" +source = "git+https://github.com/irevoire/zookeeper-client-sync.git#e781ad856d23b655c2c74d51833e04972e400207" dependencies = [ "futures", "once_cell", diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 8db96d2fb..169f963c4 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -370,7 +370,7 @@ impl IndexScheduler { // The lowest in the list is the leader. And if we're not the leader // we watch the node right before us to be notified if he dies. // See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection - let clusterized = match self.zookeeper.clone() { + let mut clusterized = match self.zookeeper.clone() { Some(zk) => { // First, load the already existing tasks in the cluster, or, if we're the first one to join the cluster, create the task directory. let tasks_watcher = match zk.create( @@ -492,7 +492,7 @@ impl IndexScheduler { let watchers = zk.multi_watcher([leader_watcher, snapshot_watcher]); - Some((zk, watchers)) + Some((zk, string_id, watchers)) } } None => None, @@ -500,12 +500,63 @@ impl IndexScheduler { let this = self.clone(); thread::spawn(move || { - if let Some((zk, watchers)) = clusterized { - // We're a follower - todo!() - } else { - // We're a leader or not running in a cluster - loop { + loop { + if let Some((zk, id, watchers)) = &mut clusterized { + // We're a follower + let event = watchers.changed(); + + if event.path.starts_with("election") { + if event.event_type == EventType::NodeDeleted { + // The person we were following died, we must see if we're the new leader + let (mut childrens, _) = zk.get_children("/election").unwrap(); + childrens.sort_unstable(); + if childrens[0].ends_with(id.as_str()) { + log::warn!("I'm the leader"); + clusterized = None; + } else { + log::warn!("The node I was watching died but there is still a leader somewhere"); + let should_watch = childrens + .into_iter() + .rfind(|path| &path[path.len() - id.len()..] < id.as_str()) + .unwrap(); + + let watcher = + zk.watch(&should_watch, AddWatchMode::Persistent).unwrap(); + + watchers.replace(event, watcher).unwrap(); + } + } else { + log::error!( + "Got unexpected event from the election watcher: {}", + event.event_type + ); + } + } else if event.path.starts_with("snapshot") { + match event.event_type { + EventType::Session => panic!("disconnected"), + EventType::NodeCreated => { + // TODO: we should stop receiving updates at this + // point and until the snapshot is imported. + log::info!("A new snapshot is in preparation: {}", event.path) + } + EventType::NodeDeleted => { + log::info!("An old snapshot has been deleted") + } + EventType::NodeDataChanged => { + log::info!("A snapshot is ready to be imported: {}", event.path); + let snapshot_id = + event.path.strip_prefix("/snapshots/snapshot-").unwrap(); + let snapshot_dir = format!("snapshots/{}", snapshot_id); + load_snapshot(&this, &snapshot_dir).unwrap(); + log::info!("Snapshot `{snapshot_id}` successfully imported"); + } + EventType::NodeChildrenChanged => { + panic!("Unknown `NodeChildrenChanged` event") + } + } + } + } else { + // We're a leader or not running in a cluster // we're either a leader or not running in a cluster, // either way we should wait until we receive a task. this.wake_up.wait();