make retrieval non blocking

This commit is contained in:
mpostma 2021-02-15 23:02:20 +01:00
parent 6766de437f
commit 8bb1b6146f
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
2 changed files with 75 additions and 62 deletions

View File

@ -166,80 +166,92 @@ impl Data {
} }
} }
pub fn retrieve_documents( pub async fn retrieve_documents<S>(
&self, &self,
index: impl AsRef<str>, index: impl AsRef<str> + Send + Sync + 'static,
offset: usize, offset: usize,
limit: usize, limit: usize,
attributes_to_retrieve: Option<&[&str]>, attributes_to_retrieve: Option<Vec<S>>,
) -> anyhow::Result<Vec<Map<String, Value>>> { ) -> anyhow::Result<Vec<Map<String, Value>>>
let index = self.index_controller where
.index(&index)? S: AsRef<str> + Send + Sync + 'static
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; {
let index_controller = self.index_controller.clone();
let documents: anyhow::Result<_> = tokio::task::spawn_blocking(move || {
let index = index_controller
.index(&index)?
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
let txn = index.read_txn()?; let txn = index.read_txn()?;
let fields_ids_map = index.fields_ids_map(&txn)?; let fields_ids_map = index.fields_ids_map(&txn)?;
let attributes_to_retrieve_ids = match attributes_to_retrieve { let attributes_to_retrieve_ids = match attributes_to_retrieve {
Some(attrs) => attrs Some(attrs) => attrs
.as_ref() .iter()
.iter() .filter_map(|f| fields_ids_map.id(f.as_ref()))
.filter_map(|f| fields_ids_map.id(f)) .collect::<Vec<_>>(),
.collect::<Vec<_>>(), None => fields_ids_map.iter().map(|(id, _)| id).collect(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(), };
};
let iter = index.documents.range(&txn, &(..))? let iter = index.documents.range(&txn, &(..))?
.skip(offset) .skip(offset)
.take(limit); .take(limit);
let mut documents = Vec::new(); let mut documents = Vec::new();
for entry in iter { for entry in iter {
let (_id, obkv) = entry?; let (_id, obkv) = entry?;
let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?; let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?;
documents.push(object); documents.push(object);
} }
Ok(documents) Ok(documents)
}).await?;
documents
} }
pub fn retrieve_document( pub async fn retrieve_document<S>(
&self, &self,
index: impl AsRef<str>, index: impl AsRef<str> + Sync + Send + 'static,
document_id: impl AsRef<str>, document_id: impl AsRef<str> + Sync + Send + 'static,
attributes_to_retrieve: Option<&[&str]>, attributes_to_retrieve: Option<Vec<S>>,
) -> anyhow::Result<Map<String, Value>> { ) -> anyhow::Result<Map<String, Value>>
let index = self.index_controller where
.index(&index)? S: AsRef<str> + Sync + Send + 'static,
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; {
let txn = index.read_txn()?; let index_controller = self.index_controller.clone();
let document: anyhow::Result<_> = tokio::task::spawn_blocking(move || {
let index = index_controller
.index(&index)?
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
let txn = index.read_txn()?;
let fields_ids_map = index.fields_ids_map(&txn)?; let fields_ids_map = index.fields_ids_map(&txn)?;
let attributes_to_retrieve_ids = match attributes_to_retrieve { let attributes_to_retrieve_ids = match attributes_to_retrieve {
Some(attrs) => attrs Some(attrs) => attrs
.as_ref() .iter()
.iter() .filter_map(|f| fields_ids_map.id(f.as_ref()))
.filter_map(|f| fields_ids_map.id(f)) .collect::<Vec<_>>(),
.collect::<Vec<_>>(), None => fields_ids_map.iter().map(|(id, _)| id).collect(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(), };
};
let internal_id = index let internal_id = index
.external_documents_ids(&txn)? .external_documents_ids(&txn)?
.get(document_id.as_ref().as_bytes()) .get(document_id.as_ref().as_bytes())
.with_context(|| format!("Document with id {} not found", document_id.as_ref()))?; .with_context(|| format!("Document with id {} not found", document_id.as_ref()))?;
let document = index.documents(&txn, std::iter::once(internal_id))? let document = index.documents(&txn, std::iter::once(internal_id))?
.into_iter() .into_iter()
.next() .next()
.map(|(_, d)| d); .map(|(_, d)| d);
match document { match document {
Some(document) => Ok(obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, document)?), Some(document) => Ok(obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, document)?),
None => bail!("Document with id {} not found", document_id.as_ref()), None => bail!("Document with id {} not found", document_id.as_ref()),
} }
}).await?;
document
} }
} }

View File

@ -55,9 +55,9 @@ async fn get_document(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<DocumentParam>, path: web::Path<DocumentParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index = &path.index_uid; let index = path.index_uid.clone();
let id = &path.document_id; let id = path.document_id.clone();
match data.retrieve_document(index, id, None) { match data.retrieve_document(index, id, None as Option<Vec<String>>).await {
Ok(document) => { Ok(document) => {
let json = serde_json::to_string(&document).unwrap(); let json = serde_json::to_string(&document).unwrap();
Ok(HttpResponse::Ok().body(json)) Ok(HttpResponse::Ok().body(json))
@ -99,13 +99,14 @@ async fn get_all_documents(
.as_ref() .as_ref()
.map(|attrs| attrs .map(|attrs| attrs
.split(",") .split(",")
.map(String::from)
.collect::<Vec<_>>()); .collect::<Vec<_>>());
match data.retrieve_documents( match data.retrieve_documents(
&path.index_uid, path.index_uid.clone(),
params.offset.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_OFFSET), params.offset.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_OFFSET),
params.limit.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_LIMIT), params.limit.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_LIMIT),
attributes_to_retrieve.as_deref()) { attributes_to_retrieve).await {
Ok(docs) => { Ok(docs) => {
let json = serde_json::to_string(&docs).unwrap(); let json = serde_json::to_string(&docs).unwrap();
Ok(HttpResponse::Ok().body(json)) Ok(HttpResponse::Ok().body(json))