implement retrieve documents

This commit is contained in:
mpostma 2021-03-04 14:20:19 +01:00
parent 17b84691f2
commit f3d65ec5e9
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
5 changed files with 225 additions and 94 deletions

View File

@ -12,59 +12,22 @@ impl Data {
self.index_controller.search(index.as_ref().to_string(), search_query).await self.index_controller.search(index.as_ref().to_string(), search_query).await
} }
pub async fn retrieve_documents<S>( pub async fn retrieve_documents(
&self, &self,
_index: String, index: String,
_offset: usize, offset: usize,
_limit: usize, limit: usize,
_attributes_to_retrieve: Option<Vec<S>>, attributes_to_retrieve: Option<Vec<String>>,
) -> anyhow::Result<Vec<Map<String, Value>>> ) -> anyhow::Result<Vec<Map<String, Value>>> {
where self.index_controller.documents(index, offset, limit, attributes_to_retrieve).await
S: AsRef<str> + Send + Sync + 'static,
{
todo!()
//let index_controller = self.index_controller.clone();
//let documents: anyhow::Result<_> = tokio::task::spawn_blocking(move || {
//let index = index_controller
//.index(index.clone())?
//.with_context(|| format!("Index {:?} doesn't exist", index))?;
//let txn = index.read_txn()?;
//let fields_ids_map = index.fields_ids_map(&txn)?;
//let attributes_to_retrieve_ids = match attributes_to_retrieve {
//Some(attrs) => attrs
//.iter()
//.filter_map(|f| fields_ids_map.id(f.as_ref()))
//.collect::<Vec<_>>(),
//None => fields_ids_map.iter().map(|(id, _)| id).collect(),
//};
//let iter = index.documents.range(&txn, &(..))?.skip(offset).take(limit);
//let mut documents = Vec::new();
//for entry in iter {
//let (_id, obkv) = entry?;
//let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?;
//documents.push(object);
//}
//Ok(documents)
//})
//.await?;
//documents
} }
pub async fn retrieve_document<S>( pub async fn retrieve_document<S>(
&self, &self,
_index: impl AsRef<str> + Sync + Send + 'static, _index: impl AsRef<str> + Sync + Send + 'static,
_document_id: impl AsRef<str> + Sync + Send + 'static, _document_id: impl AsRef<str> + Sync + Send + 'static,
_attributes_to_retrieve: Option<Vec<S>>, _attributes_to_retrieve: Option<Vec<String>>,
) -> anyhow::Result<Map<String, Value>> ) -> anyhow::Result<Map<String, Value>>
where
S: AsRef<str> + Sync + Send + 'static,
{ {
todo!() todo!()
//let index_controller = self.index_controller.clone(); //let index_controller = self.index_controller.clone();

View File

@ -4,9 +4,14 @@ mod updates;
use std::sync::Arc; use std::sync::Arc;
use std::ops::Deref; use std::ops::Deref;
use serde_json::{Value, Map};
use milli::obkv_to_json;
pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
pub use updates::{Settings, Facets, UpdateResult}; pub use updates::{Settings, Facets, UpdateResult};
pub type Document = Map<String, Value>;
#[derive(Clone)] #[derive(Clone)]
pub struct Index(pub Arc<milli::Index>); pub struct Index(pub Arc<milli::Index>);
@ -45,4 +50,38 @@ impl Index {
criteria: None, criteria: None,
}) })
} }
pub fn retrieve_documents<S>(
&self,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<S>>,
) -> anyhow::Result<Vec<Map<String, Value>>>
where
S: AsRef<str> + Send + Sync + 'static,
{
let txn = self.read_txn()?;
let fields_ids_map = self.fields_ids_map(&txn)?;
let attributes_to_retrieve_ids = match attributes_to_retrieve {
Some(attrs) => attrs
.iter()
.filter_map(|f| fields_ids_map.id(f.as_ref()))
.collect::<Vec<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit);
let mut documents = Vec::new();
for entry in iter {
let (_id, obkv) = entry?;
let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?;
documents.push(object);
}
Ok(documents)
}
} }

