implement list all updates

This commit is contained in:
mpostma 2021-03-05 18:34:04 +01:00
parent c2282ab5cb
commit a9c7b73744
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
4 changed files with 80 additions and 40 deletions

View File

@ -65,9 +65,8 @@ impl Data {
//self.index_controller.update_status(index, uid) //self.index_controller.update_status(index, uid)
} }
pub fn get_updates_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>> { pub async fn get_updates_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>> {
todo!() self.index_controller.all_update_status(index.as_ref().to_string()).await
//self.index_controller.all_update_status(index)
} }
pub fn update_index( pub fn update_index(

View File

@ -107,7 +107,7 @@ impl IndexController {
} }
pub async fn clear_documents(&self, index: String) -> anyhow::Result<UpdateStatus> { pub async fn clear_documents(&self, index: String) -> anyhow::Result<UpdateStatus> {
let uuid = self.uuid_resolver.resolve(index).await.unwrap().unwrap(); let uuid = self.uuid_resolver.resolve(index).await?.unwrap();
let meta = UpdateMeta::ClearDocuments; let meta = UpdateMeta::ClearDocuments;
let (_, receiver) = mpsc::channel(1); let (_, receiver) = mpsc::channel(1);
let status = self.update_handle.update(meta, receiver, uuid).await?; let status = self.update_handle.update(meta, receiver, uuid).await?;
@ -154,8 +154,12 @@ impl IndexController {
todo!() todo!()
} }
fn all_update_status(&self, index: String) -> anyhow::Result<Vec<UpdateStatus>> { pub async fn all_update_status(&self, index: String) -> anyhow::Result<Vec<UpdateStatus>> {
todo!() let uuid = self.uuid_resolver
.resolve(index).await?
.context("index not found")?;
let result = self.update_handle.get_all_updates_status(uuid).await?;
Ok(result)
} }
pub fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> { pub fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {

View File

@ -1,7 +1,7 @@
use std::collections::{hash_map::Entry, HashMap};
use std::fs::create_dir_all; use std::fs::create_dir_all;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::collections::{HashMap, hash_map::Entry};
use super::index_actor::IndexActorHandle; use super::index_actor::IndexActorHandle;
use log::info; use log::info;
@ -10,6 +10,7 @@ use tokio::fs::File;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid; use uuid::Uuid;
use itertools::Itertools;
use crate::index::UpdateResult; use crate::index::UpdateResult;
use crate::index_controller::{UpdateMeta, UpdateStatus}; use crate::index_controller::{UpdateMeta, UpdateStatus};
@ -22,6 +23,8 @@ type PayloadData<D> = std::result::Result<D, Box<dyn std::error::Error + Sync +
pub enum UpdateError { pub enum UpdateError {
#[error("error with update: {0}")] #[error("error with update: {0}")]
Error(Box<dyn std::error::Error + Sync + Send + 'static>), Error(Box<dyn std::error::Error + Sync + Send + 'static>),
#[error("Index {0} doesn't exist.")]
UnexistingIndex(Uuid),
} }
enum UpdateMsg<D> { enum UpdateMsg<D> {
@ -46,6 +49,7 @@ struct UpdateActor<D, S> {
#[async_trait::async_trait] #[async_trait::async_trait]
trait UpdateStoreStore { trait UpdateStoreStore {
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>>; async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>>;
async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>>;
} }
impl<D, S> UpdateActor<D, S> impl<D, S> UpdateActor<D, S>
@ -53,24 +57,16 @@ where
D: AsRef<[u8]> + Sized + 'static, D: AsRef<[u8]> + Sized + 'static,
S: UpdateStoreStore, S: UpdateStoreStore,
{ {
fn new( fn new(store: S, inbox: mpsc::Receiver<UpdateMsg<D>>, path: impl AsRef<Path>) -> Self {
store: S,
inbox: mpsc::Receiver<UpdateMsg<D>>,
path: impl AsRef<Path>,
) -> Self {
let path = path.as_ref().to_owned().join("update_files"); let path = path.as_ref().to_owned().join("update_files");
create_dir_all(&path).unwrap(); create_dir_all(&path).unwrap();
Self { Self { store, inbox, path }
store,
inbox,
path,
}
} }
async fn run(mut self) { async fn run(mut self) {
use UpdateMsg::*; use UpdateMsg::*;
info!("started update actor."); info!("Started update actor.");
loop { loop {
match self.inbox.recv().await { match self.inbox.recv().await {
@ -79,8 +75,12 @@ where
meta, meta,
data, data,
ret, ret,
}) => self.handle_update(uuid, meta, data, ret).await, }) => {
Some(ListUpdates { uuid, ret }) => self.handle_list_updates(uuid, ret).await, let _ = ret.send(self.handle_update(uuid, meta, data).await);
}
Some(ListUpdates { uuid, ret }) => {
let _ = ret.send(self.handle_list_updates(uuid).await);
} ,
None => {} None => {}
} }
} }
@ -91,45 +91,68 @@ where
uuid: Uuid, uuid: Uuid,
meta: UpdateMeta, meta: UpdateMeta,
mut payload: mpsc::Receiver<PayloadData<D>>, mut payload: mpsc::Receiver<PayloadData<D>>,
ret: oneshot::Sender<Result<UpdateStatus>>, ) -> Result<UpdateStatus> {
) { let update_store = self.store.get_or_create(uuid).await?;
let update_store = self.store.get_or_create(uuid).await.unwrap();
let update_file_id = uuid::Uuid::new_v4(); let update_file_id = uuid::Uuid::new_v4();
let path = self.path.join(format!("update_{}", update_file_id)); let path = self.path.join(format!("update_{}", update_file_id));
let mut file = File::create(&path).await.unwrap(); let mut file = File::create(&path).await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
while let Some(bytes) = payload.recv().await { while let Some(bytes) = payload.recv().await {
match bytes { match bytes {
Ok(bytes) => { Ok(bytes) => {
file.write_all(bytes.as_ref()).await; file.write_all(bytes.as_ref()).await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
} }
Err(e) => { Err(e) => {
ret.send(Err(UpdateError::Error(e))); return Err(UpdateError::Error(e));
return;
} }
} }
} }
file.flush().await; file.flush().await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let file = file.into_std().await; let file = file.into_std().await;
let result = tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = update_store let result = update_store
.register_update(meta, path, uuid) .register_update(meta, path, uuid)
.map(|pending| UpdateStatus::Pending(pending)) .map(|pending| UpdateStatus::Pending(pending))
.map_err(|e| UpdateError::Error(Box::new(e))); .map_err(|e| UpdateError::Error(Box::new(e)));
let _ = ret.send(result); result
}) })
.await; .await
.map_err(|e| UpdateError::Error(Box::new(e)))?
} }
async fn handle_list_updates( async fn handle_list_updates(
&self, &self,
uuid: Uuid, uuid: Uuid,
ret: oneshot::Sender<Result<Vec<UpdateStatus>>>, ) -> Result<Vec<UpdateStatus>> {
) { let store = self.store.get(&uuid).await?;
todo!() tokio::task::spawn_blocking(move || {
let result = match store {
Some(update_store) => {
let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| {
Ok(processing
.map(UpdateStatus::from)
.into_iter()
.chain(pending.filter_map(|p| p.ok()).map(|(_, u)| UpdateStatus::from(u)))
.chain(aborted.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u)))
.chain(processed.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u)))
.chain(failed.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u)))
.sorted_by(|a, b| a.id().cmp(&b.id()))
.collect())
})
.map_err(|e| UpdateError::Error(Box::new(e)))?;
Ok(updates)
}
None => Err(UpdateError::UnexistingIndex(uuid)),
};
result
}).await
.map_err(|e| UpdateError::Error(Box::new(e)))?
} }
} }
@ -169,6 +192,13 @@ where
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.") receiver.await.expect("update actor killed.")
} }
pub async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::ListUpdates { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
} }
struct MapUpdateStoreStore { struct MapUpdateStoreStore {
@ -181,7 +211,11 @@ impl MapUpdateStoreStore {
fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> Self { fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> Self {
let db = Arc::new(RwLock::new(HashMap::new())); let db = Arc::new(RwLock::new(HashMap::new()));
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
Self { db, index_handle, path } Self {
db,
index_handle,
path,
}
} }
} }
@ -197,13 +231,16 @@ impl UpdateStoreStore for MapUpdateStoreStore {
let index_handle = self.index_handle.clone(); let index_handle = self.index_handle.clone();
let store = UpdateStore::open(options, &path, move |meta, file| { let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle.update(meta, file)) futures::executor::block_on(index_handle.update(meta, file))
}).unwrap(); })
.unwrap();
let store = e.insert(store); let store = e.insert(store);
Ok(store.clone()) Ok(store.clone())
} }
Entry::Occupied(e) => { Entry::Occupied(e) => Ok(e.get().clone()),
Ok(e.get().clone())
}
} }
} }
async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>> {
Ok(self.db.read().await.get(uuid).cloned())
}
} }

View File

@ -154,7 +154,7 @@ async fn get_all_updates_status(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let result = data.get_updates_status(&path.index_uid); let result = data.get_updates_status(&path.index_uid).await;
match result { match result {
Ok(metas) => { Ok(metas) => {
let json = serde_json::to_string(&metas).unwrap(); let json = serde_json::to_string(&metas).unwrap();