it compiles, now tokio is complaining that we block the main thread (and is right), I need to patch the zookeeper client to not rely on tokio anymore

This commit is contained in:
Tamo 2023-11-02 12:55:16 +01:00
parent 366144146b
commit 0c18962b13
2 changed files with 60 additions and 9 deletions

2
Cargo.lock generated
View File

@ -4964,7 +4964,7 @@ dependencies = [
[[package]]
name = "zookeeper-client-sync"
version = "0.1.0"
source = "git+https://github.com/irevoire/zookeeper-client-sync.git#26814be86726425692a40903d9ee3ec2d974012a"
source = "git+https://github.com/irevoire/zookeeper-client-sync.git#e781ad856d23b655c2c74d51833e04972e400207"
dependencies = [
"futures",
"once_cell",

View File

@ -370,7 +370,7 @@ impl IndexScheduler {
// The lowest in the list is the leader. And if we're not the leader
// we watch the node right before us to be notified if he dies.
// See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection
let clusterized = match self.zookeeper.clone() {
let mut clusterized = match self.zookeeper.clone() {
Some(zk) => {
// First, load the already existing tasks in the cluster, or, if we're the first one to join the cluster, create the task directory.
let tasks_watcher = match zk.create(
@ -492,7 +492,7 @@ impl IndexScheduler {
let watchers = zk.multi_watcher([leader_watcher, snapshot_watcher]);
Some((zk, watchers))
Some((zk, string_id, watchers))
}
}
None => None,
@ -500,12 +500,63 @@ impl IndexScheduler {
let this = self.clone();
thread::spawn(move || {
if let Some((zk, watchers)) = clusterized {
// We're a follower
todo!()
} else {
// We're a leader or not running in a cluster
loop {
loop {
if let Some((zk, id, watchers)) = &mut clusterized {
// We're a follower
let event = watchers.changed();
if event.path.starts_with("election") {
if event.event_type == EventType::NodeDeleted {
// The person we were following died, we must see if we're the new leader
let (mut childrens, _) = zk.get_children("/election").unwrap();
childrens.sort_unstable();
if childrens[0].ends_with(id.as_str()) {
log::warn!("I'm the leader");
clusterized = None;
} else {
log::warn!("The node I was watching died but there is still a leader somewhere");
let should_watch = childrens
.into_iter()
.rfind(|path| &path[path.len() - id.len()..] < id.as_str())
.unwrap();
let watcher =
zk.watch(&should_watch, AddWatchMode::Persistent).unwrap();
watchers.replace(event, watcher).unwrap();
}
} else {
log::error!(
"Got unexpected event from the election watcher: {}",
event.event_type
);
}
} else if event.path.starts_with("snapshot") {
match event.event_type {
EventType::Session => panic!("disconnected"),
EventType::NodeCreated => {
// TODO: we should stop receiving updates at this
// point and until the snapshot is imported.
log::info!("A new snapshot is in preparation: {}", event.path)
}
EventType::NodeDeleted => {
log::info!("An old snapshot has been deleted")
}
EventType::NodeDataChanged => {
log::info!("A snapshot is ready to be imported: {}", event.path);
let snapshot_id =
event.path.strip_prefix("/snapshots/snapshot-").unwrap();
let snapshot_dir = format!("snapshots/{}", snapshot_id);
load_snapshot(&this, &snapshot_dir).unwrap();
log::info!("Snapshot `{snapshot_id}` successfully imported");
}
EventType::NodeChildrenChanged => {
panic!("Unknown `NodeChildrenChanged` event")
}
}
}
} else {
// We're a leader or not running in a cluster
// we're either a leader or not running in a cluster,
// either way we should wait until we receive a task.
this.wake_up.wait();