diff --git a/meilisearch-lib/src/index/update_handler.rs b/meilisearch-lib/src/index/update_handler.rs index 8fba55341..95ae2f556 100644 --- a/meilisearch-lib/src/index/update_handler.rs +++ b/meilisearch-lib/src/index/update_handler.rs @@ -3,8 +3,8 @@ use milli::update::UpdateBuilder; use milli::CompressionType; use rayon::ThreadPool; -use crate::index_controller::update_actor::RegisterUpdate; -use crate::index_controller::{Failed, Processed, Processing}; +use crate::index_controller::updates::RegisterUpdate; +use crate::index_controller::updates::status::{Failed, Processed, Processing}; use crate::options::IndexerOpts; pub struct UpdateHandler { diff --git a/meilisearch-lib/src/index/updates.rs b/meilisearch-lib/src/index/updates.rs index 6c7ae1416..e6012f4ab 100644 --- a/meilisearch-lib/src/index/updates.rs +++ b/meilisearch-lib/src/index/updates.rs @@ -8,7 +8,7 @@ use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder}; use serde::{Deserialize, Serialize, Serializer}; use uuid::Uuid; -use crate::index_controller::UpdateResult; +use crate::index_controller::updates::status::UpdateResult; use super::Index; use super::error::Result; diff --git a/meilisearch-lib/src/index_controller/dump_actor/actor.rs b/meilisearch-lib/src/index_controller/dump_actor/actor.rs index f82101bc1..881f3e5b8 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/actor.rs @@ -7,19 +7,18 @@ use chrono::Utc; use futures::{lock::Mutex, stream::StreamExt}; use log::{error, trace}; use tokio::sync::{mpsc, oneshot, RwLock}; -use update_actor::UpdateActorHandle; use super::error::{DumpActorError, Result}; use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask}; use crate::index_controller::uuid_resolver::UuidResolverSender; -use crate::index_controller::update_actor; +use crate::index_controller::updates::UpdateSender; pub const CONCURRENT_DUMP_MSG: usize = 10; -pub struct DumpActor { +pub struct DumpActor { inbox: Option>, uuid_resolver: UuidResolverSender, - update: Update, + update: UpdateSender, dump_path: PathBuf, lock: Arc>, dump_infos: Arc>>, @@ -32,14 +31,11 @@ fn generate_uid() -> String { Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() } -impl DumpActor -where - Update: UpdateActorHandle + Send + Sync + Clone + 'static, -{ +impl DumpActor { pub fn new( inbox: mpsc::Receiver, uuid_resolver: UuidResolverSender, - update: Update, + update: UpdateSender, dump_path: impl AsRef, index_db_size: usize, update_db_size: usize, diff --git a/meilisearch-lib/src/index_controller/dump_actor/error.rs b/meilisearch-lib/src/index_controller/dump_actor/error.rs index b6bddb5ea..eb6f08c00 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/error.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/error.rs @@ -1,6 +1,6 @@ use meilisearch_error::{Code, ErrorCode}; -use crate::index_controller::update_actor::error::UpdateActorError; +use crate::index_controller::updates::error::UpdateActorError; use crate::index_controller::uuid_resolver::error::UuidResolverError; pub type Result = std::result::Result; diff --git a/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs index 544cb89c6..a629ff753 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs @@ -33,7 +33,7 @@ impl DumpActorHandleImpl { pub fn new( path: impl AsRef, uuid_resolver: UuidResolverSender, - update: crate::index_controller::update_actor::UpdateActorHandleImpl, + update: crate::index_controller::updates::UpdateSender, index_db_size: usize, update_db_size: usize, ) -> anyhow::Result { diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs index 7b7a8236c..c50e8a722 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs @@ -5,7 +5,8 @@ use log::info; use serde::{Deserialize, Serialize}; use crate::index::Index; -use crate::index_controller::{update_actor::UpdateStore, uuid_resolver::store::HeedUuidStore}; +use crate::index_controller::updates::store::UpdateStore; +use crate::index_controller::{uuid_resolver::store::HeedUuidStore}; use crate::options::IndexerOpts; #[derive(Serialize, Deserialize, Debug)] diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs index 445966a56..7db682e98 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/mod.rs @@ -16,9 +16,10 @@ pub use actor::DumpActor; pub use handle_impl::*; pub use message::DumpMsg; -use super::update_actor::UpdateActorHandle; +use super::updates::UpdateSender; use super::uuid_resolver::UuidResolverSender; use crate::index_controller::dump_actor::error::DumpActorError; +use crate::index_controller::updates::UpdateMsg; use crate::index_controller::uuid_resolver::UuidResolverMsg; use crate::options::IndexerOpts; use error::Result; @@ -151,20 +152,16 @@ pub fn load_dump( Ok(()) } -struct DumpTask

{ +struct DumpTask { path: PathBuf, uuid_resolver: UuidResolverSender, - update_handle: P, + update_handle: UpdateSender, uid: String, update_db_size: usize, index_db_size: usize, } -impl

DumpTask

-where - P: UpdateActorHandle + Send + Sync + Clone + 'static, - -{ +impl DumpTask { async fn run(self) -> Result<()> { trace!("Performing dump."); @@ -182,9 +179,7 @@ where let uuids = UuidResolverMsg::dump(&self.uuid_resolver, temp_dump_path.clone()).await?; - self.update_handle - .dump(uuids, temp_dump_path.clone()) - .await?; + UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?; let dump_path = tokio::task::spawn_blocking(move || -> Result { let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?; diff --git a/meilisearch-lib/src/index_controller/error.rs b/meilisearch-lib/src/index_controller/error.rs index d3be7d7b7..ddf698d29 100644 --- a/meilisearch-lib/src/index_controller/error.rs +++ b/meilisearch-lib/src/index_controller/error.rs @@ -7,7 +7,7 @@ use crate::index::error::IndexError; use super::dump_actor::error::DumpActorError; use super::index_actor::error::IndexActorError; -use super::update_actor::error::UpdateActorError; +use super::updates::error::UpdateActorError; use super::uuid_resolver::error::UuidResolverError; pub type Result = std::result::Result; diff --git a/meilisearch-lib/src/index_controller/index_actor/actor.rs b/meilisearch-lib/src/index_controller/index_actor/actor.rs index cee656b97..6e7d13760 100644 --- a/meilisearch-lib/src/index_controller/index_actor/actor.rs +++ b/meilisearch-lib/src/index_controller/index_actor/actor.rs @@ -14,8 +14,9 @@ use crate::index::{ update_handler::UpdateHandler, Checked, Document, SearchQuery, SearchResult, Settings, }; use crate::index_controller::{ - get_arc_ownership_blocking, Failed, IndexStats, Processed, Processing, + get_arc_ownership_blocking, IndexStats, }; +use crate::index_controller::updates::status::{Failed, Processed, Processing}; use crate::options::IndexerOpts; use super::error::{IndexActorError, Result}; diff --git a/meilisearch-lib/src/index_controller/index_actor/handle_impl.rs b/meilisearch-lib/src/index_controller/index_actor/handle_impl.rs index 8cc66edee..de295af6d 100644 --- a/meilisearch-lib/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-lib/src/index_controller/index_actor/handle_impl.rs @@ -1,4 +1,4 @@ -use crate::options::IndexerOpts; +use crate::{index_controller::updates::status::{Failed, Processed, Processing}, options::IndexerOpts}; use std::path::{Path, PathBuf}; use tokio::sync::{mpsc, oneshot}; @@ -6,11 +6,10 @@ use uuid::Uuid; use crate::{ index::Checked, - index_controller::{IndexSettings, IndexStats, Processing}, + index_controller::{IndexSettings, IndexStats}, }; use crate::{ index::{Document, SearchQuery, SearchResult, Settings}, - index_controller::{Failed, Processed}, }; use super::error::Result; diff --git a/meilisearch-lib/src/index_controller/index_actor/message.rs b/meilisearch-lib/src/index_controller/index_actor/message.rs index 1b93ec34f..55aaf5bc7 100644 --- a/meilisearch-lib/src/index_controller/index_actor/message.rs +++ b/meilisearch-lib/src/index_controller/index_actor/message.rs @@ -5,7 +5,8 @@ use uuid::Uuid; use super::error::Result as IndexResult; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{Failed, IndexStats, Processed, Processing}; +use crate::index_controller::IndexStats; +use crate::index_controller::updates::status::{Failed, Processed, Processing}; use super::{IndexMeta, IndexSettings}; diff --git a/meilisearch-lib/src/index_controller/index_actor/mod.rs b/meilisearch-lib/src/index_controller/index_actor/mod.rs index bf5833222..8f2ac4d2d 100644 --- a/meilisearch-lib/src/index_controller/index_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/index_actor/mod.rs @@ -13,10 +13,9 @@ use message::IndexMsg; use store::{IndexStore, MapIndexStore}; use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{Failed, IndexStats, Processed, Processing}; use error::Result; -use super::IndexSettings; +use super::{IndexSettings, IndexStats, updates::status::{Failed, Processed, Processing}}; mod actor; pub mod error; diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index f22fec33f..da92eca20 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -18,8 +18,6 @@ use dump_actor::DumpActorHandle; pub use dump_actor::{DumpInfo, DumpStatus}; use index_actor::IndexActorHandle; use snapshot::load_snapshot; -use update_actor::UpdateActorHandle; -pub use updates::*; use uuid_resolver::error::UuidResolverError; use crate::options::IndexerOpts; @@ -27,14 +25,15 @@ use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use error::Result; use self::dump_actor::load_dump; +use self::updates::UpdateMsg; +use self::updates::status::UpdateStatus; use self::uuid_resolver::UuidResolverMsg; mod dump_actor; pub mod error; pub mod index_actor; mod snapshot; -pub mod update_actor; -mod updates; +pub mod updates; mod uuid_resolver; pub mod update_file_store; @@ -74,7 +73,7 @@ pub struct IndexStats { pub struct IndexController { uuid_resolver: uuid_resolver::UuidResolverSender, index_handle: index_actor::IndexActorHandleImpl, - update_handle: update_actor::UpdateActorHandleImpl, + update_handle: updates::UpdateSender, dump_handle: dump_actor::DumpActorHandleImpl, } @@ -140,8 +139,10 @@ impl IndexControllerBuilder { let uuid_resolver = uuid_resolver::create_uuid_resolver(&db_path)?; let index_handle = index_actor::IndexActorHandleImpl::new(&db_path, index_size, &indexer_options)?; - let update_handle = update_actor::UpdateActorHandleImpl::new( - index_handle.clone(), + + #[allow(unreachable_code)] + let update_handle = updates::create_update_handler( + todo!(), &db_path, update_store_size, )?; @@ -235,12 +236,12 @@ impl IndexController { let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.to_string()).await; match uuid { Ok(uuid) => { - let update_result = self.update_handle.update(uuid, update).await?; + let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?; Ok(update_result) }, Err(UuidResolverError::UnexistingIndex(name)) => { let uuid = Uuid::new_v4(); - let update_result = self.update_handle.update(uuid, update).await?; + let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?; // ignore if index creation fails now, since it may already have been created let _ = self.index_handle.create_index(uuid, None).await; UuidResolverMsg::insert(&self.uuid_resolver, uuid, name).await?; @@ -378,13 +379,13 @@ impl IndexController { pub async fn update_status(&self, uid: String, id: u64) -> Result { let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; - let result = self.update_handle.update_status(uuid, id).await?; + let result = UpdateMsg::get_update(&self.update_handle, uuid, id).await?; Ok(result) } pub async fn all_update_status(&self, uid: String) -> Result> { let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; - let result = self.update_handle.get_all_updates_status(uuid).await?; + let result = UpdateMsg::list_updates(&self.update_handle, uuid).await?; Ok(result) } @@ -485,7 +486,7 @@ impl IndexController { pub async fn get_index_stats(&self, uid: String) -> Result { let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; - let update_infos = self.update_handle.get_info().await?; + let update_infos = UpdateMsg::get_info(&self.update_handle).await?; let mut stats = self.index_handle.get_index_stats(uuid).await?; // Check if the currently indexing update is from out index. stats.is_indexing = Some(Some(uuid) == update_infos.processing); @@ -493,7 +494,7 @@ impl IndexController { } pub async fn get_all_stats(&self) -> Result { - let update_infos = self.update_handle.get_info().await?; + let update_infos = UpdateMsg::get_info(&self.update_handle).await?; let mut database_size = self.get_uuids_size().await? + update_infos.size; let mut last_update: Option> = None; let mut indexes = BTreeMap::new(); diff --git a/meilisearch-lib/src/index_controller/snapshot.rs b/meilisearch-lib/src/index_controller/snapshot.rs index c2f600bbc..7c999fd74 100644 --- a/meilisearch-lib/src/index_controller/snapshot.rs +++ b/meilisearch-lib/src/index_controller/snapshot.rs @@ -132,7 +132,7 @@ mod test { use super::*; use crate::index_controller::index_actor::MockIndexActorHandle; - use crate::index_controller::update_actor::{ + use crate::index_controller::updates::{ error::UpdateActorError, MockUpdateActorHandle, UpdateActorHandleImpl, }; use crate::index_controller::uuid_resolver::{ diff --git a/meilisearch-lib/src/index_controller/update_actor/handle_impl.rs b/meilisearch-lib/src/index_controller/update_actor/handle_impl.rs deleted file mode 100644 index e1df0b5d4..000000000 --- a/meilisearch-lib/src/index_controller/update_actor/handle_impl.rs +++ /dev/null @@ -1,94 +0,0 @@ -use std::collections::HashSet; -use std::path::{Path, PathBuf}; - -use tokio::sync::{mpsc, oneshot}; -use uuid::Uuid; - -use crate::index_controller::{IndexActorHandle, Update, UpdateStatus}; - -use super::error::Result; -use super::{UpdateActor, UpdateActorHandle, UpdateMsg, UpdateStoreInfo}; - -#[derive(Clone)] -pub struct UpdateActorHandleImpl { - sender: mpsc::Sender, -} - -impl UpdateActorHandleImpl { - pub fn new( - index_handle: I, - path: impl AsRef, - update_store_size: usize, - ) -> anyhow::Result - where - I: IndexActorHandle + Clone + Sync + Send +'static, - { - let path = path.as_ref().to_owned(); - let (sender, receiver) = mpsc::channel(100); - let actor = UpdateActor::new(update_store_size, receiver, path, index_handle)?; - - tokio::task::spawn_local(actor.run()); - - Ok(Self { sender }) - } -} - -#[async_trait::async_trait] -impl UpdateActorHandle for UpdateActorHandleImpl { - async fn get_all_updates_status(&self, uuid: Uuid) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::ListUpdates { uuid, ret }; - self.sender.send(msg).await?; - receiver.await? - } - - async fn update_status(&self, uuid: Uuid, id: u64) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::GetUpdate { uuid, id, ret }; - self.sender.send(msg).await?; - receiver.await? - } - - async fn delete(&self, uuid: Uuid) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Delete { uuid, ret }; - self.sender.send(msg).await?; - receiver.await? - } - - async fn snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Snapshot { uuids, path, ret }; - self.sender.send(msg).await?; - receiver.await? - } - - async fn dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Dump { uuids, path, ret }; - self.sender.send(msg).await?; - receiver.await? - } - - async fn get_info(&self) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::GetInfo { ret }; - self.sender.send(msg).await?; - receiver.await? - } - - async fn update( - &self, - uuid: Uuid, - update: Update, - ) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Update { - uuid, - update, - ret, - }; - self.sender.send(msg).await?; - receiver.await? - } -} diff --git a/meilisearch-lib/src/index_controller/update_actor/message.rs b/meilisearch-lib/src/index_controller/update_actor/message.rs deleted file mode 100644 index 40cc3360c..000000000 --- a/meilisearch-lib/src/index_controller/update_actor/message.rs +++ /dev/null @@ -1,42 +0,0 @@ -use std::collections::HashSet; -use std::path::PathBuf; - -use tokio::sync::oneshot; -use uuid::Uuid; - -use super::error::Result; -use super::{UpdateStatus, UpdateStoreInfo, Update}; - -pub enum UpdateMsg { - Update { - uuid: Uuid, - update: Update, - ret: oneshot::Sender>, - }, - ListUpdates { - uuid: Uuid, - ret: oneshot::Sender>>, - }, - GetUpdate { - uuid: Uuid, - ret: oneshot::Sender>, - id: u64, - }, - Delete { - uuid: Uuid, - ret: oneshot::Sender>, - }, - Snapshot { - uuids: HashSet, - path: PathBuf, - ret: oneshot::Sender>, - }, - Dump { - uuids: HashSet, - path: PathBuf, - ret: oneshot::Sender>, - }, - GetInfo { - ret: oneshot::Sender>, - }, -} diff --git a/meilisearch-lib/src/index_controller/update_actor/mod.rs b/meilisearch-lib/src/index_controller/update_actor/mod.rs deleted file mode 100644 index b83cf491c..000000000 --- a/meilisearch-lib/src/index_controller/update_actor/mod.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::{collections::HashSet, path::PathBuf}; - -use milli::update::IndexDocumentsMethod; -use uuid::Uuid; -use serde::{Serialize, Deserialize}; - -use crate::index_controller::UpdateStatus; -use super::Update; - -use actor::UpdateActor; -use error::Result; -use message::UpdateMsg; - -pub use handle_impl::UpdateActorHandleImpl; -pub use store::{UpdateStore, UpdateStoreInfo}; - -mod actor; -pub mod error; -mod handle_impl; -mod message; -pub mod store; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum RegisterUpdate { - DocumentAddition { - primary_key: Option, - method: IndexDocumentsMethod, - content_uuid: Uuid, - } -} - - -#[cfg(test)] -use mockall::automock; - -#[async_trait::async_trait] -pub trait UpdateActorHandle { - async fn get_all_updates_status(&self, uuid: Uuid) -> Result>; - async fn update_status(&self, uuid: Uuid, id: u64) -> Result; - async fn delete(&self, uuid: Uuid) -> Result<()>; - async fn snapshot(&self, uuid: HashSet, path: PathBuf) -> Result<()>; - async fn dump(&self, uuids: HashSet, path: PathBuf) -> Result<()>; - async fn get_info(&self) -> Result; - async fn update( - &self, - uuid: Uuid, - update: Update, - ) -> Result; -} diff --git a/meilisearch-lib/src/index_controller/update_actor/error.rs b/meilisearch-lib/src/index_controller/updates/error.rs similarity index 100% rename from meilisearch-lib/src/index_controller/update_actor/error.rs rename to meilisearch-lib/src/index_controller/updates/error.rs diff --git a/meilisearch-lib/src/index_controller/updates/message.rs b/meilisearch-lib/src/index_controller/updates/message.rs new file mode 100644 index 000000000..fe6e1360b --- /dev/null +++ b/meilisearch-lib/src/index_controller/updates/message.rs @@ -0,0 +1,112 @@ +use std::collections::HashSet; +use std::path::PathBuf; + +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +use super::error::Result; +use super::{Update, UpdateStatus, UpdateStoreInfo}; + +pub enum UpdateMsg { + Update { + uuid: Uuid, + update: Update, + ret: oneshot::Sender>, + }, + ListUpdates { + uuid: Uuid, + ret: oneshot::Sender>>, + }, + GetUpdate { + uuid: Uuid, + ret: oneshot::Sender>, + id: u64, + }, + Delete { + uuid: Uuid, + ret: oneshot::Sender>, + }, + Snapshot { + uuids: HashSet, + path: PathBuf, + ret: oneshot::Sender>, + }, + Dump { + uuids: HashSet, + path: PathBuf, + ret: oneshot::Sender>, + }, + GetInfo { + ret: oneshot::Sender>, + }, +} + +impl UpdateMsg { + pub async fn dump( + sender: &mpsc::Sender, + uuids: HashSet, + path: PathBuf, + ) -> Result<()> { + let (ret, rcv) = oneshot::channel(); + let msg = Self::Dump { + path, + uuids, + ret, + }; + sender.send(msg).await?; + rcv.await? + } + pub async fn update( + sender: &mpsc::Sender, + uuid: Uuid, + update: Update, + ) -> Result { + let (ret, rcv) = oneshot::channel(); + let msg = Self::Update { + uuid, + update, + ret, + }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn get_update( + sender: &mpsc::Sender, + uuid: Uuid, + id: u64, + ) -> Result { + let (ret, rcv) = oneshot::channel(); + let msg = Self::GetUpdate { + uuid, + id, + ret, + }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn list_updates( + sender: &mpsc::Sender, + uuid: Uuid, + ) -> Result> { + let (ret, rcv) = oneshot::channel(); + let msg = Self::ListUpdates { + uuid, + ret, + }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn get_info( + sender: &mpsc::Sender, + ) -> Result { + let (ret, rcv) = oneshot::channel(); + let msg = Self::GetInfo { + ret, + }; + sender.send(msg).await?; + rcv.await? + } +} diff --git a/meilisearch-lib/src/index_controller/update_actor/actor.rs b/meilisearch-lib/src/index_controller/updates/mod.rs similarity index 71% rename from meilisearch-lib/src/index_controller/update_actor/actor.rs rename to meilisearch-lib/src/index_controller/updates/mod.rs index 01e34e000..f281250a6 100644 --- a/meilisearch-lib/src/index_controller/update_actor/actor.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -1,3 +1,8 @@ +pub mod error; +mod message; +pub mod status; +pub mod store; + use std::collections::HashSet; use std::io; use std::path::{Path, PathBuf}; @@ -10,25 +15,47 @@ use bytes::Bytes; use futures::{Stream, StreamExt}; use log::trace; use milli::documents::DocumentBatchBuilder; +use milli::update::IndexDocumentsMethod; +use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use tokio::sync::mpsc; use uuid::Uuid; -use super::error::{Result, UpdateActorError}; -use super::RegisterUpdate; -use super::{UpdateMsg, UpdateStore, UpdateStoreInfo, Update}; -use crate::index_controller::index_actor::IndexActorHandle; +use self::error::{Result, UpdateActorError}; +pub use self::message::UpdateMsg; +use self::store::{UpdateStore, UpdateStoreInfo}; use crate::index_controller::update_file_store::UpdateFileStore; -use crate::index_controller::{DocumentAdditionFormat, Payload, UpdateStatus}; +use status::UpdateStatus; -pub struct UpdateActor { - store: Arc, - inbox: Option>, - update_file_store: UpdateFileStore, - index_handle: I, - must_exit: Arc, +use super::{DocumentAdditionFormat, Payload, Update}; + +pub type UpdateSender = mpsc::Sender; +type IndexSender = mpsc::Sender<()>; + +pub fn create_update_handler( + index_sender: IndexSender, + db_path: impl AsRef, + update_store_size: usize, +) -> anyhow::Result { + let path = db_path.as_ref().to_owned(); + let (sender, receiver) = mpsc::channel(100); + let actor = UpdateHandler::new(update_store_size, receiver, path, index_sender)?; + + tokio::task::spawn_local(actor.run()); + + Ok(sender) } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum RegisterUpdate { + DocumentAddition { + primary_key: Option, + method: IndexDocumentsMethod, + content_uuid: Uuid, + }, +} + +/// A wrapper type to implement read on a `Stream>`. struct StreamReader { stream: S, current: Option, @@ -36,13 +63,18 @@ struct StreamReader { impl StreamReader { fn new(stream: S) -> Self { - Self { stream, current: None } + Self { + stream, + current: None, + } } - } -impl> + Unpin> io::Read for StreamReader { +impl> + Unpin> io::Read + for StreamReader +{ fn read(&mut self, buf: &mut [u8]) -> io::Result { + // TODO: optimize buf filling match self.current.take() { Some(mut bytes) => { let copied = bytes.split_to(buf.len()); @@ -52,29 +84,32 @@ impl> + Unpin> io::Rea } Ok(copied.len()) } - None => { - match tokio::runtime::Handle::current().block_on(self.stream.next()) { - Some(Ok(bytes)) => { - self.current.replace(bytes); - self.read(buf) - }, - Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)), - None => return Ok(0), + None => match tokio::runtime::Handle::current().block_on(self.stream.next()) { + Some(Ok(bytes)) => { + self.current.replace(bytes); + self.read(buf) } - } + Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)), + None => return Ok(0), + }, } } } -impl UpdateActor -where - I: IndexActorHandle + Clone + Sync + Send + 'static, -{ +pub struct UpdateHandler { + store: Arc, + inbox: Option>, + update_file_store: UpdateFileStore, + index_handle: IndexSender, + must_exit: Arc, +} + +impl UpdateHandler { pub fn new( update_db_size: usize, inbox: mpsc::Receiver, path: impl AsRef, - index_handle: I, + index_handle: IndexSender, ) -> anyhow::Result { let path = path.as_ref().to_owned(); std::fs::create_dir_all(&path)?; @@ -88,14 +123,14 @@ where let inbox = Some(inbox); - let update_file_store = UpdateFileStore::new(&path).unwrap(); + let update_file_store = UpdateFileStore::new(&path).unwrap(); Ok(Self { store, inbox, index_handle, must_exit, - update_file_store + update_file_store, }) } @@ -128,11 +163,7 @@ where stream .for_each_concurrent(Some(10), |msg| async { match msg { - Update { - uuid, - update, - ret, - } => { + Update { uuid, update, ret } => { let _ = ret.send(self.handle_update(uuid, update).await); } ListUpdates { uuid, ret } => { @@ -158,23 +189,30 @@ where .await; } - async fn handle_update( - &self, - index_uuid: Uuid, - update: Update, - ) -> Result { + async fn handle_update(&self, index_uuid: Uuid, update: Update) -> Result { let registration = match update { - Update::DocumentAddition { payload, primary_key, method, format } => { + Update::DocumentAddition { + payload, + primary_key, + method, + format, + } => { let content_uuid = match format { DocumentAdditionFormat::Json => self.documents_from_json(payload).await?, }; - RegisterUpdate::DocumentAddition { primary_key, method, content_uuid } + RegisterUpdate::DocumentAddition { + primary_key, + method, + content_uuid, + } } }; let store = self.store.clone(); - let status = tokio::task::spawn_blocking(move || store.register_update(index_uuid, registration)).await??; + let status = + tokio::task::spawn_blocking(move || store.register_update(index_uuid, registration)) + .await??; Ok(status.into()) } @@ -185,14 +223,16 @@ where let (uuid, mut file) = file_store.new_update().unwrap(); let mut builder = DocumentBatchBuilder::new(&mut *file).unwrap(); - let documents: Vec> = serde_json::from_reader(StreamReader::new(payload))?; + let documents: Vec> = + serde_json::from_reader(StreamReader::new(payload))?; builder.add_documents(documents).unwrap(); builder.finish().unwrap(); file.persist(); Ok(uuid) - }).await? + }) + .await? } async fn handle_list_updates(&self, uuid: Uuid) -> Result> { @@ -256,5 +296,4 @@ where Ok(info) } - } diff --git a/meilisearch-lib/src/index_controller/updates.rs b/meilisearch-lib/src/index_controller/updates/status.rs similarity index 98% rename from meilisearch-lib/src/index_controller/updates.rs rename to meilisearch-lib/src/index_controller/updates/status.rs index efe48e5e5..7716473ab 100644 --- a/meilisearch-lib/src/index_controller/updates.rs +++ b/meilisearch-lib/src/index_controller/updates/status.rs @@ -6,9 +6,7 @@ use meilisearch_error::{Code, ErrorCode}; use milli::update::{DocumentAdditionResult, IndexDocumentsMethod}; use serde::{Deserialize, Serialize}; -use crate::index::{Settings, Unchecked}; - -use super::update_actor::RegisterUpdate; +use crate::{RegisterUpdate, index::{Settings, Unchecked}}; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UpdateResult { diff --git a/meilisearch-lib/src/index_controller/update_actor/store/codec.rs b/meilisearch-lib/src/index_controller/updates/store/codec.rs similarity index 100% rename from meilisearch-lib/src/index_controller/update_actor/store/codec.rs rename to meilisearch-lib/src/index_controller/updates/store/codec.rs diff --git a/meilisearch-lib/src/index_controller/update_actor/store/dump.rs b/meilisearch-lib/src/index_controller/updates/store/dump.rs similarity index 95% rename from meilisearch-lib/src/index_controller/update_actor/store/dump.rs rename to meilisearch-lib/src/index_controller/updates/store/dump.rs index 5f3605999..ccb09a309 100644 --- a/meilisearch-lib/src/index_controller/update_actor/store/dump.rs +++ b/meilisearch-lib/src/index_controller/updates/store/dump.rs @@ -10,10 +10,7 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::{Result, State, UpdateStore}; -use crate::index_controller::{ - index_actor::IndexActorHandle, - UpdateStatus, -}; +use crate::index_controller::{updates::{IndexSender, status::UpdateStatus}}; #[derive(Serialize, Deserialize)] struct UpdateEntry { @@ -26,7 +23,7 @@ impl UpdateStore { &self, uuids: &HashSet, path: PathBuf, - handle: impl IndexActorHandle, + handle: IndexSender, ) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Dumping); @@ -175,11 +172,12 @@ impl UpdateStore { async fn dump_indexes( uuids: &HashSet, - handle: impl IndexActorHandle, + handle: IndexSender, path: impl AsRef, ) -> Result<()> { for uuid in uuids { - handle.dump(*uuid, path.as_ref().to_owned()).await?; + //handle.dump(*uuid, path.as_ref().to_owned()).await?; + todo!() } Ok(()) diff --git a/meilisearch-lib/src/index_controller/update_actor/store/mod.rs b/meilisearch-lib/src/index_controller/updates/store/mod.rs similarity index 98% rename from meilisearch-lib/src/index_controller/update_actor/store/mod.rs rename to meilisearch-lib/src/index_controller/updates/store/mod.rs index 62fcbd5ad..be8c5f859 100644 --- a/meilisearch-lib/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/store/mod.rs @@ -28,9 +28,10 @@ use codec::*; use super::RegisterUpdate; use super::error::Result; +use super::status::{Enqueued, Processing}; use crate::EnvSizer; use crate::index_controller::update_files_path; -use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; +use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*}; #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; @@ -145,7 +146,7 @@ impl UpdateStore { pub fn open( options: EnvOpenOptions, path: impl AsRef, - index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, + index_handle: IndexSender, must_exit: Arc, ) -> anyhow::Result> { let (update_store, mut notification_receiver) = Self::new(options, path)?; @@ -283,7 +284,7 @@ impl UpdateStore { /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, index_handle: impl IndexActorHandle) -> Result> { + fn process_pending_update(&self, index_handle: IndexSender) -> Result> { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; let first_meta = self.pending_queue.first(&rtxn)?; @@ -313,7 +314,7 @@ impl UpdateStore { fn perform_update( &self, processing: Processing, - index_handle: impl IndexActorHandle, + index_handle: IndexSender, index_uuid: Uuid, global_id: u64, ) -> Result> { @@ -321,7 +322,7 @@ impl UpdateStore { let handle = Handle::current(); let update_id = processing.id(); let result = - match handle.block_on(index_handle.update(index_uuid, processing.clone())) { + match handle.block_on(/*index_handle.update(index_uuid, processing.clone())*/ todo!()) { Ok(result) => result, Err(e) => Err(processing.fail(e)), }; @@ -483,7 +484,7 @@ impl UpdateStore { &self, uuids: &HashSet, path: impl AsRef, - handle: impl IndexActorHandle + Clone, + handle: IndexSender, ) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Snapshoting); @@ -524,7 +525,7 @@ impl UpdateStore { // Perform the snapshot of each index concurently. Only a third of the capabilities of // the index actor at a time not to put too much pressure on the index actor let mut stream = futures::stream::iter(uuids.iter()) - .map(move |uuid| handle.snapshot(*uuid, path.clone())) + .map(move |uuid| todo!() /*handle.snapshot(*uuid, path.clone())*/) .buffer_unordered(CONCURRENT_INDEX_MSG / 3); Handle::current().block_on(async { diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs index 9f6be4361..64f93695e 100644 --- a/meilisearch-lib/src/lib.rs +++ b/meilisearch-lib/src/lib.rs @@ -5,7 +5,7 @@ pub mod options; pub mod index; pub mod index_controller; -pub use index_controller::{UpdateResult, UpdateStatus, IndexController as MeiliSearch, update_actor::RegisterUpdate}; +pub use index_controller::{IndexController as MeiliSearch, updates::RegisterUpdate}; use walkdir::WalkDir;