diff --git a/Cargo.lock b/Cargo.lock index 13f023140..f802ac4ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "actix-codec" version = "0.4.0" @@ -286,6 +288,12 @@ version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28b2cd92db5cbd74e8e5028f7e27dd7aa3090e89e4f2a197cc7c8dfb69c7063b" +[[package]] +name = "arc-swap" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4d7d63395147b81a9e570bcc6243aaf71c017bd666d4909cfef0085bdda8d73" + [[package]] name = "assert-json-diff" version = "1.0.1" @@ -295,19 +303,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "async-compression" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b72c1f1154e234325b50864a349b9c8e56939e266a4c307c0f159812df2f9537" -dependencies = [ - "flate2", - "futures-core", - "memchr", - "pin-project-lite 0.2.6", - "tokio 0.2.25", -] - [[package]] name = "async-stream" version = "0.3.1" @@ -775,16 +770,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "dashmap" -version = "4.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" -dependencies = [ - "cfg-if 1.0.0", - "num_cpus", -] - [[package]] name = "debugid" version = "0.7.2" @@ -1751,8 +1736,8 @@ dependencies = [ "actix-web", "actix-web-static-files", "anyhow", + "arc-swap", "assert-json-diff", - "async-compression", "async-stream", "async-trait", "byte-unit", @@ -1760,7 +1745,6 @@ dependencies = [ "cargo_toml", "chrono", "crossbeam-channel", - "dashmap", "either", "env_logger 0.8.3", "flate2", diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index c5aef1a56..e6912e428 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -28,14 +28,13 @@ actix-service = "2.0.0" actix-web = { version = "=4.0.0-beta.6", features = ["rustls"] } actix-web-static-files = { git = "https://github.com/MarinPostma/actix-web-static-files.git", rev = "6db8c3e", optional = true } anyhow = "1.0.36" -async-compression = { version = "0.3.6", features = ["gzip", "tokio-02"] } async-stream = "0.3.0" async-trait = "0.1.42" +arc-swap = "1.2.0" byte-unit = { version = "4.0.9", default-features = false, features = ["std"] } bytes = "0.6.0" chrono = { version = "0.4.19", features = ["serde"] } crossbeam-channel = "0.5.0" -dashmap = "4.0.2" either = "1.6.1" env_logger = "0.8.2" flate2 = "1.0.19" diff --git a/meilisearch-http/src/data/mod.rs b/meilisearch-http/src/data/mod.rs index 3aee18217..ec64192f7 100644 --- a/meilisearch-http/src/data/mod.rs +++ b/meilisearch-http/src/data/mod.rs @@ -1,12 +1,10 @@ -use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; -use chrono::{DateTime, Utc}; use sha2::Digest; use crate::index::Settings; -use crate::index_controller::{IndexController, IndexStats}; +use crate::index_controller::{IndexController, IndexStats, Stats}; use crate::index_controller::{IndexMetadata, IndexSettings}; use crate::option::Opt; @@ -39,13 +37,6 @@ pub struct ApiKeys { pub master: Option, } -#[derive(Default)] -pub struct Stats { - pub database_size: u64, - pub last_update: Option>, - pub indexes: HashMap, -} - impl ApiKeys { pub fn generate_missing_api_keys(&mut self) { if let Some(master_key) = &self.master { @@ -77,11 +68,7 @@ impl Data { api_keys.generate_missing_api_keys(); - let inner = DataInner { - index_controller, - options, - api_keys, - }; + let inner = DataInner { index_controller, api_keys, options }; let inner = Arc::new(inner); Ok(Data { inner }) @@ -114,31 +101,11 @@ impl Data { } pub async fn get_index_stats(&self, uid: String) -> anyhow::Result { - Ok(self.index_controller.get_stats(uid).await?) + Ok(self.index_controller.get_index_stats(uid).await?) } - pub async fn get_stats(&self) -> anyhow::Result { - let mut stats = Stats::default(); - stats.database_size += self.index_controller.get_uuids_size().await?; - - for index in self.index_controller.list_indexes().await? { - let index_stats = self.index_controller.get_stats(index.uid.clone()).await?; - - stats.database_size += index_stats.size; - stats.database_size += self - .index_controller - .get_updates_size(index.uid.clone()) - .await?; - - stats.last_update = Some(match stats.last_update { - Some(last_update) => last_update.max(index.meta.updated_at), - None => index.meta.updated_at, - }); - - stats.indexes.insert(index.uid, index_stats); - } - - Ok(stats) + pub async fn get_all_stats(&self) -> anyhow::Result { + Ok(self.index_controller.get_all_stats().await?) } #[inline] diff --git a/meilisearch-http/src/index/mod.rs b/meilisearch-http/src/index/mod.rs index 57a939aaa..43c8b0193 100644 --- a/meilisearch-http/src/index/mod.rs +++ b/meilisearch-http/src/index/mod.rs @@ -6,9 +6,9 @@ use anyhow::{bail, Context}; use milli::obkv_to_json; use serde_json::{Map, Value}; -pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; -pub use updates::{Facets, Settings, UpdateResult}; use crate::helpers::EnvSizer; +pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; +pub use updates::{Facets, Settings}; mod search; mod updates; @@ -59,9 +59,7 @@ impl Index { }) .transpose()? .unwrap_or_else(BTreeSet::new); - let distinct_attribute = self - .distinct_attribute(&txn)? - .map(String::from); + let distinct_attribute = self.distinct_attribute(&txn)?.map(String::from); Ok(Settings { displayed_attributes: Some(Some(displayed_attributes)), diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index f91da257f..038f1f7e6 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -4,17 +4,11 @@ use std::num::NonZeroUsize; use flate2::read::GzDecoder; use log::info; -use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; +use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use serde::{de::Deserializer, Deserialize, Serialize}; use super::Index; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum UpdateResult { - DocumentsAddition(DocumentAdditionResult), - DocumentDeletion { deleted: u64 }, - Other, -} +use crate::index_controller::UpdateResult; #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] @@ -91,7 +85,7 @@ impl Index { &self, format: UpdateFormat, method: IndexDocumentsMethod, - content: impl io::Read, + content: Option, update_builder: UpdateBuilder, primary_key: Option<&str>, ) -> anyhow::Result { @@ -108,16 +102,15 @@ impl Index { builder.update_format(format); builder.index_documents_method(method); - let gzipped = false; - let reader = if gzipped { - Box::new(GzDecoder::new(content)) - } else { - Box::new(content) as Box - }; + let indexing_callback = + |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step); - let result = builder.execute(reader, |indexing_step, update_id| { - info!("update {}: {:?}", update_id, indexing_step) - }); + let gzipped = false; + let result = match content { + Some(content) if gzipped => builder.execute(GzDecoder::new(content), indexing_callback), + Some(content) => builder.execute(content, indexing_callback), + None => builder.execute(std::io::empty(), indexing_callback), + }; info!("document addition done: {:?}", result); @@ -228,10 +221,13 @@ impl Index { pub fn delete_documents( &self, - document_ids: impl io::Read, + document_ids: Option, update_builder: UpdateBuilder, ) -> anyhow::Result { - let ids: Vec = serde_json::from_reader(document_ids)?; + let ids = match document_ids { + Some(reader) => serde_json::from_reader(reader)?, + None => Vec::::new(), + }; let mut txn = self.write_txn()?; let mut builder = update_builder.delete_documents(&mut txn, self)?; diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index 9cca6557b..06d2b0f60 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -1,96 +1,59 @@ use std::fs::File; -use std::future::Future; use std::path::PathBuf; use std::sync::Arc; use async_stream::stream; -use futures::pin_mut; use futures::stream::StreamExt; use heed::CompactionOption; use log::debug; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::mpsc; use tokio::task::spawn_blocking; use uuid::Uuid; use crate::index::{Document, SearchQuery, SearchResult, Settings}; -use crate::index_controller::update_handler::UpdateHandler; use crate::index_controller::{ - get_arc_ownership_blocking, updates::Processing, IndexStats, UpdateMeta, + get_arc_ownership_blocking, update_handler::UpdateHandler, Failed, IndexStats, Processed, + Processing, }; use crate::option::IndexerOpts; -use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult}; +use super::{IndexError, IndexMeta, IndexMsg, IndexResult, IndexSettings, IndexStore}; + +pub const CONCURRENT_INDEX_MSG: usize = 10; pub struct IndexActor { - read_receiver: Option>, - write_receiver: Option>, + receiver: Option>, update_handler: Arc, - processing: RwLock>, store: S, } impl IndexActor { - pub fn new( - read_receiver: mpsc::Receiver, - write_receiver: mpsc::Receiver, - store: S, - ) -> Result { + pub fn new(receiver: mpsc::Receiver, store: S) -> IndexResult { let options = IndexerOpts::default(); let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; let update_handler = Arc::new(update_handler); - let read_receiver = Some(read_receiver); - let write_receiver = Some(write_receiver); - Ok(Self { - read_receiver, - write_receiver, - update_handler, - processing: RwLock::new(None), - store, - }) + let receiver = Some(receiver); + Ok(Self { receiver, update_handler, store }) } - /// `run` poll the write_receiver and read_receiver concurrently, but while messages send - /// through the read channel are processed concurrently, the messages sent through the write - /// channel are processed one at a time. pub async fn run(mut self) { - let mut read_receiver = self - .read_receiver + let mut receiver = self + .receiver .take() .expect("Index Actor must have a inbox at this point."); - let read_stream = stream! { + let stream = stream! { loop { - match read_receiver.recv().await { + match receiver.recv().await { Some(msg) => yield msg, None => break, } } }; - let mut write_receiver = self - .write_receiver - .take() - .expect("Index Actor must have a inbox at this point."); - - let write_stream = stream! { - loop { - match write_receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - - pin_mut!(write_stream); - pin_mut!(read_stream); - - let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg)); - let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg)); - - let fut1: Box + Unpin + Send> = Box::new(fut1); - let fut2: Box + Unpin + Send> = Box::new(fut2); - - tokio::join!(fut1, fut2); + stream + .for_each_concurrent(Some(CONCURRENT_INDEX_MSG), |msg| self.handle_message(msg)) + .await; } async fn handle_message(&self, msg: IndexMsg) { @@ -103,8 +66,13 @@ impl IndexActor { } => { let _ = ret.send(self.handle_create_index(uuid, primary_key).await); } - Update { ret, meta, data } => { - let _ = ret.send(self.handle_update(meta, data).await); + Update { + ret, + meta, + data, + uuid, + } => { + let _ = ret.send(self.handle_update(uuid, meta, data).await); } Search { ret, query, uuid } => { let _ = ret.send(self.handle_search(uuid, query).await); @@ -170,7 +138,7 @@ impl IndexActor { &self, uuid: Uuid, primary_key: Option, - ) -> Result { + ) -> IndexResult { let index = self.store.create(uuid, primary_key).await?; let meta = spawn_blocking(move || IndexMeta::new(&index)) .await @@ -180,31 +148,23 @@ impl IndexActor { async fn handle_update( &self, - meta: Processing, - data: File, - ) -> Result { - async fn get_result(actor: &IndexActor, meta: Processing, data: File) -> Result { - debug!("Processing update {}", meta.id()); - let uuid = *meta.index_uuid(); - let update_handler = actor.update_handler.clone(); - let index = match actor.store.get(uuid).await? { - Some(index) => index, - None => actor.store.create(uuid, None).await?, - }; + uuid: Uuid, + meta: Processing, + data: Option, + ) -> IndexResult> { + debug!("Processing update {}", meta.id()); + let update_handler = self.update_handler.clone(); + let index = match self.store.get(uuid).await? { + Some(index) => index, + None => self.store.create(uuid, None).await?, + }; - spawn_blocking(move || update_handler.handle_update(meta, data, index)) - .await - .map_err(|e| IndexError::Error(e.into())) - } - - *self.processing.write().await = Some(*meta.index_uuid()); - let result = get_result(self, meta, data).await; - *self.processing.write().await = None; - - result + spawn_blocking(move || update_handler.handle_update(meta, data, index)) + .await + .map_err(|e| IndexError::Error(e.into())) } - async fn handle_settings(&self, uuid: Uuid) -> Result { + async fn handle_settings(&self, uuid: Uuid) -> IndexResult { let index = self .store .get(uuid) @@ -221,7 +181,7 @@ impl IndexActor { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> Result> { + ) -> IndexResult> { let index = self .store .get(uuid) @@ -241,7 +201,7 @@ impl IndexActor { uuid: Uuid, doc_id: String, attributes_to_retrieve: Option>, - ) -> Result { + ) -> IndexResult { let index = self .store .get(uuid) @@ -256,7 +216,7 @@ impl IndexActor { .map_err(|e| IndexError::Error(e.into()))? } - async fn handle_delete(&self, uuid: Uuid) -> Result<()> { + async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> { let index = self.store.delete(uuid).await?; if let Some(index) = index { @@ -273,7 +233,7 @@ impl IndexActor { Ok(()) } - async fn handle_get_meta(&self, uuid: Uuid) -> Result { + async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult { match self.store.get(uuid).await? { Some(index) => { let meta = spawn_blocking(move || IndexMeta::new(&index)) @@ -289,7 +249,7 @@ impl IndexActor { &self, uuid: Uuid, index_settings: IndexSettings, - ) -> Result { + ) -> IndexResult { let index = self .store .get(uuid) @@ -316,7 +276,7 @@ impl IndexActor { .map_err(|e| IndexError::Error(e.into()))? } - async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> { + async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> { use tokio::fs::create_dir_all; path.push("indexes"); @@ -346,23 +306,20 @@ impl IndexActor { Ok(()) } - async fn handle_get_stats(&self, uuid: Uuid) -> Result { + async fn handle_get_stats(&self, uuid: Uuid) -> IndexResult { let index = self .store .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - let processing = self.processing.read().await; - let is_indexing = *processing == Some(uuid); - spawn_blocking(move || { let rtxn = index.read_txn()?; Ok(IndexStats { size: index.size(), number_of_documents: index.number_of_documents(&rtxn)?, - is_indexing, + is_indexing: None, fields_distribution: index.fields_distribution(&rtxn)?, }) }) diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index 93406c13b..719e12cee 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -3,55 +3,64 @@ use std::path::{Path, PathBuf}; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use crate::index::{Document, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{updates::Processing, UpdateMeta}; -use crate::index_controller::{IndexSettings, IndexStats}; - -use super::{ - IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult, +use crate::index_controller::{IndexSettings, IndexStats, Processing}; +use crate::{ + index::{Document, SearchQuery, SearchResult, Settings}, + index_controller::{Failed, Processed}, }; +use super::{IndexActor, IndexActorHandle, IndexMeta, IndexMsg, IndexResult, MapIndexStore}; + #[derive(Clone)] pub struct IndexActorHandleImpl { - read_sender: mpsc::Sender, - write_sender: mpsc::Sender, + sender: mpsc::Sender, } #[async_trait::async_trait] impl IndexActorHandle for IndexActorHandleImpl { - async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { + async fn create_index( + &self, + uuid: Uuid, + primary_key: Option, + ) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::CreateIndex { ret, uuid, primary_key, }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; receiver.await.expect("IndexActor has been killed") } async fn update( &self, - meta: Processing, - data: std::fs::File, - ) -> anyhow::Result { + uuid: Uuid, + meta: Processing, + data: Option, + ) -> anyhow::Result> { let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Update { ret, meta, data }; - let _ = self.read_sender.send(msg).await; + let msg = IndexMsg::Update { + ret, + meta, + data, + uuid, + }; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { + async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Search { uuid, query, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn settings(&self, uuid: Uuid) -> Result { + async fn settings(&self, uuid: Uuid) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Settings { uuid, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } @@ -61,7 +70,7 @@ impl IndexActorHandle for IndexActorHandleImpl { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> Result> { + ) -> IndexResult> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Documents { uuid, @@ -70,7 +79,7 @@ impl IndexActorHandle for IndexActorHandleImpl { attributes_to_retrieve, limit, }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } @@ -79,7 +88,7 @@ impl IndexActorHandle for IndexActorHandleImpl { uuid: Uuid, doc_id: String, attributes_to_retrieve: Option>, - ) -> Result { + ) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Document { uuid, @@ -87,61 +96,61 @@ impl IndexActorHandle for IndexActorHandleImpl { doc_id, attributes_to_retrieve, }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn delete(&self, uuid: Uuid) -> Result<()> { + async fn delete(&self, uuid: Uuid) -> IndexResult<()> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Delete { uuid, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn get_index_meta(&self, uuid: Uuid) -> Result { + async fn get_index_meta(&self, uuid: Uuid) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::GetMeta { uuid, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result { + async fn update_index( + &self, + uuid: Uuid, + index_settings: IndexSettings, + ) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::UpdateIndex { uuid, index_settings, ret, }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Snapshot { uuid, path, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn get_index_stats(&self, uuid: Uuid) -> Result { + async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::GetStats { uuid, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } } impl IndexActorHandleImpl { pub fn new(path: impl AsRef, index_size: usize) -> anyhow::Result { - let (read_sender, read_receiver) = mpsc::channel(100); - let (write_sender, write_receiver) = mpsc::channel(100); + let (sender, receiver) = mpsc::channel(100); let store = MapIndexStore::new(path, index_size); - let actor = IndexActor::new(read_receiver, write_receiver, store)?; + let actor = IndexActor::new(receiver, store)?; tokio::task::spawn(actor.run()); - Ok(Self { - read_sender, - write_sender, - }) + Ok(Self { sender }) } } diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs index 6da0f8628..d728b2564 100644 --- a/meilisearch-http/src/index_controller/index_actor/message.rs +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -4,20 +4,21 @@ use tokio::sync::oneshot; use uuid::Uuid; use crate::index::{Document, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{updates::Processing, IndexStats, UpdateMeta}; +use crate::index_controller::{Failed, IndexStats, Processed, Processing}; -use super::{IndexMeta, IndexSettings, Result, UpdateResult}; +use super::{IndexMeta, IndexResult, IndexSettings}; pub enum IndexMsg { CreateIndex { uuid: Uuid, primary_key: Option, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, Update { - meta: Processing, - data: std::fs::File, - ret: oneshot::Sender>, + uuid: Uuid, + meta: Processing, + data: Option, + ret: oneshot::Sender>>, }, Search { uuid: Uuid, @@ -26,41 +27,41 @@ pub enum IndexMsg { }, Settings { uuid: Uuid, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, Documents { uuid: Uuid, attributes_to_retrieve: Option>, offset: usize, limit: usize, - ret: oneshot::Sender>>, + ret: oneshot::Sender>>, }, Document { uuid: Uuid, attributes_to_retrieve: Option>, doc_id: String, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, Delete { uuid: Uuid, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, GetMeta { uuid: Uuid, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, UpdateIndex { uuid: Uuid, index_settings: IndexSettings, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, Snapshot { uuid: Uuid, path: PathBuf, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, GetStats { uuid: Uuid, - ret: oneshot::Sender>, + ret: oneshot::Sender>, }, } diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index 426eb29e4..ef5ea524c 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -1,3 +1,4 @@ +use std::fs::File; use std::path::PathBuf; use chrono::{DateTime, Utc}; @@ -8,16 +9,13 @@ use thiserror::Error; use uuid::Uuid; use actor::IndexActor; +pub use actor::CONCURRENT_INDEX_MSG; pub use handle_impl::IndexActorHandleImpl; use message::IndexMsg; use store::{IndexStore, MapIndexStore}; -use crate::index::UpdateResult as UResult; use crate::index::{Document, Index, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{ - updates::{Failed, Processed, Processing}, - IndexStats, UpdateMeta, -}; +use crate::index_controller::{Failed, Processed, Processing, IndexStats}; use super::IndexSettings; @@ -26,8 +24,7 @@ mod handle_impl; mod message; mod store; -pub type Result = std::result::Result; -type UpdateResult = std::result::Result, Failed>; +pub type IndexResult = std::result::Result; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] @@ -38,20 +35,16 @@ pub struct IndexMeta { } impl IndexMeta { - fn new(index: &Index) -> Result { + fn new(index: &Index) -> IndexResult { let txn = index.read_txn()?; Self::new_txn(index, &txn) } - fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { + fn new_txn(index: &Index, txn: &heed::RoTxn) -> IndexResult { let created_at = index.created_at(&txn)?; let updated_at = index.updated_at(&txn)?; let primary_key = index.primary_key(&txn)?.map(String::from); - Ok(Self { - primary_key, - updated_at, - created_at, - }) + Ok(Self { created_at, updated_at, primary_key }) } } @@ -72,14 +65,16 @@ pub enum IndexError { #[async_trait::async_trait] #[cfg_attr(test, automock)] pub trait IndexActorHandle { - async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; + async fn create_index(&self, uuid: Uuid, primary_key: Option) + -> IndexResult; async fn update( &self, - meta: Processing, - data: std::fs::File, - ) -> anyhow::Result; - async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result; - async fn settings(&self, uuid: Uuid) -> Result; + uuid: Uuid, + meta: Processing, + data: Option, + ) -> anyhow::Result>; + async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult; + async fn settings(&self, uuid: Uuid) -> IndexResult; async fn documents( &self, @@ -87,16 +82,103 @@ pub trait IndexActorHandle { offset: usize, limit: usize, attributes_to_retrieve: Option>, - ) -> Result>; + ) -> IndexResult>; async fn document( &self, uuid: Uuid, doc_id: String, attributes_to_retrieve: Option>, - ) -> Result; - async fn delete(&self, uuid: Uuid) -> Result<()>; - async fn get_index_meta(&self, uuid: Uuid) -> Result; - async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result; - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; - async fn get_index_stats(&self, uuid: Uuid) -> Result; + ) -> IndexResult; + async fn delete(&self, uuid: Uuid) -> IndexResult<()>; + async fn get_index_meta(&self, uuid: Uuid) -> IndexResult; + async fn update_index( + &self, + uuid: Uuid, + index_settings: IndexSettings, + ) -> IndexResult; + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>; + async fn get_index_stats(&self, uuid: Uuid) -> IndexResult; +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use super::*; + + #[async_trait::async_trait] + /// Useful for passing around an `Arc` in tests. + impl IndexActorHandle for Arc { + async fn create_index( + &self, + uuid: Uuid, + primary_key: Option, + ) -> IndexResult { + self.as_ref().create_index(uuid, primary_key).await + } + + async fn update( + &self, + uuid: Uuid, + meta: Processing, + data: Option, + ) -> anyhow::Result> { + self.as_ref().update(uuid, meta, data).await + } + + async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult { + self.as_ref().search(uuid, query).await + } + + async fn settings(&self, uuid: Uuid) -> IndexResult { + self.as_ref().settings(uuid).await + } + + async fn documents( + &self, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> IndexResult> { + self.as_ref() + .documents(uuid, offset, limit, attributes_to_retrieve) + .await + } + + async fn document( + &self, + uuid: Uuid, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> IndexResult { + self.as_ref() + .document(uuid, doc_id, attributes_to_retrieve) + .await + } + + async fn delete(&self, uuid: Uuid) -> IndexResult<()> { + self.as_ref().delete(uuid).await + } + + async fn get_index_meta(&self, uuid: Uuid) -> IndexResult { + self.as_ref().get_index_meta(uuid).await + } + + async fn update_index( + &self, + uuid: Uuid, + index_settings: IndexSettings, + ) -> IndexResult { + self.as_ref().update_index(uuid, index_settings).await + } + + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + self.as_ref().snapshot(uuid, path).await + } + + async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { + self.as_ref().get_index_stats(uuid).await + } + } } diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 6250f515e..44f076f2f 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -8,16 +8,16 @@ use tokio::sync::RwLock; use tokio::task::spawn_blocking; use uuid::Uuid; -use super::{IndexError, Result}; +use super::{IndexError, IndexResult}; use crate::index::Index; type AsyncMap = Arc>>; #[async_trait::async_trait] pub trait IndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> Result; - async fn get(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: Uuid) -> Result>; + async fn create(&self, uuid: Uuid, primary_key: Option) -> IndexResult; + async fn get(&self, uuid: Uuid) -> IndexResult>; + async fn delete(&self, uuid: Uuid) -> IndexResult>; } pub struct MapIndexStore { @@ -40,14 +40,14 @@ impl MapIndexStore { #[async_trait::async_trait] impl IndexStore for MapIndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> Result { + async fn create(&self, uuid: Uuid, primary_key: Option) -> IndexResult { let path = self.path.join(format!("index-{}", uuid)); if path.exists() { return Err(IndexError::IndexAlreadyExists); } let index_size = self.index_size; - let index = spawn_blocking(move || -> Result { + let index = spawn_blocking(move || -> IndexResult { let index = open_index(&path, index_size)?; if let Some(primary_key) = primary_key { let mut txn = index.write_txn()?; @@ -64,7 +64,7 @@ impl IndexStore for MapIndexStore { Ok(index) } - async fn get(&self, uuid: Uuid) -> Result> { + async fn get(&self, uuid: Uuid) -> IndexResult> { let guard = self.index_store.read().await; match guard.get(&uuid) { Some(index) => Ok(Some(index.clone())), @@ -86,7 +86,7 @@ impl IndexStore for MapIndexStore { } } - async fn delete(&self, uuid: Uuid) -> Result> { + async fn delete(&self, uuid: Uuid) -> IndexResult> { let db_path = self.path.join(format!("index-{}", uuid)); fs::remove_dir_all(db_path) .await @@ -96,7 +96,7 @@ impl IndexStore for MapIndexStore { } } -fn open_index(path: impl AsRef, size: usize) -> Result { +fn open_index(path: impl AsRef, size: usize) -> IndexResult { std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?; let mut options = EnvOpenOptions::new(); options.map_size(size); diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 8361c45cc..81e6d0a5e 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -1,28 +1,26 @@ +use std::collections::BTreeMap; use std::path::Path; use std::sync::Arc; use std::time::Duration; use actix_web::web::{Bytes, Payload}; use anyhow::bail; +use chrono::{DateTime, Utc}; use futures::stream::StreamExt; use log::info; -use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::FieldsDistribution; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::time::sleep; use uuid::Uuid; +pub use updates::*; use index_actor::IndexActorHandle; -use snapshot::load_snapshot; -use snapshot::SnapshotService; +use snapshot::{SnapshotService, load_snapshot}; use update_actor::UpdateActorHandle; -pub use updates::{Failed, Processed, Processing}; -use uuid_resolver::UuidError; -use uuid_resolver::UuidResolverHandle; +use uuid_resolver::{UuidError, UuidResolverHandle}; -use crate::index::{Document, SearchQuery, SearchResult}; -use crate::index::{Facets, Settings, UpdateResult}; +use crate::index::{Settings, Document, SearchQuery, SearchResult}; use crate::option::Opt; mod index_actor; @@ -32,42 +30,33 @@ mod update_handler; mod updates; mod uuid_resolver; -pub type UpdateStatus = updates::UpdateStatus; - #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct IndexMetadata { + #[serde(skip)] + pub uuid: Uuid, pub uid: String, name: String, #[serde(flatten)] pub meta: index_actor::IndexMeta, } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum UpdateMeta { - DocumentsAddition { - method: IndexDocumentsMethod, - format: UpdateFormat, - primary_key: Option, - }, - ClearDocuments, - DeleteDocuments, - Settings(Settings), - Facets(Facets), -} - #[derive(Clone, Debug)] pub struct IndexSettings { pub uid: Option, pub primary_key: Option, } -#[derive(Clone, Debug)] +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] pub struct IndexStats { + #[serde(skip)] pub size: u64, pub number_of_documents: u64, - pub is_indexing: bool, + /// Whether the current index is performing an update. It is initially `None` when the + /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is + /// later set to either true or false, we we retrieve the information from the `UpdateStore` + pub is_indexing: Option, pub fields_distribution: FieldsDistribution, } @@ -77,6 +66,14 @@ pub struct IndexController { update_handle: update_actor::UpdateActorHandleImpl, } +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Stats { + pub database_size: u64, + pub last_update: Option>, + pub indexes: BTreeMap, +} + impl IndexController { pub fn new(path: impl AsRef, options: &Opt) -> anyhow::Result { let index_size = options.max_mdb_size.get_bytes() as usize; @@ -166,6 +163,8 @@ impl IndexController { Err(UuidError::UnexistingIndex(name)) => { let uuid = Uuid::new_v4(); let status = perform_update(uuid).await?; + // ignore if index creation fails now, since it may already have been created + let _ = self.index_handle.create_index(uuid, None).await; self.uuid_resolver.insert(name, uuid).await?; Ok(status) } @@ -218,6 +217,8 @@ impl IndexController { Err(UuidError::UnexistingIndex(name)) if create => { let uuid = Uuid::new_v4(); let status = perform_udpate(uuid).await?; + // ignore if index creation fails now, since it may already have been created + let _ = self.index_handle.create_index(uuid, None).await; self.uuid_resolver.insert(name, uuid).await?; Ok(status) } @@ -233,8 +234,8 @@ impl IndexController { let uid = uid.ok_or_else(|| anyhow::anyhow!("Can't create an index without a uid."))?; let uuid = self.uuid_resolver.create(uid.clone()).await?; let meta = self.index_handle.create_index(uuid, primary_key).await?; - let _ = self.update_handle.create(uuid).await?; let meta = IndexMetadata { + uuid, name: uid.clone(), uid, meta, @@ -270,6 +271,7 @@ impl IndexController { for (uid, uuid) in uuids { let meta = self.index_handle.get_index_meta(uuid).await?; let meta = IndexMetadata { + uuid, name: uid.clone(), uid, meta, @@ -327,6 +329,7 @@ impl IndexController { let uuid = self.uuid_resolver.get(uid.clone()).await?; let meta = self.index_handle.update_index(uuid, index_settings).await?; let meta = IndexMetadata { + uuid, name: uid.clone(), uid, meta, @@ -344,6 +347,7 @@ impl IndexController { let uuid = self.uuid_resolver.get(uid.clone()).await?; let meta = self.index_handle.get_index_meta(uuid).await?; let meta = IndexMetadata { + uuid, name: uid.clone(), uid, meta, @@ -351,21 +355,44 @@ impl IndexController { Ok(meta) } - pub async fn get_stats(&self, uid: String) -> anyhow::Result { - let uuid = self.uuid_resolver.get(uid.clone()).await?; - - Ok(self.index_handle.get_index_stats(uuid).await?) - } - - pub async fn get_updates_size(&self, uid: String) -> anyhow::Result { - let uuid = self.uuid_resolver.get(uid.clone()).await?; - - Ok(self.update_handle.get_size(uuid).await?) - } - pub async fn get_uuids_size(&self) -> anyhow::Result { Ok(self.uuid_resolver.get_size().await?) } + + pub async fn get_index_stats(&self, uid: String) -> anyhow::Result { + let uuid = self.uuid_resolver.get(uid).await?; + let update_infos = self.update_handle.get_info().await?; + let mut stats = self.index_handle.get_index_stats(uuid).await?; + // Check if the currently indexing update is from out index. + stats.is_indexing = Some(Some(uuid) == update_infos.processing); + Ok(stats) + } + + pub async fn get_all_stats(&self) -> anyhow::Result { + let update_infos = self.update_handle.get_info().await?; + let mut database_size = self.get_uuids_size().await? + update_infos.size; + let mut last_update: Option> = None; + let mut indexes = BTreeMap::new(); + + for index in self.list_indexes().await? { + let mut index_stats = self.index_handle.get_index_stats(index.uuid).await?; + database_size += index_stats.size; + + last_update = last_update.map_or(Some(index.meta.updated_at), |last| { + Some(last.max(index.meta.updated_at)) + }); + + index_stats.is_indexing = Some(Some(index.uuid) == update_infos.processing); + + indexes.insert(index.uid, index_stats); + } + + Ok(Stats { + database_size, + last_update, + indexes, + }) + } } pub async fn get_arc_ownership_blocking(mut item: Arc) -> T { diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs index 8557fe04e..2a456eb26 100644 --- a/meilisearch-http/src/index_controller/snapshot.rs +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -71,16 +71,9 @@ where return Ok(()); } - let tasks = uuids - .iter() - .map(|&uuid| { - self.update_handle - .snapshot(uuid, temp_snapshot_path.clone()) - }) - .collect::>(); - - futures::future::try_join_all(tasks).await?; - + self.update_handle + .snapshot(uuids, temp_snapshot_path.clone()) + .await?; let snapshot_dir = self.snapshot_path.clone(); let snapshot_path = self .snapshot_path @@ -138,20 +131,28 @@ pub fn load_snapshot( #[cfg(test)] mod test { + use std::iter::FromIterator; + use std::{collections::HashSet, sync::Arc}; + use futures::future::{err, ok}; use rand::Rng; use tokio::time::timeout; use uuid::Uuid; use super::*; - use crate::index_controller::update_actor::{MockUpdateActorHandle, UpdateError}; + use crate::index_controller::index_actor::MockIndexActorHandle; + use crate::index_controller::update_actor::{ + MockUpdateActorHandle, UpdateActorHandleImpl, UpdateError, + }; use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidError}; #[actix_rt::test] async fn test_normal() { let mut rng = rand::thread_rng(); - let uuids_num = rng.gen_range(5, 10); - let uuids = (0..uuids_num).map(|_| Uuid::new_v4()).collect::>(); + let uuids_num: usize = rng.gen_range(5, 10); + let uuids = (0..uuids_num) + .map(|_| Uuid::new_v4()) + .collect::>(); let mut uuid_resolver = MockUuidResolverHandle::new(); let uuids_clone = uuids.clone(); @@ -160,14 +161,19 @@ mod test { .times(1) .returning(move |_| Box::pin(ok(uuids_clone.clone()))); - let mut update_handle = MockUpdateActorHandle::new(); let uuids_clone = uuids.clone(); - update_handle + let mut index_handle = MockIndexActorHandle::new(); + index_handle .expect_snapshot() .withf(move |uuid, _path| uuids_clone.contains(uuid)) .times(uuids_num) .returning(move |_, _| Box::pin(ok(()))); + let dir = tempfile::tempdir_in(".").unwrap(); + let handle = Arc::new(index_handle); + let update_handle = + UpdateActorHandleImpl::>::new(handle.clone(), dir.path(), 4096 * 100).unwrap(); + let snapshot_path = tempfile::tempdir_in(".").unwrap(); let snapshot_service = SnapshotService::new( uuid_resolver, @@ -212,7 +218,7 @@ mod test { uuid_resolver .expect_snapshot() .times(1) - .returning(move |_| Box::pin(ok(vec![uuid]))); + .returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); let mut update_handle = MockUpdateActorHandle::new(); update_handle diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index f725dda84..e47edc5bc 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -1,46 +1,50 @@ +use std::collections::HashSet; use std::io::SeekFrom; use std::path::{Path, PathBuf}; +use std::sync::Arc; +use futures::StreamExt; use log::info; use oxidized_json_checker::JsonChecker; use tokio::fs; -use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; +use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; -use crate::index_controller::index_actor::IndexActorHandle; -use crate::index_controller::{get_arc_ownership_blocking, UpdateMeta, UpdateStatus}; +use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo}; +use crate::index_controller::index_actor::{IndexActorHandle, CONCURRENT_INDEX_MSG}; +use crate::index_controller::{UpdateMeta, UpdateStatus}; -use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStoreStore}; - -pub struct UpdateActor { +pub struct UpdateActor { path: PathBuf, - store: S, + store: Arc, inbox: mpsc::Receiver>, index_handle: I, } -impl UpdateActor +impl UpdateActor where D: AsRef<[u8]> + Sized + 'static, - S: UpdateStoreStore, I: IndexActorHandle + Clone + Send + Sync + 'static, { pub fn new( - store: S, + update_db_size: usize, inbox: mpsc::Receiver>, path: impl AsRef, index_handle: I, ) -> anyhow::Result { - let path = path.as_ref().to_owned(); + let path = path.as_ref().join("updates"); + + std::fs::create_dir_all(&path)?; + + let mut options = heed::EnvOpenOptions::new(); + options.map_size(update_db_size); + + let store = UpdateStore::open(options, &path, index_handle.clone())?; std::fs::create_dir_all(path.join("update_files"))?; assert!(path.exists()); - Ok(Self { - store, - inbox, - path, - index_handle, - }) + Ok(Self { path, store, inbox, index_handle }) } pub async fn run(mut self) { @@ -67,14 +71,11 @@ where Some(Delete { uuid, ret }) => { let _ = ret.send(self.handle_delete(uuid).await); } - Some(Create { uuid, ret }) => { - let _ = ret.send(self.handle_create(uuid).await); + Some(Snapshot { uuids, path, ret }) => { + let _ = ret.send(self.handle_snapshot(uuids, path).await); } - Some(Snapshot { uuid, path, ret }) => { - let _ = ret.send(self.handle_snapshot(uuid, path).await); - } - Some(GetSize { uuid, ret }) => { - let _ = ret.send(self.handle_get_size(uuid).await); + Some(GetInfo { ret }) => { + let _ = ret.send(self.handle_get_info().await); } None => break, } @@ -87,52 +88,64 @@ where meta: UpdateMeta, mut payload: mpsc::Receiver>, ) -> Result { - let update_store = self.store.get_or_create(uuid).await?; - let update_file_id = uuid::Uuid::new_v4(); - let path = self - .path - .join(format!("update_files/update_{}", update_file_id)); - let mut file = fs::OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&path) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - while let Some(bytes) = payload.recv().await { - match bytes { - Ok(bytes) => { - file.write_all(bytes.as_ref()) + let file_path = match meta { + UpdateMeta::DocumentsAddition { .. } + | UpdateMeta::DeleteDocuments => { + + let update_file_id = uuid::Uuid::new_v4(); + let path = self + .path + .join(format!("update_files/update_{}", update_file_id)); + let mut file = fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&path) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + + let mut file_len = 0; + while let Some(bytes) = payload.recv().await { + match bytes { + Ok(bytes) => { + file_len += bytes.as_ref().len(); + file.write_all(bytes.as_ref()) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + } + Err(e) => { + return Err(UpdateError::Error(e)); + } + } + } + + if file_len != 0 { + file.flush() .await .map_err(|e| UpdateError::Error(Box::new(e)))?; - } - Err(e) => { - return Err(UpdateError::Error(e)); + let file = file.into_std().await; + Some((file, path)) + } else { + // empty update, delete the empty file. + fs::remove_file(&path) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + None } } - } + _ => None + }; - file.flush() - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - - file.seek(SeekFrom::Start(0)) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - - let mut file = file.into_std().await; + let update_store = self.store.clone(); tokio::task::spawn_blocking(move || { use std::io::{copy, sink, BufReader, Seek}; // If the payload is empty, ignore the check. - if file - .metadata() - .map_err(|e| UpdateError::Error(Box::new(e)))? - .len() - > 0 - { + let path = if let Some((mut file, path)) = file_path { + // set the file back to the beginning + file.seek(SeekFrom::Start(0)).map_err(|e| UpdateError::Error(Box::new(e)))?; // Check that the json payload is valid: let reader = BufReader::new(&mut file); let mut checker = JsonChecker::new(reader); @@ -144,7 +157,10 @@ where let _: serde_json::Value = serde_json::from_reader(file) .map_err(|e| UpdateError::Error(Box::new(e)))?; } - } + Some(path) + } else { + None + }; // The payload is valid, we can register it to the update store. update_store @@ -157,11 +173,10 @@ where } async fn handle_list_updates(&self, uuid: Uuid) -> Result> { - let update_store = self.store.get(uuid).await?; + let update_store = self.store.clone(); tokio::task::spawn_blocking(move || { let result = update_store - .ok_or(UpdateError::UnexistingIndex(uuid))? - .list() + .list(uuid) .map_err(|e| UpdateError::Error(e.into()))?; Ok(result) }) @@ -170,77 +185,64 @@ where } async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result { - let store = self - .store - .get(uuid) - .await? - .ok_or(UpdateError::UnexistingIndex(uuid))?; + let store = self.store.clone(); let result = store - .meta(id) + .meta(uuid, id) .map_err(|e| UpdateError::Error(Box::new(e)))? .ok_or(UpdateError::UnexistingUpdate(id))?; Ok(result) } async fn handle_delete(&self, uuid: Uuid) -> Result<()> { - let store = self.store.delete(uuid).await?; + let store = self.store.clone(); - if let Some(store) = store { - tokio::task::spawn(async move { - let store = get_arc_ownership_blocking(store).await; - tokio::task::spawn_blocking(move || { - store.prepare_for_closing().wait(); - info!("Update store {} was closed.", uuid); - }); - }); - } - - Ok(()) - } - - async fn handle_create(&self, uuid: Uuid) -> Result<()> { - let _ = self.store.get_or_create(uuid).await?; - Ok(()) - } - - async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - let index_handle = self.index_handle.clone(); - if let Some(update_store) = self.store.get(uuid).await? { - tokio::task::spawn_blocking(move || -> anyhow::Result<()> { - // acquire write lock to prevent further writes during snapshot - // the update lock must be acquired BEFORE the write lock to prevent dead lock - let _lock = update_store.update_lock.lock(); - let mut txn = update_store.env.write_txn()?; - - // create db snapshot - update_store.snapshot(&mut txn, &path, uuid)?; - - futures::executor::block_on( - async move { index_handle.snapshot(uuid, path).await }, - )?; - Ok(()) - }) + tokio::task::spawn_blocking(move || store.delete_all(uuid)) .await .map_err(|e| UpdateError::Error(e.into()))? .map_err(|e| UpdateError::Error(e.into()))?; - } Ok(()) } - async fn handle_get_size(&self, uuid: Uuid) -> Result { - let size = match self.store.get(uuid).await? { - Some(update_store) => tokio::task::spawn_blocking(move || -> anyhow::Result { - let txn = update_store.env.read_txn()?; + async fn handle_snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()> { + let index_handle = self.index_handle.clone(); + let update_store = self.store.clone(); + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + update_store.snapshot(&uuids, &path)?; - update_store.get_size(&txn) + // Perform the snapshot of each index concurently. Only a third of the capabilities of + // the index actor at a time not to put too much pressure on the index actor + let path = &path; + let handle = &index_handle; + + let mut stream = futures::stream::iter(uuids.iter()) + .map(|&uuid| handle.snapshot(uuid, path.clone())) + .buffer_unordered(CONCURRENT_INDEX_MSG / 3); + + Handle::current().block_on(async { + while let Some(res) = stream.next().await { + res?; + } + Ok(()) }) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?, - None => 0, - }; + }) + .await + .map_err(|e| UpdateError::Error(e.into()))? + .map_err(|e| UpdateError::Error(e.into()))?; - Ok(size) + Ok(()) + } + + async fn handle_get_info(&self) -> Result { + let update_store = self.store.clone(); + let info = tokio::task::spawn_blocking(move || -> anyhow::Result { + let info = update_store.get_info()?; + Ok(info) + }) + .await + .map_err(|e| UpdateError::Error(e.into()))? + .map_err(|e| UpdateError::Error(e.into()))?; + + Ok(info) } } diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index 860cc2bc8..999481573 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -1,13 +1,13 @@ +use std::collections::HashSet; use std::path::{Path, PathBuf}; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use crate::index_controller::IndexActorHandle; +use crate::index_controller::{IndexActorHandle, UpdateStatus}; use super::{ - MapUpdateStoreStore, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, - UpdateMsg, UpdateStatus, + PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStoreInfo, }; #[derive(Clone)] @@ -27,10 +27,9 @@ where where I: IndexActorHandle + Clone + Send + Sync + 'static, { - let path = path.as_ref().to_owned().join("updates"); + let path = path.as_ref().to_owned(); let (sender, receiver) = mpsc::channel(100); - let store = MapUpdateStoreStore::new(index_handle.clone(), &path, update_store_size); - let actor = UpdateActor::new(store, receiver, path, index_handle)?; + let actor = UpdateActor::new(update_store_size, receiver, path, index_handle)?; tokio::task::spawn(actor.run()); @@ -65,23 +64,16 @@ where receiver.await.expect("update actor killed.") } - async fn create(&self, uuid: Uuid) -> Result<()> { + async fn snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Create { uuid, ret }; + let msg = UpdateMsg::Snapshot { uuids, path, ret }; let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + async fn get_info(&self) -> Result { let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Snapshot { uuid, path, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } - - async fn get_size(&self, uuid: Uuid) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::GetSize { uuid, ret }; + let msg = UpdateMsg::GetInfo { ret }; let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs index f8150c00a..17b2b3579 100644 --- a/meilisearch-http/src/index_controller/update_actor/message.rs +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -1,9 +1,10 @@ +use std::collections::HashSet; use std::path::PathBuf; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use super::{PayloadData, Result, UpdateMeta, UpdateStatus}; +use super::{PayloadData, Result, UpdateMeta, UpdateStatus, UpdateStoreInfo}; pub enum UpdateMsg { Update { @@ -25,17 +26,12 @@ pub enum UpdateMsg { uuid: Uuid, ret: oneshot::Sender>, }, - Create { - uuid: Uuid, - ret: oneshot::Sender>, - }, Snapshot { - uuid: Uuid, + uuids: HashSet, path: PathBuf, ret: oneshot::Sender>, }, - GetSize { - uuid: Uuid, - ret: oneshot::Sender>, + GetInfo { + ret: oneshot::Sender>, }, } diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index 228b47b02..e7a12b7ff 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -1,26 +1,24 @@ mod actor; mod handle_impl; mod message; -mod store; mod update_store; -use std::path::PathBuf; +use std::{collections::HashSet, path::PathBuf}; use thiserror::Error; use tokio::sync::mpsc; use uuid::Uuid; -use crate::index::UpdateResult; use crate::index_controller::{UpdateMeta, UpdateStatus}; use actor::UpdateActor; use message::UpdateMsg; -use store::{MapUpdateStoreStore, UpdateStoreStore}; +use update_store::UpdateStore; +pub use update_store::UpdateStoreInfo; pub use handle_impl::UpdateActorHandleImpl; pub type Result = std::result::Result; -type UpdateStore = update_store::UpdateStore; type PayloadData = std::result::Result>; #[cfg(test)] @@ -30,8 +28,6 @@ use mockall::automock; pub enum UpdateError { #[error("error with update: {0}")] Error(Box), - #[error("Index {0} doesn't exist.")] - UnexistingIndex(Uuid), #[error("Update {0} doesn't exist.")] UnexistingUpdate(u64), } @@ -44,9 +40,8 @@ pub trait UpdateActorHandle { async fn get_all_updates_status(&self, uuid: Uuid) -> Result>; async fn update_status(&self, uuid: Uuid, id: u64) -> Result; async fn delete(&self, uuid: Uuid) -> Result<()>; - async fn create(&self, uuid: Uuid) -> Result<()>; - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; - async fn get_size(&self, uuid: Uuid) -> Result; + async fn snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()>; + async fn get_info(&self) -> Result; async fn update( &self, meta: UpdateMeta, diff --git a/meilisearch-http/src/index_controller/update_actor/store.rs b/meilisearch-http/src/index_controller/update_actor/store.rs deleted file mode 100644 index 676182a62..000000000 --- a/meilisearch-http/src/index_controller/update_actor/store.rs +++ /dev/null @@ -1,111 +0,0 @@ -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use tokio::fs; -use tokio::sync::RwLock; -use uuid::Uuid; - -use super::{Result, UpdateError, UpdateStore}; -use crate::index_controller::IndexActorHandle; - -#[async_trait::async_trait] -pub trait UpdateStoreStore { - async fn get_or_create(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: Uuid) -> Result>>; - async fn get(&self, uuid: Uuid) -> Result>>; -} - -pub struct MapUpdateStoreStore { - db: Arc>>>, - index_handle: I, - path: PathBuf, - update_store_size: usize, -} - -impl MapUpdateStoreStore { - pub fn new(index_handle: I, path: impl AsRef, update_store_size: usize) -> Self { - let db = Arc::new(RwLock::new(HashMap::new())); - let path = path.as_ref().to_owned(); - Self { - db, - index_handle, - path, - update_store_size, - } - } -} - -#[async_trait::async_trait] -impl UpdateStoreStore for MapUpdateStoreStore -where - I: IndexActorHandle + Clone + Send + Sync + 'static, -{ - async fn get_or_create(&self, uuid: Uuid) -> Result> { - match self.db.write().await.entry(uuid) { - Entry::Vacant(e) => { - let mut options = heed::EnvOpenOptions::new(); - let update_store_size = self.update_store_size; - options.map_size(update_store_size); - let path = self.path.clone().join(format!("updates-{}", e.key())); - fs::create_dir_all(&path).await.unwrap(); - let index_handle = self.index_handle.clone(); - let store = UpdateStore::open(options, &path, move |meta, file| { - futures::executor::block_on(index_handle.update(meta, file)) - }) - .map_err(|e| UpdateError::Error(e.into()))?; - let store = e.insert(store); - Ok(store.clone()) - } - Entry::Occupied(e) => Ok(e.get().clone()), - } - } - - async fn get(&self, uuid: Uuid) -> Result>> { - let guard = self.db.read().await; - match guard.get(&uuid) { - Some(uuid) => Ok(Some(uuid.clone())), - None => { - // The index is not found in the found in the loaded indexes, so we attempt to load - // it from disk. We need to acquire a write lock **before** attempting to open the - // index, because someone could be trying to open it at the same time as us. - drop(guard); - let path = self.path.clone().join(format!("updates-{}", uuid)); - if path.exists() { - let mut guard = self.db.write().await; - match guard.entry(uuid) { - Entry::Vacant(entry) => { - // We can safely load the index - let index_handle = self.index_handle.clone(); - let mut options = heed::EnvOpenOptions::new(); - let update_store_size = self.update_store_size; - options.map_size(update_store_size); - let store = UpdateStore::open(options, &path, move |meta, file| { - futures::executor::block_on(index_handle.update(meta, file)) - }) - .map_err(|e| UpdateError::Error(e.into()))?; - let store = entry.insert(store); - Ok(Some(store.clone())) - } - Entry::Occupied(entry) => { - // The index was loaded while we attempted to to iter - Ok(Some(entry.get().clone())) - } - } - } else { - Ok(None) - } - } - } - } - - async fn delete(&self, uuid: Uuid) -> Result>> { - let store = self.db.write().await.remove(&uuid); - let path = self.path.clone().join(format!("updates-{}", uuid)); - if store.is_some() || path.exists() { - fs::remove_dir_all(path).await.unwrap(); - } - Ok(store) - } -} diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index f1895829b..6a916af33 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -1,99 +1,208 @@ -use std::fs::File; -use std::fs::{copy, create_dir_all, remove_file}; -use std::path::{Path, PathBuf}; +use std::borrow::Cow; +use std::collections::{BTreeMap, HashSet}; +use std::convert::TryInto; +use std::fs::{copy, create_dir_all, remove_file, File}; +use std::mem::size_of; +use std::path::Path; use std::sync::Arc; -use heed::types::{DecodeIgnore, OwnedType, SerdeJson}; -use heed::{CompactionOption, Database, Env, EnvOpenOptions}; -use parking_lot::{Mutex, RwLock}; -use serde::{Deserialize, Serialize}; +use anyhow::Context; +use arc_swap::ArcSwap; +use heed::types::{ByteSlice, OwnedType, SerdeJson}; +use heed::zerocopy::U64; +use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions}; +use parking_lot::{Mutex, MutexGuard}; +use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; +use super::UpdateMeta; use crate::helpers::EnvSizer; -use crate::index_controller::updates::*; +use crate::index_controller::{IndexActorHandle, updates::*}; #[allow(clippy::upper_case_acronyms)] -type BEU64 = heed::zerocopy::U64; +type BEU64 = U64; -#[derive(Clone)] -pub struct UpdateStore { - pub env: Env, - pending_meta: Database, SerdeJson>>, - pending: Database, SerdeJson>, - processed_meta: Database, SerdeJson>>, - failed_meta: Database, SerdeJson>>, - aborted_meta: Database, SerdeJson>>, - processing: Arc>>>, - notification_sender: mpsc::Sender<()>, - /// A lock on the update loop. This is meant to prevent a snapshot to occur while an update is - /// processing, while not preventing writes all together during an update - pub update_lock: Arc>, +struct NextIdCodec; + +enum NextIdKey { + Global, + Index(Uuid), } -pub trait HandleUpdate { - fn handle_update( - &mut self, - meta: Processing, - content: File, - ) -> anyhow::Result, Failed>>; +pub struct UpdateStoreInfo { + /// Size of the update store in bytes. + pub size: u64, + /// Uuid of the currently processing update if it exists + pub processing: Option, } -impl HandleUpdate for F -where - F: FnMut(Processing, File) -> anyhow::Result, Failed>>, -{ - fn handle_update( - &mut self, - meta: Processing, - content: File, - ) -> anyhow::Result, Failed>> { - self(meta, content) +/// A data structure that allows concurrent reads AND exactly one writer. +pub struct StateLock { + lock: Mutex<()>, + data: ArcSwap, +} + +struct StateLockGuard<'a> { + _lock: MutexGuard<'a, ()>, + state: &'a StateLock, +} + +impl StateLockGuard<'_> { + fn swap(&self, state: State) -> Arc { + self.state.data.swap(Arc::new(state)) } } -impl UpdateStore -where - M: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync + Clone, - N: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync, - E: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync, -{ - pub fn open( +impl StateLock { + fn from_state(state: State) -> Self { + let lock = Mutex::new(()); + let data = ArcSwap::from(Arc::new(state)); + Self { lock, data } + } + + fn read(&self) -> Arc { + self.data.load().clone() + } + + fn write(&self) -> StateLockGuard { + let _lock = self.lock.lock(); + let state = &self; + StateLockGuard { _lock, state } + } +} + +#[allow(clippy::large_enum_variant)] +pub enum State { + Idle, + Processing(Uuid, Processing), + Snapshoting, +} + +impl<'a> BytesEncode<'a> for NextIdCodec { + type EItem = NextIdKey; + + fn bytes_encode(item: &'a Self::EItem) -> Option> { + match item { + NextIdKey::Global => Some(Cow::Borrowed(b"__global__")), + NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())), + } + } +} + +struct PendingKeyCodec; + +impl<'a> BytesEncode<'a> for PendingKeyCodec { + type EItem = (u64, Uuid, u64); + + fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option> { + let mut bytes = Vec::with_capacity(size_of::()); + bytes.extend_from_slice(&global_id.to_be_bytes()); + bytes.extend_from_slice(uuid.as_bytes()); + bytes.extend_from_slice(&update_id.to_be_bytes()); + Some(Cow::Owned(bytes)) + } +} + +impl<'a> BytesDecode<'a> for PendingKeyCodec { + type DItem = (u64, Uuid, u64); + + fn bytes_decode(bytes: &'a [u8]) -> Option { + let global_id_bytes = bytes.get(0..size_of::())?.try_into().ok()?; + let global_id = u64::from_be_bytes(global_id_bytes); + + let uuid_bytes = bytes + .get(size_of::()..(size_of::() + size_of::()))? + .try_into() + .ok()?; + let uuid = Uuid::from_bytes(uuid_bytes); + + let update_id_bytes = bytes + .get((size_of::() + size_of::())..)? + .try_into() + .ok()?; + let update_id = u64::from_be_bytes(update_id_bytes); + + Some((global_id, uuid, update_id)) + } +} + +struct UpdateKeyCodec; + +impl<'a> BytesEncode<'a> for UpdateKeyCodec { + type EItem = (Uuid, u64); + + fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option> { + let mut bytes = Vec::with_capacity(size_of::()); + bytes.extend_from_slice(uuid.as_bytes()); + bytes.extend_from_slice(&update_id.to_be_bytes()); + Some(Cow::Owned(bytes)) + } +} + +impl<'a> BytesDecode<'a> for UpdateKeyCodec { + type DItem = (Uuid, u64); + + fn bytes_decode(bytes: &'a [u8]) -> Option { + let uuid_bytes = bytes.get(0..size_of::())?.try_into().ok()?; + let uuid = Uuid::from_bytes(uuid_bytes); + + let update_id_bytes = bytes.get(size_of::()..)?.try_into().ok()?; + let update_id = u64::from_be_bytes(update_id_bytes); + + Some((uuid, update_id)) + } +} + +#[derive(Clone)] +pub struct UpdateStore { + pub env: Env, + /// A queue containing the updates to process, ordered by arrival. + /// The key are built as follow: + /// | global_update_id | index_uuid | update_id | + /// | 8-bytes | 16-bytes | 8-bytes | + pending_queue: Database>, + /// Map indexes to the next available update id. If NextIdKey::Global is queried, then the next + /// global update id is returned + next_update_id: Database>, + /// Contains all the performed updates meta, be they failed, aborted, or processed. + /// The keys are built as follow: + /// | Uuid | id | + /// | 16-bytes | 8-bytes | + updates: Database>, + /// Indicates the current state of the update store, + state: Arc, + /// Wake up the loop when a new event occurs. + notification_sender: mpsc::Sender<()>, +} + +impl UpdateStore { + pub fn open( mut options: EnvOpenOptions, - path: P, - update_handler: U, - ) -> heed::Result> - where - P: AsRef, - U: HandleUpdate + Sync + Clone + Send + 'static, - { + path: impl AsRef, + index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, + ) -> anyhow::Result> { options.max_dbs(5); let env = options.open(path)?; - let pending_meta = env.create_database(Some("pending-meta"))?; - let pending = env.create_database(Some("pending"))?; - let processed_meta = env.create_database(Some("processed-meta"))?; - let aborted_meta = env.create_database(Some("aborted-meta"))?; - let failed_meta = env.create_database(Some("failed-meta"))?; - let processing = Arc::new(RwLock::new(None)); + let pending_queue = env.create_database(Some("pending-queue"))?; + let next_update_id = env.create_database(Some("next-update-id"))?; + let updates = env.create_database(Some("updates"))?; let (notification_sender, mut notification_receiver) = mpsc::channel(10); // Send a first notification to trigger the process. let _ = notification_sender.send(()); - let update_lock = Arc::new(Mutex::new(())); + let state = Arc::new(StateLock::from_state(State::Idle)); - let update_store = Arc::new(UpdateStore { - env, - pending, - pending_meta, - processed_meta, - aborted_meta, - notification_sender, - failed_meta, - processing, - update_lock, - }); + // Init update loop to perform any pending updates at launch. + // Since we just launched the update store, and we still own the receiving end of the + // channel, this call is guaranteed to succeed. + notification_sender + .try_send(()) + .expect("Failed to init update store"); + + let update_store = Arc::new(UpdateStore { env, pending_queue, next_update_id, updates, state, notification_sender }); // We need a weak reference so we can take ownership on the arc later when we // want to close the index. @@ -104,7 +213,7 @@ where loop { match update_store_weak.upgrade() { Some(update_store) => { - let handler = update_handler.clone(); + let handler = index_handle.clone(); let res = tokio::task::spawn_blocking(move || { update_store.process_pending_update(handler) }) @@ -126,115 +235,113 @@ where Ok(update_store) } - pub fn prepare_for_closing(self) -> heed::EnvClosingEvent { - self.env.prepare_for_closing() - } + /// Returns the next global update id and the next update id for a given `index_uuid`. + fn next_update_id(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result<(u64, u64)> { + let global_id = self + .next_update_id + .get(txn, &NextIdKey::Global)? + .map(U64::get) + .unwrap_or_default(); + let update_id = self + .next_update_id + .get(txn, &NextIdKey::Index(index_uuid))? + .map(U64::get) + .unwrap_or_default(); - /// Returns the new biggest id to use to store the new update. - fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { - let last_pending = self - .pending_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); + self.next_update_id + .put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?; + self.next_update_id.put( + txn, + &NextIdKey::Index(index_uuid), + &BEU64::new(update_id + 1), + )?; - let last_processed = self - .processed_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); - - let last_aborted = self - .aborted_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); - - let last_update_id = [last_pending, last_processed, last_aborted] - .iter() - .copied() - .flatten() - .max(); - - match last_update_id { - Some(last_id) => Ok(last_id + 1), - None => Ok(0), - } + Ok((global_id, update_id)) } /// Registers the update content in the pending store and the meta /// into the pending-meta store. Returns the new unique update id. pub fn register_update( &self, - meta: M, - content: impl AsRef, + meta: UpdateMeta, + content: Option>, index_uuid: Uuid, - ) -> heed::Result> { - let mut wtxn = self.env.write_txn()?; + ) -> heed::Result { + let mut txn = self.env.write_txn()?; - // We ask the update store to give us a new update id, this is safe, - // no other update can have the same id because we use a write txn before - // asking for the id and registering it so other update registering - // will be forced to wait for a new write txn. - let update_id = self.new_update_id(&wtxn)?; - let update_key = BEU64::new(update_id); + let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; + let meta = Enqueued::new(meta, update_id, content.map(|p| p.as_ref().to_owned())); - let meta = Enqueued::new(meta, update_id, index_uuid); - self.pending_meta.put(&mut wtxn, &update_key, &meta)?; - self.pending - .put(&mut wtxn, &update_key, &content.as_ref().to_owned())?; + self.pending_queue + .put(&mut txn, &(global_id, index_uuid, update_id), &meta)?; - wtxn.commit()?; + txn.commit()?; self.notification_sender .blocking_send(()) .expect("Update store loop exited."); Ok(meta) } + /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, mut handler: U) -> anyhow::Result> - where - U: HandleUpdate, - { - let _lock = self.update_lock.lock(); + fn process_pending_update( + &self, + index_handle: impl IndexActorHandle, + ) -> anyhow::Result> { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; - let first_meta = self.pending_meta.first(&rtxn)?; + let first_meta = self.pending_queue.first(&rtxn)?; + drop(rtxn); // If there is a pending update we process and only keep // a reader while processing it, not a writer. match first_meta { - Some((first_id, pending)) => { - let content_path = self - .pending - .get(&rtxn, &first_id)? - .expect("associated update content"); - - // we change the state of the update from pending to processing before we pass it - // to the update handler. Processing store is non persistent to be able recover - // from a failure + Some(((global_id, index_uuid, update_id), mut pending)) => { + let content_path = pending.content.take(); let processing = pending.processing(); - self.processing.write().replace(processing.clone()); - let file = File::open(&content_path)?; + + // Acquire the state lock and set the current state to processing. + let state = self.state.write(); + state.swap(State::Processing(index_uuid, processing.clone())); + + let file = match content_path { + Some(ref path) => { + let file = File::open(path) + .with_context(|| format!("file at path: {:?}", &content_path))?; + Some(file) + } + None => None, + }; // Process the pending update using the provided user function. - let result = handler.handle_update(processing, file)?; - drop(rtxn); + let result = Handle::current() + .block_on(index_handle.update(index_uuid, processing, file))?; // Once the pending update have been successfully processed // we must remove the content from the pending and processing stores and // write the *new* meta to the processed-meta store and commit. let mut wtxn = self.env.write_txn()?; - self.processing.write().take(); - self.pending_meta.delete(&mut wtxn, &first_id)?; - remove_file(&content_path)?; - self.pending.delete(&mut wtxn, &first_id)?; - match result { - Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?, - Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?, + self.pending_queue + .delete(&mut wtxn, &(global_id, index_uuid, update_id))?; + + if let Some(path) = content_path { + remove_file(&path)?; } + + let result = match result { + Ok(res) => res.into(), + Err(res) => res.into(), + }; + + self.updates.remap_key_type::().put( + &mut wtxn, + &(index_uuid, update_id), + &result, + )?; + wtxn.commit()?; + state.swap(State::Idle); Ok(Some(())) } @@ -242,187 +349,308 @@ where } } - pub fn list(&self) -> anyhow::Result>> { - let rtxn = self.env.read_txn()?; - let mut updates = Vec::new(); + /// List the updates for `index_uuid`. + pub fn list(&self, index_uuid: Uuid) -> anyhow::Result> { + let mut update_list = BTreeMap::::new(); - let processing = self.processing.read(); - if let Some(ref processing) = *processing { - let update = UpdateStatus::from(processing.clone()); - updates.push(update); - } + let txn = self.env.read_txn()?; - let pending = self - .pending_meta - .iter(&rtxn)? - .filter_map(Result::ok) - .filter_map(|(_, p)| (Some(p.id()) != processing.as_ref().map(|p| p.id())).then(|| p)) - .map(UpdateStatus::from); - - updates.extend(pending); - - let aborted = self - .aborted_meta - .iter(&rtxn)? - .filter_map(Result::ok) - .map(|(_, p)| p) - .map(UpdateStatus::from); - - updates.extend(aborted); - - let processed = self - .processed_meta - .iter(&rtxn)? - .filter_map(Result::ok) - .map(|(_, p)| p) - .map(UpdateStatus::from); - - updates.extend(processed); - - let failed = self - .failed_meta - .iter(&rtxn)? - .filter_map(Result::ok) - .map(|(_, p)| p) - .map(UpdateStatus::from); - - updates.extend(failed); - - updates.sort_by_key(|u| u.id()); - - Ok(updates) - } - - /// Returns the update associated meta or `None` if the update doesn't exist. - pub fn meta(&self, update_id: u64) -> heed::Result>> { - let rtxn = self.env.read_txn()?; - let key = BEU64::new(update_id); - - if let Some(ref meta) = *self.processing.read() { - if meta.id() == update_id { - return Ok(Some(UpdateStatus::Processing(meta.clone()))); + let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); + for entry in pendings { + let ((_, uuid, id), pending) = entry?; + if uuid == index_uuid { + update_list.insert(id, pending.decode()?.into()); } } - if let Some(meta) = self.pending_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Enqueued(meta))); + let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?; + for entry in updates { + let (_, update) = entry?; + update_list.insert(update.id(), update); } - if let Some(meta) = self.processed_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Processed(meta))); + // If the currently processing update is from this index, replace the corresponding pending update with this one. + match *self.state.read() { + State::Processing(uuid, ref processing) if uuid == index_uuid => { + update_list.insert(processing.id(), processing.clone().into()); + } + _ => (), } - if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Aborted(meta))); + Ok(update_list.into_iter().map(|(_, v)| v).collect()) + } + + /// Returns the update associated meta or `None` if the update doesn't exist. + pub fn meta(&self, index_uuid: Uuid, update_id: u64) -> heed::Result> { + // Check if the update is the one currently processing + match *self.state.read() { + State::Processing(uuid, ref processing) + if uuid == index_uuid && processing.id() == update_id => + { + return Ok(Some(processing.clone().into())); + } + _ => (), } - if let Some(meta) = self.failed_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Failed(meta))); + let txn = self.env.read_txn()?; + // Else, check if it is in the updates database: + let update = self + .updates + .remap_key_type::() + .get(&txn, &(index_uuid, update_id))?; + + if let Some(update) = update { + return Ok(Some(update)); } + // If nothing was found yet, we resolve to iterate over the pending queue. + let pendings = self + .pending_queue + .remap_key_type::() + .iter(&txn)? + .lazily_decode_data(); + + for entry in pendings { + let ((uuid, id), pending) = entry?; + if uuid == index_uuid && id == update_id { + return Ok(Some(pending.decode()?.into())); + } + } + + // No update was found. Ok(None) } - /// Aborts an update, an aborted update content is deleted and - /// the meta of it is moved into the aborted updates database. - /// - /// Trying to abort an update that is currently being processed, an update - /// that as already been processed or which doesn't actually exist, will - /// return `None`. - #[allow(dead_code)] - pub fn abort_update(&self, update_id: u64) -> heed::Result>> { - let mut wtxn = self.env.write_txn()?; - let key = BEU64::new(update_id); + /// Delete all updates for an index from the update store. + pub fn delete_all(&self, index_uuid: Uuid) -> anyhow::Result<()> { + let mut txn = self.env.write_txn()?; + // Contains all the content file paths that we need to be removed if the deletion was successful. + let mut paths_to_remove = Vec::new(); - // We cannot abort an update that is currently being processed. - if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) { - return Ok(None); + let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data(); + + while let Some(Ok(((_, uuid, _), pending))) = pendings.next() { + if uuid == index_uuid { + pendings.del_current()?; + let mut pending = pending.decode()?; + if let Some(path) = pending.content.take() { + paths_to_remove.push(path); + } + } } - let pending = match self.pending_meta.get(&wtxn, &key)? { - Some(meta) => meta, - None => return Ok(None), - }; + drop(pendings); - let aborted = pending.abort(); + let mut updates = self + .updates + .prefix_iter_mut(&mut txn, index_uuid.as_bytes())? + .lazily_decode_data(); - self.aborted_meta.put(&mut wtxn, &key, &aborted)?; - self.pending_meta.delete(&mut wtxn, &key)?; - self.pending.delete(&mut wtxn, &key)?; + while let Some(_) = updates.next() { + updates.del_current()?; + } - wtxn.commit()?; + drop(updates); - Ok(Some(aborted)) + txn.commit()?; + + paths_to_remove.iter().for_each(|path| { + let _ = remove_file(path); + }); + + // We don't care about the currently processing update, since it will be removed by itself + // once its done processing, and we can't abort a running update. + + Ok(()) } - /// Aborts all the pending updates, and not the one being currently processed. - /// Returns the update metas and ids that were successfully aborted. - #[allow(dead_code)] - pub fn abort_pendings(&self) -> heed::Result)>> { - let mut wtxn = self.env.write_txn()?; - let mut aborted_updates = Vec::new(); + pub fn snapshot(&self, uuids: &HashSet, path: impl AsRef) -> anyhow::Result<()> { + let state_lock = self.state.write(); + state_lock.swap(State::Snapshoting); - // We skip the first pending update as it is currently being processed. - for result in self.pending_meta.iter(&wtxn)?.skip(1) { - let (key, pending) = result?; - let id = key.get(); - aborted_updates.push((id, pending.abort())); - } + let txn = self.env.write_txn()?; - for (id, aborted) in &aborted_updates { - let key = BEU64::new(*id); - self.aborted_meta.put(&mut wtxn, &key, &aborted)?; - self.pending_meta.delete(&mut wtxn, &key)?; - self.pending.delete(&mut wtxn, &key)?; - } - - wtxn.commit()?; - - Ok(aborted_updates) - } - - pub fn snapshot( - &self, - txn: &mut heed::RwTxn, - path: impl AsRef, - uuid: Uuid, - ) -> anyhow::Result<()> { let update_path = path.as_ref().join("updates"); create_dir_all(&update_path)?; - let mut snapshot_path = update_path.join(format!("update-{}", uuid)); // acquire write lock to prevent further writes during snapshot - create_dir_all(&snapshot_path)?; - snapshot_path.push("data.mdb"); + create_dir_all(&update_path)?; + let db_path = update_path.join("data.mdb"); // create db snapshot - self.env - .copy_to_path(&snapshot_path, CompactionOption::Enabled)?; + self.env.copy_to_path(&db_path, CompactionOption::Enabled)?; let update_files_path = update_path.join("update_files"); create_dir_all(&update_files_path)?; - for path in self.pending.iter(&txn)? { - let (_, path) = path?; - let name = path.file_name().unwrap(); - let to = update_files_path.join(name); - copy(path, to)?; + let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); + + for entry in pendings { + let ((_, uuid, _), pending) = entry?; + if uuids.contains(&uuid) { + if let Some(path) = pending.decode()?.content_path() { + let name = path.file_name().unwrap(); + let to = update_files_path.join(name); + copy(path, to)?; + } + } } Ok(()) } - pub fn get_size(&self, txn: &heed::RoTxn) -> anyhow::Result { + pub fn get_info(&self) -> anyhow::Result { let mut size = self.env.size(); + let txn = self.env.read_txn()?; - for path in self.pending.iter(txn)? { - let (_, path) = path?; - - if let Ok(metadata) = path.metadata() { - size += metadata.len() + for entry in self.pending_queue.iter(&txn)? { + let (_, pending) = entry?; + if let Some(path) = pending.content_path() { + size += File::open(path)?.metadata()?.len(); } } - Ok(size) + let processing = match *self.state.read() { + State::Processing(uuid, _) => Some(uuid), + _ => None, + }; + + Ok(UpdateStoreInfo { size, processing }) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::index_controller::{index_actor::MockIndexActorHandle, UpdateResult}; + + use futures::future::ok; + + #[actix_rt::test] + async fn test_next_id() { + let dir = tempfile::tempdir_in(".").unwrap(); + let mut options = EnvOpenOptions::new(); + let handle = Arc::new(MockIndexActorHandle::new()); + options.map_size(4096 * 100); + let update_store = UpdateStore::open(options, dir.path(), handle).unwrap(); + + let index1_uuid = Uuid::new_v4(); + let index2_uuid = Uuid::new_v4(); + + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((0, 0), ids); + + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((1, 0), ids); + + let mut txn = update_store.env.write_txn().unwrap(); + let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); + txn.commit().unwrap(); + assert_eq!((2, 1), ids); + } + + #[actix_rt::test] + async fn test_register_update() { + let dir = tempfile::tempdir_in(".").unwrap(); + let mut options = EnvOpenOptions::new(); + let handle = Arc::new(MockIndexActorHandle::new()); + options.map_size(4096 * 100); + let update_store = UpdateStore::open(options, dir.path(), handle).unwrap(); + let meta = UpdateMeta::ClearDocuments; + let uuid = Uuid::new_v4(); + let store_clone = update_store.clone(); + tokio::task::spawn_blocking(move || { + store_clone + .register_update(meta, Some("here"), uuid) + .unwrap(); + }) + .await + .unwrap(); + + let txn = update_store.env.read_txn().unwrap(); + assert!(update_store + .pending_queue + .get(&txn, &(0, uuid, 0)) + .unwrap() + .is_some()); + } + + #[actix_rt::test] + async fn test_process_update() { + let dir = tempfile::tempdir_in(".").unwrap(); + let mut handle = MockIndexActorHandle::new(); + + handle + .expect_update() + .times(2) + .returning(|_index_uuid, processing, _file| { + if processing.id() == 0 { + Box::pin(ok(Ok(processing.process(UpdateResult::Other)))) + } else { + Box::pin(ok(Err(processing.fail(String::from("err"))))) + } + }); + + let handle = Arc::new(handle); + + let mut options = EnvOpenOptions::new(); + options.map_size(4096 * 100); + let store = UpdateStore::open(options, dir.path(), handle.clone()).unwrap(); + + // wait a bit for the event loop exit. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let mut txn = store.env.write_txn().unwrap(); + + let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); + let uuid = Uuid::new_v4(); + + store + .pending_queue + .put(&mut txn, &(0, uuid, 0), &update) + .unwrap(); + + let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); + + store + .pending_queue + .put(&mut txn, &(1, uuid, 1), &update) + .unwrap(); + + txn.commit().unwrap(); + + // Process the pending, and check that it has been moved to the update databases, and + // removed from the pending database. + let store_clone = store.clone(); + tokio::task::spawn_blocking(move || { + store_clone.process_pending_update(handle.clone()).unwrap(); + store_clone.process_pending_update(handle).unwrap(); + }) + .await + .unwrap(); + + let txn = store.env.read_txn().unwrap(); + + assert!(store.pending_queue.first(&txn).unwrap().is_none()); + let update = store + .updates + .remap_key_type::() + .get(&txn, &(uuid, 0)) + .unwrap() + .unwrap(); + + assert!(matches!(update, UpdateStatus::Processed(_))); + let update = store + .updates + .remap_key_type::() + .get(&txn, &(uuid, 1)) + .unwrap() + .unwrap(); + + assert!(matches!(update, UpdateStatus::Failed(_))); } } diff --git a/meilisearch-http/src/index_controller/update_handler.rs b/meilisearch-http/src/index_controller/update_handler.rs index 1eb622cbf..130b85fe7 100644 --- a/meilisearch-http/src/index_controller/update_handler.rs +++ b/meilisearch-http/src/index_controller/update_handler.rs @@ -6,9 +6,8 @@ use grenad::CompressionType; use milli::update::UpdateBuilder; use rayon::ThreadPool; -use crate::index::UpdateResult; -use crate::index_controller::updates::{Failed, Processed, Processing}; use crate::index_controller::UpdateMeta; +use crate::index_controller::{Failed, Processed, Processing}; use crate::option::IndexerOpts; pub struct UpdateHandler { @@ -59,10 +58,10 @@ impl UpdateHandler { pub fn handle_update( &self, - meta: Processing, - content: File, + meta: Processing, + content: Option, index: Index, - ) -> Result, Failed> { + ) -> Result { use UpdateMeta::*; let update_id = meta.id(); diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index 42712a396..1515f90e9 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -1,94 +1,121 @@ -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; +use std::path::{Path, PathBuf}; -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Enqueued { - pub update_id: u64, - pub meta: M, - pub enqueued_at: DateTime, - pub index_uuid: Uuid, +use chrono::{DateTime, Utc}; +use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat}; +use serde::{Deserialize, Serialize}; + +use crate::index::{Facets, Settings}; + +pub type UpdateError = String; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum UpdateResult { + DocumentsAddition(DocumentAdditionResult), + DocumentDeletion { deleted: u64 }, + Other, } -impl Enqueued { - pub fn new(meta: M, update_id: u64, index_uuid: Uuid) -> Self { +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum UpdateMeta { + DocumentsAddition { + method: IndexDocumentsMethod, + format: UpdateFormat, + primary_key: Option, + }, + ClearDocuments, + DeleteDocuments, + Settings(Settings), + Facets(Facets), +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Enqueued { + pub update_id: u64, + pub meta: UpdateMeta, + pub enqueued_at: DateTime, + pub content: Option, +} + +impl Enqueued { + pub fn new(meta: UpdateMeta, update_id: u64, content: Option) -> Self { Self { enqueued_at: Utc::now(), meta, update_id, - index_uuid, + content, } } - pub fn processing(self) -> Processing { + pub fn processing(self) -> Processing { Processing { from: self, started_processing_at: Utc::now(), } } - pub fn abort(self) -> Aborted { + pub fn abort(self) -> Aborted { Aborted { from: self, aborted_at: Utc::now(), } } - pub fn meta(&self) -> &M { + pub fn meta(&self) -> &UpdateMeta { &self.meta } pub fn id(&self) -> u64 { self.update_id } + + pub fn content_path(&self) -> Option<&Path> { + self.content.as_deref() + } } -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct Processed { - pub success: N, +pub struct Processed { + pub success: UpdateResult, pub processed_at: DateTime, #[serde(flatten)] - pub from: Processing, + pub from: Processing, } -impl Processed { +impl Processed { pub fn id(&self) -> u64 { self.from.id() } } -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct Processing { +pub struct Processing { #[serde(flatten)] - pub from: Enqueued, + pub from: Enqueued, pub started_processing_at: DateTime, } -impl Processing { +impl Processing { pub fn id(&self) -> u64 { self.from.id() } - pub fn meta(&self) -> &M { + pub fn meta(&self) -> &UpdateMeta { self.from.meta() } - pub fn index_uuid(&self) -> &Uuid { - &self.from.index_uuid - } - - pub fn process(self, meta: N) -> Processed { + pub fn process(self, success: UpdateResult) -> Processed { Processed { - success: meta, + success, from: self, processed_at: Utc::now(), } } - pub fn fail(self, error: E) -> Failed { + pub fn fail(self, error: UpdateError) -> Failed { Failed { from: self, error, @@ -97,46 +124,46 @@ impl Processing { } } -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct Aborted { +pub struct Aborted { #[serde(flatten)] - from: Enqueued, + from: Enqueued, aborted_at: DateTime, } -impl Aborted { +impl Aborted { pub fn id(&self) -> u64 { self.from.id() } } -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct Failed { +pub struct Failed { #[serde(flatten)] - from: Processing, - error: E, + from: Processing, + error: UpdateError, failed_at: DateTime, } -impl Failed { +impl Failed { pub fn id(&self) -> u64 { self.from.id() } } -#[derive(Debug, PartialEq, Eq, Hash, Serialize)] +#[derive(Debug, Serialize, Deserialize)] #[serde(tag = "status", rename_all = "camelCase")] -pub enum UpdateStatus { - Processing(Processing), - Enqueued(Enqueued), - Processed(Processed), - Aborted(Aborted), - Failed(Failed), +pub enum UpdateStatus { + Processing(Processing), + Enqueued(Enqueued), + Processed(Processed), + Aborted(Aborted), + Failed(Failed), } -impl UpdateStatus { +impl UpdateStatus { pub fn id(&self) -> u64 { match self { UpdateStatus::Processing(u) => u.id(), @@ -147,7 +174,7 @@ impl UpdateStatus { } } - pub fn processed(&self) -> Option<&Processed> { + pub fn processed(&self) -> Option<&Processed> { match self { UpdateStatus::Processed(p) => Some(p), _ => None, @@ -155,32 +182,32 @@ impl UpdateStatus { } } -impl From> for UpdateStatus { - fn from(other: Enqueued) -> Self { +impl From for UpdateStatus { + fn from(other: Enqueued) -> Self { Self::Enqueued(other) } } -impl From> for UpdateStatus { - fn from(other: Aborted) -> Self { +impl From for UpdateStatus { + fn from(other: Aborted) -> Self { Self::Aborted(other) } } -impl From> for UpdateStatus { - fn from(other: Processed) -> Self { +impl From for UpdateStatus { + fn from(other: Processed) -> Self { Self::Processed(other) } } -impl From> for UpdateStatus { - fn from(other: Processing) -> Self { +impl From for UpdateStatus { + fn from(other: Processing) -> Self { Self::Processing(other) } } -impl From> for UpdateStatus { - fn from(other: Failed) -> Self { +impl From for UpdateStatus { + fn from(other: Failed) -> Self { Self::Failed(other) } } diff --git a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs index 27ffaa05e..253326276 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{collections::HashSet, path::PathBuf}; use log::{info, warn}; use tokio::sync::mpsc; @@ -78,7 +78,7 @@ impl UuidResolverActor { Ok(result) } - async fn handle_snapshot(&self, path: PathBuf) -> Result> { + async fn handle_snapshot(&self, path: PathBuf) -> Result> { self.store.snapshot(path).await } diff --git a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs index c522e87e6..db4c482bd 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::path::{Path, PathBuf}; use tokio::sync::{mpsc, oneshot}; @@ -67,7 +68,7 @@ impl UuidResolverHandle for UuidResolverHandleImpl { .expect("Uuid resolver actor has been killed")?) } - async fn snapshot(&self, path: PathBuf) -> Result> { + async fn snapshot(&self, path: PathBuf) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::SnapshotRequest { path, ret }; let _ = self.sender.send(msg).await; diff --git a/meilisearch-http/src/index_controller/uuid_resolver/message.rs b/meilisearch-http/src/index_controller/uuid_resolver/message.rs index e7d29f05f..a72bf0587 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/message.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/message.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::path::PathBuf; use tokio::sync::oneshot; @@ -28,7 +29,7 @@ pub enum UuidResolveMsg { }, SnapshotRequest { path: PathBuf, - ret: oneshot::Sender>>, + ret: oneshot::Sender>>, }, GetSize { ret: oneshot::Sender>, diff --git a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs index 33a089ddb..ef17133ff 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs @@ -3,6 +3,7 @@ mod handle_impl; mod message; mod store; +use std::collections::HashSet; use std::path::PathBuf; use thiserror::Error; @@ -29,7 +30,7 @@ pub trait UuidResolverHandle { async fn create(&self, name: String) -> anyhow::Result; async fn delete(&self, name: String) -> anyhow::Result; async fn list(&self) -> anyhow::Result>; - async fn snapshot(&self, path: PathBuf) -> Result>; + async fn snapshot(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; } diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index 1f057830b..29c034c44 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -1,5 +1,6 @@ -use std::fs::create_dir_all; use std::path::{Path, PathBuf}; +use std::collections::HashSet; +use std::fs::create_dir_all; use heed::{ types::{ByteSlice, Str}, @@ -19,7 +20,7 @@ pub trait UuidStore { async fn delete(&self, uid: String) -> Result>; async fn list(&self) -> Result>; async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; - async fn snapshot(&self, path: PathBuf) -> Result>; + async fn snapshot(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; } @@ -129,17 +130,17 @@ impl UuidStore for HeedUuidStore { .await? } - async fn snapshot(&self, mut path: PathBuf) -> Result> { + async fn snapshot(&self, mut path: PathBuf) -> Result> { let env = self.env.clone(); let db = self.db; tokio::task::spawn_blocking(move || { // Write transaction to acquire a lock on the database. let txn = env.write_txn()?; - let mut entries = Vec::new(); + let mut entries = HashSet::new(); for entry in db.iter(&txn)? { let (_, uuid) = entry?; let uuid = Uuid::from_slice(uuid)?; - entries.push(uuid) + entries.insert(uuid); } // only perform snapshot if there are indexes diff --git a/meilisearch-http/src/routes/document.rs b/meilisearch-http/src/routes/document.rs index ed5d88230..4f211bf09 100644 --- a/meilisearch-http/src/routes/document.rs +++ b/meilisearch-http/src/routes/document.rs @@ -84,9 +84,9 @@ async fn delete_document( .delete_documents(path.index_uid.clone(), vec![path.document_id.clone()]) .await { - Ok(update_status) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) - } + Ok(update_status) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } @@ -107,14 +107,11 @@ async fn get_all_documents( path: web::Path, params: web::Query, ) -> Result { - let attributes_to_retrieve = params - .attributes_to_retrieve - .as_ref() - .and_then(|attrs| { + let attributes_to_retrieve = params.attributes_to_retrieve.as_ref().and_then(|attrs| { let mut names = Vec::new(); for name in attrs.split(',').map(String::from) { if name == "*" { - return None + return None; } names.push(name); } @@ -163,9 +160,9 @@ async fn add_documents( .await; match addition_result { - Ok(update_status) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) - } + Ok(update_status) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } @@ -242,9 +239,9 @@ async fn delete_documents( .collect(); match data.delete_documents(path.index_uid.clone(), ids).await { - Ok(update_status) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) - } + Ok(update_status) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } @@ -258,9 +255,9 @@ async fn clear_all_documents( path: web::Path, ) -> Result { match data.clear_documents(path.index_uid.clone()).await { - Ok(update_status) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) - } + Ok(update_status) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } diff --git a/meilisearch-http/src/routes/settings/mod.rs b/meilisearch-http/src/routes/settings/mod.rs index c4403d9d7..236c94fed 100644 --- a/meilisearch-http/src/routes/settings/mod.rs +++ b/meilisearch-http/src/routes/settings/mod.rs @@ -144,9 +144,9 @@ async fn update_all( .update_settings(index_uid.into_inner(), body.into_inner(), true) .await { - Ok(update_result) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() }))) - } + Ok(update_result) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } @@ -176,9 +176,9 @@ async fn delete_all( .update_settings(index_uid.into_inner(), settings, false) .await { - Ok(update_result) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() }))) - } + Ok(update_result) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } diff --git a/meilisearch-http/src/routes/stats.rs b/meilisearch-http/src/routes/stats.rs index 226b62fcd..f2d1ddecc 100644 --- a/meilisearch-http/src/routes/stats.rs +++ b/meilisearch-http/src/routes/stats.rs @@ -1,15 +1,10 @@ -use std::collections::BTreeMap; - use actix_web::get; use actix_web::web; use actix_web::HttpResponse; -use chrono::{DateTime, Utc}; use serde::Serialize; -use crate::data::Stats; use crate::error::ResponseError; use crate::helpers::Authentication; -use crate::index_controller::IndexStats; use crate::routes::IndexParam; use crate::Data; @@ -19,59 +14,19 @@ pub fn services(cfg: &mut web::ServiceConfig) { .service(get_version); } -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct IndexStatsResponse { - number_of_documents: u64, - is_indexing: bool, - fields_distribution: BTreeMap, -} - -impl From for IndexStatsResponse { - fn from(stats: IndexStats) -> Self { - Self { - number_of_documents: stats.number_of_documents, - is_indexing: stats.is_indexing, - fields_distribution: stats.fields_distribution.into_iter().collect(), - } - } -} - #[get("/indexes/{index_uid}/stats", wrap = "Authentication::Private")] async fn get_index_stats( data: web::Data, path: web::Path, ) -> Result { - let response: IndexStatsResponse = data.get_index_stats(path.index_uid.clone()).await?.into(); + let response = data.get_index_stats(path.index_uid.clone()).await?; Ok(HttpResponse::Ok().json(response)) } -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct StatsResponse { - database_size: u64, - last_update: Option>, - indexes: BTreeMap, -} - -impl From for StatsResponse { - fn from(stats: Stats) -> Self { - Self { - database_size: stats.database_size, - last_update: stats.last_update, - indexes: stats - .indexes - .into_iter() - .map(|(uid, index_stats)| (uid, index_stats.into())) - .collect(), - } - } -} - #[get("/stats", wrap = "Authentication::Private")] async fn get_stats(data: web::Data) -> Result { - let response: StatsResponse = data.get_stats().await?.into(); + let response = data.get_all_stats().await?; Ok(HttpResponse::Ok().json(response)) } diff --git a/meilisearch-http/tests/common/index.rs b/meilisearch-http/tests/common/index.rs index 86aa80c3d..adb7fef3e 100644 --- a/meilisearch-http/tests/common/index.rs +++ b/meilisearch-http/tests/common/index.rs @@ -185,12 +185,9 @@ impl Index<'_> { self.service.get(url).await } - make_settings_test_routes!( - distinct_attribute - ); + make_settings_test_routes!(distinct_attribute); } - pub struct GetDocumentOptions; #[derive(Debug, Default)] diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index 87e5cecb7..9de5fe9db 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -77,8 +77,8 @@ async fn document_addition_with_primary_key() { "content": "foo", } ]); - let (_response, code) = index.add_documents(documents, Some("primary")).await; - assert_eq!(code, 202); + let (response, code) = index.add_documents(documents, Some("primary")).await; + assert_eq!(code, 202, "response: {}", response); index.wait_update_id(0).await; @@ -189,8 +189,8 @@ async fn replace_document() { } ]); - let (_response, code) = index.add_documents(documents, None).await; - assert_eq!(code, 202); + let (response, code) = index.add_documents(documents, None).await; + assert_eq!(code, 202, "response: {}", response); index.wait_update_id(0).await; @@ -260,8 +260,8 @@ async fn update_document() { } ]); - let (_response, code) = index.update_documents(documents, None).await; - assert_eq!(code, 202); + let (response, code) = index.update_documents(documents, None).await; + assert_eq!(code, 202, "response: {}", response); index.wait_update_id(1).await; diff --git a/meilisearch-http/tests/settings/distinct.rs b/meilisearch-http/tests/settings/distinct.rs index a3aec6baf..818f200fd 100644 --- a/meilisearch-http/tests/settings/distinct.rs +++ b/meilisearch-http/tests/settings/distinct.rs @@ -6,14 +6,18 @@ async fn set_and_reset_distinct_attribute() { let server = Server::new().await; let index = server.index("test"); - let (_response, _code) = index.update_settings(json!({ "distinctAttribute": "test"})).await; + let (_response, _code) = index + .update_settings(json!({ "distinctAttribute": "test"})) + .await; index.wait_update_id(0).await; let (response, _) = index.settings().await; assert_eq!(response["distinctAttribute"], "test"); - index.update_settings(json!({ "distinctAttribute": null })).await; + index + .update_settings(json!({ "distinctAttribute": null })) + .await; index.wait_update_id(1).await; diff --git a/meilisearch-http/tests/settings/get_settings.rs b/meilisearch-http/tests/settings/get_settings.rs index 3412f45af..4230e19f8 100644 --- a/meilisearch-http/tests/settings/get_settings.rs +++ b/meilisearch-http/tests/settings/get_settings.rs @@ -23,13 +23,7 @@ async fn get_settings() { assert_eq!(settings["distinctAttribute"], json!(null)); assert_eq!( settings["rankingRules"], - json!([ - "words", - "typo", - "proximity", - "attribute", - "exactness" - ]) + json!(["words", "typo", "proximity", "attribute", "exactness"]) ); assert_eq!(settings["stopWords"], json!([])); } diff --git a/meilisearch-http/tests/settings/mod.rs b/meilisearch-http/tests/settings/mod.rs index b7102cc5f..05339cb37 100644 --- a/meilisearch-http/tests/settings/mod.rs +++ b/meilisearch-http/tests/settings/mod.rs @@ -1,2 +1,2 @@ -mod get_settings; mod distinct; +mod get_settings;