diff --git a/meilisearch-http/src/helpers/compression.rs b/meilisearch-http/src/helpers/compression.rs index bbf14d578..7e8b5e3f3 100644 --- a/meilisearch-http/src/helpers/compression.rs +++ b/meilisearch-http/src/helpers/compression.rs @@ -17,11 +17,11 @@ pub fn to_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> { Ok(()) } -pub fn from_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> { - let f = File::open(src)?; +pub fn from_tar_gz(src: impl AsRef, dest: impl AsRef) -> Result<(), Error> { + let f = File::open(&src)?; let gz = GzDecoder::new(f); let mut ar = Archive::new(gz); - create_dir_all(dest)?; - ar.unpack(dest)?; + create_dir_all(&dest)?; + ar.unpack(&dest)?; Ok(()) } diff --git a/meilisearch-http/src/index_controller/index_actor.rs b/meilisearch-http/src/index_controller/index_actor.rs index bfb7127f3..4b5c68b56 100644 --- a/meilisearch-http/src/index_controller/index_actor.rs +++ b/meilisearch-http/src/index_controller/index_actor.rs @@ -26,6 +26,7 @@ use crate::index_controller::{ UpdateMeta, }; use crate::option::IndexerOpts; +use crate::helpers::compression; pub type Result = std::result::Result; type AsyncMap = Arc>>; @@ -107,7 +108,7 @@ enum IndexMsg { uuid: Uuid, path: PathBuf, ret: oneshot::Sender>, - } + }, } struct IndexActor { @@ -426,7 +427,9 @@ impl IndexActor { spawn_blocking(move || -> anyhow::Result<()> { // Get write txn to wait for ongoing write transaction before snapshot. let _txn = index.write_txn()?; - index.env.copy_to_path(index_path, CompactionOption::Enabled)?; + index + .env + .copy_to_path(index_path, CompactionOption::Enabled)?; Ok(()) }); } @@ -455,6 +458,22 @@ impl IndexActorHandle { }) } + pub fn from_snapshot( + path: impl AsRef, + index_size: usize, + snapshot_path: impl AsRef, + ) -> anyhow::Result { + let snapshot_path = snapshot_path.as_ref().join("indexes"); + let indexes_path = path.as_ref().join("indexes"); + for entry in snapshot_path.read_dir()? { + let entry = entry?; + let src = snapshot_path.join(entry.file_name()); + let dest = indexes_path.join(entry.file_name()); + compression::from_tar_gz(src, dest)?; + } + Self::new(path, index_size) + } + pub async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::CreateIndex { @@ -558,11 +577,7 @@ impl IndexActorHandle { pub async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Snapshot { - uuid, - path, - ret, - }; + let msg = IndexMsg::Snapshot { uuid, path, ret }; let _ = self.read_sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index fc0fd3d46..d2d11d44b 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -1,10 +1,10 @@ mod index_actor; +mod snapshot; mod update_actor; mod update_handler; mod update_store; mod updates; mod uuid_resolver; -mod snapshot; use std::path::Path; use std::sync::Arc; @@ -22,8 +22,8 @@ use crate::index::{Document, SearchQuery, SearchResult}; use crate::index::{Facets, Settings, UpdateResult}; use crate::option::Opt; -pub use updates::{Failed, Processed, Processing}; use snapshot::SnapshotService; +pub use updates::{Failed, Processed, Processing}; pub type UpdateStatus = updates::UpdateStatus; @@ -63,17 +63,40 @@ pub struct IndexController { } impl IndexController { - pub fn new( - path: impl AsRef, - options: &Opt, - ) -> anyhow::Result { + pub fn new(path: impl AsRef, options: &Opt) -> anyhow::Result { let index_size = options.max_mdb_size.get_bytes() as usize; let update_store_size = options.max_udb_size.get_bytes() as usize; - let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?; - let index_handle = index_actor::IndexActorHandle::new(&path, index_size)?; - let update_handle = - update_actor::UpdateActorHandle::new(index_handle.clone(), &path, update_store_size)?; + let uuid_resolver; + let index_handle; + let update_handle; + + match options.import_snapshot { + Some(ref snapshot_path) => { + uuid_resolver = + uuid_resolver::UuidResolverHandle::from_snapshot(&path, &snapshot_path)?; + index_handle = index_actor::IndexActorHandle::from_snapshot( + &path, + index_size, + &snapshot_path, + )?; + update_handle = update_actor::UpdateActorHandle::from_snapshot( + index_handle.clone(), + &path, + update_store_size, + &snapshot_path, + )?; + } + None => { + uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?; + index_handle = index_actor::IndexActorHandle::new(&path, index_size)?; + update_handle = update_actor::UpdateActorHandle::new( + index_handle.clone(), + &path, + update_store_size, + )?; + } + } if options.schedule_snapshot { let snapshot_service = SnapshotService::new( @@ -81,7 +104,7 @@ impl IndexController { uuid_resolver.clone(), update_handle.clone(), Duration::from_secs(options.snapshot_interval_sec), - options.snapshot_dir.clone() + options.snapshot_dir.clone(), ); tokio::task::spawn(snapshot_service.run()); @@ -196,7 +219,11 @@ impl IndexController { let uuid = self.uuid_resolver.create(uid.clone()).await?; let meta = self.index_handle.create_index(uuid, primary_key).await?; let _ = self.update_handle.create(uuid).await?; - let meta = IndexMetadata { name: uid.clone(), uid, meta }; + let meta = IndexMetadata { + name: uid.clone(), + uid, + meta, + }; Ok(meta) } @@ -227,7 +254,11 @@ impl IndexController { for (uid, uuid) in uuids { let meta = self.index_handle.get_index_meta(uuid).await?; - let meta = IndexMetadata { name: uid.clone(), uid, meta }; + let meta = IndexMetadata { + name: uid.clone(), + uid, + meta, + }; ret.push(meta); } @@ -280,7 +311,11 @@ impl IndexController { let uuid = self.uuid_resolver.resolve(uid.clone()).await?; let meta = self.index_handle.update_index(uuid, index_settings).await?; - let meta = IndexMetadata { name: uid.clone(), uid, meta }; + let meta = IndexMetadata { + name: uid.clone(), + uid, + meta, + }; Ok(meta) } @@ -293,7 +328,11 @@ impl IndexController { pub async fn get_index(&self, uid: String) -> anyhow::Result { let uuid = self.uuid_resolver.resolve(uid.clone()).await?; let meta = self.index_handle.get_index_meta(uuid).await?; - let meta = IndexMetadata { name: uid.clone(), uid, meta }; + let meta = IndexMetadata { + name: uid.clone(), + uid, + meta, + }; Ok(meta) } } diff --git a/meilisearch-http/src/index_controller/update_actor.rs b/meilisearch-http/src/index_controller/update_actor.rs index 64eab5221..9b2a90370 100644 --- a/meilisearch-http/src/index_controller/update_actor.rs +++ b/meilisearch-http/src/index_controller/update_actor.rs @@ -1,5 +1,5 @@ use std::collections::{hash_map::Entry, HashMap}; -use std::fs::{create_dir_all, remove_dir_all}; +use std::fs; use std::io::SeekFrom; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -16,6 +16,7 @@ use uuid::Uuid; use super::get_arc_ownership_blocking; use crate::index::UpdateResult; use crate::index_controller::{UpdateMeta, UpdateStatus}; +use crate::helpers::compression; pub type Result = std::result::Result; type UpdateStore = super::update_store::UpdateStore; @@ -88,7 +89,7 @@ where index_handle: IndexActorHandle, ) -> anyhow::Result { let path = path.as_ref().to_owned(); - create_dir_all(path.join("update_files"))?; + fs::create_dir_all(path.join("update_files"))?; assert!(path.exists()); Ok(Self { store, @@ -305,6 +306,41 @@ where Ok(Self { sender }) } + pub fn from_snapshot( + index_handle: IndexActorHandle, + path: impl AsRef, + update_store_size: usize, + snapshot: impl AsRef, + ) -> anyhow::Result { + let src = snapshot.as_ref().join("updates"); + let dst = path.as_ref().join("updates"); + fs::create_dir_all(&dst)?; + + // restore the update stores + for entry in src.read_dir()? { + let entry = entry?; + // filter out the update_files directory. + if entry.file_type()?.is_file() { + let src = src.join(entry.file_name()); + let dest = dst.join(entry.file_name()); + compression::from_tar_gz(src, dest)?; + } + } + + // restore the update files + let src = src.join("update_files"); + let dst = dst.join("update_files"); + fs::create_dir_all(&dst)?; + for entry in src.read_dir()? { + let entry = entry?; + let src = src.join(entry.file_name()); + let dst = dst.join(entry.file_name()); + fs::copy(src, dst)?; + } + + Self::new(index_handle, path, update_store_size) + } + pub async fn update( &self, meta: UpdateMeta, @@ -393,7 +429,7 @@ impl UpdateStoreStore for MapUpdateStoreStore { let update_store_size = self.update_store_size; options.map_size(update_store_size); let path = self.path.clone().join(format!("updates-{}", e.key())); - create_dir_all(&path).unwrap(); + fs::create_dir_all(&path).unwrap(); let index_handle = self.index_handle.clone(); let store = UpdateStore::open(options, &path, move |meta, file| { futures::executor::block_on(index_handle.update(meta, file)) @@ -448,7 +484,7 @@ impl UpdateStoreStore for MapUpdateStoreStore { let store = self.db.write().await.remove(&uuid); let path = self.path.clone().join(format!("updates-{}", uuid)); if store.is_some() || path.exists() { - remove_dir_all(path).unwrap(); + fs::remove_dir_all(path).unwrap(); } Ok(store) } diff --git a/meilisearch-http/src/index_controller/uuid_resolver.rs b/meilisearch-http/src/index_controller/uuid_resolver.rs index c31d776b3..7d755e9b6 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver.rs @@ -3,13 +3,16 @@ use std::path::{Path, PathBuf}; use heed::{ types::{ByteSlice, Str}, - Database, Env, EnvOpenOptions, + Database, Env, EnvOpenOptions,CompactionOption }; use log::{info, warn}; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use heed::CompactionOption; + +use crate::helpers::compression; + +const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB pub type Result = std::result::Result; @@ -140,6 +143,17 @@ impl UuidResolverHandle { Ok(Self { sender }) } + pub fn from_snapshot( + db_path: impl AsRef, + snapshot_path: impl AsRef + ) -> anyhow::Result { + let (sender, reveiver) = mpsc::channel(100); + let store = HeedUuidStore::from_snapshot(snapshot_path, db_path)?; + let actor = UuidResolverActor::new(reveiver, store); + tokio::spawn(actor.run()); + Ok(Self { sender }) + } + pub async fn resolve(&self, name: String) -> anyhow::Result { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::Resolve { uid: name, ret }; @@ -232,11 +246,17 @@ impl HeedUuidStore { let path = path.as_ref().join("index_uuids"); create_dir_all(&path)?; let mut options = EnvOpenOptions::new(); - options.map_size(1_073_741_824); // 1GB + options.map_size(UUID_STORE_SIZE); // 1GB let env = options.open(path)?; let db = env.create_database(None)?; Ok(Self { env, db }) } + + fn from_snapshot(snapshot: impl AsRef, path: impl AsRef) -> anyhow::Result { + let snapshot = snapshot.as_ref().join("uuids"); + compression::from_tar_gz(snapshot, &path)?; + Self::new(path) + } } #[async_trait::async_trait]