From 8bb1b6146f4aa97b520aea7effe3a28dd32d3a12 Mon Sep 17 00:00:00 2001 From: mpostma Date: Mon, 15 Feb 2021 23:02:20 +0100 Subject: [PATCH] make retrieval non blocking --- src/data/search.rs | 126 ++++++++++++++++++++++------------------- src/routes/document.rs | 11 ++-- 2 files changed, 75 insertions(+), 62 deletions(-) diff --git a/src/data/search.rs b/src/data/search.rs index 33e48e4fd..b4be47c35 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -166,80 +166,92 @@ impl Data { } } - pub fn retrieve_documents( + pub async fn retrieve_documents( &self, - index: impl AsRef, + index: impl AsRef + Send + Sync + 'static, offset: usize, limit: usize, - attributes_to_retrieve: Option<&[&str]>, - ) -> anyhow::Result>> { - let index = self.index_controller - .index(&index)? - .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; + attributes_to_retrieve: Option>, + ) -> anyhow::Result>> + where + S: AsRef + Send + Sync + 'static + { + let index_controller = self.index_controller.clone(); + let documents: anyhow::Result<_> = tokio::task::spawn_blocking(move || { + let index = index_controller + .index(&index)? + .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; - let txn = index.read_txn()?; + let txn = index.read_txn()?; - let fields_ids_map = index.fields_ids_map(&txn)?; + let fields_ids_map = index.fields_ids_map(&txn)?; - let attributes_to_retrieve_ids = match attributes_to_retrieve { - Some(attrs) => attrs - .as_ref() - .iter() - .filter_map(|f| fields_ids_map.id(f)) - .collect::>(), - None => fields_ids_map.iter().map(|(id, _)| id).collect(), - }; + let attributes_to_retrieve_ids = match attributes_to_retrieve { + Some(attrs) => attrs + .iter() + .filter_map(|f| fields_ids_map.id(f.as_ref())) + .collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; - let iter = index.documents.range(&txn, &(..))? - .skip(offset) - .take(limit); + let iter = index.documents.range(&txn, &(..))? + .skip(offset) + .take(limit); - let mut documents = Vec::new(); + let mut documents = Vec::new(); - for entry in iter { - let (_id, obkv) = entry?; - let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?; - documents.push(object); - } + for entry in iter { + let (_id, obkv) = entry?; + let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?; + documents.push(object); + } - Ok(documents) + Ok(documents) + }).await?; + documents } - pub fn retrieve_document( + pub async fn retrieve_document( &self, - index: impl AsRef, - document_id: impl AsRef, - attributes_to_retrieve: Option<&[&str]>, - ) -> anyhow::Result> { - let index = self.index_controller - .index(&index)? - .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; - let txn = index.read_txn()?; + index: impl AsRef + Sync + Send + 'static, + document_id: impl AsRef + Sync + Send + 'static, + attributes_to_retrieve: Option>, + ) -> anyhow::Result> + where + S: AsRef + Sync + Send + 'static, + { + let index_controller = self.index_controller.clone(); + let document: anyhow::Result<_> = tokio::task::spawn_blocking(move || { + let index = index_controller + .index(&index)? + .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; + let txn = index.read_txn()?; - let fields_ids_map = index.fields_ids_map(&txn)?; + let fields_ids_map = index.fields_ids_map(&txn)?; - let attributes_to_retrieve_ids = match attributes_to_retrieve { - Some(attrs) => attrs - .as_ref() - .iter() - .filter_map(|f| fields_ids_map.id(f)) - .collect::>(), - None => fields_ids_map.iter().map(|(id, _)| id).collect(), - }; + let attributes_to_retrieve_ids = match attributes_to_retrieve { + Some(attrs) => attrs + .iter() + .filter_map(|f| fields_ids_map.id(f.as_ref())) + .collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; - let internal_id = index - .external_documents_ids(&txn)? - .get(document_id.as_ref().as_bytes()) - .with_context(|| format!("Document with id {} not found", document_id.as_ref()))?; + let internal_id = index + .external_documents_ids(&txn)? + .get(document_id.as_ref().as_bytes()) + .with_context(|| format!("Document with id {} not found", document_id.as_ref()))?; - let document = index.documents(&txn, std::iter::once(internal_id))? - .into_iter() - .next() - .map(|(_, d)| d); + let document = index.documents(&txn, std::iter::once(internal_id))? + .into_iter() + .next() + .map(|(_, d)| d); - match document { - Some(document) => Ok(obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, document)?), - None => bail!("Document with id {} not found", document_id.as_ref()), - } + match document { + Some(document) => Ok(obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, document)?), + None => bail!("Document with id {} not found", document_id.as_ref()), + } + }).await?; + document } } diff --git a/src/routes/document.rs b/src/routes/document.rs index ac20cfff0..a240867dd 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -55,9 +55,9 @@ async fn get_document( data: web::Data, path: web::Path, ) -> Result { - let index = &path.index_uid; - let id = &path.document_id; - match data.retrieve_document(index, id, None) { + let index = path.index_uid.clone(); + let id = path.document_id.clone(); + match data.retrieve_document(index, id, None as Option>).await { Ok(document) => { let json = serde_json::to_string(&document).unwrap(); Ok(HttpResponse::Ok().body(json)) @@ -99,13 +99,14 @@ async fn get_all_documents( .as_ref() .map(|attrs| attrs .split(",") + .map(String::from) .collect::>()); match data.retrieve_documents( - &path.index_uid, + path.index_uid.clone(), params.offset.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_OFFSET), params.limit.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_LIMIT), - attributes_to_retrieve.as_deref()) { + attributes_to_retrieve).await { Ok(docs) => { let json = serde_json::to_string(&docs).unwrap(); Ok(HttpResponse::Ok().body(json))