From 5353be74c351cb524226fb211aae4982ee258b60 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 22 Sep 2021 15:07:04 +0200 Subject: [PATCH] refactor index actor --- Cargo.lock | 12 + meilisearch-http/src/routes/mod.rs | 3 +- meilisearch-lib/Cargo.toml | 1 + meilisearch-lib/src/index/updates.rs | 3 + meilisearch-lib/src/index_controller/error.rs | 2 +- .../index_actor/handle_impl.rs | 161 --------- .../index_controller/index_actor/message.rs | 74 ----- .../src/index_controller/index_actor/mod.rs | 166 ---------- .../{index_actor => indexes}/error.rs | 16 + .../src/index_controller/indexes/message.rs | 212 ++++++++++++ .../{index_actor/actor.rs => indexes/mod.rs} | 169 ++++++++-- .../{index_actor => indexes}/store.rs | 0 meilisearch-lib/src/index_controller/mod.rs | 312 +++++++++--------- .../src/index_controller/updates/error.rs | 15 +- .../src/index_controller/updates/message.rs | 1 + .../src/index_controller/updates/mod.rs | 14 +- .../index_controller/updates/store/dump.rs | 9 +- .../src/index_controller/updates/store/mod.rs | 16 +- 18 files changed, 590 insertions(+), 596 deletions(-) delete mode 100644 meilisearch-lib/src/index_controller/index_actor/handle_impl.rs delete mode 100644 meilisearch-lib/src/index_controller/index_actor/message.rs delete mode 100644 meilisearch-lib/src/index_controller/index_actor/mod.rs rename meilisearch-lib/src/index_controller/{index_actor => indexes}/error.rs (77%) create mode 100644 meilisearch-lib/src/index_controller/indexes/message.rs rename meilisearch-lib/src/index_controller/{index_actor/actor.rs => indexes/mod.rs} (70%) rename meilisearch-lib/src/index_controller/{index_actor => indexes}/store.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 33660f836..d08b1a83c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -769,6 +769,17 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2 1.0.29", + "quote 1.0.9", + "syn 1.0.76", +] + [[package]] name = "derive_more" version = "0.99.16" @@ -1674,6 +1685,7 @@ dependencies = [ "bytes", "chrono", "crossbeam-channel", + "derivative", "either", "env_logger", "erased-serde", diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index 6c99d1766..a38689bd9 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -3,9 +3,10 @@ use std::time::Duration; use actix_web::{web, HttpResponse}; use chrono::{DateTime, Utc}; use log::debug; +use meilisearch_lib::index_controller::updates::status::{UpdateResult, UpdateStatus}; use serde::{Deserialize, Serialize}; -use meilisearch_lib::{MeiliSearch, UpdateResult, UpdateStatus, RegisterUpdate}; +use meilisearch_lib::{MeiliSearch, RegisterUpdate}; use meilisearch_lib::index::{Settings, Unchecked}; use crate::error::ResponseError; diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml index 7ef4ecad7..0d9f6520b 100644 --- a/meilisearch-lib/Cargo.toml +++ b/meilisearch-lib/Cargo.toml @@ -61,6 +61,7 @@ serdeval = "0.1.0" sysinfo = "0.20.2" tokio-stream = "0.1.7" erased-serde = "0.3.16" +derivative = "2.2.0" [dev-dependencies] actix-rt = "2.2.0" diff --git a/meilisearch-lib/src/index/updates.rs b/meilisearch-lib/src/index/updates.rs index e6012f4ab..c83862f9b 100644 --- a/meilisearch-lib/src/index/updates.rs +++ b/meilisearch-lib/src/index/updates.rs @@ -35,6 +35,9 @@ pub struct Checked; #[derive(Clone, Default, Debug, Serialize, Deserialize)] pub struct Unchecked; +/// Holds all the settings for an index. `T` can either be `Checked` if they represents settings +/// whose validity is guaranteed, or `Unchecked` if they need to be validated. In the later case, a +/// call to `check` will return a `Settings` from a `Settings`. #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] diff --git a/meilisearch-lib/src/index_controller/error.rs b/meilisearch-lib/src/index_controller/error.rs index ddf698d29..8c60e9103 100644 --- a/meilisearch-lib/src/index_controller/error.rs +++ b/meilisearch-lib/src/index_controller/error.rs @@ -6,7 +6,7 @@ use meilisearch_error::ErrorCode; use crate::index::error::IndexError; use super::dump_actor::error::DumpActorError; -use super::index_actor::error::IndexActorError; +use super::indexes::error::IndexActorError; use super::updates::error::UpdateActorError; use super::uuid_resolver::error::UuidResolverError; diff --git a/meilisearch-lib/src/index_controller/index_actor/handle_impl.rs b/meilisearch-lib/src/index_controller/index_actor/handle_impl.rs deleted file mode 100644 index de295af6d..000000000 --- a/meilisearch-lib/src/index_controller/index_actor/handle_impl.rs +++ /dev/null @@ -1,161 +0,0 @@ -use crate::{index_controller::updates::status::{Failed, Processed, Processing}, options::IndexerOpts}; -use std::path::{Path, PathBuf}; - -use tokio::sync::{mpsc, oneshot}; -use uuid::Uuid; - -use crate::{ - index::Checked, - index_controller::{IndexSettings, IndexStats}, -}; -use crate::{ - index::{Document, SearchQuery, SearchResult, Settings}, -}; - -use super::error::Result; -use super::{IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore}; - -#[derive(Clone)] -pub struct IndexActorHandleImpl { - sender: mpsc::Sender, -} - -#[async_trait::async_trait] -impl IndexActorHandle for IndexActorHandleImpl { - async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::CreateIndex { - ret, - uuid, - primary_key, - }; - let _ = self.sender.send(msg).await; - receiver.await.expect("IndexActor has been killed") - } - - async fn update( - &self, - uuid: Uuid, - meta: Processing, - ) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Update { - ret, - meta, - uuid, - }; - let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Search { uuid, query, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - async fn settings(&self, uuid: Uuid) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Settings { uuid, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - async fn documents( - &self, - uuid: Uuid, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Documents { - uuid, - ret, - offset, - attributes_to_retrieve, - limit, - }; - let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - async fn document( - &self, - uuid: Uuid, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Document { - uuid, - ret, - doc_id, - attributes_to_retrieve, - }; - let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - async fn delete(&self, uuid: Uuid) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Delete { uuid, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - async fn get_index_meta(&self, uuid: Uuid) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::GetMeta { uuid, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::UpdateIndex { - uuid, - index_settings, - ret, - }; - let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Snapshot { uuid, path, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Dump { uuid, path, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - async fn get_index_stats(&self, uuid: Uuid) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::GetStats { uuid, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } -} - -impl IndexActorHandleImpl { - pub fn new( - path: impl AsRef, - index_size: usize, - options: &IndexerOpts, - ) -> anyhow::Result { - let (sender, receiver) = mpsc::channel(100); - - let store = MapIndexStore::new(&path, index_size); - let actor = IndexActor::new(receiver, store, options)?; - tokio::task::spawn(actor.run()); - Ok(Self { sender }) - } -} diff --git a/meilisearch-lib/src/index_controller/index_actor/message.rs b/meilisearch-lib/src/index_controller/index_actor/message.rs deleted file mode 100644 index 55aaf5bc7..000000000 --- a/meilisearch-lib/src/index_controller/index_actor/message.rs +++ /dev/null @@ -1,74 +0,0 @@ -use std::path::PathBuf; - -use tokio::sync::oneshot; -use uuid::Uuid; - -use super::error::Result as IndexResult; -use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; -use crate::index_controller::IndexStats; -use crate::index_controller::updates::status::{Failed, Processed, Processing}; - -use super::{IndexMeta, IndexSettings}; - -#[allow(clippy::large_enum_variant)] -pub enum IndexMsg { - CreateIndex { - uuid: Uuid, - primary_key: Option, - ret: oneshot::Sender>, - }, - Update { - uuid: Uuid, - meta: Processing, - ret: oneshot::Sender>>, - }, - Search { - uuid: Uuid, - query: SearchQuery, - ret: oneshot::Sender>, - }, - Settings { - uuid: Uuid, - ret: oneshot::Sender>>, - }, - Documents { - uuid: Uuid, - attributes_to_retrieve: Option>, - offset: usize, - limit: usize, - ret: oneshot::Sender>>, - }, - Document { - uuid: Uuid, - attributes_to_retrieve: Option>, - doc_id: String, - ret: oneshot::Sender>, - }, - Delete { - uuid: Uuid, - ret: oneshot::Sender>, - }, - GetMeta { - uuid: Uuid, - ret: oneshot::Sender>, - }, - UpdateIndex { - uuid: Uuid, - index_settings: IndexSettings, - ret: oneshot::Sender>, - }, - Snapshot { - uuid: Uuid, - path: PathBuf, - ret: oneshot::Sender>, - }, - Dump { - uuid: Uuid, - path: PathBuf, - ret: oneshot::Sender>, - }, - GetStats { - uuid: Uuid, - ret: oneshot::Sender>, - }, -} diff --git a/meilisearch-lib/src/index_controller/index_actor/mod.rs b/meilisearch-lib/src/index_controller/index_actor/mod.rs deleted file mode 100644 index 8f2ac4d2d..000000000 --- a/meilisearch-lib/src/index_controller/index_actor/mod.rs +++ /dev/null @@ -1,166 +0,0 @@ -use std::path::PathBuf; - -use chrono::{DateTime, Utc}; -#[cfg(test)] -use mockall::automock; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use actor::IndexActor; -pub use actor::CONCURRENT_INDEX_MSG; -pub use handle_impl::IndexActorHandleImpl; -use message::IndexMsg; -use store::{IndexStore, MapIndexStore}; - -use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings}; -use error::Result; - -use super::{IndexSettings, IndexStats, updates::status::{Failed, Processed, Processing}}; - -mod actor; -pub mod error; -mod handle_impl; -mod message; -mod store; - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct IndexMeta { - created_at: DateTime, - pub updated_at: DateTime, - pub primary_key: Option, -} - -impl IndexMeta { - fn new(index: &Index) -> Result { - let txn = index.read_txn()?; - Self::new_txn(index, &txn) - } - - fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { - let created_at = index.created_at(txn)?; - let updated_at = index.updated_at(txn)?; - let primary_key = index.primary_key(txn)?.map(String::from); - Ok(Self { - created_at, - updated_at, - primary_key, - }) - } -} - -#[async_trait::async_trait] -#[cfg_attr(test, automock)] -pub trait IndexActorHandle { - async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; - async fn update( - &self, - uuid: Uuid, - meta: Processing, - ) -> Result>; - async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result; - async fn settings(&self, uuid: Uuid) -> Result>; - - async fn documents( - &self, - uuid: Uuid, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result>; - async fn document( - &self, - uuid: Uuid, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result; - async fn delete(&self, uuid: Uuid) -> Result<()>; - async fn get_index_meta(&self, uuid: Uuid) -> Result; - async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result; - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; - async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()>; - async fn get_index_stats(&self, uuid: Uuid) -> Result; -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use super::*; - - #[async_trait::async_trait] - /// Useful for passing around an `Arc` in tests. - impl IndexActorHandle for Arc { - async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { - self.as_ref().create_index(uuid, primary_key).await - } - - async fn update( - &self, - uuid: Uuid, - meta: Processing, - data: Option, - ) -> Result> { - self.as_ref().update(uuid, meta, data).await - } - - async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { - self.as_ref().search(uuid, query).await - } - - async fn settings(&self, uuid: Uuid) -> Result> { - self.as_ref().settings(uuid).await - } - - async fn documents( - &self, - uuid: Uuid, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result> { - self.as_ref() - .documents(uuid, offset, limit, attributes_to_retrieve) - .await - } - - async fn document( - &self, - uuid: Uuid, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result { - self.as_ref() - .document(uuid, doc_id, attributes_to_retrieve) - .await - } - - async fn delete(&self, uuid: Uuid) -> Result<()> { - self.as_ref().delete(uuid).await - } - - async fn get_index_meta(&self, uuid: Uuid) -> Result { - self.as_ref().get_index_meta(uuid).await - } - - async fn update_index( - &self, - uuid: Uuid, - index_settings: IndexSettings, - ) -> Result { - self.as_ref().update_index(uuid, index_settings).await - } - - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - self.as_ref().snapshot(uuid, path).await - } - - async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - self.as_ref().dump(uuid, path).await - } - - async fn get_index_stats(&self, uuid: Uuid) -> Result { - self.as_ref().get_index_stats(uuid).await - } - } -} diff --git a/meilisearch-lib/src/index_controller/index_actor/error.rs b/meilisearch-lib/src/index_controller/indexes/error.rs similarity index 77% rename from meilisearch-lib/src/index_controller/index_actor/error.rs rename to meilisearch-lib/src/index_controller/indexes/error.rs index 12a81796b..51fe273f7 100644 --- a/meilisearch-lib/src/index_controller/index_actor/error.rs +++ b/meilisearch-lib/src/index_controller/indexes/error.rs @@ -1,3 +1,5 @@ +use std::fmt; + use meilisearch_error::{Code, ErrorCode}; use crate::{error::MilliError, index::error::IndexError}; @@ -20,6 +22,20 @@ pub enum IndexActorError { Milli(#[from] milli::Error), } +impl From> for IndexActorError +where T: Send + Sync + 'static + fmt::Debug +{ + fn from(other: tokio::sync::mpsc::error::SendError) -> Self { + Self::Internal(Box::new(other)) + } +} + +impl From for IndexActorError { + fn from(other: tokio::sync::oneshot::error::RecvError) -> Self { + Self::Internal(Box::new(other)) + } +} + macro_rules! internal_error { ($($other:path), *) => { $( diff --git a/meilisearch-lib/src/index_controller/indexes/message.rs b/meilisearch-lib/src/index_controller/indexes/message.rs new file mode 100644 index 000000000..e9c67d0ab --- /dev/null +++ b/meilisearch-lib/src/index_controller/indexes/message.rs @@ -0,0 +1,212 @@ +use std::path::PathBuf; + +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +use super::error::Result; +use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; +use crate::index_controller::updates::status::{Failed, Processed, Processing}; +use crate::index_controller::{IndexSettings, IndexStats}; + +use super::IndexMeta; + +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum IndexMsg { + CreateIndex { + uuid: Uuid, + primary_key: Option, + ret: oneshot::Sender>, + }, + Update { + uuid: Uuid, + meta: Processing, + ret: oneshot::Sender>>, + }, + Search { + uuid: Uuid, + query: SearchQuery, + ret: oneshot::Sender>, + }, + Settings { + uuid: Uuid, + ret: oneshot::Sender>>, + }, + Documents { + uuid: Uuid, + attributes_to_retrieve: Option>, + offset: usize, + limit: usize, + ret: oneshot::Sender>>, + }, + Document { + uuid: Uuid, + attributes_to_retrieve: Option>, + doc_id: String, + ret: oneshot::Sender>, + }, + Delete { + uuid: Uuid, + ret: oneshot::Sender>, + }, + GetMeta { + uuid: Uuid, + ret: oneshot::Sender>, + }, + UpdateIndex { + uuid: Uuid, + index_settings: IndexSettings, + ret: oneshot::Sender>, + }, + Snapshot { + uuid: Uuid, + path: PathBuf, + ret: oneshot::Sender>, + }, + Dump { + uuid: Uuid, + path: PathBuf, + ret: oneshot::Sender>, + }, + GetStats { + uuid: Uuid, + ret: oneshot::Sender>, + }, +} + +impl IndexMsg { + pub async fn search( + sender: &mpsc::Sender, + uuid: Uuid, + query: SearchQuery, + ) -> Result { + let (ret, rcv) = oneshot::channel(); + let msg = Self::Search { + ret, + uuid, + query, + }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn update_index( + sender: &mpsc::Sender, + uuid: Uuid, + index_settings: IndexSettings, + ) -> Result { + let (ret, rcv) = oneshot::channel(); + let msg = Self::UpdateIndex { + ret, + uuid, + index_settings, + }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn create_index( + sender: &mpsc::Sender, + uuid: Uuid, + primary_key: Option, + ) -> Result { + let (ret, rcv) = oneshot::channel(); + let msg = Self::CreateIndex { + ret, + uuid, + primary_key, + }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn index_meta(sender: &mpsc::Sender, uuid: Uuid) -> Result { + let (ret, rcv) = oneshot::channel(); + let msg = Self::GetMeta { ret, uuid }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn index_stats(sender: &mpsc::Sender, uuid: Uuid) -> Result { + let (ret, rcv) = oneshot::channel(); + let msg = Self::GetStats { ret, uuid }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn settings(sender: &mpsc::Sender, uuid: Uuid) -> Result> { + let (ret, rcv) = oneshot::channel(); + let msg = Self::Settings { ret, uuid }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn documents( + sender: &mpsc::Sender, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result> { + let (ret, rcv) = oneshot::channel(); + let msg = Self::Documents { + ret, + uuid, + attributes_to_retrieve, + offset, + limit, + }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn document( + sender: &mpsc::Sender, + uuid: Uuid, + attributes_to_retrieve: Option>, + doc_id: String, + ) -> Result { + let (ret, rcv) = oneshot::channel(); + let msg = Self::Document { + ret, + uuid, + attributes_to_retrieve, + doc_id, + }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn update(sender: &mpsc::Sender, uuid: Uuid, meta: Processing) -> Result> { + let (ret, rcv) = oneshot::channel(); + let msg = Self::Update { + ret, + uuid, + meta, + }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn snapshot(sender: &mpsc::Sender, uuid: Uuid, path: PathBuf) -> Result<()> { + let (ret, rcv) = oneshot::channel(); + let msg = Self::Snapshot { + uuid, + path, + ret, + }; + sender.send(msg).await?; + rcv.await? + } + + pub async fn dump(sender: &mpsc::Sender, uuid: Uuid, path: PathBuf) -> Result<()> { + let (ret, rcv) = oneshot::channel(); + let msg = Self::Dump { + uuid, + ret, + path, + }; + sender.send(msg).await?; + rcv.await? + } +} diff --git a/meilisearch-lib/src/index_controller/index_actor/actor.rs b/meilisearch-lib/src/index_controller/indexes/mod.rs similarity index 70% rename from meilisearch-lib/src/index_controller/index_actor/actor.rs rename to meilisearch-lib/src/index_controller/indexes/mod.rs index 6e7d13760..bac492364 100644 --- a/meilisearch-lib/src/index_controller/index_actor/actor.rs +++ b/meilisearch-lib/src/index_controller/indexes/mod.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use async_stream::stream; @@ -8,22 +8,74 @@ use log::debug; use milli::update::UpdateBuilder; use tokio::task::spawn_blocking; use tokio::{fs, sync::mpsc}; -use uuid::Uuid; -use crate::index::{ - update_handler::UpdateHandler, Checked, Document, SearchQuery, SearchResult, Settings, -}; -use crate::index_controller::{ - get_arc_ownership_blocking, IndexStats, -}; +use crate::index::update_handler::UpdateHandler; use crate::index_controller::updates::status::{Failed, Processed, Processing}; +use crate::index_controller::{get_arc_ownership_blocking, IndexStats}; use crate::options::IndexerOpts; -use super::error::{IndexActorError, Result}; -use super::{IndexMeta, IndexMsg, IndexSettings, IndexStore}; - pub const CONCURRENT_INDEX_MSG: usize = 10; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +pub use message::IndexMsg; + +use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings}; +use error::Result; + +use self::error::IndexActorError; +use self::store::{IndexStore, MapIndexStore}; + +use super::IndexSettings; + +pub mod error; +mod message; +mod store; + +pub type IndexHandlerSender = mpsc::Sender; + +pub fn create_indexes_handler( + db_path: impl AsRef, + index_size: usize, + indexer_options: &IndexerOpts, +) -> anyhow::Result { + let (sender, receiver) = mpsc::channel(100); + let store = MapIndexStore::new(&db_path, index_size); + let actor = IndexActor::new(receiver, store, indexer_options)?; + + tokio::task::spawn(actor.run()); + + Ok(sender) +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct IndexMeta { + created_at: DateTime, + pub updated_at: DateTime, + pub primary_key: Option, +} + +impl IndexMeta { + fn new(index: &Index) -> Result { + let txn = index.read_txn()?; + Self::new_txn(index, &txn) + } + + fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { + let created_at = index.created_at(txn)?; + let updated_at = index.updated_at(txn)?; + let primary_key = index.primary_key(txn)?.map(String::from); + Ok(Self { + created_at, + updated_at, + primary_key, + }) + } +} + pub struct IndexActor { receiver: Option>, update_handler: Arc, @@ -31,15 +83,15 @@ pub struct IndexActor { } impl IndexActor -where S: IndexStore + Sync + Send, +where + S: IndexStore + Sync + Send, { pub fn new( receiver: mpsc::Receiver, store: S, options: &IndexerOpts, ) -> anyhow::Result { - let update_handler = UpdateHandler::new(options)?; - let update_handler = Arc::new(update_handler); + let update_handler = Arc::new(UpdateHandler::new(options)?); let receiver = Some(receiver); Ok(Self { @@ -82,11 +134,7 @@ where S: IndexStore + Sync + Send, } => { let _ = ret.send(self.handle_create_index(uuid, primary_key).await); } - Update { - ret, - meta, - uuid, - } => { + Update { ret, meta, uuid } => { let _ = ret.send(self.handle_update(uuid, meta).await); } Search { ret, query, uuid } => { @@ -350,3 +398,86 @@ where S: IndexStore + Sync + Send, .await? } } + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use super::*; + + #[async_trait::async_trait] + /// Useful for passing around an `Arc` in tests. + impl IndexActorHandle for Arc { + async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { + self.as_ref().create_index(uuid, primary_key).await + } + + async fn update( + &self, + uuid: Uuid, + meta: Processing, + data: Option, + ) -> Result> { + self.as_ref().update(uuid, meta, data).await + } + + async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { + self.as_ref().search(uuid, query).await + } + + async fn settings(&self, uuid: Uuid) -> Result> { + self.as_ref().settings(uuid).await + } + + async fn documents( + &self, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result> { + self.as_ref() + .documents(uuid, offset, limit, attributes_to_retrieve) + .await + } + + async fn document( + &self, + uuid: Uuid, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result { + self.as_ref() + .document(uuid, doc_id, attributes_to_retrieve) + .await + } + + async fn delete(&self, uuid: Uuid) -> Result<()> { + self.as_ref().delete(uuid).await + } + + async fn get_index_meta(&self, uuid: Uuid) -> Result { + self.as_ref().get_index_meta(uuid).await + } + + async fn update_index( + &self, + uuid: Uuid, + index_settings: IndexSettings, + ) -> Result { + self.as_ref().update_index(uuid, index_settings).await + } + + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + self.as_ref().snapshot(uuid, path).await + } + + async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + self.as_ref().dump(uuid, path).await + } + + async fn get_index_stats(&self, uuid: Uuid) -> Result { + self.as_ref().get_index_stats(uuid).await + } + } +} diff --git a/meilisearch-lib/src/index_controller/index_actor/store.rs b/meilisearch-lib/src/index_controller/indexes/store.rs similarity index 100% rename from meilisearch-lib/src/index_controller/index_actor/store.rs rename to meilisearch-lib/src/index_controller/indexes/store.rs diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index da92eca20..bd3f4c07b 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -8,36 +8,38 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::Stream; use log::info; -use milli::FieldDistribution; use milli::update::IndexDocumentsMethod; +use milli::FieldDistribution; use serde::{Deserialize, Serialize}; use tokio::time::sleep; use uuid::Uuid; use dump_actor::DumpActorHandle; pub use dump_actor::{DumpInfo, DumpStatus}; -use index_actor::IndexActorHandle; use snapshot::load_snapshot; use uuid_resolver::error::UuidResolverError; -use crate::options::IndexerOpts; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; +use crate::options::IndexerOpts; use error::Result; use self::dump_actor::load_dump; -use self::updates::UpdateMsg; +use self::indexes::IndexMsg; use self::updates::status::UpdateStatus; +use self::updates::UpdateMsg; use self::uuid_resolver::UuidResolverMsg; mod dump_actor; pub mod error; -pub mod index_actor; +pub mod indexes; mod snapshot; +pub mod update_file_store; pub mod updates; mod uuid_resolver; -pub mod update_file_store; -pub type Payload = Box> + Send + Sync + 'static + Unpin>; +pub type Payload = Box< + dyn Stream> + Send + Sync + 'static + Unpin, +>; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] @@ -47,7 +49,7 @@ pub struct IndexMetadata { pub uid: String, name: String, #[serde(flatten)] - pub meta: index_actor::IndexMeta, + pub meta: indexes::IndexMeta, } #[derive(Clone, Debug)] @@ -72,16 +74,16 @@ pub struct IndexStats { #[derive(Clone)] pub struct IndexController { uuid_resolver: uuid_resolver::UuidResolverSender, - index_handle: index_actor::IndexActorHandleImpl, + index_handle: indexes::IndexHandlerSender, update_handle: updates::UpdateSender, dump_handle: dump_actor::DumpActorHandleImpl, } +#[derive(Debug)] pub enum DocumentAdditionFormat { Json, } - #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct Stats { @@ -90,13 +92,16 @@ pub struct Stats { pub indexes: BTreeMap, } +#[derive(derivative::Derivative)] +#[derivative(Debug)] pub enum Update { DocumentAddition { + #[derivative(Debug="ignore")] payload: Payload, primary_key: Option, method: IndexDocumentsMethod, format: DocumentAdditionFormat, - } + }, } #[derive(Default, Debug)] @@ -112,9 +117,17 @@ pub struct IndexControllerBuilder { } impl IndexControllerBuilder { - pub fn build(self, db_path: impl AsRef, indexer_options: IndexerOpts) -> anyhow::Result { - let index_size = self.max_index_size.ok_or_else(|| anyhow::anyhow!("Missing index size"))?; - let update_store_size = self.max_index_size.ok_or_else(|| anyhow::anyhow!("Missing update database size"))?; + pub fn build( + self, + db_path: impl AsRef, + indexer_options: IndexerOpts, + ) -> anyhow::Result { + let index_size = self + .max_index_size + .ok_or_else(|| anyhow::anyhow!("Missing index size"))?; + let update_store_size = self + .max_index_size + .ok_or_else(|| anyhow::anyhow!("Missing update database size"))?; if let Some(ref path) = self.import_snapshot { info!("Loading from snapshot {:?}", path); @@ -137,18 +150,15 @@ impl IndexControllerBuilder { std::fs::create_dir_all(db_path.as_ref())?; let uuid_resolver = uuid_resolver::create_uuid_resolver(&db_path)?; - let index_handle = - index_actor::IndexActorHandleImpl::new(&db_path, index_size, &indexer_options)?; + let index_handle = indexes::create_indexes_handler(&db_path, index_size, &indexer_options)?; #[allow(unreachable_code)] - let update_handle = updates::create_update_handler( - todo!(), - &db_path, - update_store_size, - )?; + let update_handle = updates::create_update_handler(index_handle.clone(), &db_path, update_store_size)?; let dump_handle = dump_actor::DumpActorHandleImpl::new( - &self.dump_dst.ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?, + &self + .dump_dst + .ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?, uuid_resolver.clone(), update_handle.clone(), index_size, @@ -156,19 +166,19 @@ impl IndexControllerBuilder { )?; //if options.schedule_snapshot { - //let snapshot_service = SnapshotService::new( - //uuid_resolver.clone(), - //update_handle.clone(), - //Duration::from_secs(options.snapshot_interval_sec), - //options.snapshot_dir.clone(), - //options - //.db_path - //.file_name() - //.map(|n| n.to_owned().into_string().expect("invalid path")) - //.unwrap_or_else(|| String::from("data.ms")), - //); + //let snapshot_service = SnapshotService::new( + //uuid_resolver.clone(), + //update_handle.clone(), + //Duration::from_secs(options.snapshot_interval_sec), + //options.snapshot_dir.clone(), + //options + //.db_path + //.file_name() + //.map(|n| n.to_owned().into_string().expect("invalid path")) + //.unwrap_or_else(|| String::from("data.ms")), + //); - //tokio::task::spawn(snapshot_service.run()); + //tokio::task::spawn(snapshot_service.run()); //} Ok(IndexController { @@ -197,7 +207,10 @@ impl IndexControllerBuilder { } /// Set the index controller builder's ignore snapshot if db exists. - pub fn set_ignore_snapshot_if_db_exists(&mut self, ignore_snapshot_if_db_exists: bool) -> &mut Self { + pub fn set_ignore_snapshot_if_db_exists( + &mut self, + ignore_snapshot_if_db_exists: bool, + ) -> &mut Self { self.ignore_snapshot_if_db_exists = ignore_snapshot_if_db_exists; self } @@ -238,12 +251,12 @@ impl IndexController { Ok(uuid) => { let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?; Ok(update_result) - }, + } Err(UuidResolverError::UnexistingIndex(name)) => { let uuid = Uuid::new_v4(); let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?; // ignore if index creation fails now, since it may already have been created - let _ = self.index_handle.create_index(uuid, None).await; + let _ = IndexMsg::create_index(&self.index_handle, uuid, None).await?; UuidResolverMsg::insert(&self.uuid_resolver, uuid, name).await?; Ok(update_result) @@ -253,128 +266,128 @@ impl IndexController { } //pub async fn add_documents( - //&self, - //uid: String, - //method: milli::update::IndexDocumentsMethod, - //payload: Payload, - //primary_key: Option, + //&self, + //uid: String, + //method: milli::update::IndexDocumentsMethod, + //payload: Payload, + //primary_key: Option, //) -> Result { - //let perform_update = |uuid| async move { - //let meta = UpdateMeta::DocumentsAddition { - //method, - //primary_key, - //}; - //let (sender, receiver) = mpsc::channel(10); + //let perform_update = |uuid| async move { + //let meta = UpdateMeta::DocumentsAddition { + //method, + //primary_key, + //}; + //let (sender, receiver) = mpsc::channel(10); - //// It is necessary to spawn a local task to send the payload to the update handle to - //// prevent dead_locking between the update_handle::update that waits for the update to be - //// registered and the update_actor that waits for the the payload to be sent to it. - //tokio::task::spawn_local(async move { - //payload - //.for_each(|r| async { - //let _ = sender.send(r).await; - //}) - //.await - //}); + //// It is necessary to spawn a local task to send the payload to the update handle to + //// prevent dead_locking between the update_handle::update that waits for the update to be + //// registered and the update_actor that waits for the the payload to be sent to it. + //tokio::task::spawn_local(async move { + //payload + //.for_each(|r| async { + //let _ = sender.send(r).await; + //}) + //.await + //}); - //// This must be done *AFTER* spawning the task. - //self.update_handle.update(meta, receiver, uuid).await - //}; + //// This must be done *AFTER* spawning the task. + //self.update_handle.update(meta, receiver, uuid).await + //}; - //match self.uuid_resolver.get(uid).await { - //Ok(uuid) => Ok(perform_update(uuid).await?), - //Err(UuidResolverError::UnexistingIndex(name)) => { - //let uuid = Uuid::new_v4(); - //let status = perform_update(uuid).await?; - //// ignore if index creation fails now, since it may already have been created - //let _ = self.index_handle.create_index(uuid, None).await; - //self.uuid_resolver.insert(name, uuid).await?; - //Ok(status) - //} - //Err(e) => Err(e.into()), - //} + //match self.uuid_resolver.get(uid).await { + //Ok(uuid) => Ok(perform_update(uuid).await?), + //Err(UuidResolverError::UnexistingIndex(name)) => { + //let uuid = Uuid::new_v4(); + //let status = perform_update(uuid).await?; + //// ignore if index creation fails now, since it may already have been created + //let _ = self.index_handle.create_index(uuid, None).await; + //self.uuid_resolver.insert(name, uuid).await?; + //Ok(status) + //} + //Err(e) => Err(e.into()), + //} //} //pub async fn clear_documents(&self, uid: String) -> Result { - //let uuid = self.uuid_resolver.get(uid).await?; - //let meta = UpdateMeta::ClearDocuments; - //let (_, receiver) = mpsc::channel(1); - //let status = self.update_handle.update(meta, receiver, uuid).await?; - //Ok(status) + //let uuid = self.uuid_resolver.get(uid).await?; + //let meta = UpdateMeta::ClearDocuments; + //let (_, receiver) = mpsc::channel(1); + //let status = self.update_handle.update(meta, receiver, uuid).await?; + //Ok(status) //} //pub async fn delete_documents( - //&self, - //uid: String, - //documents: Vec, + //&self, + //uid: String, + //documents: Vec, //) -> Result { - //let uuid = self.uuid_resolver.get(uid).await?; - //let meta = UpdateMeta::DeleteDocuments { ids: documents }; - //let (_, receiver) = mpsc::channel(1); - //let status = self.update_handle.update(meta, receiver, uuid).await?; - //Ok(status) + //let uuid = self.uuid_resolver.get(uid).await?; + //let meta = UpdateMeta::DeleteDocuments { ids: documents }; + //let (_, receiver) = mpsc::channel(1); + //let status = self.update_handle.update(meta, receiver, uuid).await?; + //Ok(status) //} //pub async fn update_settings( - //&self, - //uid: String, - //settings: Settings, - //create: bool, + //&self, + //uid: String, + //settings: Settings, + //create: bool, //) -> Result { - //let perform_udpate = |uuid| async move { - //let meta = UpdateMeta::Settings(settings.into_unchecked()); - //// Nothing so send, drop the sender right away, as not to block the update actor. - //let (_, receiver) = mpsc::channel(1); - //self.update_handle.update(meta, receiver, uuid).await - //}; + //let perform_udpate = |uuid| async move { + //let meta = UpdateMeta::Settings(settings.into_unchecked()); + //// Nothing so send, drop the sender right away, as not to block the update actor. + //let (_, receiver) = mpsc::channel(1); + //self.update_handle.update(meta, receiver, uuid).await + //}; - //match self.uuid_resolver.get(uid).await { - //Ok(uuid) => Ok(perform_udpate(uuid).await?), - //Err(UuidResolverError::UnexistingIndex(name)) if create => { - //let uuid = Uuid::new_v4(); - //let status = perform_udpate(uuid).await?; - //// ignore if index creation fails now, since it may already have been created - //let _ = self.index_handle.create_index(uuid, None).await; - //self.uuid_resolver.insert(name, uuid).await?; - //Ok(status) - //} - //Err(e) => Err(e.into()), - //} + //match self.uuid_resolver.get(uid).await { + //Ok(uuid) => Ok(perform_udpate(uuid).await?), + //Err(UuidResolverError::UnexistingIndex(name)) if create => { + //let uuid = Uuid::new_v4(); + //let status = perform_udpate(uuid).await?; + //// ignore if index creation fails now, since it may already have been created + //let _ = self.index_handle.create_index(uuid, None).await; + //self.uuid_resolver.insert(name, uuid).await?; + //Ok(status) + //} + //Err(e) => Err(e.into()), + //} //} //pub async fn create_index(&self, index_settings: IndexSettings) -> Result { - //let IndexSettings { uid, primary_key } = index_settings; - //let uid = uid.ok_or(IndexControllerError::MissingUid)?; - //let uuid = Uuid::new_v4(); - //let meta = self.index_handle.create_index(uuid, primary_key).await?; - //self.uuid_resolver.insert(uid.clone(), uuid).await?; - //let meta = IndexMetadata { - //uuid, - //name: uid.clone(), - //uid, - //meta, - //}; + //let IndexSettings { uid, primary_key } = index_settings; + //let uid = uid.ok_or(IndexControllerError::MissingUid)?; + //let uuid = Uuid::new_v4(); + //let meta = self.index_handle.create_index(uuid, primary_key).await?; + //self.uuid_resolver.insert(uid.clone(), uuid).await?; + //let meta = IndexMetadata { + //uuid, + //name: uid.clone(), + //uid, + //meta, + //}; - //Ok(meta) + //Ok(meta) //} //pub async fn delete_index(&self, uid: String) -> Result<()> { - //let uuid = self.uuid_resolver.delete(uid).await?; + //let uuid = self.uuid_resolver.delete(uid).await?; - //// We remove the index from the resolver synchronously, and effectively perform the index - //// deletion as a background task. - //let update_handle = self.update_handle.clone(); - //let index_handle = self.index_handle.clone(); - //tokio::spawn(async move { - //if let Err(e) = update_handle.delete(uuid).await { - //error!("Error while deleting index: {}", e); - //} - //if let Err(e) = index_handle.delete(uuid).await { - //error!("Error while deleting index: {}", e); - //} - //}); + //// We remove the index from the resolver synchronously, and effectively perform the index + //// deletion as a background task. + //let update_handle = self.update_handle.clone(); + //let index_handle = self.index_handle.clone(); + //tokio::spawn(async move { + //if let Err(e) = update_handle.delete(uuid).await { + //error!("Error while deleting index: {}", e); + //} + //if let Err(e) = index_handle.delete(uuid).await { + //error!("Error while deleting index: {}", e); + //} + //}); - //Ok(()) + //Ok(()) //} pub async fn update_status(&self, uid: String, id: u64) -> Result { @@ -393,7 +406,7 @@ impl IndexController { let uuids = UuidResolverMsg::list(&self.uuid_resolver).await?; let mut ret = Vec::new(); for (uid, uuid) in uuids { - let meta = self.index_handle.get_index_meta(uuid).await?; + let meta = IndexMsg::index_meta(&self.index_handle, uuid).await?; let meta = IndexMetadata { uuid, name: uid.clone(), @@ -408,7 +421,7 @@ impl IndexController { pub async fn settings(&self, uid: String) -> Result> { let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; - let settings = self.index_handle.settings(uuid).await?; + let settings = IndexMsg::settings(&self.index_handle, uuid).await?; Ok(settings) } @@ -420,10 +433,14 @@ impl IndexController { attributes_to_retrieve: Option>, ) -> Result> { let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; - let documents = self - .index_handle - .documents(uuid, offset, limit, attributes_to_retrieve) - .await?; + let documents = IndexMsg::documents( + &self.index_handle, + uuid, + offset, + limit, + attributes_to_retrieve, + ) + .await?; Ok(documents) } @@ -434,10 +451,7 @@ impl IndexController { attributes_to_retrieve: Option>, ) -> Result { let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; - let document = self - .index_handle - .document(uuid, doc_id, attributes_to_retrieve) - .await?; + let document = IndexMsg::document(&self.index_handle, uuid, attributes_to_retrieve, doc_id).await?; Ok(document) } @@ -451,7 +465,7 @@ impl IndexController { } let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.clone()).await?; - let meta = self.index_handle.update_index(uuid, index_settings).await?; + let meta = IndexMsg::update_index(&self.index_handle, uuid, index_settings).await?; let meta = IndexMetadata { uuid, name: uid.clone(), @@ -463,13 +477,13 @@ impl IndexController { pub async fn search(&self, uid: String, query: SearchQuery) -> Result { let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; - let result = self.index_handle.search(uuid, query).await?; + let result = IndexMsg::search(&self.index_handle, uuid, query).await?; Ok(result) } pub async fn get_index(&self, uid: String) -> Result { let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.clone()).await?; - let meta = self.index_handle.get_index_meta(uuid).await?; + let meta = IndexMsg::index_meta(&self.index_handle, uuid).await?; let meta = IndexMetadata { uuid, name: uid.clone(), @@ -487,7 +501,7 @@ impl IndexController { pub async fn get_index_stats(&self, uid: String) -> Result { let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; let update_infos = UpdateMsg::get_info(&self.update_handle).await?; - let mut stats = self.index_handle.get_index_stats(uuid).await?; + let mut stats = IndexMsg::index_stats(&self.index_handle, uuid).await?; // Check if the currently indexing update is from out index. stats.is_indexing = Some(Some(uuid) == update_infos.processing); Ok(stats) @@ -500,7 +514,7 @@ impl IndexController { let mut indexes = BTreeMap::new(); for index in self.list_indexes().await? { - let mut index_stats = self.index_handle.get_index_stats(index.uuid).await?; + let mut index_stats = IndexMsg::index_stats(&self.index_handle, index.uuid).await?; database_size += index_stats.size; last_update = last_update.map_or(Some(index.meta.updated_at), |last| { diff --git a/meilisearch-lib/src/index_controller/updates/error.rs b/meilisearch-lib/src/index_controller/updates/error.rs index 0a457c977..858631f69 100644 --- a/meilisearch-lib/src/index_controller/updates/error.rs +++ b/meilisearch-lib/src/index_controller/updates/error.rs @@ -1,8 +1,9 @@ +use std::fmt; use std::error::Error; use meilisearch_error::{Code, ErrorCode}; -use crate::index_controller::index_actor::error::IndexActorError; +use crate::index_controller::indexes::error::IndexActorError; pub type Result = std::result::Result; @@ -25,15 +26,17 @@ pub enum UpdateActorError { PayloadError(#[from] actix_web::error::PayloadError), } -impl From> for UpdateActorError { - fn from(_: tokio::sync::mpsc::error::SendError) -> Self { - Self::FatalUpdateStoreError +impl From> for UpdateActorError +where T: Sync + Send + 'static + fmt::Debug +{ + fn from(other: tokio::sync::mpsc::error::SendError) -> Self { + Self::Internal(Box::new(other)) } } impl From for UpdateActorError { - fn from(_: tokio::sync::oneshot::error::RecvError) -> Self { - Self::FatalUpdateStoreError + fn from(other: tokio::sync::oneshot::error::RecvError) -> Self { + Self::Internal(Box::new(other)) } } diff --git a/meilisearch-lib/src/index_controller/updates/message.rs b/meilisearch-lib/src/index_controller/updates/message.rs index fe6e1360b..09dc7443a 100644 --- a/meilisearch-lib/src/index_controller/updates/message.rs +++ b/meilisearch-lib/src/index_controller/updates/message.rs @@ -7,6 +7,7 @@ use uuid::Uuid; use super::error::Result; use super::{Update, UpdateStatus, UpdateStoreInfo}; +#[derive(Debug)] pub enum UpdateMsg { Update { uuid: Uuid, diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index f281250a6..750ca7c46 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -27,19 +27,19 @@ use self::store::{UpdateStore, UpdateStoreInfo}; use crate::index_controller::update_file_store::UpdateFileStore; use status::UpdateStatus; +use super::indexes::IndexHandlerSender; use super::{DocumentAdditionFormat, Payload, Update}; pub type UpdateSender = mpsc::Sender; -type IndexSender = mpsc::Sender<()>; pub fn create_update_handler( - index_sender: IndexSender, + index_sender: IndexHandlerSender, db_path: impl AsRef, update_store_size: usize, ) -> anyhow::Result { let path = db_path.as_ref().to_owned(); let (sender, receiver) = mpsc::channel(100); - let actor = UpdateHandler::new(update_store_size, receiver, path, index_sender)?; + let actor = UpdateLoop::new(update_store_size, receiver, path, index_sender)?; tokio::task::spawn_local(actor.run()); @@ -96,20 +96,20 @@ impl> + Unpin> io::Rea } } -pub struct UpdateHandler { +pub struct UpdateLoop { store: Arc, inbox: Option>, update_file_store: UpdateFileStore, - index_handle: IndexSender, + index_handle: IndexHandlerSender, must_exit: Arc, } -impl UpdateHandler { +impl UpdateLoop { pub fn new( update_db_size: usize, inbox: mpsc::Receiver, path: impl AsRef, - index_handle: IndexSender, + index_handle: IndexHandlerSender, ) -> anyhow::Result { let path = path.as_ref().to_owned(); std::fs::create_dir_all(&path)?; diff --git a/meilisearch-lib/src/index_controller/updates/store/dump.rs b/meilisearch-lib/src/index_controller/updates/store/dump.rs index ccb09a309..689678cc4 100644 --- a/meilisearch-lib/src/index_controller/updates/store/dump.rs +++ b/meilisearch-lib/src/index_controller/updates/store/dump.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::{Result, State, UpdateStore}; -use crate::index_controller::{updates::{IndexSender, status::UpdateStatus}}; +use crate::index_controller::{indexes::{IndexHandlerSender, IndexMsg}, updates::{status::UpdateStatus}}; #[derive(Serialize, Deserialize)] struct UpdateEntry { @@ -23,7 +23,7 @@ impl UpdateStore { &self, uuids: &HashSet, path: PathBuf, - handle: IndexSender, + handle: IndexHandlerSender, ) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Dumping); @@ -172,12 +172,11 @@ impl UpdateStore { async fn dump_indexes( uuids: &HashSet, - handle: IndexSender, + handle: IndexHandlerSender, path: impl AsRef, ) -> Result<()> { for uuid in uuids { - //handle.dump(*uuid, path.as_ref().to_owned()).await?; - todo!() + IndexMsg::dump(&handle, *uuid, path.as_ref().to_owned()).await?; } Ok(()) diff --git a/meilisearch-lib/src/index_controller/updates/store/mod.rs b/meilisearch-lib/src/index_controller/updates/store/mod.rs index be8c5f859..25eb840c9 100644 --- a/meilisearch-lib/src/index_controller/updates/store/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/store/mod.rs @@ -30,14 +30,16 @@ use super::RegisterUpdate; use super::error::Result; use super::status::{Enqueued, Processing}; use crate::EnvSizer; +use crate::index_controller::indexes::{CONCURRENT_INDEX_MSG, IndexHandlerSender, IndexMsg}; use crate::index_controller::update_files_path; -use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*}; +use crate::index_controller::updates::*; #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; const UPDATE_DIR: &str = "update_files"; +#[derive(Debug)] pub struct UpdateStoreInfo { /// Size of the update store in bytes. pub size: u64, @@ -146,7 +148,7 @@ impl UpdateStore { pub fn open( options: EnvOpenOptions, path: impl AsRef, - index_handle: IndexSender, + index_handle: IndexHandlerSender, must_exit: Arc, ) -> anyhow::Result> { let (update_store, mut notification_receiver) = Self::new(options, path)?; @@ -284,7 +286,7 @@ impl UpdateStore { /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, index_handle: IndexSender) -> Result> { + fn process_pending_update(&self, index_handle: IndexHandlerSender) -> Result> { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; let first_meta = self.pending_queue.first(&rtxn)?; @@ -314,7 +316,7 @@ impl UpdateStore { fn perform_update( &self, processing: Processing, - index_handle: IndexSender, + index_handle: IndexHandlerSender, index_uuid: Uuid, global_id: u64, ) -> Result> { @@ -322,7 +324,7 @@ impl UpdateStore { let handle = Handle::current(); let update_id = processing.id(); let result = - match handle.block_on(/*index_handle.update(index_uuid, processing.clone())*/ todo!()) { + match handle.block_on(IndexMsg::update(&index_handle, index_uuid, processing.clone())) { Ok(result) => result, Err(e) => Err(processing.fail(e)), }; @@ -484,7 +486,7 @@ impl UpdateStore { &self, uuids: &HashSet, path: impl AsRef, - handle: IndexSender, + handle: IndexHandlerSender, ) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Snapshoting); @@ -525,7 +527,7 @@ impl UpdateStore { // Perform the snapshot of each index concurently. Only a third of the capabilities of // the index actor at a time not to put too much pressure on the index actor let mut stream = futures::stream::iter(uuids.iter()) - .map(move |uuid| todo!() /*handle.snapshot(*uuid, path.clone())*/) + .map(move |uuid| IndexMsg::snapshot(handle,*uuid, path.clone())) .buffer_unordered(CONCURRENT_INDEX_MSG / 3); Handle::current().block_on(async {