View File

@ -1,6 +1,6 @@
use std::collections::{HashMap, hash_map::Entry}; use std::collections::{hash_map::Entry, HashMap};
use std::fs::{File, create_dir_all}; use std::fs::{create_dir_all, File};
use std::path::{PathBuf, Path}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use async_stream::stream; use async_stream::stream;
@ -8,25 +8,51 @@ use chrono::Utc;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use log::info; use log::info;
use serde_json::{Map, Value};
use thiserror::Error; use thiserror::Error;
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid; use uuid::Uuid;
use super::update_handler::UpdateHandler; use super::update_handler::UpdateHandler;
use crate::index_controller::{IndexMetadata, UpdateMeta, updates::{Processed, Failed, Processing}};
use crate::index::UpdateResult as UResult; use crate::index::UpdateResult as UResult;
use crate::index::{Document, Index, SearchQuery, SearchResult, Settings};
use crate::index_controller::{
updates::{Failed, Processed, Processing},
IndexMetadata, UpdateMeta,
};
use crate::option::IndexerOpts; use crate::option::IndexerOpts;
use crate::index::{Index, SearchQuery, SearchResult, Settings};
pub type Result<T> = std::result::Result<T, IndexError>; pub type Result<T> = std::result::Result<T, IndexError>;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>; type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<UpdateMeta, String>>; type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<UpdateMeta, String>>;
enum IndexMsg { enum IndexMsg {
CreateIndex { uuid: Uuid, primary_key: Option<String>, ret: oneshot::Sender<Result<IndexMetadata>> }, CreateIndex {
Update { meta: Processing<UpdateMeta>, data: std::fs::File, ret: oneshot::Sender<UpdateResult>}, uuid: Uuid,
Search { uuid: Uuid, query: SearchQuery, ret: oneshot::Sender<anyhow::Result<SearchResult>> }, primary_key: Option<String>,
Settings { uuid: Uuid, ret: oneshot::Sender<Result<Settings>> }, ret: oneshot::Sender<Result<IndexMetadata>>,
},
Update {
meta: Processing<UpdateMeta>,
data: std::fs::File,
ret: oneshot::Sender<UpdateResult>,
},
Search {
uuid: Uuid,
query: SearchQuery,
ret: oneshot::Sender<anyhow::Result<SearchResult>>,
},
Settings {
uuid: Uuid,
ret: oneshot::Sender<Result<Settings>>,
},
Documents {
uuid: Uuid,
attributes_to_retrieve: Option<Vec<String>>,
offset: usize,
limit: usize,
ret: oneshot::Sender<Result<Vec<Map<String, Value>>>>,
},
} }
struct IndexActor<S> { struct IndexActor<S> {
@ -56,11 +82,20 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
let update_handler = UpdateHandler::new(&options).unwrap(); let update_handler = UpdateHandler::new(&options).unwrap();
let update_handler = Arc::new(update_handler); let update_handler = Arc::new(update_handler);
let inbox = Some(inbox); let inbox = Some(inbox);
Self { inbox, store, update_handler } Self {
inbox,
store,
update_handler,
}
} }
async fn run(mut self) { async fn run(mut self) {
let mut inbox = self.inbox.take().expect("Index Actor must have a inbox at this point."); use IndexMsg::*;
let mut inbox = self
.inbox
.take()
.expect("Index Actor must have a inbox at this point.");
let stream = stream! { let stream = stream! {
loop { loop {
@ -73,31 +108,59 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
let fut = stream.for_each_concurrent(Some(10), |msg| async { let fut = stream.for_each_concurrent(Some(10), |msg| async {
match msg { match msg {
IndexMsg::CreateIndex { uuid, primary_key, ret } => self.handle_create_index(uuid, primary_key, ret).await, CreateIndex {
IndexMsg::Update { ret, meta, data } => self.handle_update(meta, data, ret).await, uuid,
IndexMsg::Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await, primary_key,
IndexMsg::Settings { ret, uuid } => self.handle_settings(uuid, ret).await, ret,
} => self.handle_create_index(uuid, primary_key, ret).await,
Update { ret, meta, data } => self.handle_update(meta, data, ret).await,
Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await,
Settings { ret, uuid } => self.handle_settings(uuid, ret).await,
Documents {
ret,
uuid,
attributes_to_retrieve,
offset,
limit,
} => {
self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret)
.await
}
} }
}); });
fut.await; fut.await;
} }
async fn handle_search(&self, uuid: Uuid, query: SearchQuery, ret: oneshot::Sender<anyhow::Result<SearchResult>>) { async fn handle_search(
&self,
uuid: Uuid,
query: SearchQuery,
ret: oneshot::Sender<anyhow::Result<SearchResult>>,
) {
let index = self.store.get(uuid).await.unwrap().unwrap(); let index = self.store.get(uuid).await.unwrap().unwrap();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = index.perform_search(query); let result = index.perform_search(query);
ret.send(result) ret.send(result)
}); });
} }
async fn handle_create_index(&self, uuid: Uuid, primary_key: Option<String>, ret: oneshot::Sender<Result<IndexMetadata>>) { async fn handle_create_index(
&self,
uuid: Uuid,
primary_key: Option<String>,
ret: oneshot::Sender<Result<IndexMetadata>>,
) {
let result = self.store.create_index(uuid, primary_key).await; let result = self.store.create_index(uuid, primary_key).await;
let _ = ret.send(result); let _ = ret.send(result);
} }
async fn handle_update(&self, meta: Processing<UpdateMeta>, data: File, ret: oneshot::Sender<UpdateResult>) { async fn handle_update(
&self,
meta: Processing<UpdateMeta>,
data: File,
ret: oneshot::Sender<UpdateResult>,
) {
info!("Processing update {}", meta.id()); info!("Processing update {}", meta.id());
let uuid = meta.index_uuid().clone(); let uuid = meta.index_uuid().clone();
let index = self.store.get_or_create(uuid).await.unwrap(); let index = self.store.get_or_create(uuid).await.unwrap();
@ -105,13 +168,30 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = update_handler.handle_update(meta, data, index); let result = update_handler.handle_update(meta, data, index);
let _ = ret.send(result); let _ = ret.send(result);
}).await; })
.await;
} }
async fn handle_settings(&self, uuid: Uuid, ret: oneshot::Sender<Result<Settings>>) { async fn handle_settings(&self, uuid: Uuid, ret: oneshot::Sender<Result<Settings>>) {
let index = self.store.get(uuid).await.unwrap().unwrap(); let index = self.store.get(uuid).await.unwrap().unwrap();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = index.settings() let result = index.settings().map_err(|e| IndexError::Error(e));
let _ = ret.send(result);
})
.await;
}
async fn handle_fetch_documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
ret: oneshot::Sender<Result<Vec<Document>>>,
) {
let index = self.store.get(uuid).await.unwrap().unwrap();
tokio::task::spawn_blocking(move || {
let result = index.retrieve_documents(offset, limit, attributes_to_retrieve)
.map_err(|e| IndexError::Error(e)); .map_err(|e| IndexError::Error(e));
let _ = ret.send(result); let _ = ret.send(result);
}).await; }).await;
@ -133,9 +213,17 @@ impl IndexActorHandle {
Self { sender } Self { sender }
} }
pub async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMetadata> { pub async fn create_index(
&self,
uuid: Uuid,
primary_key: Option<String>,
) -> Result<IndexMetadata> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::CreateIndex { ret, uuid, primary_key }; let msg = IndexMsg::CreateIndex {
ret,
uuid,
primary_key,
};
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed") receiver.await.expect("IndexActor has been killed")
} }
@ -160,6 +248,25 @@ impl IndexActorHandle {
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
pub async fn documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Documents {
uuid,
ret,
offset,
attributes_to_retrieve,
limit,
};
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
} }
struct MapIndexStore { struct MapIndexStore {
@ -190,29 +297,31 @@ impl IndexStore for MapIndexStore {
create_dir_all(&db_path).expect("can't create db"); create_dir_all(&db_path).expect("can't create db");
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100_000); options.map_size(4096 * 100_000);
let index = milli::Index::new(options, &db_path) let index = milli::Index::new(options, &db_path).map_err(|e| IndexError::Error(e))?;
.map_err(|e| IndexError::Error(e))?;
let index = Index(Arc::new(index)); let index = Index(Arc::new(index));
Ok(index) Ok(index)
}).await.expect("thread died"); })
.await
.expect("thread died");
self.index_store.write().await.insert(meta.uuid.clone(), index?); self.index_store
.write()
.await
.insert(meta.uuid.clone(), index?);
Ok(meta) Ok(meta)
} }
async fn get_or_create(&self, uuid: Uuid) -> Result<Index> { async fn get_or_create(&self, uuid: Uuid) -> Result<Index> {
match self.index_store.write().await.entry(uuid.clone()) { match self.index_store.write().await.entry(uuid.clone()) {
Entry::Vacant(entry) => { Entry::Vacant(entry) => match self.meta_store.write().await.entry(uuid.clone()) {
match self.meta_store.write().await.entry(uuid.clone()) {
Entry::Vacant(_) => { Entry::Vacant(_) => {
todo!() todo!()
} }
Entry::Occupied(entry) => { Entry::Occupied(entry) => {
todo!() todo!()
} }
} },
}
Entry::Occupied(entry) => Ok(entry.get().clone()), Entry::Occupied(entry) => Ok(entry.get().clone()),
} }
} }
@ -228,6 +337,10 @@ impl MapIndexStore {
root.push("indexes/"); root.push("indexes/");
let meta_store = Arc::new(RwLock::new(HashMap::new())); let meta_store = Arc::new(RwLock::new(HashMap::new()));
let index_store = Arc::new(RwLock::new(HashMap::new())); let index_store = Arc::new(RwLock::new(HashMap::new()));
Self { meta_store, index_store, root } Self {
meta_store,
index_store,
root,
}
} }
} }

