diff --git a/meilisearch-http/src/index/update_handler.rs b/meilisearch-http/src/index/update_handler.rs index f3977a00d..7f857785e 100644 --- a/meilisearch-http/src/index/update_handler.rs +++ b/meilisearch-http/src/index/update_handler.rs @@ -1,12 +1,13 @@ use std::fs::File; +use std::sync::{mpsc, Arc}; use crate::index::Index; use milli::update::UpdateBuilder; use milli::CompressionType; use rayon::ThreadPool; -use crate::index_controller::UpdateMeta; -use crate::index_controller::{Failed, Processed, Processing}; +use crate::index_controller::{Aborted, Done, Failed, Processed, Processing}; +use crate::index_controller::{UpdateMeta, UpdateResult}; use crate::option::IndexerOpts; pub struct UpdateHandler { @@ -54,15 +55,17 @@ impl UpdateHandler { pub fn handle_update( &self, + channel: mpsc::Sender<(mpsc::Sender, Result)>, meta: Processing, content: Option, index: Index, - ) -> Result { + ) -> Result { use UpdateMeta::*; let update_id = meta.id(); let update_builder = self.update_builder(update_id); + let mut wtxn = index.write_txn().unwrap(); let result = match meta.meta() { DocumentsAddition { @@ -70,20 +73,47 @@ impl UpdateHandler { format, primary_key, } => index.update_documents( + &mut wtxn, *format, *method, content, update_builder, primary_key.as_deref(), ), - ClearDocuments => index.clear_documents(update_builder), - DeleteDocuments { ids } => index.delete_documents(ids, update_builder), - Settings(settings) => index.update_settings(&settings.clone().check(), update_builder), + ClearDocuments => index.clear_documents(&mut wtxn, update_builder), + DeleteDocuments { ids } => index.delete_documents(&mut wtxn, ids, update_builder), + Settings(settings) => { + index.update_settings(&mut wtxn, &settings.clone().check(), update_builder) + } }; - match result { + let result = match result { Ok(result) => Ok(meta.process(result)), Err(e) => Err(meta.fail(e.into())), + }; + + + let (sender, receiver) = mpsc::channel(); + channel.send((sender, result)); + + // here we should decide how we want to handle a failure. probably by closing the channel + // right: for now I'm just going to panic + + let meta = result.unwrap(); + + match receiver.recv() { + Ok(Hello::Abort) => Err(meta.abort()), + Ok(Hello::Commit) => wtxn + .commit() + .map(|ok| meta.commit()) + .map_err(|e| meta.abort()), + Err(e) => panic!("update actor died {}", e), } } } + +/// MARIN: I can't find any good name for this and I'm not even sure we need a new enum +pub enum Hello { + Commit, + Abort, +} diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index 924e6b1ef..62b7f43be 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -4,6 +4,7 @@ use std::marker::PhantomData; use std::num::NonZeroUsize; use flate2::read::GzDecoder; +use heed::RwTxn; use log::{debug, info, trace}; use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder, UpdateFormat}; use serde::{Deserialize, Serialize, Serializer}; @@ -160,24 +161,17 @@ pub struct Facets { } impl Index { - pub fn update_documents( - &self, + pub fn update_documents<'a>( + &'a self, + txn: &mut RwTxn<'a, 'a>, format: UpdateFormat, method: IndexDocumentsMethod, content: Option, update_builder: UpdateBuilder, primary_key: Option<&str>, ) -> Result { - let mut txn = self.write_txn()?; - let result = self.update_documents_txn( - &mut txn, - format, - method, - content, - update_builder, - primary_key, - )?; - txn.commit()?; + let result = + self.update_documents_txn(txn, format, method, content, update_builder, primary_key)?; Ok(result) } @@ -220,16 +214,14 @@ impl Index { Ok(UpdateResult::DocumentsAddition(addition)) } - pub fn clear_documents(&self, update_builder: UpdateBuilder) -> Result { - // We must use the write transaction of the update here. - let mut wtxn = self.write_txn()?; - let builder = update_builder.clear_documents(&mut wtxn, self); - + pub fn clear_documents<'a>( + &'a self, + wtxn: &mut RwTxn<'a, 'a>, + update_builder: UpdateBuilder, + ) -> Result { + let builder = update_builder.clear_documents(wtxn, self); let _count = builder.execute()?; - - wtxn.commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into) + Ok(UpdateResult::Other) } pub fn update_settings_txn<'a, 'b>( @@ -302,8 +294,9 @@ impl Index { Ok(UpdateResult::Other) } - pub fn update_settings( - &self, + pub fn update_settings<'a>( + &'a self, + txn: &mut RwTxn<'a, 'a>, settings: &Settings, update_builder: UpdateBuilder, ) -> Result { @@ -313,12 +306,12 @@ impl Index { Ok(result) } - pub fn delete_documents( - &self, + pub fn delete_documents<'a>( + &'a self, + txn: &mut RwTxn<'a, 'a>, document_ids: &[String], update_builder: UpdateBuilder, ) -> Result { - let mut txn = self.write_txn()?; let mut builder = update_builder.delete_documents(&mut txn, self)?; // We ignore unexisting document ids @@ -327,9 +320,7 @@ impl Index { }); let deleted = builder.execute()?; - txn.commit() - .and(Ok(UpdateResult::DocumentDeletion { deleted })) - .map_err(Into::into) + Ok(UpdateResult::DocumentDeletion { deleted }) } } diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index fc40a5090..1194db6fb 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -7,16 +7,16 @@ use futures::stream::StreamExt; use heed::CompactionOption; use log::debug; use milli::update::UpdateBuilder; +use tokio::sync::oneshot; use tokio::task::spawn_blocking; use tokio::{fs, sync::mpsc}; use uuid::Uuid; +use crate::index::update_handler::Hello; use crate::index::{ update_handler::UpdateHandler, Checked, Document, SearchQuery, SearchResult, Settings, }; -use crate::index_controller::{ - get_arc_ownership_blocking, Failed, IndexStats, Processed, Processing, -}; +use crate::index_controller::{Aborted, Done, Failed, IndexStats, Processed, Processing, get_arc_ownership_blocking}; use crate::option::IndexerOpts; use super::error::{IndexActorError, Result}; @@ -81,11 +81,12 @@ impl IndexActor { } Update { ret, + channel, meta, data, uuid, } => { - let _ = ret.send(self.handle_update(uuid, meta, data).await); + let _ = ret.send(self.handle_update(channel, uuid, meta, data).await); } Search { ret, query, uuid } => { let _ = ret.send(self.handle_search(uuid, query).await); @@ -163,10 +164,11 @@ impl IndexActor { async fn handle_update( &self, + channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender, std::result::Result)>, uuid: Uuid, meta: Processing, data: Option, - ) -> Result> { + ) -> Result> { debug!("Processing update {}", meta.id()); let update_handler = self.update_handler.clone(); let index = match self.store.get(uuid).await? { @@ -174,7 +176,7 @@ impl IndexActor { None => self.store.create(uuid, None).await?, }; - Ok(spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?) + Ok(spawn_blocking(move || update_handler.handle_update(channel, meta, data, index)).await?) } async fn handle_settings(&self, uuid: Uuid) -> Result> { diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index ceb2a8226..e41bb1d7a 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -4,10 +4,7 @@ use std::path::{Path, PathBuf}; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use crate::{ - index::Checked, - index_controller::{IndexSettings, IndexStats, Processing}, -}; +use crate::{index::{Checked, update_handler::Hello}, index_controller::{IndexSettings, IndexStats, Processing}}; use crate::{ index::{Document, SearchQuery, SearchResult, Settings}, index_controller::{Failed, Processed}, @@ -36,13 +33,15 @@ impl IndexActorHandle for IndexActorHandleImpl { async fn update( &self, + channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender, std::result::Result)>, uuid: Uuid, meta: Processing, data: Option, - ) -> Result> { + ) -> Result), Failed>> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Update { ret, + channel, meta, data, uuid, diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs index 415b90e4b..be647aac8 100644 --- a/meilisearch-http/src/index_controller/index_actor/message.rs +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -4,6 +4,7 @@ use tokio::sync::oneshot; use uuid::Uuid; use super::error::Result as IndexResult; +use crate::index::update_handler::Hello; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::index_controller::{Failed, IndexStats, Processed, Processing}; @@ -18,9 +19,10 @@ pub enum IndexMsg { }, Update { uuid: Uuid, + channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender, std::result::Result)>, meta: Processing, data: Option, - ret: oneshot::Sender>>, + ret: oneshot::Sender), Failed>>>, }, Search { uuid: Uuid, diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index faad75e01..010e2604c 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -13,6 +13,7 @@ pub use handle_impl::IndexActorHandleImpl; use message::IndexMsg; use store::{IndexStore, MapIndexStore}; +use crate::index::update_handler::Hello; use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings}; use crate::index_controller::{Failed, IndexStats, Processed, Processing}; use error::Result; @@ -57,6 +58,7 @@ pub trait IndexActorHandle { async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; async fn update( &self, + channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender, std::result::Result)>, uuid: Uuid, meta: Processing, data: Option, diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index e23e05b52..7ddea3f99 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -2,6 +2,7 @@ mod codec; pub mod dump; use std::fs::{copy, create_dir_all, remove_file, File}; +use std::io::BufRead; use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -29,6 +30,7 @@ use codec::*; use super::error::Result; use super::UpdateMeta; use crate::helpers::EnvSizer; +use crate::index::update_handler::Hello; use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; #[allow(clippy::upper_case_acronyms)] @@ -329,13 +331,29 @@ impl UpdateStore { None => None, }; + let (sender, receiver) = std::sync::mpsc::channel(); // Process the pending update using the provided user function. - let handle = Handle::current(); - let result = - match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) { - Ok(result) => result, - Err(e) => Err(processing.fail(e.into())), + let handle = + tokio::task::spawn(index_handle.update(sender, index_uuid, processing.clone(), file)); + + let (sender2, receiver) = std::sync::mpsc::channel(); + // TODO: we should not panic here + let (sender, result) = receiver.recv().unwrap(); + let mut line = String::new(); + loop { + std::io::stdin().lock().read_line(&mut line).unwrap(); + match line.as_str() { + "commit" => { + sender.send((sender2, Hello::Commit)); + break; + } + "abort" => { + sender.send((sender2, Hello::Abort)); + break; + } + _ => (), }; + } // Once the pending update have been successfully processed // we must remove the content from the pending and processing stores and