Move the load snapshot step into a function

This commit is contained in:
Clément Renault 2023-09-12 16:05:02 +02:00
parent f37fdceb15
commit 9b01506cee
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -403,104 +403,10 @@ impl IndexScheduler {
} }
WatchedEventType::NodeDataChanged => { WatchedEventType::NodeDataChanged => {
let path = path.unwrap(); let path = path.unwrap();
log::info!("Importing snapshot {}", path);
let snapshot_id = let snapshot_id =
path.strip_prefix("/snapshots/snapshot-").unwrap(); path.strip_prefix("/snapshots/snapshot-").unwrap();
let snapshot_dir = format!("snapshots/{}", snapshot_id); let snapshot_dir = format!("snapshots/{}", snapshot_id);
load_snapshot(&this, &snapshot_dir).unwrap();
let inner = this.inner();
let s3 = inner.options.s3.as_ref().unwrap();
// 1. TODO: Ensure the snapshot version file is the same as our version.
// 2. Download all the databases
let base_path = {
let mut path = inner.env.path().to_path_buf();
assert!(path.pop());
path
};
let mut tasks_file =
tempfile::NamedTempFile::new_in(inner.env.path()).unwrap();
log::info!("Downloading the index scheduler database.");
let tasks_snapshot = format!("{snapshot_dir}/tasks.mdb");
let status = s3
.get_object_to_writer(tasks_snapshot, &mut tasks_file)
.unwrap();
assert!(matches!(status, 200 | 202));
log::info!("Downloading the indexes databases");
let indexes_files =
tempfile::TempDir::new_in(&base_path).unwrap();
let src = format!("{snapshot_dir}/indexes");
let uuids =
s3.list(src.clone(), None).unwrap().into_iter().flat_map(
|lbr| {
lbr.contents.into_iter().map(|o| {
let mut iter = o.key.rsplit('.');
iter.nth(1).unwrap().to_string()
})
},
);
for uuid in uuids {
log::info!("\tDownloading the index {}", uuid);
std::fs::create_dir_all(
indexes_files.path().join(&uuid).with_extension(""),
)
.unwrap();
let path = indexes_files
.path()
.join(&uuid)
.with_extension("")
.join("data.mdb");
let mut file = File::create(path).unwrap();
s3.get_object_to_writer(
format!("{src}/{uuid}.mdb"),
&mut file,
)
.unwrap();
}
// 3. Lock the index-mapper and close all the env
drop(inner);
let mut lock = this.inner.write();
let mut raw_inner = lock.take().unwrap();
// Wait for others to finish what they started
let indexes = raw_inner.index_mapper.clear();
let mut pfcs: Vec<_> = indexes
.into_iter()
.map(|index| index.prepare_for_closing())
.collect();
pfcs.push(raw_inner.env.prepare_for_closing());
pfcs.into_iter().for_each(|pfc| pfc.wait());
// Let's replace all the folders/files.
std::fs::rename(
&tasks_file,
base_path.join("tasks").join("data.mdb"),
)
.unwrap();
let dst_indexes = base_path.join("indexes");
std::fs::remove_dir_all(&dst_indexes).unwrap();
std::fs::create_dir_all(&dst_indexes).unwrap();
std::fs::rename(indexes_files.into_path(), dst_indexes)
.unwrap();
let mut inner = IndexSchedulerInner::new(
raw_inner.options,
#[cfg(test)]
raw_inner.test_breakpoint_sdr,
#[cfg(test)]
raw_inner.planned_failures,
)
.unwrap();
// We replace the newly created wake-up signal with the old one
inner.wake_up = raw_inner.wake_up;
*lock = Some(inner);
} }
otherwise => panic!("{otherwise:?}"), otherwise => panic!("{otherwise:?}"),
} }
@ -966,6 +872,79 @@ impl IndexScheduler {
} }
} }
fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> {
log::info!("Importing snapshot {}", path);
let inner = this.inner();
let s3 = inner.options.s3.as_ref().unwrap();
// 1. TODO: Ensure the snapshot version file is the same as our version.
// 2. Download all the databases
let base_path = {
let mut path = inner.env.path().to_path_buf();
assert!(path.pop());
path
};
let mut tasks_file = tempfile::NamedTempFile::new_in(inner.env.path())?;
log::info!("Downloading the index scheduler database.");
let tasks_snapshot = format!("{path}/tasks.mdb");
let status = s3.get_object_to_writer(tasks_snapshot, &mut tasks_file)?;
assert!(matches!(status, 200 | 202));
log::info!("Downloading the indexes databases");
let indexes_files = tempfile::TempDir::new_in(&base_path)?;
let src = format!("{path}/indexes");
let uuids = s3.list(src.clone(), None)?.into_iter().flat_map(|lbr| {
lbr.contents.into_iter().map(|o| {
let mut iter = o.key.rsplit('.');
iter.nth(1).unwrap().to_string()
})
});
for uuid in uuids {
log::info!("\tDownloading the index {}", uuid);
std::fs::create_dir_all(indexes_files.path().join(&uuid).with_extension(""))?;
let path = indexes_files.path().join(&uuid).with_extension("").join("data.mdb");
let mut file = File::create(path)?;
s3.get_object_to_writer(format!("{src}/{uuid}.mdb"), &mut file)?;
}
// 3. Lock the index-mapper and close all the env
drop(inner);
let mut lock = this.inner.write();
let mut raw_inner = lock.take().unwrap();
// Wait for others to finish what they started
let indexes = raw_inner.index_mapper.clear();
let mut pfcs: Vec<_> = indexes.into_iter().map(|index| index.prepare_for_closing()).collect();
pfcs.push(raw_inner.env.prepare_for_closing());
pfcs.into_iter().for_each(|pfc| pfc.wait());
// Let's replace all the folders/files.
std::fs::rename(&tasks_file, base_path.join("tasks").join("data.mdb"))?;
let dst_indexes = base_path.join("indexes");
std::fs::remove_dir_all(&dst_indexes)?;
std::fs::create_dir_all(&dst_indexes)?;
std::fs::rename(indexes_files.into_path(), dst_indexes)?;
let mut inner = IndexSchedulerInner::new(
raw_inner.options,
#[cfg(test)]
raw_inner.test_breakpoint_sdr,
#[cfg(test)]
raw_inner.planned_failures,
)?;
// We replace the newly created wake-up signal with the old one
inner.wake_up = raw_inner.wake_up;
*lock = Some(inner);
Ok(())
}
/// This is the internal structure that keeps the indexes alive. /// This is the internal structure that keeps the indexes alive.
pub struct IndexSchedulerInner { pub struct IndexSchedulerInner {
/// The LMDB environment which the DBs are associated with. /// The LMDB environment which the DBs are associated with.