diff --git a/src/data/updates.rs b/src/data/updates.rs index a712f4cc6..3e3837861 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -65,9 +65,8 @@ impl Data { //self.index_controller.update_status(index, uid) } - pub fn get_updates_status(&self, index: impl AsRef) -> anyhow::Result> { - todo!() - //self.index_controller.all_update_status(index) + pub async fn get_updates_status(&self, index: impl AsRef) -> anyhow::Result> { + self.index_controller.all_update_status(index.as_ref().to_string()).await } pub fn update_index( diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index d790d1d58..8eb1684a2 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -107,7 +107,7 @@ impl IndexController { } pub async fn clear_documents(&self, index: String) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(index).await.unwrap().unwrap(); + let uuid = self.uuid_resolver.resolve(index).await?.unwrap(); let meta = UpdateMeta::ClearDocuments; let (_, receiver) = mpsc::channel(1); let status = self.update_handle.update(meta, receiver, uuid).await?; @@ -154,8 +154,12 @@ impl IndexController { todo!() } - fn all_update_status(&self, index: String) -> anyhow::Result> { - todo!() + pub async fn all_update_status(&self, index: String) -> anyhow::Result> { + let uuid = self.uuid_resolver + .resolve(index).await? + .context("index not found")?; + let result = self.update_handle.get_all_updates_status(uuid).await?; + Ok(result) } pub fn list_indexes(&self) -> anyhow::Result> { diff --git a/src/index_controller/update_actor.rs b/src/index_controller/update_actor.rs index 4d1ff78fb..b236df2a6 100644 --- a/src/index_controller/update_actor.rs +++ b/src/index_controller/update_actor.rs @@ -1,7 +1,7 @@ +use std::collections::{hash_map::Entry, HashMap}; use std::fs::create_dir_all; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::collections::{HashMap, hash_map::Entry}; use super::index_actor::IndexActorHandle; use log::info; @@ -10,6 +10,7 @@ use tokio::fs::File; use tokio::io::AsyncWriteExt; use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; +use itertools::Itertools; use crate::index::UpdateResult; use crate::index_controller::{UpdateMeta, UpdateStatus}; @@ -22,6 +23,8 @@ type PayloadData = std::result::Result), + #[error("Index {0} doesn't exist.")] + UnexistingIndex(Uuid), } enum UpdateMsg { @@ -46,6 +49,7 @@ struct UpdateActor { #[async_trait::async_trait] trait UpdateStoreStore { async fn get_or_create(&self, uuid: Uuid) -> Result>; + async fn get(&self, uuid: &Uuid) -> Result>>; } impl UpdateActor @@ -53,24 +57,16 @@ where D: AsRef<[u8]> + Sized + 'static, S: UpdateStoreStore, { - fn new( - store: S, - inbox: mpsc::Receiver>, - path: impl AsRef, - ) -> Self { + fn new(store: S, inbox: mpsc::Receiver>, path: impl AsRef) -> Self { let path = path.as_ref().to_owned().join("update_files"); create_dir_all(&path).unwrap(); - Self { - store, - inbox, - path, - } + Self { store, inbox, path } } async fn run(mut self) { use UpdateMsg::*; - info!("started update actor."); + info!("Started update actor."); loop { match self.inbox.recv().await { @@ -79,8 +75,12 @@ where meta, data, ret, - }) => self.handle_update(uuid, meta, data, ret).await, - Some(ListUpdates { uuid, ret }) => self.handle_list_updates(uuid, ret).await, + }) => { + let _ = ret.send(self.handle_update(uuid, meta, data).await); + } + Some(ListUpdates { uuid, ret }) => { + let _ = ret.send(self.handle_list_updates(uuid).await); + } , None => {} } } @@ -91,45 +91,68 @@ where uuid: Uuid, meta: UpdateMeta, mut payload: mpsc::Receiver>, - ret: oneshot::Sender>, - ) { - let update_store = self.store.get_or_create(uuid).await.unwrap(); + ) -> Result { + let update_store = self.store.get_or_create(uuid).await?; let update_file_id = uuid::Uuid::new_v4(); let path = self.path.join(format!("update_{}", update_file_id)); - let mut file = File::create(&path).await.unwrap(); + let mut file = File::create(&path).await + .map_err(|e| UpdateError::Error(Box::new(e)))?; while let Some(bytes) = payload.recv().await { match bytes { Ok(bytes) => { - file.write_all(bytes.as_ref()).await; + file.write_all(bytes.as_ref()).await + .map_err(|e| UpdateError::Error(Box::new(e)))?; } Err(e) => { - ret.send(Err(UpdateError::Error(e))); - return; + return Err(UpdateError::Error(e)); } } } - file.flush().await; + file.flush().await + .map_err(|e| UpdateError::Error(Box::new(e)))?; let file = file.into_std().await; - let result = tokio::task::spawn_blocking(move || { + tokio::task::spawn_blocking(move || { let result = update_store .register_update(meta, path, uuid) .map(|pending| UpdateStatus::Pending(pending)) .map_err(|e| UpdateError::Error(Box::new(e))); - let _ = ret.send(result); + result }) - .await; + .await + .map_err(|e| UpdateError::Error(Box::new(e)))? } async fn handle_list_updates( &self, uuid: Uuid, - ret: oneshot::Sender>>, - ) { - todo!() + ) -> Result> { + let store = self.store.get(&uuid).await?; + tokio::task::spawn_blocking(move || { + let result = match store { + Some(update_store) => { + let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| { + Ok(processing + .map(UpdateStatus::from) + .into_iter() + .chain(pending.filter_map(|p| p.ok()).map(|(_, u)| UpdateStatus::from(u))) + .chain(aborted.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u))) + .chain(processed.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u))) + .chain(failed.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u))) + .sorted_by(|a, b| a.id().cmp(&b.id())) + .collect()) + }) + .map_err(|e| UpdateError::Error(Box::new(e)))?; + Ok(updates) + } + None => Err(UpdateError::UnexistingIndex(uuid)), + }; + result + }).await + .map_err(|e| UpdateError::Error(Box::new(e)))? } } @@ -169,6 +192,13 @@ where let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } + + pub async fn get_all_updates_status(&self, uuid: Uuid) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::ListUpdates { uuid, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } } struct MapUpdateStoreStore { @@ -181,7 +211,11 @@ impl MapUpdateStoreStore { fn new(index_handle: IndexActorHandle, path: impl AsRef) -> Self { let db = Arc::new(RwLock::new(HashMap::new())); let path = path.as_ref().to_owned(); - Self { db, index_handle, path } + Self { + db, + index_handle, + path, + } } } @@ -197,13 +231,16 @@ impl UpdateStoreStore for MapUpdateStoreStore { let index_handle = self.index_handle.clone(); let store = UpdateStore::open(options, &path, move |meta, file| { futures::executor::block_on(index_handle.update(meta, file)) - }).unwrap(); + }) + .unwrap(); let store = e.insert(store); Ok(store.clone()) } - Entry::Occupied(e) => { - Ok(e.get().clone()) - } + Entry::Occupied(e) => Ok(e.get().clone()), } } + + async fn get(&self, uuid: &Uuid) -> Result>> { + Ok(self.db.read().await.get(uuid).cloned()) + } } diff --git a/src/routes/index.rs b/src/routes/index.rs index 813256517..a8618d9dd 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -154,7 +154,7 @@ async fn get_all_updates_status( data: web::Data, path: web::Path, ) -> Result { - let result = data.get_updates_status(&path.index_uid); + let result = data.get_updates_status(&path.index_uid).await; match result { Ok(metas) => { let json = serde_json::to_string(&metas).unwrap();