diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 8f892587d..9b6b057c3 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -43,7 +43,7 @@ impl IndexStore for MapIndexStore { let mut lock = self.index_store.write().await; if let Some(index) = lock.get(&uuid) { - return Ok(index.clone()) + return Ok(index.clone()); } let path = self.path.join(format!("index-{}", uuid)); if path.exists() { diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index 7779f2556..76cba7e07 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::io::SeekFrom; use std::path::{Path, PathBuf}; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use log::info; @@ -19,6 +20,7 @@ pub struct UpdateActor { store: Arc, inbox: mpsc::Receiver>, index_handle: I, + must_exit: Arc, } impl UpdateActor @@ -39,14 +41,17 @@ where let mut options = heed::EnvOpenOptions::new(); options.map_size(update_db_size); - let store = UpdateStore::open(options, &path, index_handle.clone())?; + let must_exit = Arc::new(AtomicBool::new(false)); + + let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?; std::fs::create_dir_all(path.join("update_files"))?; - assert!(path.exists()); + Ok(Self { path, store, inbox, index_handle, + must_exit, }) } @@ -56,7 +61,13 @@ where info!("Started update actor."); loop { - match self.inbox.recv().await { + let msg = self.inbox.recv().await; + + if self.must_exit.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + + match msg { Some(Update { uuid, meta, diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index cc5ba9757..7844bf855 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -7,7 +7,8 @@ use uuid::Uuid; use crate::index_controller::{IndexActorHandle, UpdateStatus}; use super::{ - PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStoreInfo, + PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateError, UpdateMeta, UpdateMsg, + UpdateStoreInfo, }; #[derive(Clone)] @@ -47,42 +48,72 @@ where async fn get_all_updates_status(&self, uuid: Uuid) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::ListUpdates { uuid, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn update_status(&self, uuid: Uuid, id: u64) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::GetUpdate { uuid, id, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn delete(&self, uuid: Uuid) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Delete { uuid, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Snapshot { uuids, path, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Dump { uuids, path, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn get_info(&self) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::GetInfo { ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn update( @@ -98,7 +129,12 @@ where meta, ret, }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } } diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index ba89eebe3..b854cca70 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -30,6 +30,10 @@ pub enum UpdateError { UnexistingUpdate(u64), #[error("Internal error processing update: {0}")] Internal(String), + #[error( + "Update store was shut down due to a fatal error, please check your logs for more info." + )] + FatalUpdateStoreError, } macro_rules! internal_error { diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index 331e7b2bb..7ab854be7 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -3,6 +3,7 @@ pub mod dump; use std::fs::{copy, create_dir_all, remove_file, File}; use std::path::Path; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::{ collections::{BTreeMap, HashSet}, @@ -98,7 +99,7 @@ pub struct UpdateStore { /// | 16-bytes | 8-bytes | updates: Database>, /// Indicates the current state of the update store, - pub state: Arc, + state: Arc, /// Wake up the loop when a new event occurs. notification_sender: mpsc::Sender<()>, path: PathBuf, @@ -138,6 +139,7 @@ impl UpdateStore { options: EnvOpenOptions, path: impl AsRef, index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, + must_exit: Arc, ) -> anyhow::Result> { let (update_store, mut notification_receiver) = Self::new(options, path)?; let update_store = Arc::new(update_store); @@ -171,7 +173,11 @@ impl UpdateStore { match res { Ok(Some(_)) => (), Ok(None) => break, - Err(e) => error!("error while processing update: {}", e), + Err(e) => { + error!("Fatal error while processing update that requires the update store to shutdown: {}", e); + must_exit.store(true, Ordering::SeqCst); + break 'outer; + } } } // the ownership on the arc has been taken, we need to exit. @@ -181,6 +187,8 @@ impl UpdateStore { } }); + error!("Update store loop exited."); + Ok(update_store) } @@ -286,63 +294,79 @@ impl UpdateStore { // If there is a pending update we process and only keep // a reader while processing it, not a writer. match first_meta { - Some(((global_id, index_uuid, update_id), mut pending)) => { - let content_path = pending.content.take(); + Some(((global_id, index_uuid, _), mut pending)) => { + let content = pending.content.take(); let processing = pending.processing(); - // Acquire the state lock and set the current state to processing. // txn must *always* be acquired after state lock, or it will dead lock. let state = self.state.write(); state.swap(State::Processing(index_uuid, processing.clone())); - let file = match content_path { - Some(uuid) => { - let path = update_uuid_to_file_path(&self.path, uuid); - let file = File::open(path)?; - Some(file) - } - None => None, - }; + let result = + self.perform_update(content, processing, index_handle, index_uuid, global_id); - // Process the pending update using the provided user function. - let handle = Handle::current(); - let result = match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) { - Ok(result) => result, - Err(e) => Err(processing.fail(e.to_string())), - }; - - // Once the pending update have been successfully processed - // we must remove the content from the pending and processing stores and - // write the *new* meta to the processed-meta store and commit. - let mut wtxn = self.env.write_txn()?; - self.pending_queue - .delete(&mut wtxn, &(global_id, index_uuid, update_id))?; - - if let Some(uuid) = content_path { - let path = update_uuid_to_file_path(&self.path, uuid); - remove_file(&path)?; - } - - let result = match result { - Ok(res) => res.into(), - Err(res) => res.into(), - }; - - self.updates.remap_key_type::().put( - &mut wtxn, - &(index_uuid, update_id), - &result, - )?; - - wtxn.commit()?; state.swap(State::Idle); - Ok(Some(())) + result } None => Ok(None), } } + fn perform_update( + &self, + content: Option, + processing: Processing, + index_handle: impl IndexActorHandle, + index_uuid: Uuid, + global_id: u64, + ) -> anyhow::Result> { + let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid)); + let update_id = processing.id(); + + let file = match content_path { + Some(ref path) => { + let file = File::open(path)?; + Some(file) + } + None => None, + }; + + // Process the pending update using the provided user function. + let handle = Handle::current(); + let result = + match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) { + Ok(result) => result, + Err(e) => Err(processing.fail(e.to_string())), + }; + + // Once the pending update have been successfully processed + // we must remove the content from the pending and processing stores and + // write the *new* meta to the processed-meta store and commit. + let mut wtxn = self.env.write_txn()?; + self.pending_queue + .delete(&mut wtxn, &(global_id, index_uuid, update_id))?; + + let result = match result { + Ok(res) => res.into(), + Err(res) => res.into(), + }; + + self.updates.remap_key_type::().put( + &mut wtxn, + &(index_uuid, update_id), + &result, + )?; + + wtxn.commit()?; + + if let Some(ref path) = content_path { + remove_file(&path)?; + } + + Ok(Some(())) + } + /// List the updates for `index_uuid`. pub fn list(&self, index_uuid: Uuid) -> anyhow::Result> { let mut update_list = BTreeMap::::new(); @@ -561,7 +585,13 @@ mod test { let mut options = EnvOpenOptions::new(); let handle = Arc::new(MockIndexActorHandle::new()); options.map_size(4096 * 100); - let update_store = UpdateStore::open(options, dir.path(), handle).unwrap(); + let update_store = UpdateStore::open( + options, + dir.path(), + handle, + Arc::new(AtomicBool::new(false)), + ) + .unwrap(); let index1_uuid = Uuid::new_v4(); let index2_uuid = Uuid::new_v4(); @@ -588,7 +618,13 @@ mod test { let mut options = EnvOpenOptions::new(); let handle = Arc::new(MockIndexActorHandle::new()); options.map_size(4096 * 100); - let update_store = UpdateStore::open(options, dir.path(), handle).unwrap(); + let update_store = UpdateStore::open( + options, + dir.path(), + handle, + Arc::new(AtomicBool::new(false)), + ) + .unwrap(); let meta = UpdateMeta::ClearDocuments; let uuid = Uuid::new_v4(); let store_clone = update_store.clone(); @@ -626,7 +662,13 @@ mod test { let mut options = EnvOpenOptions::new(); options.map_size(4096 * 100); - let store = UpdateStore::open(options, dir.path(), handle.clone()).unwrap(); + let store = UpdateStore::open( + options, + dir.path(), + handle.clone(), + Arc::new(AtomicBool::new(false)), + ) + .unwrap(); // wait a bit for the event loop exit. tokio::time::sleep(std::time::Duration::from_millis(50)).await; diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index 303289df3..0dc1c2534 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -3,7 +3,7 @@ use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::index::{Unchecked, Settings}; +use crate::index::{Settings, Unchecked}; pub type UpdateError = String; diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index 5f7c23f97..bab223bb3 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -8,7 +8,7 @@ use heed::{CompactionOption, Database, Env, EnvOpenOptions}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use super::{Result, UUID_STORE_SIZE, UuidResolverError}; +use super::{Result, UuidResolverError, UUID_STORE_SIZE}; use crate::helpers::EnvSizer; #[derive(Serialize, Deserialize)] @@ -96,7 +96,7 @@ impl HeedUuidStore { let mut txn = env.write_txn()?; if db.get(&txn, &name)?.is_some() { - return Err(UuidResolverError::NameAlreadyExist) + return Err(UuidResolverError::NameAlreadyExist); } db.put(&mut txn, &name, uuid.as_bytes())?;