diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index b0fe0b432..dcb64a788 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -746,7 +746,12 @@ impl IndexScheduler { this.register_raw_task(&mut wtxn, &task).unwrap(); wtxn.commit().unwrap(); } - otherwise => panic!("{otherwise:?}"), + WatchedEventType::None + | WatchedEventType::NodeCreated + | WatchedEventType::NodeDeleted => (), + WatchedEventType::NodeChildrenChanged + | WatchedEventType::DataWatchRemoved + | WatchedEventType::ChildWatchRemoved => panic!("{event_type:?}"), } this.wake_up.signal(); @@ -775,7 +780,7 @@ impl IndexScheduler { continue; } - let snapshot_id = zookeeper + let snapshot_path = zookeeper .create( "/snapshots/snapshot-", vec![], @@ -784,11 +789,15 @@ impl IndexScheduler { ) .unwrap(); - dbg!(&snapshot_id); + let snapshot_id = snapshot_path + .rsplit_once('-') + .map(|(_, id)| id.parse::().unwrap()) + .unwrap(); + let zk_snapshots = format!("{}/zk-snapshots", env!("HOME")); std::fs::create_dir_all(&zk_snapshots).unwrap(); let snapshot_dir = - PathBuf::from(format!("{zk_snapshots}/{snapshot_id}")); + PathBuf::from(format!("{zk_snapshots}/{snapshot_id:0>10?}")); std::fs::create_dir(&snapshot_dir).unwrap(); // 1. Snapshot the version file. @@ -796,13 +805,10 @@ impl IndexScheduler { std::fs::copy(&this.version_file_path, dst).unwrap(); // 2. Snapshot the index-scheduler LMDB env - let dst = snapshot_dir.join("tasks"); - std::fs::create_dir_all(&dst).unwrap(); - log::info!("Snapshotting the tasks"); let env = this.env.clone(); env.copy_to_path( - dst.join("tasks.mdb"), + snapshot_dir.join("tasks.mdb"), heed::CompactionOption::Enabled, ) .unwrap(); @@ -839,7 +845,7 @@ impl IndexScheduler { // we must notify everyone that we dropped a new snapshot on the s3 let _stat = zookeeper.set_data( - &format!("/snapshots/snapshot-{}", snapshot_id), + &format!("/snapshots/snapshot-{:0>10?}", snapshot_id), vec![], None, ); @@ -854,14 +860,9 @@ impl IndexScheduler { .clone(); // we don't want to hold the mutex log::info!("Deleting {} processed tasks", processed.len()); for task in processed { + let node = dbg!(format!("/tasks/task-{:0>10?}", task as i32)); let _ = zookeeper // we don't want to crash if we can't delete an update file. - .delete( - &format!( - "/tasks/task-{:0>10}", - task as i32 - ), - None, - ) + .delete(&node, None) .unwrap(); // TODO: Delete the update files associated with the deleted tasks } @@ -1288,7 +1289,7 @@ impl IndexScheduler { // TODO: ugly unwrap zookeeper .set_data( - &format!("/tasks/task-{}", id), + &format!("/tasks/task-{:0>10?}", id), serde_json::to_vec_pretty(&task).unwrap(), None, )