Fixup a lot of small issues on the ZK config

This commit is contained in:
Clément Renault 2023-08-30 16:42:55 +02:00
parent 0c7d7c68bc
commit c488a4a351
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -746,7 +746,12 @@ impl IndexScheduler {
this.register_raw_task(&mut wtxn, &task).unwrap(); this.register_raw_task(&mut wtxn, &task).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
} }
otherwise => panic!("{otherwise:?}"), WatchedEventType::None
| WatchedEventType::NodeCreated
| WatchedEventType::NodeDeleted => (),
WatchedEventType::NodeChildrenChanged
| WatchedEventType::DataWatchRemoved
| WatchedEventType::ChildWatchRemoved => panic!("{event_type:?}"),
} }
this.wake_up.signal(); this.wake_up.signal();
@ -775,7 +780,7 @@ impl IndexScheduler {
continue; continue;
} }
let snapshot_id = zookeeper let snapshot_path = zookeeper
.create( .create(
"/snapshots/snapshot-", "/snapshots/snapshot-",
vec![], vec![],
@ -784,11 +789,15 @@ impl IndexScheduler {
) )
.unwrap(); .unwrap();
dbg!(&snapshot_id); let snapshot_id = snapshot_path
.rsplit_once('-')
.map(|(_, id)| id.parse::<u32>().unwrap())
.unwrap();
let zk_snapshots = format!("{}/zk-snapshots", env!("HOME")); let zk_snapshots = format!("{}/zk-snapshots", env!("HOME"));
std::fs::create_dir_all(&zk_snapshots).unwrap(); std::fs::create_dir_all(&zk_snapshots).unwrap();
let snapshot_dir = let snapshot_dir =
PathBuf::from(format!("{zk_snapshots}/{snapshot_id}")); PathBuf::from(format!("{zk_snapshots}/{snapshot_id:0>10?}"));
std::fs::create_dir(&snapshot_dir).unwrap(); std::fs::create_dir(&snapshot_dir).unwrap();
// 1. Snapshot the version file. // 1. Snapshot the version file.
@ -796,13 +805,10 @@ impl IndexScheduler {
std::fs::copy(&this.version_file_path, dst).unwrap(); std::fs::copy(&this.version_file_path, dst).unwrap();
// 2. Snapshot the index-scheduler LMDB env // 2. Snapshot the index-scheduler LMDB env
let dst = snapshot_dir.join("tasks");
std::fs::create_dir_all(&dst).unwrap();
log::info!("Snapshotting the tasks"); log::info!("Snapshotting the tasks");
let env = this.env.clone(); let env = this.env.clone();
env.copy_to_path( env.copy_to_path(
dst.join("tasks.mdb"), snapshot_dir.join("tasks.mdb"),
heed::CompactionOption::Enabled, heed::CompactionOption::Enabled,
) )
.unwrap(); .unwrap();
@ -839,7 +845,7 @@ impl IndexScheduler {
// we must notify everyone that we dropped a new snapshot on the s3 // we must notify everyone that we dropped a new snapshot on the s3
let _stat = zookeeper.set_data( let _stat = zookeeper.set_data(
&format!("/snapshots/snapshot-{}", snapshot_id), &format!("/snapshots/snapshot-{:0>10?}", snapshot_id),
vec![], vec![],
None, None,
); );
@ -854,14 +860,9 @@ impl IndexScheduler {
.clone(); // we don't want to hold the mutex .clone(); // we don't want to hold the mutex
log::info!("Deleting {} processed tasks", processed.len()); log::info!("Deleting {} processed tasks", processed.len());
for task in processed { for task in processed {
let node = dbg!(format!("/tasks/task-{:0>10?}", task as i32));
let _ = zookeeper // we don't want to crash if we can't delete an update file. let _ = zookeeper // we don't want to crash if we can't delete an update file.
.delete( .delete(&node, None)
&format!(
"/tasks/task-{:0>10}",
task as i32
),
None,
)
.unwrap(); .unwrap();
// TODO: Delete the update files associated with the deleted tasks // TODO: Delete the update files associated with the deleted tasks
} }
@ -1288,7 +1289,7 @@ impl IndexScheduler {
// TODO: ugly unwrap // TODO: ugly unwrap
zookeeper zookeeper
.set_data( .set_data(
&format!("/tasks/task-{}", id), &format!("/tasks/task-{:0>10?}", id),
serde_json::to_vec_pretty(&task).unwrap(), serde_json::to_vec_pretty(&task).unwrap(),
None, None,
) )