diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 4ec0b064c..ff7cdfc64 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -366,94 +366,35 @@ impl IndexScheduler { #[cfg(test)] self.inner().breakpoint(Breakpoint::Init); - let latch = match self.zookeeper { - Some(ref zookeeper) => { - let id = Uuid::new_v4().to_string(); - let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string()); - let wake_up = self.wake_up.clone(); - let latchc = latch.clone(); - zookeeper - .add_watch("/election", AddWatchMode::PersistentRecursive, move |_| { - if latchc.has_leadership() { - log::info!("I am the leader!"); - } else { - log::info!("I am *not* the leader!"); - } - thread::sleep(Duration::from_secs(1)); - wake_up.signal(); - }) - .unwrap(); - - latch.start().unwrap(); - - // Join the potential leaders list. - // The lowest in the list is the leader. And if we're not the leader - // we watch the node right before us to be notified if he dies. - // See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection - let latchc = latch.clone(); - let this = self.clone(); - let watcher = - zookeeper.watch("/snapshots", AddWatchMode::PersistentRecursive).unwrap(); - watcher.run_on_change(move |event| { - if !latchc.has_leadership() { - let WatchedEvent { event_type, path, .. } = event; - match event_type { - EventType::NodeCreated => { - log::info!("The snapshot {} is in preparation", path); - } - EventType::NodeDataChanged => { - let snapshot_id = - path.strip_prefix("/snapshots/snapshot-").unwrap(); - let snapshot_dir = format!("snapshots/{}", snapshot_id); - load_snapshot(&this, &snapshot_dir).unwrap(); - } - otherwise => panic!("{otherwise:?}"), - } - } - }); - - { - // TODO we must lock the IndexSchedulerInner here to make sure that we don't - // load this snapshot after the upper watcher find a more recent one. - let (mut snapshots, _) = zookeeper.get_children("/snapshots").unwrap(); - snapshots.sort_unstable(); - for snapshot_name in dbg!(snapshots).iter().rev() { - let (_, snapshot_id) = snapshot_name.rsplit_once('-').unwrap(); - let snapshot_path = format!("/snapshots/{snapshot_name}"); - match zookeeper.get_data(&snapshot_path) { - Ok((data, _stat)) => { - if data == b"ok" { - eprintln!("Loading snapshot {snapshot_path}"); - let s3_snapshot_path = format!("snapshots/{snapshot_id:0>10}"); - load_snapshot(self, &s3_snapshot_path).unwrap(); - break; - } - } - Err(ZkError::NoNode) => (), - Err(e) => panic!("Impossible to get data: {e}"), - } - } - } - - match zookeeper.create( + // Join the potential leaders list. + // The lowest in the list is the leader. And if we're not the leader + // we watch the node right before us to be notified if he dies. + // See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection + let clusterized = match self.zookeeper { + Some(zk) => { + // First, load the already existing tasks in the cluster, or, if we're the first one to join the cluster, create the task directory. + let tasks_watcher = match zk.create( "/tasks", &[], &CreateMode::Persistent.with_acls(Acls::anyone_all()), ) { - Ok(_) => (), + Ok(_) => zk.watch("/tasks", AddWatchMode::PersistentRecursive).unwrap(), Err(ZkError::NodeExists) => { log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk."); - let (children, _) = zookeeper + // We want to install the watcher asap on the tasks just to be sure we don't miss anything while importing the already existing tasks. + let tasks_watcher = + zk.watch("/tasks", AddWatchMode::PersistentRecursive).unwrap(); + let (children, _) = zk .get_children("/tasks") - .expect("Internal, the /tasks directory was deleted during execution."); // TODO change me + .expect("Internal, the /tasks directory was deleted during execution."); log::info!("Importing {} tasks", children.len()); let inner = self.inner(); let mut wtxn = inner.env.write_txn().unwrap(); for path in children { log::info!(" Importing {}", path); - match zookeeper.get_data(&format!("/tasks/{}", &path)) { + match zk.get_data(&format!("/tasks/{}", &path)) { Ok((data, _stat)) => { if data != b"ok" { log::info!(" Task {} was not \"ok\", skipping.", path); @@ -472,20 +413,17 @@ impl IndexScheduler { } Err(e) => panic!("{e}"), } - // else the file was deleted while we were inserting the key. We ignore it. - // TODO: What happens if someone updates the files before we have the time - // to setup the watcher } wtxn.commit().unwrap(); + tasks_watcher } Err(e) => panic!("{e}"), - } + }; - // TODO: fix unwrap by returning a clear error. + // Then insert all the received tasks into our task queue let this = self.clone(); - let zookeeperc = zookeeper.clone(); - let watcher = zookeeper.watch("/tasks", AddWatchMode::PersistentRecursive).unwrap(); - watcher.run_on_change(move |event| { + let zookeeperc = zk.clone(); + tasks_watcher.run_on_change(move |event| { let WatchedEvent { event_type, path, .. } = event; match event_type { EventType::NodeDataChanged => { @@ -513,19 +451,65 @@ impl IndexScheduler { this.wake_up.signal(); }); - Some(latch) + // First, create the directory that'll be used for the leader election + match zk.create( + "/election", + &[], + &CreateMode::Persistent.with_acls(Acls::anyone_all()), + ) { + Ok(_) => (), + Err(ZkError::NodeExists) => (), + Err(e) => return Err(e.into()), + }; + + let (_, id) = zk + .create( + "/election/node-", + &[], + &CreateMode::EphemeralSequential.with_acls(Acls::anyone_all()), + ) + .unwrap(); + let (mut childrens, _) = zk.get_children("/election").unwrap(); + childrens.sort_unstable(); + + let string_id = id.to_string(); + + // there is at least us in the childrens of election + if childrens[0].ends_with(id.to_string()) { + log::warn!("I'm the leader"); + None + } else { + let should_watch = childrens + .into_iter() + .rfind(|path| path[path.len() - id.len()..] < id) + .unwrap(); + log::warn!("I'm a follower When `{should_watch}` die I should check if I'm the new leader"); + + let leader_watcher = + zk.watch("/election/{watch}", AddWatchMode::Persistent).unwrap(); + let snapshot_watcher = + zk.watch("/snapshot", AddWatchMode::PersistentRecursive).unwrap(); + + let watchers = zk.multi_watcher([leader_watcher, snapshot_watcher]); + + Some((zk, watchers)) + } } None => None, }; let this = self.clone(); thread::spawn(move || { - loop { - // we're either a leader or not running in a cluster, - // either way we should wait until we receive a task. - this.wake_up.wait(); + if let Some((zk, watchers)) = clusterized { + // We're a follower + todo!() + } else { + // We're a leader or not running in a cluster + loop { + // we're either a leader or not running in a cluster, + // either way we should wait until we receive a task. + this.wake_up.wait(); - if latch.as_ref().map_or(true, |latch| latch.has_leadership()) { let inner = this.inner(); match inner.tick() { Ok(TickOutcome::TickAgain(n)) => {