clean index actor unwraps

This commit is contained in:
mpostma 2021-03-08 15:53:16 +01:00
parent 2f93cce7aa
commit 06403a5708
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
4 changed files with 113 additions and 97 deletions

View File

@ -59,10 +59,8 @@ impl Data {
pub fn new(options: Opt) -> anyhow::Result<Data> { pub fn new(options: Opt) -> anyhow::Result<Data> {
let path = options.db_path.clone(); let path = options.db_path.clone();
//let indexer_opts = options.indexer_options.clone();
create_dir_all(&path)?; create_dir_all(&path)?;
let index_controller = IndexController::new(&path); let index_controller = IndexController::new(&path)?;
let mut api_keys = ApiKeys { let mut api_keys = ApiKeys {
master: options.clone().master_key, master: options.clone().master_key,

View File

@ -5,15 +5,15 @@ use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use async_stream::stream; use async_stream::stream;
use chrono::{Utc, DateTime}; use chrono::{DateTime, Utc};
use futures::pin_mut; use futures::pin_mut;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use log::info; use log::info;
use serde::{Deserialize, Serialize};
use thiserror::Error; use thiserror::Error;
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid; use uuid::Uuid;
use serde::{Serialize, Deserialize};
use super::get_arc_ownership_blocking; use super::get_arc_ownership_blocking;
use super::update_handler::UpdateHandler; use super::update_handler::UpdateHandler;
@ -31,7 +31,7 @@ type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<U
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct IndexMeta{ pub struct IndexMeta {
uuid: Uuid, uuid: Uuid,
created_at: DateTime<Utc>, created_at: DateTime<Utc>,
updated_at: DateTime<Utc>, updated_at: DateTime<Utc>,
@ -47,7 +47,7 @@ enum IndexMsg {
Update { Update {
meta: Processing<UpdateMeta>, meta: Processing<UpdateMeta>,
data: std::fs::File, data: std::fs::File,
ret: oneshot::Sender<UpdateResult>, ret: oneshot::Sender<Result<UpdateResult>>,
}, },
Search { Search {
uuid: Uuid, uuid: Uuid,
@ -102,7 +102,8 @@ pub enum IndexError {
trait IndexStore { trait IndexStore {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta>; async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta>;
async fn update_index<R, F>(&self, uuid: Uuid, f: F) -> Result<R> async fn update_index<R, F>(&self, uuid: Uuid, f: F) -> Result<R>
where F: FnOnce(Index) -> Result<R> + Send + Sync + 'static, where
F: FnOnce(Index) -> Result<R> + Send + Sync + 'static,
R: Sync + Send + 'static; R: Sync + Send + 'static;
async fn get_or_create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>; async fn get_or_create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>; async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
@ -115,18 +116,19 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
read_receiver: mpsc::Receiver<IndexMsg>, read_receiver: mpsc::Receiver<IndexMsg>,
write_receiver: mpsc::Receiver<IndexMsg>, write_receiver: mpsc::Receiver<IndexMsg>,
store: S, store: S,
) -> Self { ) -> Result<Self> {
let options = IndexerOpts::default(); let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).unwrap(); let update_handler = UpdateHandler::new(&options)
.map_err(|e| IndexError::Error(e.into()))?;
let update_handler = Arc::new(update_handler); let update_handler = Arc::new(update_handler);
let read_receiver = Some(read_receiver); let read_receiver = Some(read_receiver);
let write_receiver = Some(write_receiver); let write_receiver = Some(write_receiver);
Self { Ok(Self {
read_receiver, read_receiver,
write_receiver, write_receiver,
store, store,
update_handler, update_handler,
} })
} }
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send /// `run` poll the write_receiver and read_receiver concurrently, but while messages send
@ -180,10 +182,18 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
uuid, uuid,
primary_key, primary_key,
ret, ret,
} => self.handle_create_index(uuid, primary_key, ret).await, } => {
Update { ret, meta, data } => self.handle_update(meta, data, ret).await, let _ = ret.send(self.handle_create_index(uuid, primary_key).await);
Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await, }
Settings { ret, uuid } => self.handle_settings(uuid, ret).await, Update { ret, meta, data } => {
let _ = ret.send(self.handle_update(meta, data).await);
}
Search { ret, query, uuid } => {
let _ = ret.send(self.handle_search(uuid, query).await);
}
Settings { ret, uuid } => {
let _ = ret.send(self.handle_settings(uuid).await);
}
Documents { Documents {
ret, ret,
uuid, uuid,
@ -191,8 +201,10 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
offset, offset,
limit, limit,
} => { } => {
self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret) let _ = ret.send(
.await self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve)
.await,
);
} }
Document { Document {
uuid, uuid,
@ -200,8 +212,10 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
doc_id, doc_id,
ret, ret,
} => { } => {
self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret) let _ = ret.send(
.await self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve)
.await,
);
} }
Delete { uuid, ret } => { Delete { uuid, ret } => {
let _ = ret.send(self.handle_delete(uuid).await); let _ = ret.send(self.handle_delete(uuid).await);
@ -212,56 +226,51 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
} }
} }
async fn handle_search( async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result<SearchResult> {
&self, let index = self.store
uuid: Uuid, .get(uuid)
query: SearchQuery, .await?
ret: oneshot::Sender<anyhow::Result<SearchResult>>, .ok_or(IndexError::UnexistingIndex)?;
) { tokio::task::spawn_blocking(move || index.perform_search(query)).await?
let index = self.store.get(uuid).await.unwrap().unwrap();
tokio::task::spawn_blocking(move || {
let result = index.perform_search(query);
ret.send(result)
});
} }
async fn handle_create_index( async fn handle_create_index(
&self, &self,
uuid: Uuid, uuid: Uuid,
primary_key: Option<String>, primary_key: Option<String>,
ret: oneshot::Sender<Result<IndexMeta>>, ) -> Result<IndexMeta> {
) { self.store.create_index(uuid, primary_key).await
let result = self.store.create_index(uuid, primary_key).await;
let _ = ret.send(result);
} }
async fn handle_update( async fn handle_update(
&self, &self,
meta: Processing<UpdateMeta>, meta: Processing<UpdateMeta>,
data: File, data: File,
ret: oneshot::Sender<UpdateResult>, ) -> Result<UpdateResult> {
) {
info!("Processing update {}", meta.id()); info!("Processing update {}", meta.id());
let uuid = meta.index_uuid().clone(); let uuid = meta.index_uuid().clone();
let update_handler = self.update_handler.clone(); let update_handler = self.update_handler.clone();
let handle = self.store.update_index(uuid, |index| { let handle = self
.store
.update_index(uuid, |index| {
let handle = tokio::task::spawn_blocking(move || { let handle = tokio::task::spawn_blocking(move || {
let result = update_handler.handle_update(meta, data, index); update_handler.handle_update(meta, data, index)
let _ = ret.send(result);
}); });
Ok(handle) Ok(handle)
}); })
.await?;
handle.await; handle.await.map_err(|e| IndexError::Error(e.into()))
} }
async fn handle_settings(&self, uuid: Uuid, ret: oneshot::Sender<Result<Settings>>) { async fn handle_settings(&self, uuid: Uuid) -> Result<Settings> {
let index = self.store.get(uuid).await.unwrap().unwrap(); let index = self.store
tokio::task::spawn_blocking(move || { .get(uuid)
let result = index.settings().map_err(|e| IndexError::Error(e)); .await?
let _ = ret.send(result); .ok_or(IndexError::UnexistingIndex)?;
}) tokio::task::spawn_blocking(move || index.settings().map_err(|e| IndexError::Error(e)))
.await; .await
.map_err(|e| IndexError::Error(e.into()))?
} }
async fn handle_fetch_documents( async fn handle_fetch_documents(
@ -270,16 +279,17 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
offset: usize, offset: usize,
limit: usize, limit: usize,
attributes_to_retrieve: Option<Vec<String>>, attributes_to_retrieve: Option<Vec<String>>,
ret: oneshot::Sender<Result<Vec<Document>>>, ) -> Result<Vec<Document>> {
) { let index = self.store.get(uuid)
let index = self.store.get(uuid).await.unwrap().unwrap(); .await?
.ok_or(IndexError::UnexistingIndex)?;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = index index
.retrieve_documents(offset, limit, attributes_to_retrieve) .retrieve_documents(offset, limit, attributes_to_retrieve)
.map_err(|e| IndexError::Error(e)); .map_err(|e| IndexError::Error(e))
let _ = ret.send(result);
}) })
.await; .await
.map_err(|e| IndexError::Error(e.into()))?
} }
async fn handle_fetch_document( async fn handle_fetch_document(
@ -287,16 +297,19 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
uuid: Uuid, uuid: Uuid,
doc_id: String, doc_id: String,
attributes_to_retrieve: Option<Vec<String>>, attributes_to_retrieve: Option<Vec<String>>,
ret: oneshot::Sender<Result<Document>>, ) -> Result<Document> {
) { let index = self
let index = self.store.get(uuid).await.unwrap().unwrap(); .store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = index index
.retrieve_document(doc_id, attributes_to_retrieve) .retrieve_document(doc_id, attributes_to_retrieve)
.map_err(|e| IndexError::Error(e)); .map_err(|e| IndexError::Error(e))
let _ = ret.send(result);
}) })
.await; .await
.map_err(|e| IndexError::Error(e.into()))?
} }
async fn handle_delete(&self, uuid: Uuid) -> Result<()> { async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
@ -329,24 +342,20 @@ pub struct IndexActorHandle {
} }
impl IndexActorHandle { impl IndexActorHandle {
pub fn new(path: impl AsRef<Path>) -> Self { pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let (read_sender, read_receiver) = mpsc::channel(100); let (read_sender, read_receiver) = mpsc::channel(100);
let (write_sender, write_receiver) = mpsc::channel(100); let (write_sender, write_receiver) = mpsc::channel(100);
let store = MapIndexStore::new(path); let store = MapIndexStore::new(path);
let actor = IndexActor::new(read_receiver, write_receiver, store); let actor = IndexActor::new(read_receiver, write_receiver, store)?;
tokio::task::spawn(actor.run()); tokio::task::spawn(actor.run());
Self { Ok(Self {
read_sender, read_sender,
write_sender, write_sender,
} })
} }
pub async fn create_index( pub async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
&self,
uuid: Uuid,
primary_key: Option<String>,
) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::CreateIndex { let msg = IndexMsg::CreateIndex {
ret, ret,
@ -357,11 +366,15 @@ impl IndexActorHandle {
receiver.await.expect("IndexActor has been killed") receiver.await.expect("IndexActor has been killed")
} }
pub async fn update(&self, meta: Processing<UpdateMeta>, data: std::fs::File) -> UpdateResult { pub async fn update(
&self,
meta: Processing<UpdateMeta>,
data: std::fs::File,
) -> anyhow::Result<UpdateResult> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Update { ret, meta, data }; let msg = IndexMsg::Update { ret, meta, data };
let _ = self.read_sender.send(msg).await; let _ = self.read_sender.send(msg).await;
receiver.await.expect("IndexActor has been killed") Ok(receiver.await.expect("IndexActor has been killed")?)
} }
pub async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> { pub async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
@ -441,7 +454,7 @@ impl IndexStore for MapIndexStore {
let meta = match self.meta_store.write().await.entry(uuid.clone()) { let meta = match self.meta_store.write().await.entry(uuid.clone()) {
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
let now = Utc::now(); let now = Utc::now();
let meta = IndexMeta{ let meta = IndexMeta {
uuid, uuid,
created_at: now.clone(), created_at: now.clone(),
updated_at: now, updated_at: now,
@ -478,7 +491,7 @@ impl IndexStore for MapIndexStore {
Entry::Vacant(index_entry) => match self.meta_store.write().await.entry(uuid.clone()) { Entry::Vacant(index_entry) => match self.meta_store.write().await.entry(uuid.clone()) {
Entry::Vacant(meta_entry) => { Entry::Vacant(meta_entry) => {
let now = Utc::now(); let now = Utc::now();
let meta = IndexMeta{ let meta = IndexMeta {
uuid, uuid,
created_at: now.clone(), created_at: now.clone(),
updated_at: now, updated_at: now,
@ -491,7 +504,8 @@ impl IndexStore for MapIndexStore {
create_dir_all(&db_path).expect("can't create db"); create_dir_all(&db_path).expect("can't create db");
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100_000); options.map_size(4096 * 100_000);
let index = milli::Index::new(options, &db_path).map_err(|e| IndexError::Error(e))?; let index = milli::Index::new(options, &db_path)
.map_err(|e| IndexError::Error(e))?;
let index = Index(Arc::new(index)); let index = Index(Arc::new(index));
Ok(index) Ok(index)
}) })
@ -508,7 +522,8 @@ impl IndexStore for MapIndexStore {
create_dir_all(&db_path).expect("can't create db"); create_dir_all(&db_path).expect("can't create db");
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100_000); options.map_size(4096 * 100_000);
let index = milli::Index::new(options, &db_path).map_err(|e| IndexError::Error(e))?; let index = milli::Index::new(options, &db_path)
.map_err(|e| IndexError::Error(e))?;
let index = Index(Arc::new(index)); let index = Index(Arc::new(index));
Ok(index) Ok(index)
}) })
@ -540,11 +555,14 @@ impl IndexStore for MapIndexStore {
} }
async fn update_index<R, F>(&self, uuid: Uuid, f: F) -> Result<R> async fn update_index<R, F>(&self, uuid: Uuid, f: F) -> Result<R>
where F: FnOnce(Index) -> Result<R> + Send + Sync + 'static, where
F: FnOnce(Index) -> Result<R> + Send + Sync + 'static,
R: Sync + Send + 'static, R: Sync + Send + 'static,
{ {
let index = self.get_or_create(uuid.clone(), None).await?; let index = self.get_or_create(uuid.clone(), None).await?;
let mut meta = self.get_meta(&uuid).await? let mut meta = self
.get_meta(&uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?; .ok_or(IndexError::UnexistingIndex)?;
match f(index) { match f(index) {
Ok(r) => { Ok(r) => {
@ -552,7 +570,7 @@ impl IndexStore for MapIndexStore {
self.meta_store.write().await.insert(uuid, meta); self.meta_store.write().await.insert(uuid, meta);
Ok(r) Ok(r)
} }
Err(e) => Err(e) Err(e) => Err(e),
} }
} }
} }

View File

@ -69,11 +69,11 @@ enum IndexControllerMsg {
} }
impl IndexController { impl IndexController {
pub fn new(path: impl AsRef<Path>) -> Self { pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let uuid_resolver = uuid_resolver::UuidResolverHandle::new(); let uuid_resolver = uuid_resolver::UuidResolverHandle::new();
let index_actor = index_actor::IndexActorHandle::new(&path); let index_actor = index_actor::IndexActorHandle::new(&path)?;
let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path); let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path);
Self { uuid_resolver, index_handle: index_actor, update_handle } Ok(Self { uuid_resolver, index_handle: index_actor, update_handle })
} }
pub async fn add_documents( pub async fn add_documents(

View File

@ -30,18 +30,18 @@ pub trait HandleUpdate<M, N, E> {
&mut self, &mut self,
meta: Processing<M>, meta: Processing<M>,
content: File, content: File,
) -> Result<Processed<M, N>, Failed<M, E>>; ) -> anyhow::Result<Result<Processed<M, N>, Failed<M, E>>>;
} }
impl<M, N, E, F> HandleUpdate<M, N, E> for F impl<M, N, E, F> HandleUpdate<M, N, E> for F
where where
F: FnMut(Processing<M>, File) -> Result<Processed<M, N>, Failed<M, E>>, F: FnMut(Processing<M>, File) -> anyhow::Result<Result<Processed<M, N>, Failed<M, E>>>,
{ {
fn handle_update( fn handle_update(
&mut self, &mut self,
meta: Processing<M>, meta: Processing<M>,
content: File, content: File,
) -> Result<Processed<M, N>, Failed<M, E>> { ) -> anyhow::Result<Result<Processed<M, N>, Failed<M, E>>> {
self(meta, content) self(meta, content)
} }
} }
@ -100,7 +100,7 @@ where
update_store.process_pending_update(handler) update_store.process_pending_update(handler)
}) })
.await .await
.unwrap(); .expect("Fatal error processing update.");
match res { match res {
Ok(Some(_)) => (), Ok(Some(_)) => (),
Ok(None) => break, Ok(None) => break,
@ -185,7 +185,7 @@ where
/// Executes the user provided function on the next pending update (the one with the lowest id). /// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and /// This is asynchronous as it let the user process the update with a read-only txn and
/// only writing the result meta to the processed-meta store *after* it has been processed. /// only writing the result meta to the processed-meta store *after* it has been processed.
fn process_pending_update<U>(&self, mut handler: U) -> heed::Result<Option<()>> fn process_pending_update<U>(&self, mut handler: U) -> anyhow::Result<Option<()>>
where where
U: HandleUpdate<M, N, E>, U: HandleUpdate<M, N, E>,
{ {
@ -209,7 +209,7 @@ where
self.processing.write().unwrap().replace(processing.clone()); self.processing.write().unwrap().replace(processing.clone());
let file = File::open(&content_path)?; let file = File::open(&content_path)?;
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
let result = handler.handle_update(processing, file); let result = handler.handle_update(processing, file)?;
drop(rtxn); drop(rtxn);
// Once the pending update have been successfully processed // Once the pending update have been successfully processed