View File

@ -11,7 +11,6 @@ use actix_web::web::Bytes;
use actix_web::web::Payload; use actix_web::web::Payload;
use anyhow::Context; use anyhow::Context;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use crate::index::{SearchResult, SearchQuery};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateFormat};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
@ -19,6 +18,7 @@ use tokio::sync::{mpsc, oneshot};
use uuid::Uuid; use uuid::Uuid;
pub use updates::{Processed, Processing, Failed}; pub use updates::{Processed, Processing, Failed};
use crate::index::{SearchResult, SearchQuery, Document};
use crate::index::{UpdateResult, Settings, Facets}; use crate::index::{UpdateResult, Settings, Facets};
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>; pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
@ -157,6 +157,21 @@ impl IndexController {
Ok(settings) Ok(settings)
} }
pub async fn documents(
&self,
index: String,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> anyhow::Result<Vec<Document>> {
let uuid = self.uuid_resolver
.resolve(index.clone())
.await?
.with_context(|| format!("Index {:?} doesn't exist", index))?;
let documents = self.index_handle.documents(uuid, offset, limit, attributes_to_retrieve).await?;
Ok(documents)
}
fn update_index(&self, name: String, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> { fn update_index(&self, name: String, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> {
todo!() todo!()
} }

View File

@ -55,17 +55,18 @@ async fn get_document(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<DocumentParam>, path: web::Path<DocumentParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index = path.index_uid.clone(); todo!()
let id = path.document_id.clone(); //let index = path.index_uid.clone();
match data.retrieve_document(index, id, None as Option<Vec<String>>).await { //let id = path.document_id.clone();
Ok(document) => { //match data.retrieve_document(index, id, None as Option<Vec<String>>).await {
let json = serde_json::to_string(&document).unwrap(); //Ok(document) => {
Ok(HttpResponse::Ok().body(json)) //let json = serde_json::to_string(&document).unwrap();
} //Ok(HttpResponse::Ok().body(json))
Err(e) => { //}
Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) //Err(e) => {
} //Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
} //}
//}
} }
#[delete( #[delete(