diff --git a/src/data/search.rs b/src/data/search.rs index d6bf8438e..319f8b973 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -12,59 +12,22 @@ impl Data { self.index_controller.search(index.as_ref().to_string(), search_query).await } - pub async fn retrieve_documents( + pub async fn retrieve_documents( &self, - _index: String, - _offset: usize, - _limit: usize, - _attributes_to_retrieve: Option>, - ) -> anyhow::Result>> - where - S: AsRef + Send + Sync + 'static, - { - todo!() - //let index_controller = self.index_controller.clone(); - //let documents: anyhow::Result<_> = tokio::task::spawn_blocking(move || { - //let index = index_controller - //.index(index.clone())? - //.with_context(|| format!("Index {:?} doesn't exist", index))?; - - //let txn = index.read_txn()?; - - //let fields_ids_map = index.fields_ids_map(&txn)?; - - //let attributes_to_retrieve_ids = match attributes_to_retrieve { - //Some(attrs) => attrs - //.iter() - //.filter_map(|f| fields_ids_map.id(f.as_ref())) - //.collect::>(), - //None => fields_ids_map.iter().map(|(id, _)| id).collect(), - //}; - - //let iter = index.documents.range(&txn, &(..))?.skip(offset).take(limit); - - //let mut documents = Vec::new(); - - //for entry in iter { - //let (_id, obkv) = entry?; - //let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?; - //documents.push(object); - //} - - //Ok(documents) - //}) - //.await?; - //documents + index: String, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> anyhow::Result>> { + self.index_controller.documents(index, offset, limit, attributes_to_retrieve).await } pub async fn retrieve_document( &self, _index: impl AsRef + Sync + Send + 'static, _document_id: impl AsRef + Sync + Send + 'static, - _attributes_to_retrieve: Option>, + _attributes_to_retrieve: Option>, ) -> anyhow::Result> - where - S: AsRef + Sync + Send + 'static, { todo!() //let index_controller = self.index_controller.clone(); diff --git a/src/index/mod.rs b/src/index/mod.rs index a68f983e9..8ae2ba6a2 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -4,9 +4,14 @@ mod updates; use std::sync::Arc; use std::ops::Deref; +use serde_json::{Value, Map}; +use milli::obkv_to_json; + pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; pub use updates::{Settings, Facets, UpdateResult}; +pub type Document = Map; + #[derive(Clone)] pub struct Index(pub Arc); @@ -45,4 +50,38 @@ impl Index { criteria: None, }) } + + pub fn retrieve_documents( + &self, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> anyhow::Result>> + where + S: AsRef + Send + Sync + 'static, + { + let txn = self.read_txn()?; + + let fields_ids_map = self.fields_ids_map(&txn)?; + + let attributes_to_retrieve_ids = match attributes_to_retrieve { + Some(attrs) => attrs + .iter() + .filter_map(|f| fields_ids_map.id(f.as_ref())) + .collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; + + let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit); + + let mut documents = Vec::new(); + + for entry in iter { + let (_id, obkv) = entry?; + let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?; + documents.push(object); + } + + Ok(documents) + } } diff --git a/src/index_controller/index_actor.rs b/src/index_controller/index_actor.rs index 655a1e2d5..8e59227ac 100644 --- a/src/index_controller/index_actor.rs +++ b/src/index_controller/index_actor.rs @@ -1,6 +1,6 @@ -use std::collections::{HashMap, hash_map::Entry}; -use std::fs::{File, create_dir_all}; -use std::path::{PathBuf, Path}; +use std::collections::{hash_map::Entry, HashMap}; +use std::fs::{create_dir_all, File}; +use std::path::{Path, PathBuf}; use std::sync::Arc; use async_stream::stream; @@ -8,25 +8,51 @@ use chrono::Utc; use futures::stream::StreamExt; use heed::EnvOpenOptions; use log::info; +use serde_json::{Map, Value}; use thiserror::Error; use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; use super::update_handler::UpdateHandler; -use crate::index_controller::{IndexMetadata, UpdateMeta, updates::{Processed, Failed, Processing}}; use crate::index::UpdateResult as UResult; +use crate::index::{Document, Index, SearchQuery, SearchResult, Settings}; +use crate::index_controller::{ + updates::{Failed, Processed, Processing}, + IndexMetadata, UpdateMeta, +}; use crate::option::IndexerOpts; -use crate::index::{Index, SearchQuery, SearchResult, Settings}; pub type Result = std::result::Result; type AsyncMap = Arc>>; type UpdateResult = std::result::Result, Failed>; enum IndexMsg { - CreateIndex { uuid: Uuid, primary_key: Option, ret: oneshot::Sender> }, - Update { meta: Processing, data: std::fs::File, ret: oneshot::Sender}, - Search { uuid: Uuid, query: SearchQuery, ret: oneshot::Sender> }, - Settings { uuid: Uuid, ret: oneshot::Sender> }, + CreateIndex { + uuid: Uuid, + primary_key: Option, + ret: oneshot::Sender>, + }, + Update { + meta: Processing, + data: std::fs::File, + ret: oneshot::Sender, + }, + Search { + uuid: Uuid, + query: SearchQuery, + ret: oneshot::Sender>, + }, + Settings { + uuid: Uuid, + ret: oneshot::Sender>, + }, + Documents { + uuid: Uuid, + attributes_to_retrieve: Option>, + offset: usize, + limit: usize, + ret: oneshot::Sender>>>, + }, } struct IndexActor { @@ -56,11 +82,20 @@ impl IndexActor { let update_handler = UpdateHandler::new(&options).unwrap(); let update_handler = Arc::new(update_handler); let inbox = Some(inbox); - Self { inbox, store, update_handler } + Self { + inbox, + store, + update_handler, + } } async fn run(mut self) { - let mut inbox = self.inbox.take().expect("Index Actor must have a inbox at this point."); + use IndexMsg::*; + + let mut inbox = self + .inbox + .take() + .expect("Index Actor must have a inbox at this point."); let stream = stream! { loop { @@ -73,31 +108,59 @@ impl IndexActor { let fut = stream.for_each_concurrent(Some(10), |msg| async { match msg { - IndexMsg::CreateIndex { uuid, primary_key, ret } => self.handle_create_index(uuid, primary_key, ret).await, - IndexMsg::Update { ret, meta, data } => self.handle_update(meta, data, ret).await, - IndexMsg::Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await, - IndexMsg::Settings { ret, uuid } => self.handle_settings(uuid, ret).await, + CreateIndex { + uuid, + primary_key, + ret, + } => self.handle_create_index(uuid, primary_key, ret).await, + Update { ret, meta, data } => self.handle_update(meta, data, ret).await, + Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await, + Settings { ret, uuid } => self.handle_settings(uuid, ret).await, + Documents { + ret, + uuid, + attributes_to_retrieve, + offset, + limit, + } => { + self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret) + .await + } } }); fut.await; } - async fn handle_search(&self, uuid: Uuid, query: SearchQuery, ret: oneshot::Sender>) { + async fn handle_search( + &self, + uuid: Uuid, + query: SearchQuery, + ret: oneshot::Sender>, + ) { let index = self.store.get(uuid).await.unwrap().unwrap(); tokio::task::spawn_blocking(move || { let result = index.perform_search(query); ret.send(result) }); - } - async fn handle_create_index(&self, uuid: Uuid, primary_key: Option, ret: oneshot::Sender>) { + async fn handle_create_index( + &self, + uuid: Uuid, + primary_key: Option, + ret: oneshot::Sender>, + ) { let result = self.store.create_index(uuid, primary_key).await; let _ = ret.send(result); } - async fn handle_update(&self, meta: Processing, data: File, ret: oneshot::Sender) { + async fn handle_update( + &self, + meta: Processing, + data: File, + ret: oneshot::Sender, + ) { info!("Processing update {}", meta.id()); let uuid = meta.index_uuid().clone(); let index = self.store.get_or_create(uuid).await.unwrap(); @@ -105,13 +168,30 @@ impl IndexActor { tokio::task::spawn_blocking(move || { let result = update_handler.handle_update(meta, data, index); let _ = ret.send(result); - }).await; + }) + .await; } async fn handle_settings(&self, uuid: Uuid, ret: oneshot::Sender>) { let index = self.store.get(uuid).await.unwrap().unwrap(); tokio::task::spawn_blocking(move || { - let result = index.settings() + let result = index.settings().map_err(|e| IndexError::Error(e)); + let _ = ret.send(result); + }) + .await; + } + + async fn handle_fetch_documents( + &self, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ret: oneshot::Sender>>, + ) { + let index = self.store.get(uuid).await.unwrap().unwrap(); + tokio::task::spawn_blocking(move || { + let result = index.retrieve_documents(offset, limit, attributes_to_retrieve) .map_err(|e| IndexError::Error(e)); let _ = ret.send(result); }).await; @@ -133,9 +213,17 @@ impl IndexActorHandle { Self { sender } } - pub async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { + pub async fn create_index( + &self, + uuid: Uuid, + primary_key: Option, + ) -> Result { let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::CreateIndex { ret, uuid, primary_key }; + let msg = IndexMsg::CreateIndex { + ret, + uuid, + primary_key, + }; let _ = self.sender.send(msg).await; receiver.await.expect("IndexActor has been killed") } @@ -160,6 +248,25 @@ impl IndexActorHandle { let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } + + pub async fn documents( + &self, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Documents { + uuid, + ret, + offset, + attributes_to_retrieve, + limit, + }; + let _ = self.sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } } struct MapIndexStore { @@ -190,29 +297,31 @@ impl IndexStore for MapIndexStore { create_dir_all(&db_path).expect("can't create db"); let mut options = EnvOpenOptions::new(); options.map_size(4096 * 100_000); - let index = milli::Index::new(options, &db_path) - .map_err(|e| IndexError::Error(e))?; + let index = milli::Index::new(options, &db_path).map_err(|e| IndexError::Error(e))?; let index = Index(Arc::new(index)); Ok(index) - }).await.expect("thread died"); + }) + .await + .expect("thread died"); - self.index_store.write().await.insert(meta.uuid.clone(), index?); + self.index_store + .write() + .await + .insert(meta.uuid.clone(), index?); Ok(meta) } async fn get_or_create(&self, uuid: Uuid) -> Result { match self.index_store.write().await.entry(uuid.clone()) { - Entry::Vacant(entry) => { - match self.meta_store.write().await.entry(uuid.clone()) { - Entry::Vacant(_) => { - todo!() - } - Entry::Occupied(entry) => { - todo!() - } + Entry::Vacant(entry) => match self.meta_store.write().await.entry(uuid.clone()) { + Entry::Vacant(_) => { + todo!() } - } + Entry::Occupied(entry) => { + todo!() + } + }, Entry::Occupied(entry) => Ok(entry.get().clone()), } } @@ -228,6 +337,10 @@ impl MapIndexStore { root.push("indexes/"); let meta_store = Arc::new(RwLock::new(HashMap::new())); let index_store = Arc::new(RwLock::new(HashMap::new())); - Self { meta_store, index_store, root } + Self { + meta_store, + index_store, + root, + } } } diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index f1bc922d6..47ce04b96 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -11,7 +11,6 @@ use actix_web::web::Bytes; use actix_web::web::Payload; use anyhow::Context; use chrono::{DateTime, Utc}; -use crate::index::{SearchResult, SearchQuery}; use futures::stream::StreamExt; use milli::update::{IndexDocumentsMethod, UpdateFormat}; use serde::{Serialize, Deserialize}; @@ -19,6 +18,7 @@ use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; pub use updates::{Processed, Processing, Failed}; +use crate::index::{SearchResult, SearchQuery, Document}; use crate::index::{UpdateResult, Settings, Facets}; pub type UpdateStatus = updates::UpdateStatus; @@ -157,6 +157,21 @@ impl IndexController { Ok(settings) } + pub async fn documents( + &self, + index: String, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> anyhow::Result> { + let uuid = self.uuid_resolver + .resolve(index.clone()) + .await? + .with_context(|| format!("Index {:?} doesn't exist", index))?; + let documents = self.index_handle.documents(uuid, offset, limit, attributes_to_retrieve).await?; + Ok(documents) + } + fn update_index(&self, name: String, index_settings: IndexSettings) -> anyhow::Result { todo!() } diff --git a/src/routes/document.rs b/src/routes/document.rs index 00d037359..af9efc701 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -55,17 +55,18 @@ async fn get_document( data: web::Data, path: web::Path, ) -> Result { - let index = path.index_uid.clone(); - let id = path.document_id.clone(); - match data.retrieve_document(index, id, None as Option>).await { - Ok(document) => { - let json = serde_json::to_string(&document).unwrap(); - Ok(HttpResponse::Ok().body(json)) - } - Err(e) => { - Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) - } - } + todo!() + //let index = path.index_uid.clone(); + //let id = path.document_id.clone(); + //match data.retrieve_document(index, id, None as Option>).await { + //Ok(document) => { + //let json = serde_json::to_string(&document).unwrap(); + //Ok(HttpResponse::Ok().body(json)) + //} + //Err(e) => { + //Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) + //} + //} } #[delete(