diff --git a/Cargo.lock b/Cargo.lock index 575f03d98..3620a6885 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1822,6 +1822,7 @@ dependencies = [ "milli", "mime", "once_cell", + "parking_lot", "rand 0.7.3", "rayon", "regex", diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index 15319dea6..7b09b3042 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -62,6 +62,7 @@ either = "1.6.1" async-trait = "0.1.42" thiserror = "1.0.24" async-stream = "0.3.0" +parking_lot = "0.11.1" [dependencies.sentry] default-features = false diff --git a/meilisearch-http/src/index_controller/update_actor.rs b/meilisearch-http/src/index_controller/update_actor.rs index 708e8fd19..8614e297b 100644 --- a/meilisearch-http/src/index_controller/update_actor.rs +++ b/meilisearch-http/src/index_controller/update_actor.rs @@ -3,18 +3,17 @@ use std::fs::{create_dir_all, remove_dir_all}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use itertools::Itertools; -use log::info; use super::index_actor::IndexActorHandle; +use log::info; use thiserror::Error; use tokio::fs::File; use tokio::io::AsyncWriteExt; use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; +use super::get_arc_ownership_blocking; use crate::index::UpdateResult; use crate::index_controller::{UpdateMeta, UpdateStatus}; -use super::get_arc_ownership_blocking; pub type Result = std::result::Result; type UpdateStore = super::update_store::UpdateStore; @@ -68,7 +67,11 @@ where D: AsRef<[u8]> + Sized + 'static, S: UpdateStoreStore, { - fn new(store: S, inbox: mpsc::Receiver>, path: impl AsRef) -> anyhow::Result { + fn new( + store: S, + inbox: mpsc::Receiver>, + path: impl AsRef, + ) -> anyhow::Result { let path = path.as_ref().to_owned().join("update_files"); create_dir_all(&path)?; assert!(path.exists()); @@ -87,12 +90,12 @@ where meta, data, ret, - }) => { + }) => { let _ = ret.send(self.handle_update(uuid, meta, data).await); } Some(ListUpdates { uuid, ret }) => { let _ = ret.send(self.handle_list_updates(uuid).await); - } , + } Some(GetUpdate { uuid, ret, id }) => { let _ = ret.send(self.handle_get_update(uuid, id).await); } @@ -113,13 +116,15 @@ where let update_store = self.store.get_or_create(uuid).await?; let update_file_id = uuid::Uuid::new_v4(); let path = self.path.join(format!("update_{}", update_file_id)); - let mut file = File::create(&path).await + let mut file = File::create(&path) + .await .map_err(|e| UpdateError::Error(Box::new(e)))?; while let Some(bytes) = payload.recv().await { match bytes { Ok(bytes) => { - file.write_all(bytes.as_ref()).await + file.write_all(bytes.as_ref()) + .await .map_err(|e| UpdateError::Error(Box::new(e)))?; } Err(e) => { @@ -128,7 +133,8 @@ where } } - file.flush().await + file.flush() + .await .map_err(|e| UpdateError::Error(Box::new(e)))?; let file = file.into_std().await; @@ -144,50 +150,33 @@ where .map_err(|e| UpdateError::Error(Box::new(e)))? } - async fn handle_list_updates( - &self, - uuid: Uuid, - ) -> Result> { - let store = self.store.get(&uuid).await?; + async fn handle_list_updates(&self, uuid: Uuid) -> Result> { + let update_store = self.store.get(&uuid).await?; tokio::task::spawn_blocking(move || { - let result = match store { - Some(update_store) => { - let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| { - Ok(processing - .map(UpdateStatus::from) - .into_iter() - .chain(pending.filter_map(|p| p.ok()).map(|(_, u)| UpdateStatus::from(u))) - .chain(aborted.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u))) - .chain(processed.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u))) - .chain(failed.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u))) - .sorted_by(|a, b| a.id().cmp(&b.id())) - .collect()) - }) - .map_err(|e| UpdateError::Error(Box::new(e)))?; - Ok(updates) - } - None => Err(UpdateError::UnexistingIndex(uuid)), - }; - result - }).await + let result = update_store + .ok_or(UpdateError::UnexistingIndex(uuid))? + .list() + .map_err(|e| UpdateError::Error(e.into()))?; + Ok(result) + }) + .await .map_err(|e| UpdateError::Error(Box::new(e)))? } - async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result> { - let store = self.store + let store = self + .store .get(&uuid) .await? .ok_or(UpdateError::UnexistingIndex(uuid))?; - let result = store.meta(id) + let result = store + .meta(id) .map_err(|e| UpdateError::Error(Box::new(e)))?; Ok(result) } async fn handle_delete(&self, uuid: Uuid) -> Result<()> { - let store = self.store - .delete(&uuid) - .await?; + let store = self.store.delete(&uuid).await?; if let Some(store) = store { tokio::task::spawn(async move { diff --git a/meilisearch-http/src/index_controller/update_store.rs b/meilisearch-http/src/index_controller/update_store.rs index fe04b205d..a7a65e494 100644 --- a/meilisearch-http/src/index_controller/update_store.rs +++ b/meilisearch-http/src/index_controller/update_store.rs @@ -1,6 +1,6 @@ use std::fs::remove_file; use std::path::{Path, PathBuf}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use heed::types::{DecodeIgnore, OwnedType, SerdeJson}; use heed::{Database, Env, EnvOpenOptions}; @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize}; use std::fs::File; use tokio::sync::mpsc; use uuid::Uuid; +use parking_lot::RwLock; use crate::index_controller::updates::*; @@ -206,7 +207,7 @@ where // to the update handler. Processing store is non persistent to be able recover // from a failure let processing = pending.processing(); - self.processing.write().unwrap().replace(processing.clone()); + self.processing.write().replace(processing.clone()); let file = File::open(&content_path)?; // Process the pending update using the provided user function. let result = handler.handle_update(processing, file)?; @@ -216,7 +217,7 @@ where // we must remove the content from the pending and processing stores and // write the *new* meta to the processed-meta store and commit. let mut wtxn = self.env.write_txn()?; - self.processing.write().unwrap().take(); + self.processing.write().take(); self.pending_meta.delete(&mut wtxn, &first_id)?; remove_file(&content_path)?; self.pending.delete(&mut wtxn, &first_id)?; @@ -232,36 +233,52 @@ where } } - /// Execute the user defined function with the meta-store iterators, the first - /// iterator is the *processed* meta one, the second the *aborted* meta one - /// and, the last is the *pending* meta one. - pub fn iter_metas(&self, mut f: F) -> heed::Result - where - F: for<'a> FnMut( - Option>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - ) -> heed::Result, - { + pub fn list(&self) -> anyhow::Result>> { let rtxn = self.env.read_txn()?; + let mut updates = Vec::new(); - // We get the pending, processed and aborted meta iterators. - let processed_iter = self.processed_meta.iter(&rtxn)?; - let aborted_iter = self.aborted_meta.iter(&rtxn)?; - let pending_iter = self.pending_meta.iter(&rtxn)?; - let processing = self.processing.read().unwrap().clone(); - let failed_iter = self.failed_meta.iter(&rtxn)?; + let processing = self.processing.read(); + if let Some(ref processing) = *processing { + let update = UpdateStatus::from(processing.clone()); + updates.push(update); + } - // We execute the user defined function with both iterators. - (f)( - processing, - processed_iter, - aborted_iter, - pending_iter, - failed_iter, - ) + let pending = self + .pending_meta + .iter(&rtxn)? + .filter_map(Result::ok) + .filter_map(|(_, p)| (Some(p.id()) != processing.as_ref().map(|p| p.id())).then(|| p)) + .map(UpdateStatus::from); + + updates.extend(pending); + + let aborted = + self.aborted_meta.iter(&rtxn)? + .filter_map(Result::ok) + .map(|(_, p)| p) + .map(UpdateStatus::from); + + updates.extend(aborted); + + let processed = + self.processed_meta.iter(&rtxn)? + .filter_map(Result::ok) + .map(|(_, p)| p) + .map(UpdateStatus::from); + + updates.extend(processed); + + let failed = + self.failed_meta.iter(&rtxn)? + .filter_map(Result::ok) + .map(|(_, p)| p) + .map(UpdateStatus::from); + + updates.extend(failed); + + updates.sort_unstable_by(|a, b| a.id().cmp(&b.id())); + + Ok(updates) } /// Returns the update associated meta or `None` if the update doesn't exist. @@ -269,7 +286,7 @@ where let rtxn = self.env.read_txn()?; let key = BEU64::new(update_id); - if let Some(ref meta) = *self.processing.read().unwrap() { + if let Some(ref meta) = *self.processing.read() { if meta.id() == update_id { return Ok(Some(UpdateStatus::Processing(meta.clone()))); }