diff --git a/Cargo.lock b/Cargo.lock index 25acf9bda..6cd5e26a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,6 +286,12 @@ version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28b2cd92db5cbd74e8e5028f7e27dd7aa3090e89e4f2a197cc7c8dfb69c7063b" +[[package]] +name = "arc-swap" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4d7d63395147b81a9e570bcc6243aaf71c017bd666d4909cfef0085bdda8d73" + [[package]] name = "assert-json-diff" version = "1.0.1" @@ -295,19 +301,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "async-compression" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b72c1f1154e234325b50864a349b9c8e56939e266a4c307c0f159812df2f9537" -dependencies = [ - "flate2", - "futures-core", - "memchr", - "pin-project-lite 0.2.6", - "tokio 0.2.25", -] - [[package]] name = "async-stream" version = "0.3.1" @@ -775,16 +768,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "dashmap" -version = "4.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" -dependencies = [ - "cfg-if 1.0.0", - "num_cpus", -] - [[package]] name = "debugid" version = "0.7.2" @@ -1751,17 +1734,15 @@ dependencies = [ "actix-web", "actix-web-static-files", "anyhow", + "arc-swap", "assert-json-diff", - "async-compression", "async-stream", "async-trait", "byte-unit", - "bytemuck", "bytes 0.6.0", "cargo_toml", "chrono", "crossbeam-channel", - "dashmap", "either", "env_logger 0.8.3", "flate2", diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index c1eb6b8dc..e6912e428 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -28,15 +28,13 @@ actix-service = "2.0.0" actix-web = { version = "=4.0.0-beta.6", features = ["rustls"] } actix-web-static-files = { git = "https://github.com/MarinPostma/actix-web-static-files.git", rev = "6db8c3e", optional = true } anyhow = "1.0.36" -async-compression = { version = "0.3.6", features = ["gzip", "tokio-02"] } async-stream = "0.3.0" async-trait = "0.1.42" +arc-swap = "1.2.0" byte-unit = { version = "4.0.9", default-features = false, features = ["std"] } -bytemuck = "1.5.1" bytes = "0.6.0" chrono = { version = "0.4.19", features = ["serde"] } crossbeam-channel = "0.5.0" -dashmap = "4.0.2" either = "1.6.1" env_logger = "0.8.2" flate2 = "1.0.19" diff --git a/meilisearch-http/src/index/mod.rs b/meilisearch-http/src/index/mod.rs index 0bb20d3ed..43c8b0193 100644 --- a/meilisearch-http/src/index/mod.rs +++ b/meilisearch-http/src/index/mod.rs @@ -8,7 +8,7 @@ use serde_json::{Map, Value}; use crate::helpers::EnvSizer; pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; -pub use updates::{Facets, Settings, UpdateResult}; +pub use updates::{Facets, Settings}; mod search; mod updates; @@ -59,9 +59,7 @@ impl Index { }) .transpose()? .unwrap_or_else(BTreeSet::new); - let distinct_attribute = self - .distinct_attribute(&txn)? - .map(String::from); + let distinct_attribute = self.distinct_attribute(&txn)?.map(String::from); Ok(Settings { displayed_attributes: Some(Some(displayed_attributes)), diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index f91da257f..038f1f7e6 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -4,17 +4,11 @@ use std::num::NonZeroUsize; use flate2::read::GzDecoder; use log::info; -use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; +use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use serde::{de::Deserializer, Deserialize, Serialize}; use super::Index; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum UpdateResult { - DocumentsAddition(DocumentAdditionResult), - DocumentDeletion { deleted: u64 }, - Other, -} +use crate::index_controller::UpdateResult; #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] @@ -91,7 +85,7 @@ impl Index { &self, format: UpdateFormat, method: IndexDocumentsMethod, - content: impl io::Read, + content: Option, update_builder: UpdateBuilder, primary_key: Option<&str>, ) -> anyhow::Result { @@ -108,16 +102,15 @@ impl Index { builder.update_format(format); builder.index_documents_method(method); - let gzipped = false; - let reader = if gzipped { - Box::new(GzDecoder::new(content)) - } else { - Box::new(content) as Box - }; + let indexing_callback = + |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step); - let result = builder.execute(reader, |indexing_step, update_id| { - info!("update {}: {:?}", update_id, indexing_step) - }); + let gzipped = false; + let result = match content { + Some(content) if gzipped => builder.execute(GzDecoder::new(content), indexing_callback), + Some(content) => builder.execute(content, indexing_callback), + None => builder.execute(std::io::empty(), indexing_callback), + }; info!("document addition done: {:?}", result); @@ -228,10 +221,13 @@ impl Index { pub fn delete_documents( &self, - document_ids: impl io::Read, + document_ids: Option, update_builder: UpdateBuilder, ) -> anyhow::Result { - let ids: Vec = serde_json::from_reader(document_ids)?; + let ids = match document_ids { + Some(reader) => serde_json::from_reader(reader)?, + None => Vec::::new(), + }; let mut txn = self.write_txn()?; let mut builder = update_builder.delete_documents(&mut txn, self)?; diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index e9e55606f..05130e795 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -11,13 +11,13 @@ use tokio::task::spawn_blocking; use uuid::Uuid; use crate::index::{Document, SearchQuery, SearchResult, Settings}; -use crate::index_controller::update_handler::UpdateHandler; use crate::index_controller::{ - get_arc_ownership_blocking, updates::Processing, IndexStats, UpdateMeta, + get_arc_ownership_blocking, update_handler::UpdateHandler, Failed, IndexStats, Processed, + Processing, }; use crate::option::IndexerOpts; -use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult}; +use super::{IndexError, IndexMeta, IndexMsg, IndexResult, IndexSettings, IndexStore}; pub const CONCURRENT_INDEX_MSG: usize = 10; @@ -28,7 +28,7 @@ pub struct IndexActor { } impl IndexActor { - pub fn new(receiver: mpsc::Receiver, store: S) -> Result { + pub fn new(receiver: mpsc::Receiver, store: S) -> IndexResult { let options = IndexerOpts::default(); let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; let update_handler = Arc::new(update_handler); @@ -40,9 +40,6 @@ impl IndexActor { }) } - /// `run` poll the write_receiver and read_receiver concurrently, but while messages send - /// through the read channel are processed concurrently, the messages sent through the write - /// channel are processed one at a time. pub async fn run(mut self) { let mut receiver = self .receiver @@ -145,7 +142,7 @@ impl IndexActor { &self, uuid: Uuid, primary_key: Option, - ) -> Result { + ) -> IndexResult { let index = self.store.create(uuid, primary_key).await?; let meta = spawn_blocking(move || IndexMeta::new(&index)) .await @@ -156,9 +153,9 @@ impl IndexActor { async fn handle_update( &self, uuid: Uuid, - meta: Processing, - data: File, - ) -> Result { + meta: Processing, + data: Option, + ) -> IndexResult> { debug!("Processing update {}", meta.id()); let update_handler = self.update_handler.clone(); let index = match self.store.get(uuid).await? { @@ -171,7 +168,7 @@ impl IndexActor { .map_err(|e| IndexError::Error(e.into())) } - async fn handle_settings(&self, uuid: Uuid) -> Result { + async fn handle_settings(&self, uuid: Uuid) -> IndexResult { let index = self .store .get(uuid) @@ -188,7 +185,7 @@ impl IndexActor { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> Result> { + ) -> IndexResult> { let index = self .store .get(uuid) @@ -208,7 +205,7 @@ impl IndexActor { uuid: Uuid, doc_id: String, attributes_to_retrieve: Option>, - ) -> Result { + ) -> IndexResult { let index = self .store .get(uuid) @@ -223,7 +220,7 @@ impl IndexActor { .map_err(|e| IndexError::Error(e.into()))? } - async fn handle_delete(&self, uuid: Uuid) -> Result<()> { + async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> { let index = self.store.delete(uuid).await?; if let Some(index) = index { @@ -240,7 +237,7 @@ impl IndexActor { Ok(()) } - async fn handle_get_meta(&self, uuid: Uuid) -> Result { + async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult { match self.store.get(uuid).await? { Some(index) => { let meta = spawn_blocking(move || IndexMeta::new(&index)) @@ -256,7 +253,7 @@ impl IndexActor { &self, uuid: Uuid, index_settings: IndexSettings, - ) -> Result { + ) -> IndexResult { let index = self .store .get(uuid) @@ -283,7 +280,7 @@ impl IndexActor { .map_err(|e| IndexError::Error(e.into()))? } - async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> { + async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> { use tokio::fs::create_dir_all; path.push("indexes"); @@ -313,7 +310,7 @@ impl IndexActor { Ok(()) } - async fn handle_get_stats(&self, uuid: Uuid) -> Result { + async fn handle_get_stats(&self, uuid: Uuid) -> IndexResult { let index = self .store .get(uuid) diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index d23357d38..719e12cee 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -3,14 +3,14 @@ use std::path::{Path, PathBuf}; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use crate::index::{Document, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{updates::Processing, UpdateMeta}; -use crate::index_controller::{IndexSettings, IndexStats}; - -use super::{ - IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult, +use crate::index_controller::{IndexSettings, IndexStats, Processing}; +use crate::{ + index::{Document, SearchQuery, SearchResult, Settings}, + index_controller::{Failed, Processed}, }; +use super::{IndexActor, IndexActorHandle, IndexMeta, IndexMsg, IndexResult, MapIndexStore}; + #[derive(Clone)] pub struct IndexActorHandleImpl { sender: mpsc::Sender, @@ -18,7 +18,11 @@ pub struct IndexActorHandleImpl { #[async_trait::async_trait] impl IndexActorHandle for IndexActorHandleImpl { - async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { + async fn create_index( + &self, + uuid: Uuid, + primary_key: Option, + ) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::CreateIndex { ret, @@ -32,9 +36,9 @@ impl IndexActorHandle for IndexActorHandleImpl { async fn update( &self, uuid: Uuid, - meta: Processing, - data: std::fs::File, - ) -> anyhow::Result { + meta: Processing, + data: Option, + ) -> anyhow::Result> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Update { ret, @@ -46,14 +50,14 @@ impl IndexActorHandle for IndexActorHandleImpl { Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { + async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Search { uuid, query, ret }; let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn settings(&self, uuid: Uuid) -> Result { + async fn settings(&self, uuid: Uuid) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Settings { uuid, ret }; let _ = self.sender.send(msg).await; @@ -66,7 +70,7 @@ impl IndexActorHandle for IndexActorHandleImpl { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> Result> { + ) -> IndexResult> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Documents { uuid, @@ -84,7 +88,7 @@ impl IndexActorHandle for IndexActorHandleImpl { uuid: Uuid, doc_id: String, attributes_to_retrieve: Option>, - ) -> Result { + ) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Document { uuid, @@ -96,21 +100,25 @@ impl IndexActorHandle for IndexActorHandleImpl { Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn delete(&self, uuid: Uuid) -> Result<()> { + async fn delete(&self, uuid: Uuid) -> IndexResult<()> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Delete { uuid, ret }; let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn get_index_meta(&self, uuid: Uuid) -> Result { + async fn get_index_meta(&self, uuid: Uuid) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::GetMeta { uuid, ret }; let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result { + async fn update_index( + &self, + uuid: Uuid, + index_settings: IndexSettings, + ) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::UpdateIndex { uuid, @@ -121,14 +129,14 @@ impl IndexActorHandle for IndexActorHandleImpl { Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Snapshot { uuid, path, ret }; let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn get_index_stats(&self, uuid: Uuid) -> Result { + async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::GetStats { uuid, ret }; let _ = self.sender.send(msg).await; diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs index cb28f2868..d728b2564 100644 --- a/meilisearch-http/src/index_controller/index_actor/message.rs +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -4,21 +4,21 @@ use tokio::sync::oneshot; use uuid::Uuid; use crate::index::{Document, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{updates::Processing, IndexStats, UpdateMeta}; +use crate::index_controller::{Failed, IndexStats, Processed, Processing}; -use super::{IndexMeta, IndexSettings, Result, UpdateResult}; +use super::{IndexMeta, IndexResult, IndexSettings}; pub enum IndexMsg { CreateIndex { uuid: Uuid, primary_key: Option, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, Update { uuid: Uuid, - meta: Processing, - data: std::fs::File, - ret: oneshot::Sender>, + meta: Processing, + data: Option, + ret: oneshot::Sender>>, }, Search { uuid: Uuid, @@ -27,41 +27,41 @@ pub enum IndexMsg { }, Settings { uuid: Uuid, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, Documents { uuid: Uuid, attributes_to_retrieve: Option>, offset: usize, limit: usize, - ret: oneshot::Sender>>, + ret: oneshot::Sender>>, }, Document { uuid: Uuid, attributes_to_retrieve: Option>, doc_id: String, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, Delete { uuid: Uuid, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, GetMeta { uuid: Uuid, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, UpdateIndex { uuid: Uuid, index_settings: IndexSettings, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, Snapshot { uuid: Uuid, path: PathBuf, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, GetStats { uuid: Uuid, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, } diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index 26f7a23db..d49923fa0 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -1,5 +1,4 @@ -#[cfg(test)] -use std::sync::Arc; +use std::fs::File; use std::path::PathBuf; use chrono::{DateTime, Utc}; @@ -15,12 +14,8 @@ pub use handle_impl::IndexActorHandleImpl; use message::IndexMsg; use store::{IndexStore, MapIndexStore}; -use crate::index::UpdateResult as UResult; use crate::index::{Document, Index, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{ - updates::{Failed, Processed, Processing}, - IndexStats, UpdateMeta, -}; +use crate::index_controller::{Failed, Processed, Processing, IndexStats}; use super::IndexSettings; @@ -29,8 +24,7 @@ mod handle_impl; mod message; mod store; -pub type Result = std::result::Result; -type UpdateResult = std::result::Result, Failed>; +pub type IndexResult = std::result::Result; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] @@ -41,12 +35,12 @@ pub struct IndexMeta { } impl IndexMeta { - fn new(index: &Index) -> Result { + fn new(index: &Index) -> IndexResult { let txn = index.read_txn()?; Self::new_txn(index, &txn) } - fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { + fn new_txn(index: &Index, txn: &heed::RoTxn) -> IndexResult { let created_at = index.created_at(&txn)?; let updated_at = index.updated_at(&txn)?; let primary_key = index.primary_key(&txn)?.map(String::from); @@ -72,82 +66,19 @@ pub enum IndexError { ExistingPrimaryKey, } -#[cfg(test)] -#[async_trait::async_trait] -impl IndexActorHandle for Arc { - async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { - self.as_ref().create_index(uuid, primary_key).await - } - - async fn update( - &self, - uuid: Uuid, - meta: Processing, - data: std::fs::File, - ) -> anyhow::Result { - self.as_ref().update(uuid, meta, data).await - } - - async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { - self.as_ref().search(uuid, query).await - } - - async fn settings(&self, uuid: Uuid) -> Result { - self.as_ref().settings(uuid).await - } - - async fn documents( - &self, - uuid: Uuid, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result> { - self.as_ref().documents(uuid, offset, limit, attributes_to_retrieve).await - } - - async fn document( - &self, - uuid: Uuid, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result { - self.as_ref().document(uuid, doc_id, attributes_to_retrieve).await - } - - async fn delete(&self, uuid: Uuid) -> Result<()> { - self.as_ref().delete(uuid).await - } - - async fn get_index_meta(&self, uuid: Uuid) -> Result { - self.as_ref().get_index_meta(uuid).await - } - - async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result { - self.as_ref().update_index(uuid, index_settings).await - } - - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - self.as_ref().snapshot(uuid, path).await - } - - async fn get_index_stats(&self, uuid: Uuid) -> Result { - self.as_ref().get_index_stats(uuid).await - } -} - #[async_trait::async_trait] #[cfg_attr(test, automock)] pub trait IndexActorHandle { - async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; + async fn create_index(&self, uuid: Uuid, primary_key: Option) + -> IndexResult; async fn update( &self, uuid: Uuid, - meta: Processing, - data: std::fs::File, - ) -> anyhow::Result; - async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result; - async fn settings(&self, uuid: Uuid) -> Result; + meta: Processing, + data: Option, + ) -> anyhow::Result>; + async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult; + async fn settings(&self, uuid: Uuid) -> IndexResult; async fn documents( &self, @@ -155,16 +86,103 @@ pub trait IndexActorHandle { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> Result>; + ) -> IndexResult>; async fn document( &self, uuid: Uuid, doc_id: String, attributes_to_retrieve: Option>, - ) -> Result; - async fn delete(&self, uuid: Uuid) -> Result<()>; - async fn get_index_meta(&self, uuid: Uuid) -> Result; - async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result; - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; - async fn get_index_stats(&self, uuid: Uuid) -> Result; + ) -> IndexResult; + async fn delete(&self, uuid: Uuid) -> IndexResult<()>; + async fn get_index_meta(&self, uuid: Uuid) -> IndexResult; + async fn update_index( + &self, + uuid: Uuid, + index_settings: IndexSettings, + ) -> IndexResult; + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>; + async fn get_index_stats(&self, uuid: Uuid) -> IndexResult; +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use super::*; + + #[async_trait::async_trait] + /// Useful for passing around an `Arc` in tests. + impl IndexActorHandle for Arc { + async fn create_index( + &self, + uuid: Uuid, + primary_key: Option, + ) -> IndexResult { + self.as_ref().create_index(uuid, primary_key).await + } + + async fn update( + &self, + uuid: Uuid, + meta: Processing, + data: Option, + ) -> anyhow::Result> { + self.as_ref().update(uuid, meta, data).await + } + + async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult { + self.as_ref().search(uuid, query).await + } + + async fn settings(&self, uuid: Uuid) -> IndexResult { + self.as_ref().settings(uuid).await + } + + async fn documents( + &self, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> IndexResult> { + self.as_ref() + .documents(uuid, offset, limit, attributes_to_retrieve) + .await + } + + async fn document( + &self, + uuid: Uuid, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> IndexResult { + self.as_ref() + .document(uuid, doc_id, attributes_to_retrieve) + .await + } + + async fn delete(&self, uuid: Uuid) -> IndexResult<()> { + self.as_ref().delete(uuid).await + } + + async fn get_index_meta(&self, uuid: Uuid) -> IndexResult { + self.as_ref().get_index_meta(uuid).await + } + + async fn update_index( + &self, + uuid: Uuid, + index_settings: IndexSettings, + ) -> IndexResult { + self.as_ref().update_index(uuid, index_settings).await + } + + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + self.as_ref().snapshot(uuid, path).await + } + + async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { + self.as_ref().get_index_stats(uuid).await + } + } } diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 6250f515e..44f076f2f 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -8,16 +8,16 @@ use tokio::sync::RwLock; use tokio::task::spawn_blocking; use uuid::Uuid; -use super::{IndexError, Result}; +use super::{IndexError, IndexResult}; use crate::index::Index; type AsyncMap = Arc>>; #[async_trait::async_trait] pub trait IndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> Result; - async fn get(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: Uuid) -> Result>; + async fn create(&self, uuid: Uuid, primary_key: Option) -> IndexResult; + async fn get(&self, uuid: Uuid) -> IndexResult>; + async fn delete(&self, uuid: Uuid) -> IndexResult>; } pub struct MapIndexStore { @@ -40,14 +40,14 @@ impl MapIndexStore { #[async_trait::async_trait] impl IndexStore for MapIndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> Result { + async fn create(&self, uuid: Uuid, primary_key: Option) -> IndexResult { let path = self.path.join(format!("index-{}", uuid)); if path.exists() { return Err(IndexError::IndexAlreadyExists); } let index_size = self.index_size; - let index = spawn_blocking(move || -> Result { + let index = spawn_blocking(move || -> IndexResult { let index = open_index(&path, index_size)?; if let Some(primary_key) = primary_key { let mut txn = index.write_txn()?; @@ -64,7 +64,7 @@ impl IndexStore for MapIndexStore { Ok(index) } - async fn get(&self, uuid: Uuid) -> Result> { + async fn get(&self, uuid: Uuid) -> IndexResult> { let guard = self.index_store.read().await; match guard.get(&uuid) { Some(index) => Ok(Some(index.clone())), @@ -86,7 +86,7 @@ impl IndexStore for MapIndexStore { } } - async fn delete(&self, uuid: Uuid) -> Result> { + async fn delete(&self, uuid: Uuid) -> IndexResult> { let db_path = self.path.join(format!("index-{}", uuid)); fs::remove_dir_all(db_path) .await @@ -96,7 +96,7 @@ impl IndexStore for MapIndexStore { } } -fn open_index(path: impl AsRef, size: usize) -> Result { +fn open_index(path: impl AsRef, size: usize) -> IndexResult { std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?; let mut options = EnvOpenOptions::new(); options.map_size(size); diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 35b8b1ecf..81e6d0a5e 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -8,23 +8,19 @@ use anyhow::bail; use chrono::{DateTime, Utc}; use futures::stream::StreamExt; use log::info; -use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::FieldsDistribution; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::time::sleep; use uuid::Uuid; +pub use updates::*; use index_actor::IndexActorHandle; -use snapshot::load_snapshot; -use snapshot::SnapshotService; +use snapshot::{SnapshotService, load_snapshot}; use update_actor::UpdateActorHandle; -pub use updates::{Failed, Processed, Processing}; -use uuid_resolver::UuidError; -use uuid_resolver::UuidResolverHandle; +use uuid_resolver::{UuidError, UuidResolverHandle}; -use crate::index::{Document, SearchQuery, SearchResult}; -use crate::index::{Facets, Settings, UpdateResult}; +use crate::index::{Settings, Document, SearchQuery, SearchResult}; use crate::option::Opt; mod index_actor; @@ -34,8 +30,6 @@ mod update_handler; mod updates; mod uuid_resolver; -pub type UpdateStatus = updates::UpdateStatus; - #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct IndexMetadata { @@ -47,20 +41,6 @@ pub struct IndexMetadata { pub meta: index_actor::IndexMeta, } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum UpdateMeta { - DocumentsAddition { - method: IndexDocumentsMethod, - format: UpdateFormat, - primary_key: Option, - }, - ClearDocuments, - DeleteDocuments, - Settings(Settings), - Facets(Facets), -} - #[derive(Clone, Debug)] pub struct IndexSettings { pub uid: Option, @@ -73,6 +53,9 @@ pub struct IndexStats { #[serde(skip)] pub size: u64, pub number_of_documents: u64, + /// Whether the current index is performing an update. It is initially `None` when the + /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is + /// later set to either true or false, we we retrieve the information from the `UpdateStore` pub is_indexing: Option, pub fields_distribution: FieldsDistribution, } @@ -180,7 +163,8 @@ impl IndexController { Err(UuidError::UnexistingIndex(name)) => { let uuid = Uuid::new_v4(); let status = perform_update(uuid).await?; - self.index_handle.create_index(uuid, None).await?; + // ignore if index creation fails now, since it may already have been created + let _ = self.index_handle.create_index(uuid, None).await; self.uuid_resolver.insert(name, uuid).await?; Ok(status) } @@ -233,7 +217,8 @@ impl IndexController { Err(UuidError::UnexistingIndex(name)) if create => { let uuid = Uuid::new_v4(); let status = perform_udpate(uuid).await?; - self.index_handle.create_index(uuid, None).await?; + // ignore if index creation fails now, since it may already have been created + let _ = self.index_handle.create_index(uuid, None).await; self.uuid_resolver.insert(name, uuid).await?; Ok(status) } @@ -378,7 +363,8 @@ impl IndexController { let uuid = self.uuid_resolver.get(uid).await?; let update_infos = self.update_handle.get_info().await?; let mut stats = self.index_handle.get_index_stats(uuid).await?; - stats.is_indexing = (Some(uuid) == update_infos.processing).into(); + // Check if the currently indexing update is from out index. + stats.is_indexing = Some(Some(uuid) == update_infos.processing); Ok(stats) } @@ -396,7 +382,7 @@ impl IndexController { Some(last.max(index.meta.updated_at)) }); - index_stats.is_indexing = (Some(index.uuid) == update_infos.processing).into(); + index_stats.is_indexing = Some(Some(index.uuid) == update_infos.processing); indexes.insert(index.uid, index_stats); } diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs index 40c72b0be..2a456eb26 100644 --- a/meilisearch-http/src/index_controller/snapshot.rs +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -131,7 +131,8 @@ pub fn load_snapshot( #[cfg(test)] mod test { - use std::sync::Arc; + use std::iter::FromIterator; + use std::{collections::HashSet, sync::Arc}; use futures::future::{err, ok}; use rand::Rng; @@ -139,15 +140,19 @@ mod test { use uuid::Uuid; use super::*; - use crate::index_controller::update_actor::{UpdateError, MockUpdateActorHandle, UpdateActorHandleImpl}; use crate::index_controller::index_actor::MockIndexActorHandle; + use crate::index_controller::update_actor::{ + MockUpdateActorHandle, UpdateActorHandleImpl, UpdateError, + }; use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidError}; #[actix_rt::test] async fn test_normal() { let mut rng = rand::thread_rng(); let uuids_num: usize = rng.gen_range(5, 10); - let uuids = (0..uuids_num).map(|_| Uuid::new_v4()).collect::>(); + let uuids = (0..uuids_num) + .map(|_| Uuid::new_v4()) + .collect::>(); let mut uuid_resolver = MockUuidResolverHandle::new(); let uuids_clone = uuids.clone(); @@ -162,13 +167,12 @@ mod test { .expect_snapshot() .withf(move |uuid, _path| uuids_clone.contains(uuid)) .times(uuids_num) - .returning(move |_, _| { - Box::pin(ok(())) - }); + .returning(move |_, _| Box::pin(ok(()))); let dir = tempfile::tempdir_in(".").unwrap(); let handle = Arc::new(index_handle); - let update_handle = UpdateActorHandleImpl::>::new(handle.clone(), dir.path(), 4096 * 100).unwrap(); + let update_handle = + UpdateActorHandleImpl::>::new(handle.clone(), dir.path(), 4096 * 100).unwrap(); let snapshot_path = tempfile::tempdir_in(".").unwrap(); let snapshot_service = SnapshotService::new( @@ -214,7 +218,7 @@ mod test { uuid_resolver .expect_snapshot() .times(1) - .returning(move |_| Box::pin(ok(vec![uuid]))); + .returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); let mut update_handle = MockUpdateActorHandle::new(); update_handle diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index 0b5e88270..c98df7d25 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -1,14 +1,16 @@ +use std::collections::HashSet; use std::io::SeekFrom; use std::path::{Path, PathBuf}; use std::sync::Arc; +use futures::StreamExt; use log::info; use oxidized_json_checker::JsonChecker; use tokio::fs; -use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; +use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; -use futures::StreamExt; use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo}; use crate::index_controller::index_actor::{IndexActorHandle, CONCURRENT_INDEX_MSG}; @@ -32,18 +34,14 @@ where path: impl AsRef, index_handle: I, ) -> anyhow::Result { - let path = path.as_ref().to_owned().join("updates"); + let path = path.as_ref().join("updates"); std::fs::create_dir_all(&path)?; let mut options = heed::EnvOpenOptions::new(); options.map_size(update_db_size); - let handle = index_handle.clone(); - let store = UpdateStore::open(options, &path, move |uuid, meta, file| { - futures::executor::block_on(handle.update(uuid, meta, file)) - }) - .map_err(|e| UpdateError::Error(e.into()))?; + let store = UpdateStore::open(options, &path, index_handle.clone())?; std::fs::create_dir_all(path.join("update_files"))?; assert!(path.exists()); Ok(Self { @@ -95,40 +93,54 @@ where meta: UpdateMeta, mut payload: mpsc::Receiver>, ) -> Result { - let update_file_id = uuid::Uuid::new_v4(); - let path = self - .path - .join(format!("update_files/update_{}", update_file_id)); - let mut file = fs::OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&path) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - while let Some(bytes) = payload.recv().await { - match bytes { - Ok(bytes) => { - file.write_all(bytes.as_ref()) + let file_path = match meta { + UpdateMeta::DocumentsAddition { .. } + | UpdateMeta::DeleteDocuments => { + + let update_file_id = uuid::Uuid::new_v4(); + let path = self + .path + .join(format!("update_files/update_{}", update_file_id)); + let mut file = fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&path) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + + let mut file_len = 0; + while let Some(bytes) = payload.recv().await { + match bytes { + Ok(bytes) => { + file_len += bytes.as_ref().len(); + file.write_all(bytes.as_ref()) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + } + Err(e) => { + return Err(UpdateError::Error(e)); + } + } + } + + if file_len != 0 { + file.flush() .await .map_err(|e| UpdateError::Error(Box::new(e)))?; - } - Err(e) => { - return Err(UpdateError::Error(e)); + let file = file.into_std().await; + Some((file, path)) + } else { + // empty update, delete the empty file. + fs::remove_file(&path) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + None } } - } - - file.flush() - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - - file.seek(SeekFrom::Start(0)) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - - let mut file = file.into_std().await; + _ => None + }; let update_store = self.store.clone(); @@ -136,12 +148,9 @@ where use std::io::{copy, sink, BufReader, Seek}; // If the payload is empty, ignore the check. - if file - .metadata() - .map_err(|e| UpdateError::Error(Box::new(e)))? - .len() - > 0 - { + let path = if let Some((mut file, path)) = file_path { + // set the file back to the beginning + file.seek(SeekFrom::Start(0)).map_err(|e| UpdateError::Error(Box::new(e)))?; // Check that the json payload is valid: let reader = BufReader::new(&mut file); let mut checker = JsonChecker::new(reader); @@ -153,7 +162,10 @@ where let _: serde_json::Value = serde_json::from_reader(file) .map_err(|e| UpdateError::Error(Box::new(e)))?; } - } + Some(path) + } else { + None + }; // The payload is valid, we can register it to the update store. update_store @@ -197,17 +209,11 @@ where Ok(()) } - async fn handle_snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()> { + async fn handle_snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let index_handle = self.index_handle.clone(); let update_store = self.store.clone(); tokio::task::spawn_blocking(move || -> anyhow::Result<()> { - // acquire write lock to prevent further writes during snapshot - // the update lock must be acquired BEFORE the write lock to prevent dead lock - let _lock = update_store.update_lock.lock(); - let mut txn = update_store.env.write_txn()?; - - // create db snapshot - update_store.snapshot(&mut txn, &path)?; + update_store.snapshot(&uuids, &path)?; // Perform the snapshot of each index concurently. Only a third of the capabilities of // the index actor at a time not to put too much pressure on the index actor @@ -218,7 +224,7 @@ where .map(|&uuid| handle.snapshot(uuid, path.clone())) .buffer_unordered(CONCURRENT_INDEX_MSG / 3); - futures::executor::block_on(async { + Handle::current().block_on(async { while let Some(res) = stream.next().await { res?; } @@ -234,25 +240,14 @@ where async fn handle_get_info(&self) -> Result { let update_store = self.store.clone(); - let processing = self.store.processing.clone(); let info = tokio::task::spawn_blocking(move || -> anyhow::Result { - let txn = update_store.env.read_txn()?; - let size = update_store.get_size(&txn)?; - let processing = processing - .read() - .as_ref() - .map(|(uuid, _)| uuid) - .cloned(); - let info = UpdateStoreInfo { - size, processing - }; + let info = update_store.get_info()?; Ok(info) }) .await .map_err(|e| UpdateError::Error(e.into()))? .map_err(|e| UpdateError::Error(e.into()))?; - Ok(info) } } diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index f79ef0e4e..999481573 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -1,12 +1,13 @@ +use std::collections::HashSet; use std::path::{Path, PathBuf}; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use crate::index_controller::IndexActorHandle; +use crate::index_controller::{IndexActorHandle, UpdateStatus}; use super::{ - PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStatus, UpdateStoreInfo + PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStoreInfo, }; #[derive(Clone)] @@ -63,7 +64,7 @@ where receiver.await.expect("update actor killed.") } - async fn snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()> { + async fn snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Snapshot { uuids, path, ret }; let _ = self.sender.send(msg).await; diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs index 6082ad280..17b2b3579 100644 --- a/meilisearch-http/src/index_controller/update_actor/message.rs +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::path::PathBuf; use tokio::sync::{mpsc, oneshot}; @@ -26,7 +27,7 @@ pub enum UpdateMsg { ret: oneshot::Sender>, }, Snapshot { - uuids: Vec, + uuids: HashSet, path: PathBuf, ret: oneshot::Sender>, }, diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index 6b7ed7b9b..e7a12b7ff 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -3,22 +3,22 @@ mod handle_impl; mod message; mod update_store; -use std::path::PathBuf; +use std::{collections::HashSet, path::PathBuf}; use thiserror::Error; use tokio::sync::mpsc; use uuid::Uuid; -use crate::index::UpdateResult; use crate::index_controller::{UpdateMeta, UpdateStatus}; use actor::UpdateActor; use message::UpdateMsg; +use update_store::UpdateStore; +pub use update_store::UpdateStoreInfo; pub use handle_impl::UpdateActorHandleImpl; pub type Result = std::result::Result; -type UpdateStore = update_store::UpdateStore; type PayloadData = std::result::Result>; #[cfg(test)] @@ -32,13 +32,6 @@ pub enum UpdateError { UnexistingUpdate(u64), } -pub struct UpdateStoreInfo { - /// Size of the update store in bytes. - pub size: u64, - /// Uuid of the currently processing update if it exists - pub processing: Option, -} - #[async_trait::async_trait] #[cfg_attr(test, automock(type Data=Vec;))] pub trait UpdateActorHandle { @@ -47,7 +40,7 @@ pub trait UpdateActorHandle { async fn get_all_updates_status(&self, uuid: Uuid) -> Result>; async fn update_status(&self, uuid: Uuid, id: u64) -> Result; async fn delete(&self, uuid: Uuid) -> Result<()>; - async fn snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()>; + async fn snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()>; async fn get_info(&self) -> Result; async fn update( &self, diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index 70f20e901..795b6c36a 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -1,148 +1,213 @@ use std::borrow::Cow; +use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; use std::fs::{copy, create_dir_all, remove_file, File}; use std::mem::size_of; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::Arc; use anyhow::Context; -use bytemuck::{Pod, Zeroable}; -use heed::types::{ByteSlice, DecodeIgnore, SerdeJson}; +use arc_swap::ArcSwap; +use heed::types::{ByteSlice, OwnedType, SerdeJson}; +use heed::zerocopy::U64; use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions}; -use parking_lot::{Mutex, RwLock}; -use serde::{Deserialize, Serialize}; +use parking_lot::{Mutex, MutexGuard}; +use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; +use super::UpdateMeta; use crate::helpers::EnvSizer; -use crate::index_controller::updates::*; +use crate::index_controller::{IndexActorHandle, updates::*}; #[allow(clippy::upper_case_acronyms)] -type BEU64 = heed::zerocopy::U64; +type BEU64 = U64; -struct IndexUuidUpdateIdCodec; +struct NextIdCodec; -#[repr(C)] -#[derive(Copy, Clone)] -struct IndexUuidUpdateId(Uuid, BEU64); +enum NextIdKey { + Global, + Index(Uuid), +} -// Is Uuid really zeroable (semantically)? -unsafe impl Zeroable for IndexUuidUpdateId {} -unsafe impl Pod for IndexUuidUpdateId {} +pub struct UpdateStoreInfo { + /// Size of the update store in bytes. + pub size: u64, + /// Uuid of the currently processing update if it exists + pub processing: Option, +} -impl IndexUuidUpdateId { - fn new(uuid: Uuid, update_id: u64) -> Self { - Self(uuid, BEU64::new(update_id)) +/// A data structure that allows concurrent reads AND exactly one writer. +pub struct StateLock { + lock: Mutex<()>, + data: ArcSwap, +} + +struct StateLockGuard<'a> { + _lock: MutexGuard<'a, ()>, + state: &'a StateLock, +} + +impl StateLockGuard<'_> { + fn swap(&self, state: State) -> Arc { + self.state.data.swap(Arc::new(state)) } } -const UUID_SIZE: usize = size_of::(); -const U64_SIZE: usize = size_of::(); +impl StateLock { + fn from_state(state: State) -> Self { + let lock = Mutex::new(()); + let data = ArcSwap::from(Arc::new(state)); + Self { data, lock } + } -impl<'a> BytesEncode<'a> for IndexUuidUpdateIdCodec { - type EItem = IndexUuidUpdateId; + fn read(&self) -> Arc { + self.data.load().clone() + } - fn bytes_encode(item: &'a Self::EItem) -> Option> { - let bytes = bytemuck::cast_ref::(item); - Some(Cow::Borrowed(&bytes[..])) + fn write(&self) -> StateLockGuard { + let _lock = self.lock.lock(); + let state = &self; + StateLockGuard { state, _lock } } } -impl<'a> BytesDecode<'a> for IndexUuidUpdateIdCodec { +pub enum State { + Idle, + Processing(Uuid, Processing), + Snapshoting, +} + +impl<'a> BytesEncode<'a> for NextIdCodec { + type EItem = NextIdKey; + + fn bytes_encode(item: &'a Self::EItem) -> Option> { + match item { + NextIdKey::Global => Some(Cow::Borrowed(b"__global__")), + NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())), + } + } +} + +struct PendingKeyCodec; + +impl<'a> BytesEncode<'a> for PendingKeyCodec { + type EItem = (u64, Uuid, u64); + + fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option> { + let mut bytes = Vec::with_capacity(size_of::()); + bytes.extend_from_slice(&global_id.to_be_bytes()); + bytes.extend_from_slice(uuid.as_bytes()); + bytes.extend_from_slice(&update_id.to_be_bytes()); + Some(Cow::Owned(bytes)) + } +} + +impl<'a> BytesDecode<'a> for PendingKeyCodec { + type DItem = (u64, Uuid, u64); + + fn bytes_decode(bytes: &'a [u8]) -> Option { + let global_id_bytes = bytes.get(0..size_of::())?.try_into().ok()?; + let global_id = u64::from_be_bytes(global_id_bytes); + + let uuid_bytes = bytes + .get(size_of::()..(size_of::() + size_of::()))? + .try_into() + .ok()?; + let uuid = Uuid::from_bytes(uuid_bytes); + + let update_id_bytes = bytes + .get((size_of::() + size_of::())..)? + .try_into() + .ok()?; + let update_id = u64::from_be_bytes(update_id_bytes); + + Some((global_id, uuid, update_id)) + } +} + +struct UpdateKeyCodec; + +impl<'a> BytesEncode<'a> for UpdateKeyCodec { + type EItem = (Uuid, u64); + + fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option> { + let mut bytes = Vec::with_capacity(size_of::()); + bytes.extend_from_slice(uuid.as_bytes()); + bytes.extend_from_slice(&update_id.to_be_bytes()); + Some(Cow::Owned(bytes)) + } +} + +impl<'a> BytesDecode<'a> for UpdateKeyCodec { type DItem = (Uuid, u64); fn bytes_decode(bytes: &'a [u8]) -> Option { - let bytes = bytes.try_into().ok()?; - let IndexUuidUpdateId(uuid, id) = - bytemuck::cast_ref::<[u8; UUID_SIZE + U64_SIZE], IndexUuidUpdateId>(bytes); - Some((*uuid, id.get())) + let uuid_bytes = bytes.get(0..size_of::())?.try_into().ok()?; + let uuid = Uuid::from_bytes(uuid_bytes); + + let update_id_bytes = bytes.get(size_of::()..)?.try_into().ok()?; + let update_id = u64::from_be_bytes(update_id_bytes); + + Some((uuid, update_id)) } } #[derive(Clone)] -pub struct UpdateStore { +pub struct UpdateStore { pub env: Env, - pending_meta: Database>>, - pending: Database>, - processed_meta: Database>>, - failed_meta: Database>>, - aborted_meta: Database>>, - pub processing: Arc)>>>, + /// A queue containing the updates to process, ordered by arrival. + /// The key are built as follow: + /// | global_update_id | index_uuid | update_id | + /// | 8-bytes | 16-bytes | 8-bytes | + pending_queue: Database>, + /// Map indexes to the next available update id. If NextIdKey::Global is queried, then the next + /// global update id is returned + next_update_id: Database>, + /// Contains all the performed updates meta, be they failed, aborted, or processed. + /// The keys are built as follow: + /// | Uuid | id | + /// | 16-bytes | 8-bytes | + updates: Database>, + /// Indicates the current state of the update store, + state: Arc, + /// Wake up the loop when a new event occurs. notification_sender: mpsc::Sender<()>, - /// A lock on the update loop. This is meant to prevent a snapshot to occur while an update is - /// processing, while not preventing writes all together during an update - pub update_lock: Arc>, } -pub trait HandleUpdate { - fn handle_update( - &mut self, - index_uuid: Uuid, - meta: Processing, - content: File, - ) -> anyhow::Result, Failed>>; -} - -impl HandleUpdate for F -where - F: FnMut(Uuid, Processing, File) -> anyhow::Result, Failed>>, -{ - fn handle_update( - &mut self, - index_uuid: Uuid, - meta: Processing, - content: File, - ) -> anyhow::Result, Failed>> { - self(index_uuid, meta, content) - } -} - -impl UpdateStore -where - M: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync + Clone, - N: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync, - E: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync, -{ - pub fn open( +impl UpdateStore { + pub fn open( mut options: EnvOpenOptions, - path: P, - update_handler: U, - ) -> anyhow::Result> - where - P: AsRef, - U: HandleUpdate + Sync + Clone + Send + 'static, - { + path: impl AsRef, + index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, + ) -> anyhow::Result> { options.max_dbs(5); let env = options.open(path)?; - let pending_meta = env.create_database(Some("pending-meta"))?; - let pending = env.create_database(Some("pending"))?; - let processed_meta = env.create_database(Some("processed-meta"))?; - let aborted_meta = env.create_database(Some("aborted-meta"))?; - let failed_meta = env.create_database(Some("failed-meta"))?; - let processing = Arc::new(RwLock::new(None)); + let pending_queue = env.create_database(Some("pending-queue"))?; + let next_update_id = env.create_database(Some("next-update-id"))?; + let updates = env.create_database(Some("updates"))?; let (notification_sender, mut notification_receiver) = mpsc::channel(10); // Send a first notification to trigger the process. let _ = notification_sender.send(()); - let update_lock = Arc::new(Mutex::new(())); + let state = Arc::new(StateLock::from_state(State::Idle)); // Init update loop to perform any pending updates at launch. // Since we just launched the update store, and we still own the receiving end of the // channel, this call is guaranteed to succeed. - notification_sender.try_send(()).expect("Failed to init update store"); + notification_sender + .try_send(()) + .expect("Failed to init update store"); let update_store = Arc::new(UpdateStore { env, - pending, - pending_meta, - processed_meta, - aborted_meta, notification_sender, - failed_meta, - processing, - update_lock, + next_update_id, + pending_queue, + updates, + state, }); // We need a weak reference so we can take ownership on the arc later when we @@ -154,7 +219,7 @@ where loop { match update_store_weak.upgrade() { Some(update_store) => { - let handler = update_handler.clone(); + let handler = index_handle.clone(); let res = tokio::task::spawn_blocking(move || { update_store.process_pending_update(handler) }) @@ -176,75 +241,47 @@ where Ok(update_store) } - /// Returns the new biggest id to use to store the new update. - fn new_update_id(&self, txn: &heed::RoTxn, index_uuid: Uuid) -> heed::Result { - // TODO: this is a very inneficient process for finding the next update id for each index, - // and needs to be made better. - let last_pending = self - .pending_meta - .remap_data_type::() - .prefix_iter(txn, index_uuid.as_bytes())? - .remap_key_type::() - .last() - .transpose()? - .map(|((_, id), _)| id); + /// Returns the next global update id and the next update id for a given `index_uuid`. + fn next_update_id(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result<(u64, u64)> { + let global_id = self + .next_update_id + .get(txn, &NextIdKey::Global)? + .map(U64::get) + .unwrap_or_default(); + let update_id = self + .next_update_id + .get(txn, &NextIdKey::Index(index_uuid))? + .map(U64::get) + .unwrap_or_default(); - let last_processed = self - .processed_meta - .remap_data_type::() - .prefix_iter(txn, index_uuid.as_bytes())? - .remap_key_type::() - .last() - .transpose()? - .map(|((_, id), _)| id); + self.next_update_id + .put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?; + self.next_update_id.put( + txn, + &NextIdKey::Index(index_uuid), + &BEU64::new(update_id + 1), + )?; - let last_aborted = self - .aborted_meta - .remap_data_type::() - .prefix_iter(txn, index_uuid.as_bytes())? - .remap_key_type::() - .last() - .transpose()? - .map(|((_, id), _)| id); - - let last_update_id = [last_pending, last_processed, last_aborted] - .iter() - .copied() - .flatten() - .max(); - - match last_update_id { - Some(last_id) => Ok(last_id + 1), - None => Ok(0), - } + Ok((global_id, update_id)) } /// Registers the update content in the pending store and the meta /// into the pending-meta store. Returns the new unique update id. pub fn register_update( &self, - meta: M, - content: impl AsRef, + meta: UpdateMeta, + content: Option>, index_uuid: Uuid, - ) -> heed::Result> { - let mut wtxn = self.env.write_txn()?; + ) -> heed::Result { + let mut txn = self.env.write_txn()?; - // We ask the update store to give us a new update id, this is safe, - // no other update can have the same id because we use a write txn before - // asking for the id and registering it so other update registering - // will be forced to wait for a new write txn. - let update_id = self.new_update_id(&wtxn, index_uuid)?; - let meta = Enqueued::new(meta, update_id); - let key = IndexUuidUpdateId::new(index_uuid, update_id); - self.pending_meta - .remap_key_type::() - .put(&mut wtxn, &key, &meta)?; + let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; + let meta = Enqueued::new(meta, update_id, content.map(|p| p.as_ref().to_owned())); - self.pending - .remap_key_type::() - .put(&mut wtxn, &key, &content.as_ref().to_owned())?; + self.pending_queue + .put(&mut txn, &(global_id, index_uuid, update_id), &meta)?; - wtxn.commit()?; + txn.commit()?; self.notification_sender .blocking_send(()) @@ -255,68 +292,62 @@ where /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, mut handler: U) -> anyhow::Result> - where - U: HandleUpdate, - { - let _lock = self.update_lock.lock(); + fn process_pending_update( + &self, + index_handle: impl IndexActorHandle, + ) -> anyhow::Result> { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; - - let first_meta = self - .pending_meta - .remap_key_type::() - .first(&rtxn)?; + let first_meta = self.pending_queue.first(&rtxn)?; + drop(rtxn); // If there is a pending update we process and only keep // a reader while processing it, not a writer. match first_meta { - Some(((index_uuid, update_id), pending)) => { - let key = IndexUuidUpdateId::new(index_uuid, update_id); - let content_path = self - .pending - .remap_key_type::() - .get(&rtxn, &key)? - .expect("associated update content"); - - // we change the state of the update from pending to processing before we pass it - // to the update handler. Processing store is non persistent to be able recover - // from a failure + Some(((global_id, index_uuid, update_id), mut pending)) => { + let content_path = pending.content.take(); let processing = pending.processing(); - self.processing - .write() - .replace((index_uuid, processing.clone())); - let file = File::open(&content_path) - .with_context(|| format!("file at path: {:?}", &content_path))?; + + // Acquire the state lock and set the current state to processing. + let state = self.state.write(); + state.swap(State::Processing(index_uuid, processing.clone())); + + let file = match content_path { + Some(ref path) => { + let file = File::open(path) + .with_context(|| format!("file at path: {:?}", &content_path))?; + Some(file) + } + None => None, + }; // Process the pending update using the provided user function. - let result = handler.handle_update(index_uuid, processing, file)?; - drop(rtxn); + let result = Handle::current() + .block_on(index_handle.update(index_uuid, processing, file))?; // Once the pending update have been successfully processed // we must remove the content from the pending and processing stores and // write the *new* meta to the processed-meta store and commit. let mut wtxn = self.env.write_txn()?; - self.processing.write().take(); - self.pending_meta - .remap_key_type::() - .delete(&mut wtxn, &key)?; + self.pending_queue + .delete(&mut wtxn, &(global_id, index_uuid, update_id))?; - remove_file(&content_path)?; - - self.pending - .remap_key_type::() - .delete(&mut wtxn, &key)?; - match result { - Ok(processed) => self - .processed_meta - .remap_key_type::() - .put(&mut wtxn, &key, &processed)?, - Err(failed) => self - .failed_meta - .remap_key_type::() - .put(&mut wtxn, &key, &failed)?, + if let Some(path) = content_path { + remove_file(&path)?; } + + let result = match result { + Ok(res) => res.into(), + Err(res) => res.into(), + }; + + self.updates.remap_key_type::().put( + &mut wtxn, + &(index_uuid, update_id), + &result, + )?; + wtxn.commit()?; + state.swap(State::Idle); Ok(Some(())) } @@ -324,245 +355,127 @@ where } } - pub fn list(&self, index_uuid: Uuid) -> anyhow::Result>> { - let rtxn = self.env.read_txn()?; - let mut updates = Vec::new(); + /// List the updates for `index_uuid`. + pub fn list(&self, index_uuid: Uuid) -> anyhow::Result> { + let mut update_list = BTreeMap::::new(); - let processing = self.processing.read(); - if let Some((uuid, ref processing)) = *processing { + let txn = self.env.read_txn()?; + + let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); + for entry in pendings { + let ((_, uuid, id), pending) = entry?; if uuid == index_uuid { - let update = UpdateStatus::from(processing.clone()); - updates.push(update); + update_list.insert(id, pending.decode()?.into()); } } - let pending = self - .pending_meta - .prefix_iter(&rtxn, index_uuid.as_bytes())? - .filter_map(|entry| { - let (_, p) = entry.ok()?; - if let Some((uuid, ref processing)) = *processing { - // Filter out the currently processing update if it is from this index. - if uuid == index_uuid && processing.id() == p.id() { - None - } else { - Some(UpdateStatus::from(p)) - } - } else { - Some(UpdateStatus::from(p)) - } - }); + let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?; + for entry in updates { + let (_, update) = entry?; + update_list.insert(update.id(), update); + } - updates.extend(pending); + // If the currently processing update is from this index, replace the corresponding pending update with this one. + match *self.state.read() { + State::Processing(uuid, ref processing) if uuid == index_uuid => { + update_list.insert(processing.id(), processing.clone().into()); + } + _ => (), + } - let aborted = self - .aborted_meta - .prefix_iter(&rtxn, index_uuid.as_bytes())? - .filter_map(Result::ok) - .map(|(_, p)| p) - .map(UpdateStatus::from); - - updates.extend(aborted); - - let processed = self - .processed_meta - .iter(&rtxn)? - .filter_map(Result::ok) - .map(|(_, p)| p) - .map(UpdateStatus::from); - - updates.extend(processed); - - let failed = self - .failed_meta - .iter(&rtxn)? - .filter_map(Result::ok) - .map(|(_, p)| p) - .map(UpdateStatus::from); - - updates.extend(failed); - - updates.sort_by_key(|u| u.id()); - - Ok(updates) + Ok(update_list.into_iter().map(|(_, v)| v).collect()) } /// Returns the update associated meta or `None` if the update doesn't exist. - pub fn meta( - &self, - index_uuid: Uuid, - update_id: u64, - ) -> heed::Result>> { - let rtxn = self.env.read_txn()?; - let key = IndexUuidUpdateId::new(index_uuid, update_id); + pub fn meta(&self, index_uuid: Uuid, update_id: u64) -> heed::Result> { + // Check if the update is the one currently processing + match *self.state.read() { + State::Processing(uuid, ref processing) + if uuid == index_uuid && processing.id() == update_id => + { + return Ok(Some(processing.clone().into())); + } + _ => (), + } - if let Some((uuid, ref meta)) = *self.processing.read() { - if uuid == index_uuid && meta.id() == update_id { - return Ok(Some(UpdateStatus::Processing(meta.clone()))); + let txn = self.env.read_txn()?; + // Else, check if it is in the updates database: + let update = self + .updates + .remap_key_type::() + .get(&txn, &(index_uuid, update_id))?; + + if let Some(update) = update { + return Ok(Some(update)); + } + + // If nothing was found yet, we resolve to iterate over the pending queue. + let pendings = self + .pending_queue + .remap_key_type::() + .iter(&txn)? + .lazily_decode_data(); + + for entry in pendings { + let ((uuid, id), pending) = entry?; + if uuid == index_uuid && id == update_id { + return Ok(Some(pending.decode()?.into())); } } - if let Some(meta) = self - .pending_meta - .remap_key_type::() - .get(&rtxn, &key)? - { - return Ok(Some(UpdateStatus::Enqueued(meta))); - } - - if let Some(meta) = self - .processed_meta - .remap_key_type::() - .get(&rtxn, &key)? - { - return Ok(Some(UpdateStatus::Processed(meta))); - } - - if let Some(meta) = self - .aborted_meta - .remap_key_type::() - .get(&rtxn, &key)? - { - return Ok(Some(UpdateStatus::Aborted(meta))); - } - - if let Some(meta) = self - .failed_meta - .remap_key_type::() - .get(&rtxn, &key)? - { - return Ok(Some(UpdateStatus::Failed(meta))); - } - + // No update was found. Ok(None) } - /// Aborts an update, an aborted update content is deleted and - /// the meta of it is moved into the aborted updates database. - /// - /// Trying to abort an update that is currently being processed, an update - /// that as already been processed or which doesn't actually exist, will - /// return `None`. - #[allow(dead_code)] - pub fn abort_update( - &self, - index_uuid: Uuid, - update_id: u64, - ) -> heed::Result>> { - let mut wtxn = self.env.write_txn()?; - let key = IndexUuidUpdateId::new(index_uuid, update_id); - - // We cannot abort an update that is currently being processed. - if self - .pending_meta - .remap_key_type::() - .first(&wtxn)? - .map(|((_, id), _)| id) - == Some(update_id) - { - return Ok(None); - } - - let pending = match self - .pending_meta - .remap_key_type::() - .get(&wtxn, &key)? - { - Some(meta) => meta, - None => return Ok(None), - }; - - let aborted = pending.abort(); - - self.aborted_meta - .remap_key_type::() - .put(&mut wtxn, &key, &aborted)?; - self.pending_meta - .remap_key_type::() - .delete(&mut wtxn, &key)?; - self.pending - .remap_key_type::() - .delete(&mut wtxn, &key)?; - - wtxn.commit()?; - - Ok(Some(aborted)) - } - - /// Aborts all the pending updates, and not the one being currently processed. - /// Returns the update metas and ids that were successfully aborted. - #[allow(dead_code)] - pub fn abort_pendings(&self, index_uuid: Uuid) -> heed::Result)>> { - let mut wtxn = self.env.write_txn()?; - let mut aborted_updates = Vec::new(); - - // We skip the first pending update as it is currently being processed. - for result in self - .pending_meta - .prefix_iter(&wtxn, index_uuid.as_bytes())? - .remap_key_type::() - .skip(1) - { - let ((_, update_id), pending) = result?; - aborted_updates.push((update_id, pending.abort())); - } - - for (id, aborted) in &aborted_updates { - let key = IndexUuidUpdateId::new(index_uuid, *id); - self.aborted_meta - .remap_key_type::() - .put(&mut wtxn, &key, &aborted)?; - self.pending_meta - .remap_key_type::() - .delete(&mut wtxn, &key)?; - self.pending - .remap_key_type::() - .delete(&mut wtxn, &key)?; - } - - wtxn.commit()?; - - Ok(aborted_updates) - } - - pub fn delete_all(&self, uuid: Uuid) -> anyhow::Result<()> { - fn delete_all( - txn: &mut heed::RwTxn, - uuid: Uuid, - db: Database, - ) -> anyhow::Result<()> - where - A: for<'a> heed::BytesDecode<'a>, - { - let mut iter = db.prefix_iter_mut(txn, uuid.as_bytes())?; - while let Some(_) = iter.next() { - iter.del_current()?; - } - Ok(()) - } - + /// Delete all updates for an index from the update store. + pub fn delete_all(&self, index_uuid: Uuid) -> anyhow::Result<()> { let mut txn = self.env.write_txn()?; + // Contains all the content file paths that we need to be removed if the deletion was successful. + let mut paths_to_remove = Vec::new(); - delete_all(&mut txn, uuid, self.pending)?; - delete_all(&mut txn, uuid, self.pending_meta)?; - delete_all(&mut txn, uuid, self.processed_meta)?; - delete_all(&mut txn, uuid, self.aborted_meta)?; - delete_all(&mut txn, uuid, self.failed_meta)?; + let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data(); - let processing = self.processing.upgradable_read(); - if let Some((processing_uuid, _)) = *processing { - if processing_uuid == uuid { - parking_lot::RwLockUpgradableReadGuard::upgrade(processing).take(); + while let Some(Ok(((_, uuid, _), pending))) = pendings.next() { + if uuid == index_uuid { + pendings.del_current()?; + let mut pending = pending.decode()?; + if let Some(path) = pending.content.take() { + paths_to_remove.push(path); + } } } + + drop(pendings); + + let mut updates = self + .updates + .prefix_iter_mut(&mut txn, index_uuid.as_bytes())? + .lazily_decode_data(); + + while let Some(_) = updates.next() { + updates.del_current()?; + } + + drop(updates); + + txn.commit()?; + + paths_to_remove.iter().for_each(|path| { + let _ = remove_file(path); + }); + + // We don't care about the currently processing update, since it will be removed by itself + // once its done processing, and we can't abort a running update. + Ok(()) } - pub fn snapshot( - &self, - txn: &mut heed::RwTxn, - path: impl AsRef, - ) -> anyhow::Result<()> { + pub fn snapshot(&self, uuids: &HashSet, path: impl AsRef) -> anyhow::Result<()> { + let state_lock = self.state.write(); + state_lock.swap(State::Snapshoting); + + let txn = self.env.write_txn()?; + let update_path = path.as_ref().join("updates"); create_dir_all(&update_path)?; @@ -571,33 +484,179 @@ where let db_path = update_path.join("data.mdb"); // create db snapshot - self.env - .copy_to_path(&db_path, CompactionOption::Enabled)?; + self.env.copy_to_path(&db_path, CompactionOption::Enabled)?; let update_files_path = update_path.join("update_files"); create_dir_all(&update_files_path)?; - for path in self.pending.iter(&txn)? { - let (_, path) = path?; - let name = path.file_name().unwrap(); - let to = update_files_path.join(name); - copy(path, to)?; + let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); + + for entry in pendings { + let ((_, uuid, _), pending) = entry?; + if uuids.contains(&uuid) { + if let Some(path) = pending.decode()?.content_path() { + let name = path.file_name().unwrap(); + let to = update_files_path.join(name); + copy(path, to)?; + } + } } Ok(()) } - pub fn get_size(&self, txn: &heed::RoTxn) -> anyhow::Result { + pub fn get_info(&self) -> anyhow::Result { let mut size = self.env.size(); + let txn = self.env.read_txn()?; - for path in self.pending.iter(txn)? { - let (_, path) = path?; - - if let Ok(metadata) = path.metadata() { - size += metadata.len() + for entry in self.pending_queue.iter(&txn)? { + let (_, pending) = entry?; + if let Some(path) = pending.content_path() { + size += File::open(path)?.metadata()?.len(); } } - Ok(size) + let processing = match *self.state.read() { + State::Processing(uuid, _) => Some(uuid), + _ => None, + }; + + Ok(UpdateStoreInfo { size, processing }) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::index_controller::{index_actor::MockIndexActorHandle, UpdateResult}; + + use futures::future::ok; + + #[actix_rt::test] + async fn test_next_id() { + let dir = tempfile::tempdir_in(".").unwrap(); + let mut options = EnvOpenOptions::new(); + let handle = Arc::new(MockIndexActorHandle::new()); + options.map_size(4096 * 100); + let update_store = UpdateStore::open(options, dir.path(), handle).unwrap(); + + let index1_uuid = Uuid::new_v4(); + let index2_uuid = Uuid::new_v4(); + + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((0, 0), ids); + + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((1, 0), ids); + + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((2, 1), ids); + } + + #[actix_rt::test] + async fn test_register_update() { + let dir = tempfile::tempdir_in(".").unwrap(); + let mut options = EnvOpenOptions::new(); + let handle = Arc::new(MockIndexActorHandle::new()); + options.map_size(4096 * 100); + let update_store = UpdateStore::open(options, dir.path(), handle).unwrap(); + let meta = UpdateMeta::ClearDocuments; + let uuid = Uuid::new_v4(); + let store_clone = update_store.clone(); + tokio::task::spawn_blocking(move || { + store_clone + .register_update(meta, Some("here"), uuid) + .unwrap(); + }) + .await + .unwrap(); + + let txn = update_store.env.read_txn().unwrap(); + assert!(update_store + .pending_queue + .get(&txn, &(0, uuid, 0)) + .unwrap() + .is_some()); + } + + #[actix_rt::test] + async fn test_process_update() { + let dir = tempfile::tempdir_in(".").unwrap(); + let mut handle = MockIndexActorHandle::new(); + + handle + .expect_update() + .times(2) + .returning(|_index_uuid, processing, _file| { + if processing.id() == 0 { + Box::pin(ok(Ok(processing.process(UpdateResult::Other)))) + } else { + Box::pin(ok(Err(processing.fail(String::from("err"))))) + } + }); + + let handle = Arc::new(handle); + + let mut options = EnvOpenOptions::new(); + options.map_size(4096 * 100); + let store = UpdateStore::open(options, dir.path(), handle.clone()).unwrap(); + + // wait a bit for the event loop exit. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let mut txn = store.env.write_txn().unwrap(); + + let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); + let uuid = Uuid::new_v4(); + + store + .pending_queue + .put(&mut txn, &(0, uuid, 0), &update) + .unwrap(); + + let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); + + store + .pending_queue + .put(&mut txn, &(1, uuid, 1), &update) + .unwrap(); + + txn.commit().unwrap(); + + // Process the pending, and check that it has been moved to the update databases, and + // removed from the pending database. + let store_clone = store.clone(); + tokio::task::spawn_blocking(move || { + store_clone.process_pending_update(handle.clone()).unwrap(); + store_clone.process_pending_update(handle).unwrap(); + }) + .await + .unwrap(); + + let txn = store.env.read_txn().unwrap(); + + assert!(store.pending_queue.first(&txn).unwrap().is_none()); + let update = store + .updates + .remap_key_type::() + .get(&txn, &(uuid, 0)) + .unwrap() + .unwrap(); + + assert!(matches!(update, UpdateStatus::Processed(_))); + let update = store + .updates + .remap_key_type::() + .get(&txn, &(uuid, 1)) + .unwrap() + .unwrap(); + + assert!(matches!(update, UpdateStatus::Failed(_))); } } diff --git a/meilisearch-http/src/index_controller/update_handler.rs b/meilisearch-http/src/index_controller/update_handler.rs index 1eb622cbf..130b85fe7 100644 --- a/meilisearch-http/src/index_controller/update_handler.rs +++ b/meilisearch-http/src/index_controller/update_handler.rs @@ -6,9 +6,8 @@ use grenad::CompressionType; use milli::update::UpdateBuilder; use rayon::ThreadPool; -use crate::index::UpdateResult; -use crate::index_controller::updates::{Failed, Processed, Processing}; use crate::index_controller::UpdateMeta; +use crate::index_controller::{Failed, Processed, Processing}; use crate::option::IndexerOpts; pub struct UpdateHandler { @@ -59,10 +58,10 @@ impl UpdateHandler { pub fn handle_update( &self, - meta: Processing, - content: File, + meta: Processing, + content: Option, index: Index, - ) -> Result, Failed> { + ) -> Result { use UpdateMeta::*; let update_id = meta.id(); diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index 36e327cc2..1515f90e9 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -1,87 +1,121 @@ +use std::path::{Path, PathBuf}; + use chrono::{DateTime, Utc}; +use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat}; use serde::{Deserialize, Serialize}; -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Enqueued { - pub update_id: u64, - pub meta: M, - pub enqueued_at: DateTime, +use crate::index::{Facets, Settings}; + +pub type UpdateError = String; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum UpdateResult { + DocumentsAddition(DocumentAdditionResult), + DocumentDeletion { deleted: u64 }, + Other, } -impl Enqueued { - pub fn new(meta: M, update_id: u64) -> Self { +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum UpdateMeta { + DocumentsAddition { + method: IndexDocumentsMethod, + format: UpdateFormat, + primary_key: Option, + }, + ClearDocuments, + DeleteDocuments, + Settings(Settings), + Facets(Facets), +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Enqueued { + pub update_id: u64, + pub meta: UpdateMeta, + pub enqueued_at: DateTime, + pub content: Option, +} + +impl Enqueued { + pub fn new(meta: UpdateMeta, update_id: u64, content: Option) -> Self { Self { enqueued_at: Utc::now(), meta, update_id, + content, } } - pub fn processing(self) -> Processing { + pub fn processing(self) -> Processing { Processing { from: self, started_processing_at: Utc::now(), } } - pub fn abort(self) -> Aborted { + pub fn abort(self) -> Aborted { Aborted { from: self, aborted_at: Utc::now(), } } - pub fn meta(&self) -> &M { + pub fn meta(&self) -> &UpdateMeta { &self.meta } pub fn id(&self) -> u64 { self.update_id } + + pub fn content_path(&self) -> Option<&Path> { + self.content.as_deref() + } } -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct Processed { - pub success: N, +pub struct Processed { + pub success: UpdateResult, pub processed_at: DateTime, #[serde(flatten)] - pub from: Processing, + pub from: Processing, } -impl Processed { +impl Processed { pub fn id(&self) -> u64 { self.from.id() } } -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct Processing { +pub struct Processing { #[serde(flatten)] - pub from: Enqueued, + pub from: Enqueued, pub started_processing_at: DateTime, } -impl Processing { +impl Processing { pub fn id(&self) -> u64 { self.from.id() } - pub fn meta(&self) -> &M { + pub fn meta(&self) -> &UpdateMeta { self.from.meta() } - pub fn process(self, meta: N) -> Processed { + pub fn process(self, success: UpdateResult) -> Processed { Processed { - success: meta, + success, from: self, processed_at: Utc::now(), } } - pub fn fail(self, error: E) -> Failed { + pub fn fail(self, error: UpdateError) -> Failed { Failed { from: self, error, @@ -90,46 +124,46 @@ impl Processing { } } -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct Aborted { +pub struct Aborted { #[serde(flatten)] - from: Enqueued, + from: Enqueued, aborted_at: DateTime, } -impl Aborted { +impl Aborted { pub fn id(&self) -> u64 { self.from.id() } } -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct Failed { +pub struct Failed { #[serde(flatten)] - from: Processing, - error: E, + from: Processing, + error: UpdateError, failed_at: DateTime, } -impl Failed { +impl Failed { pub fn id(&self) -> u64 { self.from.id() } } -#[derive(Debug, PartialEq, Eq, Hash, Serialize)] +#[derive(Debug, Serialize, Deserialize)] #[serde(tag = "status", rename_all = "camelCase")] -pub enum UpdateStatus { - Processing(Processing), - Enqueued(Enqueued), - Processed(Processed), - Aborted(Aborted), - Failed(Failed), +pub enum UpdateStatus { + Processing(Processing), + Enqueued(Enqueued), + Processed(Processed), + Aborted(Aborted), + Failed(Failed), } -impl UpdateStatus { +impl UpdateStatus { pub fn id(&self) -> u64 { match self { UpdateStatus::Processing(u) => u.id(), @@ -140,7 +174,7 @@ impl UpdateStatus { } } - pub fn processed(&self) -> Option<&Processed> { + pub fn processed(&self) -> Option<&Processed> { match self { UpdateStatus::Processed(p) => Some(p), _ => None, @@ -148,32 +182,32 @@ impl UpdateStatus { } } -impl From> for UpdateStatus { - fn from(other: Enqueued) -> Self { +impl From for UpdateStatus { + fn from(other: Enqueued) -> Self { Self::Enqueued(other) } } -impl From> for UpdateStatus { - fn from(other: Aborted) -> Self { +impl From for UpdateStatus { + fn from(other: Aborted) -> Self { Self::Aborted(other) } } -impl From> for UpdateStatus { - fn from(other: Processed) -> Self { +impl From for UpdateStatus { + fn from(other: Processed) -> Self { Self::Processed(other) } } -impl From> for UpdateStatus { - fn from(other: Processing) -> Self { +impl From for UpdateStatus { + fn from(other: Processing) -> Self { Self::Processing(other) } } -impl From> for UpdateStatus { - fn from(other: Failed) -> Self { +impl From for UpdateStatus { + fn from(other: Failed) -> Self { Self::Failed(other) } } diff --git a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs index 27ffaa05e..253326276 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{collections::HashSet, path::PathBuf}; use log::{info, warn}; use tokio::sync::mpsc; @@ -78,7 +78,7 @@ impl UuidResolverActor { Ok(result) } - async fn handle_snapshot(&self, path: PathBuf) -> Result> { + async fn handle_snapshot(&self, path: PathBuf) -> Result> { self.store.snapshot(path).await } diff --git a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs index c522e87e6..db4c482bd 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::path::{Path, PathBuf}; use tokio::sync::{mpsc, oneshot}; @@ -67,7 +68,7 @@ impl UuidResolverHandle for UuidResolverHandleImpl { .expect("Uuid resolver actor has been killed")?) } - async fn snapshot(&self, path: PathBuf) -> Result> { + async fn snapshot(&self, path: PathBuf) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::SnapshotRequest { path, ret }; let _ = self.sender.send(msg).await; diff --git a/meilisearch-http/src/index_controller/uuid_resolver/message.rs b/meilisearch-http/src/index_controller/uuid_resolver/message.rs index e7d29f05f..a72bf0587 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/message.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/message.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::path::PathBuf; use tokio::sync::oneshot; @@ -28,7 +29,7 @@ pub enum UuidResolveMsg { }, SnapshotRequest { path: PathBuf, - ret: oneshot::Sender>>, + ret: oneshot::Sender>>, }, GetSize { ret: oneshot::Sender>, diff --git a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs index 33a089ddb..ef17133ff 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs @@ -3,6 +3,7 @@ mod handle_impl; mod message; mod store; +use std::collections::HashSet; use std::path::PathBuf; use thiserror::Error; @@ -29,7 +30,7 @@ pub trait UuidResolverHandle { async fn create(&self, name: String) -> anyhow::Result; async fn delete(&self, name: String) -> anyhow::Result; async fn list(&self) -> anyhow::Result>; - async fn snapshot(&self, path: PathBuf) -> Result>; + async fn snapshot(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; } diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index 1f057830b..29c034c44 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -1,5 +1,6 @@ -use std::fs::create_dir_all; use std::path::{Path, PathBuf}; +use std::collections::HashSet; +use std::fs::create_dir_all; use heed::{ types::{ByteSlice, Str}, @@ -19,7 +20,7 @@ pub trait UuidStore { async fn delete(&self, uid: String) -> Result>; async fn list(&self) -> Result>; async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; - async fn snapshot(&self, path: PathBuf) -> Result>; + async fn snapshot(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; } @@ -129,17 +130,17 @@ impl UuidStore for HeedUuidStore { .await? } - async fn snapshot(&self, mut path: PathBuf) -> Result> { + async fn snapshot(&self, mut path: PathBuf) -> Result> { let env = self.env.clone(); let db = self.db; tokio::task::spawn_blocking(move || { // Write transaction to acquire a lock on the database. let txn = env.write_txn()?; - let mut entries = Vec::new(); + let mut entries = HashSet::new(); for entry in db.iter(&txn)? { let (_, uuid) = entry?; let uuid = Uuid::from_slice(uuid)?; - entries.push(uuid) + entries.insert(uuid); } // only perform snapshot if there are indexes diff --git a/meilisearch-http/src/routes/document.rs b/meilisearch-http/src/routes/document.rs index 357a2b16a..4f211bf09 100644 --- a/meilisearch-http/src/routes/document.rs +++ b/meilisearch-http/src/routes/document.rs @@ -107,14 +107,11 @@ async fn get_all_documents( path: web::Path, params: web::Query, ) -> Result { - let attributes_to_retrieve = params - .attributes_to_retrieve - .as_ref() - .and_then(|attrs| { + let attributes_to_retrieve = params.attributes_to_retrieve.as_ref().and_then(|attrs| { let mut names = Vec::new(); for name in attrs.split(',').map(String::from) { if name == "*" { - return None + return None; } names.push(name); } diff --git a/meilisearch-http/tests/common/index.rs b/meilisearch-http/tests/common/index.rs index 86aa80c3d..adb7fef3e 100644 --- a/meilisearch-http/tests/common/index.rs +++ b/meilisearch-http/tests/common/index.rs @@ -185,12 +185,9 @@ impl Index<'_> { self.service.get(url).await } - make_settings_test_routes!( - distinct_attribute - ); + make_settings_test_routes!(distinct_attribute); } - pub struct GetDocumentOptions; #[derive(Debug, Default)] diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index 87e5cecb7..9de5fe9db 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -77,8 +77,8 @@ async fn document_addition_with_primary_key() { "content": "foo", } ]); - let (_response, code) = index.add_documents(documents, Some("primary")).await; - assert_eq!(code, 202); + let (response, code) = index.add_documents(documents, Some("primary")).await; + assert_eq!(code, 202, "response: {}", response); index.wait_update_id(0).await; @@ -189,8 +189,8 @@ async fn replace_document() { } ]); - let (_response, code) = index.add_documents(documents, None).await; - assert_eq!(code, 202); + let (response, code) = index.add_documents(documents, None).await; + assert_eq!(code, 202, "response: {}", response); index.wait_update_id(0).await; @@ -260,8 +260,8 @@ async fn update_document() { } ]); - let (_response, code) = index.update_documents(documents, None).await; - assert_eq!(code, 202); + let (response, code) = index.update_documents(documents, None).await; + assert_eq!(code, 202, "response: {}", response); index.wait_update_id(1).await; diff --git a/meilisearch-http/tests/settings/distinct.rs b/meilisearch-http/tests/settings/distinct.rs index a3aec6baf..818f200fd 100644 --- a/meilisearch-http/tests/settings/distinct.rs +++ b/meilisearch-http/tests/settings/distinct.rs @@ -6,14 +6,18 @@ async fn set_and_reset_distinct_attribute() { let server = Server::new().await; let index = server.index("test"); - let (_response, _code) = index.update_settings(json!({ "distinctAttribute": "test"})).await; + let (_response, _code) = index + .update_settings(json!({ "distinctAttribute": "test"})) + .await; index.wait_update_id(0).await; let (response, _) = index.settings().await; assert_eq!(response["distinctAttribute"], "test"); - index.update_settings(json!({ "distinctAttribute": null })).await; + index + .update_settings(json!({ "distinctAttribute": null })) + .await; index.wait_update_id(1).await; diff --git a/meilisearch-http/tests/settings/mod.rs b/meilisearch-http/tests/settings/mod.rs index b7102cc5f..05339cb37 100644 --- a/meilisearch-http/tests/settings/mod.rs +++ b/meilisearch-http/tests/settings/mod.rs @@ -1,2 +1,2 @@ -mod get_settings; mod distinct; +mod get_settings;