This commit is contained in:
Tamo 2023-10-31 17:39:10 +01:00
parent c573261ac4
commit c3a8d4b7fb

View File

@ -366,94 +366,35 @@ impl IndexScheduler {
#[cfg(test)]
self.inner().breakpoint(Breakpoint::Init);
let latch = match self.zookeeper {
Some(ref zookeeper) => {
let id = Uuid::new_v4().to_string();
let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string());
let wake_up = self.wake_up.clone();
let latchc = latch.clone();
zookeeper
.add_watch("/election", AddWatchMode::PersistentRecursive, move |_| {
if latchc.has_leadership() {
log::info!("I am the leader!");
} else {
log::info!("I am *not* the leader!");
}
thread::sleep(Duration::from_secs(1));
wake_up.signal();
})
.unwrap();
latch.start().unwrap();
// Join the potential leaders list.
// The lowest in the list is the leader. And if we're not the leader
// we watch the node right before us to be notified if he dies.
// See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection
let latchc = latch.clone();
let this = self.clone();
let watcher =
zookeeper.watch("/snapshots", AddWatchMode::PersistentRecursive).unwrap();
watcher.run_on_change(move |event| {
if !latchc.has_leadership() {
let WatchedEvent { event_type, path, .. } = event;
match event_type {
EventType::NodeCreated => {
log::info!("The snapshot {} is in preparation", path);
}
EventType::NodeDataChanged => {
let snapshot_id =
path.strip_prefix("/snapshots/snapshot-").unwrap();
let snapshot_dir = format!("snapshots/{}", snapshot_id);
load_snapshot(&this, &snapshot_dir).unwrap();
}
otherwise => panic!("{otherwise:?}"),
}
}
});
{
// TODO we must lock the IndexSchedulerInner here to make sure that we don't
// load this snapshot after the upper watcher find a more recent one.
let (mut snapshots, _) = zookeeper.get_children("/snapshots").unwrap();
snapshots.sort_unstable();
for snapshot_name in dbg!(snapshots).iter().rev() {
let (_, snapshot_id) = snapshot_name.rsplit_once('-').unwrap();
let snapshot_path = format!("/snapshots/{snapshot_name}");
match zookeeper.get_data(&snapshot_path) {
Ok((data, _stat)) => {
if data == b"ok" {
eprintln!("Loading snapshot {snapshot_path}");
let s3_snapshot_path = format!("snapshots/{snapshot_id:0>10}");
load_snapshot(self, &s3_snapshot_path).unwrap();
break;
}
}
Err(ZkError::NoNode) => (),
Err(e) => panic!("Impossible to get data: {e}"),
}
}
}
match zookeeper.create(
let clusterized = match self.zookeeper {
Some(zk) => {
// First, load the already existing tasks in the cluster, or, if we're the first one to join the cluster, create the task directory.
let tasks_watcher = match zk.create(
"/tasks",
&[],
&CreateMode::Persistent.with_acls(Acls::anyone_all()),
) {
Ok(_) => (),
Ok(_) => zk.watch("/tasks", AddWatchMode::PersistentRecursive).unwrap(),
Err(ZkError::NodeExists) => {
log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk.");
let (children, _) = zookeeper
// We want to install the watcher asap on the tasks just to be sure we don't miss anything while importing the already existing tasks.
let tasks_watcher =
zk.watch("/tasks", AddWatchMode::PersistentRecursive).unwrap();
let (children, _) = zk
.get_children("/tasks")
.expect("Internal, the /tasks directory was deleted during execution."); // TODO change me
.expect("Internal, the /tasks directory was deleted during execution.");
log::info!("Importing {} tasks", children.len());
let inner = self.inner();
let mut wtxn = inner.env.write_txn().unwrap();
for path in children {
log::info!(" Importing {}", path);
match zookeeper.get_data(&format!("/tasks/{}", &path)) {
match zk.get_data(&format!("/tasks/{}", &path)) {
Ok((data, _stat)) => {
if data != b"ok" {
log::info!(" Task {} was not \"ok\", skipping.", path);
@ -472,20 +413,17 @@ impl IndexScheduler {
}
Err(e) => panic!("{e}"),
}
// else the file was deleted while we were inserting the key. We ignore it.
// TODO: What happens if someone updates the files before we have the time
// to setup the watcher
}
wtxn.commit().unwrap();
tasks_watcher
}
Err(e) => panic!("{e}"),
}
};
// TODO: fix unwrap by returning a clear error.
// Then insert all the received tasks into our task queue
let this = self.clone();
let zookeeperc = zookeeper.clone();
let watcher = zookeeper.watch("/tasks", AddWatchMode::PersistentRecursive).unwrap();
watcher.run_on_change(move |event| {
let zookeeperc = zk.clone();
tasks_watcher.run_on_change(move |event| {
let WatchedEvent { event_type, path, .. } = event;
match event_type {
EventType::NodeDataChanged => {
@ -513,19 +451,65 @@ impl IndexScheduler {
this.wake_up.signal();
});
Some(latch)
// First, create the directory that'll be used for the leader election
match zk.create(
"/election",
&[],
&CreateMode::Persistent.with_acls(Acls::anyone_all()),
) {
Ok(_) => (),
Err(ZkError::NodeExists) => (),
Err(e) => return Err(e.into()),
};
let (_, id) = zk
.create(
"/election/node-",
&[],
&CreateMode::EphemeralSequential.with_acls(Acls::anyone_all()),
)
.unwrap();
let (mut childrens, _) = zk.get_children("/election").unwrap();
childrens.sort_unstable();
let string_id = id.to_string();
// there is at least us in the childrens of election
if childrens[0].ends_with(id.to_string()) {
log::warn!("I'm the leader");
None
} else {
let should_watch = childrens
.into_iter()
.rfind(|path| path[path.len() - id.len()..] < id)
.unwrap();
log::warn!("I'm a follower When `{should_watch}` die I should check if I'm the new leader");
let leader_watcher =
zk.watch("/election/{watch}", AddWatchMode::Persistent).unwrap();
let snapshot_watcher =
zk.watch("/snapshot", AddWatchMode::PersistentRecursive).unwrap();
let watchers = zk.multi_watcher([leader_watcher, snapshot_watcher]);
Some((zk, watchers))
}
}
None => None,
};
let this = self.clone();
thread::spawn(move || {
if let Some((zk, watchers)) = clusterized {
// We're a follower
todo!()
} else {
// We're a leader or not running in a cluster
loop {
// we're either a leader or not running in a cluster,
// either way we should wait until we receive a task.
this.wake_up.wait();
if latch.as_ref().map_or(true, |latch| latch.has_leadership()) {
let inner = this.inner();
match inner.tick() {
Ok(TickOutcome::TickAgain(n)) => {