From 9b01506cee89da877a4485d3802cfaee4dcdeeb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 12 Sep 2023 16:05:02 +0200 Subject: [PATCH] Move the load snapshot step into a function --- index-scheduler/src/lib.rs | 169 ++++++++++++++++--------------------- 1 file changed, 74 insertions(+), 95 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index eedceb27f..2a875b0d8 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -403,104 +403,10 @@ impl IndexScheduler { } WatchedEventType::NodeDataChanged => { let path = path.unwrap(); - log::info!("Importing snapshot {}", path); let snapshot_id = path.strip_prefix("/snapshots/snapshot-").unwrap(); let snapshot_dir = format!("snapshots/{}", snapshot_id); - - let inner = this.inner(); - let s3 = inner.options.s3.as_ref().unwrap(); - - // 1. TODO: Ensure the snapshot version file is the same as our version. - - // 2. Download all the databases - let base_path = { - let mut path = inner.env.path().to_path_buf(); - assert!(path.pop()); - path - }; - - let mut tasks_file = - tempfile::NamedTempFile::new_in(inner.env.path()).unwrap(); - - log::info!("Downloading the index scheduler database."); - let tasks_snapshot = format!("{snapshot_dir}/tasks.mdb"); - let status = s3 - .get_object_to_writer(tasks_snapshot, &mut tasks_file) - .unwrap(); - assert!(matches!(status, 200 | 202)); - - log::info!("Downloading the indexes databases"); - let indexes_files = - tempfile::TempDir::new_in(&base_path).unwrap(); - - let src = format!("{snapshot_dir}/indexes"); - let uuids = - s3.list(src.clone(), None).unwrap().into_iter().flat_map( - |lbr| { - lbr.contents.into_iter().map(|o| { - let mut iter = o.key.rsplit('.'); - iter.nth(1).unwrap().to_string() - }) - }, - ); - for uuid in uuids { - log::info!("\tDownloading the index {}", uuid); - std::fs::create_dir_all( - indexes_files.path().join(&uuid).with_extension(""), - ) - .unwrap(); - let path = indexes_files - .path() - .join(&uuid) - .with_extension("") - .join("data.mdb"); - let mut file = File::create(path).unwrap(); - s3.get_object_to_writer( - format!("{src}/{uuid}.mdb"), - &mut file, - ) - .unwrap(); - } - - // 3. Lock the index-mapper and close all the env - drop(inner); - let mut lock = this.inner.write(); - let mut raw_inner = lock.take().unwrap(); - - // Wait for others to finish what they started - let indexes = raw_inner.index_mapper.clear(); - let mut pfcs: Vec<_> = indexes - .into_iter() - .map(|index| index.prepare_for_closing()) - .collect(); - pfcs.push(raw_inner.env.prepare_for_closing()); - pfcs.into_iter().for_each(|pfc| pfc.wait()); - - // Let's replace all the folders/files. - std::fs::rename( - &tasks_file, - base_path.join("tasks").join("data.mdb"), - ) - .unwrap(); - let dst_indexes = base_path.join("indexes"); - std::fs::remove_dir_all(&dst_indexes).unwrap(); - std::fs::create_dir_all(&dst_indexes).unwrap(); - std::fs::rename(indexes_files.into_path(), dst_indexes) - .unwrap(); - - let mut inner = IndexSchedulerInner::new( - raw_inner.options, - #[cfg(test)] - raw_inner.test_breakpoint_sdr, - #[cfg(test)] - raw_inner.planned_failures, - ) - .unwrap(); - - // We replace the newly created wake-up signal with the old one - inner.wake_up = raw_inner.wake_up; - *lock = Some(inner); + load_snapshot(&this, &snapshot_dir).unwrap(); } otherwise => panic!("{otherwise:?}"), } @@ -966,6 +872,79 @@ impl IndexScheduler { } } +fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> { + log::info!("Importing snapshot {}", path); + + let inner = this.inner(); + let s3 = inner.options.s3.as_ref().unwrap(); + + // 1. TODO: Ensure the snapshot version file is the same as our version. + + // 2. Download all the databases + let base_path = { + let mut path = inner.env.path().to_path_buf(); + assert!(path.pop()); + path + }; + + let mut tasks_file = tempfile::NamedTempFile::new_in(inner.env.path())?; + + log::info!("Downloading the index scheduler database."); + let tasks_snapshot = format!("{path}/tasks.mdb"); + let status = s3.get_object_to_writer(tasks_snapshot, &mut tasks_file)?; + assert!(matches!(status, 200 | 202)); + + log::info!("Downloading the indexes databases"); + let indexes_files = tempfile::TempDir::new_in(&base_path)?; + + let src = format!("{path}/indexes"); + let uuids = s3.list(src.clone(), None)?.into_iter().flat_map(|lbr| { + lbr.contents.into_iter().map(|o| { + let mut iter = o.key.rsplit('.'); + iter.nth(1).unwrap().to_string() + }) + }); + for uuid in uuids { + log::info!("\tDownloading the index {}", uuid); + std::fs::create_dir_all(indexes_files.path().join(&uuid).with_extension(""))?; + let path = indexes_files.path().join(&uuid).with_extension("").join("data.mdb"); + let mut file = File::create(path)?; + s3.get_object_to_writer(format!("{src}/{uuid}.mdb"), &mut file)?; + } + + // 3. Lock the index-mapper and close all the env + drop(inner); + let mut lock = this.inner.write(); + let mut raw_inner = lock.take().unwrap(); + + // Wait for others to finish what they started + let indexes = raw_inner.index_mapper.clear(); + let mut pfcs: Vec<_> = indexes.into_iter().map(|index| index.prepare_for_closing()).collect(); + pfcs.push(raw_inner.env.prepare_for_closing()); + pfcs.into_iter().for_each(|pfc| pfc.wait()); + + // Let's replace all the folders/files. + std::fs::rename(&tasks_file, base_path.join("tasks").join("data.mdb"))?; + let dst_indexes = base_path.join("indexes"); + std::fs::remove_dir_all(&dst_indexes)?; + std::fs::create_dir_all(&dst_indexes)?; + std::fs::rename(indexes_files.into_path(), dst_indexes)?; + + let mut inner = IndexSchedulerInner::new( + raw_inner.options, + #[cfg(test)] + raw_inner.test_breakpoint_sdr, + #[cfg(test)] + raw_inner.planned_failures, + )?; + + // We replace the newly created wake-up signal with the old one + inner.wake_up = raw_inner.wake_up; + *lock = Some(inner); + + Ok(()) +} + /// This is the internal structure that keeps the indexes alive. pub struct IndexSchedulerInner { /// The LMDB environment which the DBs are associated with.