feat(http): implement is_indexing for stats

This commit is contained in:
Alexey Shekhirin 2021-04-02 14:44:35 +03:00
parent 09d9a29176
commit 87412f63ef
No known key found for this signature in database
GPG Key ID: AF9A26AA133B5B98
9 changed files with 65 additions and 22 deletions

View File

@ -348,7 +348,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(IndexStats { Ok(IndexStats {
size: index.size()?, size: index.size()?,
number_of_documents: index.number_of_documents(&rtxn)?, number_of_documents: index.number_of_documents(&rtxn)?,
is_indexing: false, // TODO check actual is_indexing is_indexing: false, // We set this field in src/index_controller/mod.rs get_stats
fields_distribution: index.fields_distribution(&rtxn)?, fields_distribution: index.fields_distribution(&rtxn)?,
}) })
}) })

View File

@ -354,7 +354,13 @@ impl IndexController {
pub async fn get_stats(&self, uid: String) -> anyhow::Result<IndexStats> { pub async fn get_stats(&self, uid: String) -> anyhow::Result<IndexStats> {
let uuid = self.uuid_resolver.get(uid.clone()).await?; let uuid = self.uuid_resolver.get(uid.clone()).await?;
Ok(self.index_handle.get_index_stats(uuid).await?) let stats = self.index_handle.get_index_stats(uuid);
let is_indexing = self.update_handle.is_locked(uuid);
Ok(IndexStats {
is_indexing: is_indexing.await?,
..stats.await?
})
} }
} }

View File

@ -72,6 +72,9 @@ where
Some(Snapshot { uuid, path, ret }) => { Some(Snapshot { uuid, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuid, path).await); let _ = ret.send(self.handle_snapshot(uuid, path).await);
} }
Some(IsLocked { uuid, ret }) => {
let _ = ret.send(self.handle_is_locked(uuid).await);
}
None => break, None => break,
} }
} }
@ -223,4 +226,14 @@ where
Ok(()) Ok(())
} }
async fn handle_is_locked(&self, uuid: Uuid) -> Result<bool> {
let store = self
.store
.get(uuid)
.await?
.ok_or(UpdateError::UnexistingIndex(uuid))?;
Ok(store.update_lock.is_locked())
}
} }

View File

@ -3,11 +3,12 @@ use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use uuid::Uuid; use uuid::Uuid;
use crate::index_controller::IndexActorHandle;
use super::{ use super::{
MapUpdateStoreStore, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, MapUpdateStoreStore, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta,
UpdateMsg, UpdateStatus, UpdateMsg, UpdateStatus,
}; };
use crate::index_controller::IndexActorHandle;
#[derive(Clone)] #[derive(Clone)]
pub struct UpdateActorHandleImpl<D> { pub struct UpdateActorHandleImpl<D> {
@ -36,6 +37,7 @@ where
Ok(Self { sender }) Ok(Self { sender })
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<D> UpdateActorHandle for UpdateActorHandleImpl<D> impl<D> UpdateActorHandle for UpdateActorHandleImpl<D>
where where
@ -43,29 +45,12 @@ where
{ {
type Data = D; type Data = D;
async fn update(
&self,
meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<Self::Data>>,
uuid: Uuid,
) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Update {
uuid,
data,
meta,
ret,
};
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> { async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::ListUpdates { uuid, ret }; let msg = UpdateMsg::ListUpdates { uuid, ret };
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.") receiver.await.expect("update actor killed.")
} }
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> { async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetUpdate { uuid, id, ret }; let msg = UpdateMsg::GetUpdate { uuid, id, ret };
@ -93,4 +78,28 @@ where
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.") receiver.await.expect("update actor killed.")
} }
async fn update(
&self,
meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<Self::Data>>,
uuid: Uuid,
) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Update {
uuid,
data,
meta,
ret,
};
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn is_locked(&self, uuid: Uuid) -> Result<bool> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::IsLocked { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
} }

View File

@ -34,4 +34,8 @@ pub enum UpdateMsg<D> {
path: PathBuf, path: PathBuf,
ret: oneshot::Sender<Result<()>>, ret: oneshot::Sender<Result<()>>,
}, },
IsLocked {
uuid: Uuid,
ret: oneshot::Sender<Result<bool>>,
},
} }

View File

@ -52,4 +52,5 @@ pub trait UpdateActorHandle {
data: mpsc::Receiver<PayloadData<Self::Data>>, data: mpsc::Receiver<PayloadData<Self::Data>>,
uuid: Uuid, uuid: Uuid,
) -> Result<UpdateStatus>; ) -> Result<UpdateStatus>;
async fn is_locked(&self, uuid: Uuid) -> Result<bool>;
} }

View File

@ -39,7 +39,7 @@ impl UpdateHandler {
}) })
} }
fn update_buidler(&self, update_id: u64) -> UpdateBuilder { fn update_builder(&self, update_id: u64) -> UpdateBuilder {
// We prepare the update by using the update builder. // We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new(update_id); let mut update_builder = UpdateBuilder::new(update_id);
if let Some(max_nb_chunks) = self.max_nb_chunks { if let Some(max_nb_chunks) = self.max_nb_chunks {
@ -67,7 +67,7 @@ impl UpdateHandler {
let update_id = meta.id(); let update_id = meta.id();
let update_builder = self.update_buidler(update_id); let update_builder = self.update_builder(update_id);
let result = match meta.meta() { let result = match meta.meta() {
DocumentsAddition { DocumentsAddition {

View File

@ -35,6 +35,11 @@ async fn stats() {
assert_eq!(code, 202); assert_eq!(code, 202);
assert_eq!(response["updateId"], 0); assert_eq!(response["updateId"], 0);
let (response, code) = index.stats().await;
assert_eq!(code, 200);
assert_eq!(response["isIndexing"], true);
index.wait_update_id(0).await; index.wait_update_id(0).await;
let (response, code) = index.stats().await; let (response, code) = index.stats().await;

View File

@ -56,6 +56,11 @@ async fn stats() {
assert_eq!(code, 202); assert_eq!(code, 202);
assert_eq!(response["updateId"], 0); assert_eq!(response["updateId"], 0);
let (response, code) = server.stats().await;
assert_eq!(code, 200);
assert_eq!(response["indexes"]["test"]["isIndexing"], true);
index.wait_update_id(0).await; index.wait_update_id(0).await;
let (response, code) = server.stats().await; let (response, code) = server.stats().await;