fix all the bugs on the snapshots export/import pipeline

This commit is contained in:
Tamo 2023-11-06 12:34:48 +01:00
parent c5ec817f52
commit d1bc7ec58a

View File

@ -485,10 +485,11 @@ impl IndexScheduler {
.unwrap();
log::warn!("I'm a follower When `{should_watch}` die I should check if I'm the new leader");
let leader_watcher =
zk.watch("/election/{watch}", AddWatchMode::Persistent).unwrap();
let leader_watcher = zk
.watch(&format!("/election/{should_watch}"), AddWatchMode::Persistent)
.unwrap();
let snapshot_watcher =
zk.watch("/snapshot", AddWatchMode::PersistentRecursive).unwrap();
zk.watch("/snapshots", AddWatchMode::PersistentRecursive).unwrap();
let watchers = zk.multi_watcher([leader_watcher, snapshot_watcher]);
@ -534,7 +535,7 @@ impl IndexScheduler {
event.event_type
);
}
} else if event.path.starts_with("snapshot") {
} else if event.path.starts_with("/snapshots") {
match event.event_type {
EventType::Session => panic!("disconnected"),
EventType::NodeCreated => {
@ -547,8 +548,7 @@ impl IndexScheduler {
}
EventType::NodeDataChanged => {
log::info!("A snapshot is ready to be imported: {}", event.path);
let snapshot_id =
event.path.strip_prefix("/snapshots/snapshot-").unwrap();
let snapshot_id = event.path.strip_prefix("/snapshots/").unwrap();
let snapshot_dir = format!("snapshots/{}", snapshot_id);
load_snapshot(&this, &snapshot_dir).unwrap();
log::info!("Snapshot `{snapshot_id}` successfully imported");
@ -581,7 +581,7 @@ impl IndexScheduler {
let rtxn = inner.env.read_txn().unwrap();
let (_, snapshot_id) = zookeeper
.create(
"/snapshots/snapshot-",
"/snapshots/",
&[],
&CreateMode::PersistentSequential
.with_acls(Acls::anyone_all()),
@ -589,7 +589,7 @@ impl IndexScheduler {
.unwrap();
let zk_snapshots = format!("snapshots");
let snapshot_dir = format!("{zk_snapshots}/{snapshot_id:0>10?}");
let snapshot_dir = format!("{zk_snapshots}/{snapshot_id}");
let s3 = inner.options.s3.as_ref().unwrap();
@ -603,7 +603,6 @@ impl IndexScheduler {
// 2. Snapshot the index-scheduler LMDB env
log::info!("Snapshotting the tasks");
let env = inner.env.clone();
let snapshot_dir = format!("{zk_snapshots}/{snapshot_id:0>10?}");
let temp = TempDir::new().unwrap();
let mut file = env
@ -642,7 +641,7 @@ impl IndexScheduler {
// we must notify everyone that we dropped a new snapshot on the s3
let _stat = zookeeper.set_data(
&format!("/snapshots/snapshot-{snapshot_id}"),
&format!("/snapshots/{snapshot_id}"),
b"ok",
None,
);
@ -919,13 +918,13 @@ fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> {
log::info!("Downloading the index scheduler database.");
let tasks_snapshot = format!("{path}/tasks.mdb");
s3.get_object_to_writer(tasks_snapshot, &mut tasks_file)?;
s3.get_object_to_writer(tasks_snapshot, &mut tasks_file).unwrap();
log::info!("Downloading the indexes databases");
let indexes_files = tempfile::TempDir::new_in(&base_path)?;
let src = format!("{path}/indexes");
let uuids = s3.list_objects(&src)?.into_iter().map(|lbr| {
let uuids = s3.list_objects(&src)?.map(|lbr| {
let key = lbr.unwrap().key;
let (_, name) = key.rsplit_once('/').unwrap();
name.to_string()