restore snapshots

This commit is contained in:
mpostma 2021-03-22 10:17:38 +01:00
parent 7f6a54cb12
commit 4847884165
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
5 changed files with 143 additions and 33 deletions

View File

@ -17,11 +17,11 @@ pub fn to_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> {
Ok(()) Ok(())
} }
pub fn from_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> { pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> Result<(), Error> {
let f = File::open(src)?; let f = File::open(&src)?;
let gz = GzDecoder::new(f); let gz = GzDecoder::new(f);
let mut ar = Archive::new(gz); let mut ar = Archive::new(gz);
create_dir_all(dest)?; create_dir_all(&dest)?;
ar.unpack(dest)?; ar.unpack(&dest)?;
Ok(()) Ok(())
} }

View File

@ -26,6 +26,7 @@ use crate::index_controller::{
UpdateMeta, UpdateMeta,
}; };
use crate::option::IndexerOpts; use crate::option::IndexerOpts;
use crate::helpers::compression;
pub type Result<T> = std::result::Result<T, IndexError>; pub type Result<T> = std::result::Result<T, IndexError>;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>; type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
@ -107,7 +108,7 @@ enum IndexMsg {
uuid: Uuid, uuid: Uuid,
path: PathBuf, path: PathBuf,
ret: oneshot::Sender<Result<()>>, ret: oneshot::Sender<Result<()>>,
} },
} }
struct IndexActor<S> { struct IndexActor<S> {
@ -426,7 +427,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
spawn_blocking(move || -> anyhow::Result<()> { spawn_blocking(move || -> anyhow::Result<()> {
// Get write txn to wait for ongoing write transaction before snapshot. // Get write txn to wait for ongoing write transaction before snapshot.
let _txn = index.write_txn()?; let _txn = index.write_txn()?;
index.env.copy_to_path(index_path, CompactionOption::Enabled)?; index
.env
.copy_to_path(index_path, CompactionOption::Enabled)?;
Ok(()) Ok(())
}); });
} }
@ -455,6 +458,22 @@ impl IndexActorHandle {
}) })
} }
pub fn from_snapshot(
path: impl AsRef<Path>,
index_size: usize,
snapshot_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
let snapshot_path = snapshot_path.as_ref().join("indexes");
let indexes_path = path.as_ref().join("indexes");
for entry in snapshot_path.read_dir()? {
let entry = entry?;
let src = snapshot_path.join(entry.file_name());
let dest = indexes_path.join(entry.file_name());
compression::from_tar_gz(src, dest)?;
}
Self::new(path, index_size)
}
pub async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> { pub async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::CreateIndex { let msg = IndexMsg::CreateIndex {
@ -558,11 +577,7 @@ impl IndexActorHandle {
pub async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { pub async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Snapshot { let msg = IndexMsg::Snapshot { uuid, path, ret };
uuid,
path,
ret,
};
let _ = self.read_sender.send(msg).await; let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }

View File

@ -1,10 +1,10 @@
mod index_actor; mod index_actor;
mod snapshot;
mod update_actor; mod update_actor;
mod update_handler; mod update_handler;
mod update_store; mod update_store;
mod updates; mod updates;
mod uuid_resolver; mod uuid_resolver;
mod snapshot;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
@ -22,8 +22,8 @@ use crate::index::{Document, SearchQuery, SearchResult};
use crate::index::{Facets, Settings, UpdateResult}; use crate::index::{Facets, Settings, UpdateResult};
use crate::option::Opt; use crate::option::Opt;
pub use updates::{Failed, Processed, Processing};
use snapshot::SnapshotService; use snapshot::SnapshotService;
pub use updates::{Failed, Processed, Processing};
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>; pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
@ -63,17 +63,40 @@ pub struct IndexController {
} }
impl IndexController { impl IndexController {
pub fn new( pub fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> {
path: impl AsRef<Path>,
options: &Opt,
) -> anyhow::Result<Self> {
let index_size = options.max_mdb_size.get_bytes() as usize; let index_size = options.max_mdb_size.get_bytes() as usize;
let update_store_size = options.max_udb_size.get_bytes() as usize; let update_store_size = options.max_udb_size.get_bytes() as usize;
let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?; let uuid_resolver;
let index_handle = index_actor::IndexActorHandle::new(&path, index_size)?; let index_handle;
let update_handle = let update_handle;
update_actor::UpdateActorHandle::new(index_handle.clone(), &path, update_store_size)?;
match options.import_snapshot {
Some(ref snapshot_path) => {
uuid_resolver =
uuid_resolver::UuidResolverHandle::from_snapshot(&path, &snapshot_path)?;
index_handle = index_actor::IndexActorHandle::from_snapshot(
&path,
index_size,
&snapshot_path,
)?;
update_handle = update_actor::UpdateActorHandle::from_snapshot(
index_handle.clone(),
&path,
update_store_size,
&snapshot_path,
)?;
}
None => {
uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?;
index_handle = index_actor::IndexActorHandle::new(&path, index_size)?;
update_handle = update_actor::UpdateActorHandle::new(
index_handle.clone(),
&path,
update_store_size,
)?;
}
}
if options.schedule_snapshot { if options.schedule_snapshot {
let snapshot_service = SnapshotService::new( let snapshot_service = SnapshotService::new(
@ -81,7 +104,7 @@ impl IndexController {
uuid_resolver.clone(), uuid_resolver.clone(),
update_handle.clone(), update_handle.clone(),
Duration::from_secs(options.snapshot_interval_sec), Duration::from_secs(options.snapshot_interval_sec),
options.snapshot_dir.clone() options.snapshot_dir.clone(),
); );
tokio::task::spawn(snapshot_service.run()); tokio::task::spawn(snapshot_service.run());
@ -196,7 +219,11 @@ impl IndexController {
let uuid = self.uuid_resolver.create(uid.clone()).await?; let uuid = self.uuid_resolver.create(uid.clone()).await?;
let meta = self.index_handle.create_index(uuid, primary_key).await?; let meta = self.index_handle.create_index(uuid, primary_key).await?;
let _ = self.update_handle.create(uuid).await?; let _ = self.update_handle.create(uuid).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta }; let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
Ok(meta) Ok(meta)
} }
@ -227,7 +254,11 @@ impl IndexController {
for (uid, uuid) in uuids { for (uid, uuid) in uuids {
let meta = self.index_handle.get_index_meta(uuid).await?; let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta }; let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
ret.push(meta); ret.push(meta);
} }
@ -280,7 +311,11 @@ impl IndexController {
let uuid = self.uuid_resolver.resolve(uid.clone()).await?; let uuid = self.uuid_resolver.resolve(uid.clone()).await?;
let meta = self.index_handle.update_index(uuid, index_settings).await?; let meta = self.index_handle.update_index(uuid, index_settings).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta }; let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
Ok(meta) Ok(meta)
} }
@ -293,7 +328,11 @@ impl IndexController {
pub async fn get_index(&self, uid: String) -> anyhow::Result<IndexMetadata> { pub async fn get_index(&self, uid: String) -> anyhow::Result<IndexMetadata> {
let uuid = self.uuid_resolver.resolve(uid.clone()).await?; let uuid = self.uuid_resolver.resolve(uid.clone()).await?;
let meta = self.index_handle.get_index_meta(uuid).await?; let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta }; let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
Ok(meta) Ok(meta)
} }
} }

View File

@ -1,5 +1,5 @@
use std::collections::{hash_map::Entry, HashMap}; use std::collections::{hash_map::Entry, HashMap};
use std::fs::{create_dir_all, remove_dir_all}; use std::fs;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
@ -16,6 +16,7 @@ use uuid::Uuid;
use super::get_arc_ownership_blocking; use super::get_arc_ownership_blocking;
use crate::index::UpdateResult; use crate::index::UpdateResult;
use crate::index_controller::{UpdateMeta, UpdateStatus}; use crate::index_controller::{UpdateMeta, UpdateStatus};
use crate::helpers::compression;
pub type Result<T> = std::result::Result<T, UpdateError>; pub type Result<T> = std::result::Result<T, UpdateError>;
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>; type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
@ -88,7 +89,7 @@ where
index_handle: IndexActorHandle, index_handle: IndexActorHandle,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
create_dir_all(path.join("update_files"))?; fs::create_dir_all(path.join("update_files"))?;
assert!(path.exists()); assert!(path.exists());
Ok(Self { Ok(Self {
store, store,
@ -305,6 +306,41 @@ where
Ok(Self { sender }) Ok(Self { sender })
} }
pub fn from_snapshot(
index_handle: IndexActorHandle,
path: impl AsRef<Path>,
update_store_size: usize,
snapshot: impl AsRef<Path>,
) -> anyhow::Result<Self> {
let src = snapshot.as_ref().join("updates");
let dst = path.as_ref().join("updates");
fs::create_dir_all(&dst)?;
// restore the update stores
for entry in src.read_dir()? {
let entry = entry?;
// filter out the update_files directory.
if entry.file_type()?.is_file() {
let src = src.join(entry.file_name());
let dest = dst.join(entry.file_name());
compression::from_tar_gz(src, dest)?;
}
}
// restore the update files
let src = src.join("update_files");
let dst = dst.join("update_files");
fs::create_dir_all(&dst)?;
for entry in src.read_dir()? {
let entry = entry?;
let src = src.join(entry.file_name());
let dst = dst.join(entry.file_name());
fs::copy(src, dst)?;
}
Self::new(index_handle, path, update_store_size)
}
pub async fn update( pub async fn update(
&self, &self,
meta: UpdateMeta, meta: UpdateMeta,
@ -393,7 +429,7 @@ impl UpdateStoreStore for MapUpdateStoreStore {
let update_store_size = self.update_store_size; let update_store_size = self.update_store_size;
options.map_size(update_store_size); options.map_size(update_store_size);
let path = self.path.clone().join(format!("updates-{}", e.key())); let path = self.path.clone().join(format!("updates-{}", e.key()));
create_dir_all(&path).unwrap(); fs::create_dir_all(&path).unwrap();
let index_handle = self.index_handle.clone(); let index_handle = self.index_handle.clone();
let store = UpdateStore::open(options, &path, move |meta, file| { let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle.update(meta, file)) futures::executor::block_on(index_handle.update(meta, file))
@ -448,7 +484,7 @@ impl UpdateStoreStore for MapUpdateStoreStore {
let store = self.db.write().await.remove(&uuid); let store = self.db.write().await.remove(&uuid);
let path = self.path.clone().join(format!("updates-{}", uuid)); let path = self.path.clone().join(format!("updates-{}", uuid));
if store.is_some() || path.exists() { if store.is_some() || path.exists() {
remove_dir_all(path).unwrap(); fs::remove_dir_all(path).unwrap();
} }
Ok(store) Ok(store)
} }

View File

@ -3,13 +3,16 @@ use std::path::{Path, PathBuf};
use heed::{ use heed::{
types::{ByteSlice, Str}, types::{ByteSlice, Str},
Database, Env, EnvOpenOptions, Database, Env, EnvOpenOptions,CompactionOption
}; };
use log::{info, warn}; use log::{info, warn};
use thiserror::Error; use thiserror::Error;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use uuid::Uuid; use uuid::Uuid;
use heed::CompactionOption;
use crate::helpers::compression;
const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB
pub type Result<T> = std::result::Result<T, UuidError>; pub type Result<T> = std::result::Result<T, UuidError>;
@ -140,6 +143,17 @@ impl UuidResolverHandle {
Ok(Self { sender }) Ok(Self { sender })
} }
pub fn from_snapshot(
db_path: impl AsRef<Path>,
snapshot_path: impl AsRef<Path>
) -> anyhow::Result<Self> {
let (sender, reveiver) = mpsc::channel(100);
let store = HeedUuidStore::from_snapshot(snapshot_path, db_path)?;
let actor = UuidResolverActor::new(reveiver, store);
tokio::spawn(actor.run());
Ok(Self { sender })
}
pub async fn resolve(&self, name: String) -> anyhow::Result<Uuid> { pub async fn resolve(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Resolve { uid: name, ret }; let msg = UuidResolveMsg::Resolve { uid: name, ret };
@ -232,11 +246,17 @@ impl HeedUuidStore {
let path = path.as_ref().join("index_uuids"); let path = path.as_ref().join("index_uuids");
create_dir_all(&path)?; create_dir_all(&path)?;
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(1_073_741_824); // 1GB options.map_size(UUID_STORE_SIZE); // 1GB
let env = options.open(path)?; let env = options.open(path)?;
let db = env.create_database(None)?; let db = env.create_database(None)?;
Ok(Self { env, db }) Ok(Self { env, db })
} }
fn from_snapshot(snapshot: impl AsRef<Path>, path: impl AsRef<Path>) -> anyhow::Result<Self> {
let snapshot = snapshot.as_ref().join("uuids");
compression::from_tar_gz(snapshot, &path)?;
Self::new(path)
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]