starts creating snapshot, the import is still missing

This commit is contained in:
Tamo 2023-08-10 15:00:25 +02:00
parent 61ccfaf9bc
commit 777eebb759

View File

@ -676,7 +676,31 @@ impl IndexScheduler {
zk::WatchedEvent { event_type, session_state, path } = snapshot_watcher.changed() => match event_type {
zk::EventType::Session => panic!("Session error {:?}", session_state),
zk::EventType::NodeCreated => {
println!("I should load a snapshot - {}", path);
log::info!("The snapshot {} is in preparation", path);
}
zk::EventType::NodeDataChanged => {
log::info!("Importing snapshot {}", path);
let snapshot_id = path.strip_prefix("/snapshots/snapshot-").unwrap();
let snapshot_dir =
PathBuf::from(format!("~/zk-snapshots/{}", snapshot_id));
// TODO: everything
// 1. TODO: Ensure the snapshot version file is the same as our version.
// 2. Download and import the index-scheduler database
log::info!("Importing the index scheduler.");
let tasks =
snapshot_dir.join("tasks.mdb");
// 3. Snapshot every indexes
log::info!("Importing the indexes");
let dst = snapshot_dir.join("indexes");
let mut indexes = tokio::fs::read_dir(dst).await.unwrap();
while let Some(uuid) = indexes.next_entry().await.unwrap() {
// TODO: Import the index
}
}
_ => (),
},
@ -702,17 +726,88 @@ impl IndexScheduler {
continue;
}
// TODO:
// - create a new snapshot on disk/s3
// we must notify everyone that we dropped a new snapshot on the s3
let options = zk::CreateMode::EphemeralSequential
.with_acls(zk::Acls::anyone_all());
let (_stat, id) = zk
let (_stat, snapshot_id) = zk
.create("/snapshots/snapshot-", &[], &options)
.await
.unwrap();
log::info!("Notified that there was a new snapshot {id}");
tokio::fs::create_dir_all("~/zk-snapshots").await.unwrap();
let snapshot_dir =
PathBuf::from(format!("~/zk-snapshots/{snapshot_id}"));
tokio::fs::create_dir(&snapshot_dir).await.unwrap();
// 1. Snapshot the version file.
let dst =
snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME);
tokio::fs::copy(&run.version_file_path, dst).await.unwrap();
// 2. Snapshot the index-scheduler LMDB env
let dst = snapshot_dir.join("tasks");
tokio::fs::create_dir_all(&dst).await.unwrap();
log::info!("Snapshotting the tasks");
let env = run.env.clone();
tokio::task::spawn_blocking(move || {
env.copy_to_path(
dst.join("tasks.mdb"),
heed::CompactionOption::Enabled,
)
.unwrap();
})
.await
.unwrap();
// 3. Snapshot every indexes
log::info!("Snapshotting the indexes");
let dst = snapshot_dir.join("indexes");
tokio::fs::create_dir_all(&dst).await.unwrap();
let this = run.private_clone();
let indexes = tokio::task::spawn_blocking(move || {
let rtxn = this.env.read_txn().unwrap();
this.index_mapper
.index_mapping
.iter(&rtxn)
.unwrap()
.map(|ret| ret.unwrap())
.map(|(name, uuid)| (name.to_string(), uuid))
.collect::<Vec<_>>()
})
.await
.unwrap();
for (name, uuid) in indexes {
log::info!(" Snapshotting index {name}");
let this = run.private_clone();
let dst = dst.clone();
tokio::task::spawn_blocking(move || {
let rtxn = this.env.read_txn().unwrap();
let index =
this.index_mapper.index(&rtxn, &name).unwrap();
index
.copy_to_path(
dst.join(format!("{uuid}.mdb")),
heed::CompactionOption::Enabled,
)
.unwrap();
})
.await
.unwrap();
}
// we must notify everyone that we dropped a new snapshot on the s3
let _stat = zk
.set_data(
&format!("/snapshots/snapshot-{}", snapshot_id),
&[],
None,
)
.await
.unwrap();
log::info!(
"Notified everyone about the new snapshot {snapshot_id}"
);
// We can now delete all the tasks that has been processed
let processed = run
@ -732,6 +827,7 @@ impl IndexScheduler {
None,
)
.await;
// TODO: Delete the update files associated with the deleted tasks
}
}
}