diff --git a/meilisearch-http/src/index/search.rs b/meilisearch-http/src/index/search.rs index a0de632be..2002266a1 100644 --- a/meilisearch-http/src/index/search.rs +++ b/meilisearch-http/src/index/search.rs @@ -41,6 +41,7 @@ pub struct SearchQuery { pub struct SearchResult { pub hits: Vec>, pub nb_hits: u64, + pub exhaustive_nb_hits: bool, pub query: String, pub limit: usize, pub offset: usize, @@ -107,6 +108,7 @@ impl Index { }; let result = SearchResult { + exhaustive_nb_hits: false, // not implemented yet hits: documents, nb_hits, query: query.q.clone().unwrap_or_default(), diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index a0b119239..b26ab8828 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -17,6 +17,7 @@ use milli::update::{IndexDocumentsMethod, UpdateFormat}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::time::sleep; +use uuid::Uuid; use crate::index::{Document, SearchQuery, SearchResult}; use crate::index::{Facets, Settings, UpdateResult}; @@ -29,6 +30,7 @@ use uuid_resolver::UuidResolverHandle; use snapshot::SnapshotService; pub use updates::{Failed, Processed, Processing}; +use uuid_resolver::UuidError; pub type UpdateStatus = updates::UpdateStatus; @@ -119,41 +121,51 @@ impl IndexController { uid: String, method: milli::update::IndexDocumentsMethod, format: milli::update::UpdateFormat, - mut payload: Payload, + payload: Payload, primary_key: Option, ) -> anyhow::Result { - let uuid = self.uuid_resolver.get_or_create(uid).await?; - let meta = UpdateMeta::DocumentsAddition { - method, - format, - primary_key, + let perform_update = |uuid| async move { + let meta = UpdateMeta::DocumentsAddition { + method, + format, + primary_key, + }; + let (sender, receiver) = mpsc::channel(10); + + // It is necessary to spawn a local task to send the payload to the update handle to + // prevent dead_locking between the update_handle::update that waits for the update to be + // registered and the update_actor that waits for the the payload to be sent to it. + tokio::task::spawn_local(async move { + payload + .map(|bytes| { + bytes.map_err(|e| { + Box::new(e) as Box + }) + }) + .for_each(|r| async { + let _ = sender.send(r).await; + }) + .await + }); + + // This must be done *AFTER* spawning the task. + self.update_handle.update(meta, receiver, uuid).await }; - let (sender, receiver) = mpsc::channel(10); - // It is necessary to spawn a local task to senf the payload to the update handle to - // prevent dead_locking between the update_handle::update that waits for the update to be - // registered and the update_actor that waits for the the payload to be sent to it. - tokio::task::spawn_local(async move { - while let Some(bytes) = payload.next().await { - match bytes { - Ok(bytes) => { - let _ = sender.send(Ok(bytes)).await; - } - Err(e) => { - let error: Box = Box::new(e); - let _ = sender.send(Err(error)).await; - } - } + match self.uuid_resolver.get(uid).await { + Ok(uuid) => Ok(perform_update(uuid).await?), + Err(UuidError::UnexistingIndex(name)) => { + let uuid = Uuid::new_v4(); + let status = perform_update(uuid).await?; + self.uuid_resolver.insert(name, uuid).await?; + Ok(status) } - }); - - // This must be done *AFTER* spawning the task. - let status = self.update_handle.update(meta, receiver, uuid).await?; - Ok(status) + Err(e) => Err(e.into()), + } } pub async fn clear_documents(&self, uid: String) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid).await?; + let uuid = self.uuid_resolver.get(uid).await?; let meta = UpdateMeta::ClearDocuments; let (_, receiver) = mpsc::channel(1); let status = self.update_handle.update(meta, receiver, uuid).await?; @@ -165,7 +177,7 @@ impl IndexController { uid: String, document_ids: Vec, ) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid).await?; + let uuid = self.uuid_resolver.get(uid).await?; let meta = UpdateMeta::DeleteDocuments; let (sender, receiver) = mpsc::channel(10); @@ -185,26 +197,23 @@ impl IndexController { settings: Settings, create: bool, ) -> anyhow::Result { - let uuid = if create { - let uuid = self.uuid_resolver.get_or_create(uid).await?; - // We need to create the index upfront, since it would otherwise only be created when - // the update is processed. This would make calls to GET index to fail until the update - // is complete. Since this is get or create, we ignore the error when the index already - // exists. - match self.index_handle.create_index(uuid, None).await { - Ok(_) | Err(index_actor::IndexError::IndexAlreadyExists) => (), - Err(e) => return Err(e.into()), - } - uuid - } else { - self.uuid_resolver.resolve(uid).await? + let perform_udpate = |uuid| async move { + let meta = UpdateMeta::Settings(settings); + // Nothing so send, drop the sender right away, as not to block the update actor. + let (_, receiver) = mpsc::channel(1); + self.update_handle.update(meta, receiver, uuid).await }; - let meta = UpdateMeta::Settings(settings); - // Nothing so send, drop the sender right away, as not to block the update actor. - let (_, receiver) = mpsc::channel(1); - let status = self.update_handle.update(meta, receiver, uuid).await?; - Ok(status) + match self.uuid_resolver.get(uid).await { + Ok(uuid) => Ok(perform_udpate(uuid).await?), + Err(UuidError::UnexistingIndex(name)) if create => { + let uuid = Uuid::new_v4(); + let status = perform_udpate(uuid).await?; + self.uuid_resolver.insert(name, uuid).await?; + Ok(status) + } + Err(e) => Err(e.into()), + } } pub async fn create_index( @@ -233,13 +242,13 @@ impl IndexController { } pub async fn update_status(&self, uid: String, id: u64) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid).await?; + let uuid = self.uuid_resolver.get(uid).await?; let result = self.update_handle.update_status(uuid, id).await?; Ok(result) } pub async fn all_update_status(&self, uid: String) -> anyhow::Result> { - let uuid = self.uuid_resolver.resolve(uid).await?; + let uuid = self.uuid_resolver.get(uid).await?; let result = self.update_handle.get_all_updates_status(uuid).await?; Ok(result) } @@ -263,7 +272,7 @@ impl IndexController { } pub async fn settings(&self, uid: String) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid.clone()).await?; + let uuid = self.uuid_resolver.get(uid.clone()).await?; let settings = self.index_handle.settings(uuid).await?; Ok(settings) } @@ -275,7 +284,7 @@ impl IndexController { limit: usize, attributes_to_retrieve: Option>, ) -> anyhow::Result> { - let uuid = self.uuid_resolver.resolve(uid.clone()).await?; + let uuid = self.uuid_resolver.get(uid.clone()).await?; let documents = self .index_handle .documents(uuid, offset, limit, attributes_to_retrieve) @@ -289,7 +298,7 @@ impl IndexController { doc_id: String, attributes_to_retrieve: Option>, ) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid.clone()).await?; + let uuid = self.uuid_resolver.get(uid.clone()).await?; let document = self .index_handle .document(uuid, doc_id, attributes_to_retrieve) @@ -306,7 +315,7 @@ impl IndexController { bail!("Can't change the index uid.") } - let uuid = self.uuid_resolver.resolve(uid.clone()).await?; + let uuid = self.uuid_resolver.get(uid.clone()).await?; let meta = self.index_handle.update_index(uuid, index_settings).await?; let meta = IndexMetadata { name: uid.clone(), @@ -317,13 +326,13 @@ impl IndexController { } pub async fn search(&self, uid: String, query: SearchQuery) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid).await?; + let uuid = self.uuid_resolver.get(uid).await?; let result = self.index_handle.search(uuid, query).await?; Ok(result) } pub async fn get_index(&self, uid: String) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid.clone()).await?; + let uuid = self.uuid_resolver.get(uid.clone()).await?; let meta = self.index_handle.get_index_meta(uuid).await?; let meta = IndexMetadata { name: uid.clone(),