diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 0764be428..da8602c25 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -485,10 +485,11 @@ impl IndexScheduler { .unwrap(); log::warn!("I'm a follower When `{should_watch}` die I should check if I'm the new leader"); - let leader_watcher = - zk.watch("/election/{watch}", AddWatchMode::Persistent).unwrap(); + let leader_watcher = zk + .watch(&format!("/election/{should_watch}"), AddWatchMode::Persistent) + .unwrap(); let snapshot_watcher = - zk.watch("/snapshot", AddWatchMode::PersistentRecursive).unwrap(); + zk.watch("/snapshots", AddWatchMode::PersistentRecursive).unwrap(); let watchers = zk.multi_watcher([leader_watcher, snapshot_watcher]); @@ -534,7 +535,7 @@ impl IndexScheduler { event.event_type ); } - } else if event.path.starts_with("snapshot") { + } else if event.path.starts_with("/snapshots") { match event.event_type { EventType::Session => panic!("disconnected"), EventType::NodeCreated => { @@ -547,8 +548,7 @@ impl IndexScheduler { } EventType::NodeDataChanged => { log::info!("A snapshot is ready to be imported: {}", event.path); - let snapshot_id = - event.path.strip_prefix("/snapshots/snapshot-").unwrap(); + let snapshot_id = event.path.strip_prefix("/snapshots/").unwrap(); let snapshot_dir = format!("snapshots/{}", snapshot_id); load_snapshot(&this, &snapshot_dir).unwrap(); log::info!("Snapshot `{snapshot_id}` successfully imported"); @@ -581,7 +581,7 @@ impl IndexScheduler { let rtxn = inner.env.read_txn().unwrap(); let (_, snapshot_id) = zookeeper .create( - "/snapshots/snapshot-", + "/snapshots/", &[], &CreateMode::PersistentSequential .with_acls(Acls::anyone_all()), @@ -589,7 +589,7 @@ impl IndexScheduler { .unwrap(); let zk_snapshots = format!("snapshots"); - let snapshot_dir = format!("{zk_snapshots}/{snapshot_id:0>10?}"); + let snapshot_dir = format!("{zk_snapshots}/{snapshot_id}"); let s3 = inner.options.s3.as_ref().unwrap(); @@ -603,7 +603,6 @@ impl IndexScheduler { // 2. Snapshot the index-scheduler LMDB env log::info!("Snapshotting the tasks"); let env = inner.env.clone(); - let snapshot_dir = format!("{zk_snapshots}/{snapshot_id:0>10?}"); let temp = TempDir::new().unwrap(); let mut file = env @@ -642,7 +641,7 @@ impl IndexScheduler { // we must notify everyone that we dropped a new snapshot on the s3 let _stat = zookeeper.set_data( - &format!("/snapshots/snapshot-{snapshot_id}"), + &format!("/snapshots/{snapshot_id}"), b"ok", None, ); @@ -919,13 +918,13 @@ fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> { log::info!("Downloading the index scheduler database."); let tasks_snapshot = format!("{path}/tasks.mdb"); - s3.get_object_to_writer(tasks_snapshot, &mut tasks_file)?; + s3.get_object_to_writer(tasks_snapshot, &mut tasks_file).unwrap(); log::info!("Downloading the indexes databases"); let indexes_files = tempfile::TempDir::new_in(&base_path)?; let src = format!("{path}/indexes"); - let uuids = s3.list_objects(&src)?.into_iter().map(|lbr| { + let uuids = s3.list_objects(&src)?.map(|lbr| { let key = lbr.unwrap().key; let (_, name) = key.rsplit_once('/').unwrap(); name.to_string()