diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index 9cca6557b..7278918aa 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -103,8 +103,8 @@ impl IndexActor { } => { let _ = ret.send(self.handle_create_index(uuid, primary_key).await); } - Update { ret, meta, data } => { - let _ = ret.send(self.handle_update(meta, data).await); + Update { ret, meta, data, uuid } => { + let _ = ret.send(self.handle_update(uuid, meta, data).await); } Search { ret, query, uuid } => { let _ = ret.send(self.handle_search(uuid, query).await); @@ -180,25 +180,25 @@ impl IndexActor { async fn handle_update( &self, + uuid: Uuid, meta: Processing, data: File, ) -> Result { - async fn get_result(actor: &IndexActor, meta: Processing, data: File) -> Result { + let get_result = || async { debug!("Processing update {}", meta.id()); - let uuid = *meta.index_uuid(); - let update_handler = actor.update_handler.clone(); - let index = match actor.store.get(uuid).await? { + let update_handler = self.update_handler.clone(); + let index = match self.store.get(uuid).await? { Some(index) => index, - None => actor.store.create(uuid, None).await?, + None => self.store.create(uuid, None).await?, }; spawn_blocking(move || update_handler.handle_update(meta, data, index)) .await .map_err(|e| IndexError::Error(e.into())) - } + }; - *self.processing.write().await = Some(*meta.index_uuid()); - let result = get_result(self, meta, data).await; + *self.processing.write().await = Some(uuid); + let result = get_result().await; *self.processing.write().await = None; result diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index 93406c13b..5a734cdbf 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -32,11 +32,12 @@ impl IndexActorHandle for IndexActorHandleImpl { async fn update( &self, + uuid: Uuid, meta: Processing, data: std::fs::File, ) -> anyhow::Result { let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Update { ret, meta, data }; + let msg = IndexMsg::Update { ret, meta, data, uuid }; let _ = self.read_sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs index 6da0f8628..cb28f2868 100644 --- a/meilisearch-http/src/index_controller/index_actor/message.rs +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -15,6 +15,7 @@ pub enum IndexMsg { ret: oneshot::Sender>, }, Update { + uuid: Uuid, meta: Processing, data: std::fs::File, ret: oneshot::Sender>, diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index 426eb29e4..70a69822d 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -75,6 +75,7 @@ pub trait IndexActorHandle { async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; async fn update( &self, + uuid: Uuid, meta: Processing, data: std::fs::File, ) -> anyhow::Result; diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 8361c45cc..cf8d036c7 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -233,7 +233,6 @@ impl IndexController { let uid = uid.ok_or_else(|| anyhow::anyhow!("Can't create an index without a uid."))?; let uuid = self.uuid_resolver.create(uid.clone()).await?; let meta = self.index_handle.create_index(uuid, primary_key).await?; - let _ = self.update_handle.create(uuid).await?; let meta = IndexMetadata { name: uid.clone(), uid, diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index f725dda84..8cf86d126 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -1,5 +1,6 @@ use std::io::SeekFrom; use std::path::{Path, PathBuf}; +use std::sync::Arc; use log::info; use oxidized_json_checker::JsonChecker; @@ -8,31 +9,44 @@ use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio::sync::mpsc; use uuid::Uuid; +use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore}; use crate::index_controller::index_actor::IndexActorHandle; -use crate::index_controller::{get_arc_ownership_blocking, UpdateMeta, UpdateStatus}; +use crate::index_controller::{UpdateMeta, UpdateStatus}; -use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStoreStore}; -pub struct UpdateActor { +pub struct UpdateActor { path: PathBuf, - store: S, + store: Arc, inbox: mpsc::Receiver>, index_handle: I, } -impl UpdateActor +impl UpdateActor where D: AsRef<[u8]> + Sized + 'static, - S: UpdateStoreStore, I: IndexActorHandle + Clone + Send + Sync + 'static, { pub fn new( - store: S, + update_db_size: usize, inbox: mpsc::Receiver>, path: impl AsRef, index_handle: I, ) -> anyhow::Result { - let path = path.as_ref().to_owned(); + let path = path + .as_ref() + .to_owned() + .join("updates"); + + std::fs::create_dir_all(&path)?; + + let mut options = heed::EnvOpenOptions::new(); + options.map_size(update_db_size); + + let handle = index_handle.clone(); + let store = UpdateStore::open(options, &path, move |uuid, meta, file| { + futures::executor::block_on(handle.update(uuid, meta, file)) + }) + .map_err(|e| UpdateError::Error(e.into()))?; std::fs::create_dir_all(path.join("update_files"))?; assert!(path.exists()); Ok(Self { @@ -67,9 +81,6 @@ where Some(Delete { uuid, ret }) => { let _ = ret.send(self.handle_delete(uuid).await); } - Some(Create { uuid, ret }) => { - let _ = ret.send(self.handle_create(uuid).await); - } Some(Snapshot { uuid, path, ret }) => { let _ = ret.send(self.handle_snapshot(uuid, path).await); } @@ -87,7 +98,6 @@ where meta: UpdateMeta, mut payload: mpsc::Receiver>, ) -> Result { - let update_store = self.store.get_or_create(uuid).await?; let update_file_id = uuid::Uuid::new_v4(); let path = self .path @@ -123,6 +133,8 @@ where let mut file = file.into_std().await; + let update_store = self.store.clone(); + tokio::task::spawn_blocking(move || { use std::io::{copy, sink, BufReader, Seek}; @@ -157,11 +169,10 @@ where } async fn handle_list_updates(&self, uuid: Uuid) -> Result> { - let update_store = self.store.get(uuid).await?; + let update_store = self.store.clone(); tokio::task::spawn_blocking(move || { let result = update_store - .ok_or(UpdateError::UnexistingIndex(uuid))? - .list() + .list(uuid) .map_err(|e| UpdateError::Error(e.into()))?; Ok(result) }) @@ -170,60 +181,45 @@ where } async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result { - let store = self - .store - .get(uuid) - .await? - .ok_or(UpdateError::UnexistingIndex(uuid))?; + let store = self.store.clone(); let result = store - .meta(id) + .meta(uuid, id) .map_err(|e| UpdateError::Error(Box::new(e)))? .ok_or(UpdateError::UnexistingUpdate(id))?; Ok(result) } async fn handle_delete(&self, uuid: Uuid) -> Result<()> { - let store = self.store.delete(uuid).await?; + let store = self.store.clone(); - if let Some(store) = store { - tokio::task::spawn(async move { - let store = get_arc_ownership_blocking(store).await; - tokio::task::spawn_blocking(move || { - store.prepare_for_closing().wait(); - info!("Update store {} was closed.", uuid); - }); - }); - } + tokio::task::spawn_blocking(move || store.delete_all(uuid)) + .await + .map_err(|e| UpdateError::Error(e.into()))? + .map_err(|e| UpdateError::Error(e.into()))?; Ok(()) } - async fn handle_create(&self, uuid: Uuid) -> Result<()> { - let _ = self.store.get_or_create(uuid).await?; - Ok(()) - } - async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { let index_handle = self.index_handle.clone(); - if let Some(update_store) = self.store.get(uuid).await? { - tokio::task::spawn_blocking(move || -> anyhow::Result<()> { - // acquire write lock to prevent further writes during snapshot - // the update lock must be acquired BEFORE the write lock to prevent dead lock - let _lock = update_store.update_lock.lock(); - let mut txn = update_store.env.write_txn()?; + let update_store = self.store.clone(); + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + // acquire write lock to prevent further writes during snapshot + // the update lock must be acquired BEFORE the write lock to prevent dead lock + let _lock = update_store.update_lock.lock(); + let mut txn = update_store.env.write_txn()?; - // create db snapshot - update_store.snapshot(&mut txn, &path, uuid)?; + // create db snapshot + update_store.snapshot(&mut txn, &path, uuid)?; - futures::executor::block_on( - async move { index_handle.snapshot(uuid, path).await }, - )?; - Ok(()) - }) - .await + futures::executor::block_on( + async move { index_handle.snapshot(uuid, path).await }, + )?; + Ok(()) + }) + .await .map_err(|e| UpdateError::Error(e.into()))? .map_err(|e| UpdateError::Error(e.into()))?; - } Ok(()) } diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index 860cc2bc8..bbb52d17c 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -6,7 +6,7 @@ use uuid::Uuid; use crate::index_controller::IndexActorHandle; use super::{ - MapUpdateStoreStore, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, + PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStatus, }; @@ -29,8 +29,7 @@ where { let path = path.as_ref().to_owned().join("updates"); let (sender, receiver) = mpsc::channel(100); - let store = MapUpdateStoreStore::new(index_handle.clone(), &path, update_store_size); - let actor = UpdateActor::new(store, receiver, path, index_handle)?; + let actor = UpdateActor::new(update_store_size, receiver, path, index_handle)?; tokio::task::spawn(actor.run()); @@ -65,13 +64,6 @@ where receiver.await.expect("update actor killed.") } - async fn create(&self, uuid: Uuid) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Create { uuid, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Snapshot { uuid, path, ret }; diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs index f8150c00a..c863c803e 100644 --- a/meilisearch-http/src/index_controller/update_actor/message.rs +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -25,10 +25,6 @@ pub enum UpdateMsg { uuid: Uuid, ret: oneshot::Sender>, }, - Create { - uuid: Uuid, - ret: oneshot::Sender>, - }, Snapshot { uuid: Uuid, path: PathBuf, diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index 228b47b02..1c9461f6a 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -1,7 +1,6 @@ mod actor; mod handle_impl; mod message; -mod store; mod update_store; use std::path::PathBuf; @@ -15,7 +14,6 @@ use crate::index_controller::{UpdateMeta, UpdateStatus}; use actor::UpdateActor; use message::UpdateMsg; -use store::{MapUpdateStoreStore, UpdateStoreStore}; pub use handle_impl::UpdateActorHandleImpl; @@ -30,8 +28,6 @@ use mockall::automock; pub enum UpdateError { #[error("error with update: {0}")] Error(Box), - #[error("Index {0} doesn't exist.")] - UnexistingIndex(Uuid), #[error("Update {0} doesn't exist.")] UnexistingUpdate(u64), } @@ -44,7 +40,6 @@ pub trait UpdateActorHandle { async fn get_all_updates_status(&self, uuid: Uuid) -> Result>; async fn update_status(&self, uuid: Uuid, id: u64) -> Result; async fn delete(&self, uuid: Uuid) -> Result<()>; - async fn create(&self, uuid: Uuid) -> Result<()>; async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; async fn get_size(&self, uuid: Uuid) -> Result; async fn update( diff --git a/meilisearch-http/src/index_controller/update_actor/store.rs b/meilisearch-http/src/index_controller/update_actor/store.rs deleted file mode 100644 index 676182a62..000000000 --- a/meilisearch-http/src/index_controller/update_actor/store.rs +++ /dev/null @@ -1,111 +0,0 @@ -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use tokio::fs; -use tokio::sync::RwLock; -use uuid::Uuid; - -use super::{Result, UpdateError, UpdateStore}; -use crate::index_controller::IndexActorHandle; - -#[async_trait::async_trait] -pub trait UpdateStoreStore { - async fn get_or_create(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: Uuid) -> Result>>; - async fn get(&self, uuid: Uuid) -> Result>>; -} - -pub struct MapUpdateStoreStore { - db: Arc>>>, - index_handle: I, - path: PathBuf, - update_store_size: usize, -} - -impl MapUpdateStoreStore { - pub fn new(index_handle: I, path: impl AsRef, update_store_size: usize) -> Self { - let db = Arc::new(RwLock::new(HashMap::new())); - let path = path.as_ref().to_owned(); - Self { - db, - index_handle, - path, - update_store_size, - } - } -} - -#[async_trait::async_trait] -impl UpdateStoreStore for MapUpdateStoreStore -where - I: IndexActorHandle + Clone + Send + Sync + 'static, -{ - async fn get_or_create(&self, uuid: Uuid) -> Result> { - match self.db.write().await.entry(uuid) { - Entry::Vacant(e) => { - let mut options = heed::EnvOpenOptions::new(); - let update_store_size = self.update_store_size; - options.map_size(update_store_size); - let path = self.path.clone().join(format!("updates-{}", e.key())); - fs::create_dir_all(&path).await.unwrap(); - let index_handle = self.index_handle.clone(); - let store = UpdateStore::open(options, &path, move |meta, file| { - futures::executor::block_on(index_handle.update(meta, file)) - }) - .map_err(|e| UpdateError::Error(e.into()))?; - let store = e.insert(store); - Ok(store.clone()) - } - Entry::Occupied(e) => Ok(e.get().clone()), - } - } - - async fn get(&self, uuid: Uuid) -> Result>> { - let guard = self.db.read().await; - match guard.get(&uuid) { - Some(uuid) => Ok(Some(uuid.clone())), - None => { - // The index is not found in the found in the loaded indexes, so we attempt to load - // it from disk. We need to acquire a write lock **before** attempting to open the - // index, because someone could be trying to open it at the same time as us. - drop(guard); - let path = self.path.clone().join(format!("updates-{}", uuid)); - if path.exists() { - let mut guard = self.db.write().await; - match guard.entry(uuid) { - Entry::Vacant(entry) => { - // We can safely load the index - let index_handle = self.index_handle.clone(); - let mut options = heed::EnvOpenOptions::new(); - let update_store_size = self.update_store_size; - options.map_size(update_store_size); - let store = UpdateStore::open(options, &path, move |meta, file| { - futures::executor::block_on(index_handle.update(meta, file)) - }) - .map_err(|e| UpdateError::Error(e.into()))?; - let store = entry.insert(store); - Ok(Some(store.clone())) - } - Entry::Occupied(entry) => { - // The index was loaded while we attempted to to iter - Ok(Some(entry.get().clone())) - } - } - } else { - Ok(None) - } - } - } - } - - async fn delete(&self, uuid: Uuid) -> Result>> { - let store = self.db.write().await.remove(&uuid); - let path = self.path.clone().join(format!("updates-{}", uuid)); - if store.is_some() || path.exists() { - fs::remove_dir_all(path).await.unwrap(); - } - Ok(store) - } -} diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index efd827619..c0c76283c 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -170,10 +170,6 @@ where Ok(update_store) } - pub fn prepare_for_closing(self) -> heed::EnvClosingEvent { - self.env.prepare_for_closing() - } - /// Returns the new biggest id to use to store the new update. fn new_update_id(&self, txn: &heed::RoTxn, index_uuid: Uuid) -> heed::Result { // TODO: this is a very inneficient process for finding the next update id for each index, @@ -510,6 +506,38 @@ where Ok(aborted_updates) } + pub fn delete_all(&self, uuid: Uuid) -> anyhow::Result<()> { + fn delete_all( + txn: &mut heed::RwTxn, + uuid: Uuid, + db: Database + ) -> anyhow::Result<()> + where A: for<'a> heed::BytesDecode<'a> + { + let mut iter = db.prefix_iter_mut(txn, uuid.as_bytes())?; + while let Some(_) = iter.next() { + iter.del_current()?; + } + Ok(()) + } + + let mut txn = self.env.write_txn()?; + + delete_all(&mut txn, uuid, self.pending)?; + delete_all(&mut txn, uuid, self.pending_meta)?; + delete_all(&mut txn, uuid, self.processed_meta)?; + delete_all(&mut txn, uuid, self.aborted_meta)?; + delete_all(&mut txn, uuid, self.failed_meta)?; + + let processing = self.processing.upgradable_read(); + if let Some((processing_uuid, _)) = *processing { + if processing_uuid == uuid { + parking_lot::RwLockUpgradableReadGuard::upgrade(processing).take(); + } + } + Ok(()) + } + pub fn snapshot( &self, txn: &mut heed::RwTxn, diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index 60db5d3df..36e327cc2 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -1,6 +1,5 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use uuid::Uuid; #[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] @@ -74,10 +73,6 @@ impl Processing { self.from.meta() } - pub fn index_uuid(&self) -> &Uuid { - &self.from.index_uuid - } - pub fn process(self, meta: N) -> Processed { Processed { success: meta,