fix stats

This commit is contained in:
Marin Postma 2021-04-14 18:55:04 +02:00
parent 33830d5ecf
commit ee675eadf1
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
9 changed files with 97 additions and 117 deletions

View File

@ -1,12 +1,10 @@
use std::collections::HashMap;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use chrono::{DateTime, Utc};
use sha2::Digest; use sha2::Digest;
use crate::index::Settings; use crate::index::Settings;
use crate::index_controller::{IndexController, IndexStats}; use crate::index_controller::{IndexController, IndexStats, Stats};
use crate::index_controller::{IndexMetadata, IndexSettings}; use crate::index_controller::{IndexMetadata, IndexSettings};
use crate::option::Opt; use crate::option::Opt;
@ -39,13 +37,6 @@ pub struct ApiKeys {
pub master: Option<String>, pub master: Option<String>,
} }
#[derive(Default)]
pub struct Stats {
pub database_size: u64,
pub last_update: Option<DateTime<Utc>>,
pub indexes: HashMap<String, IndexStats>,
}
impl ApiKeys { impl ApiKeys {
pub fn generate_missing_api_keys(&mut self) { pub fn generate_missing_api_keys(&mut self) {
if let Some(master_key) = &self.master { if let Some(master_key) = &self.master {
@ -114,32 +105,13 @@ impl Data {
} }
pub async fn get_index_stats(&self, uid: String) -> anyhow::Result<IndexStats> { pub async fn get_index_stats(&self, uid: String) -> anyhow::Result<IndexStats> {
Ok(self.index_controller.get_stats(uid).await?) Ok(self.index_controller.get_index_stats(uid).await?)
} }
pub async fn get_stats(&self) -> anyhow::Result<Stats> { pub async fn get_all_stats(&self) -> anyhow::Result<Stats> {
let mut stats = Stats::default(); Ok(self.index_controller.get_all_stats().await?)
stats.database_size += self.index_controller.get_uuids_size().await?;
for index in self.index_controller.list_indexes().await? {
let index_stats = self.index_controller.get_stats(index.uid.clone()).await?;
stats.database_size += index_stats.size;
stats.database_size += self
.index_controller
.get_updates_size()
.await?;
stats.last_update = Some(match stats.last_update {
Some(last_update) => last_update.max(index.meta.updated_at),
None => index.meta.updated_at,
});
stats.indexes.insert(index.uid, index_stats);
} }
Ok(stats)
}
#[inline] #[inline]
pub fn http_payload_size_limit(&self) -> usize { pub fn http_payload_size_limit(&self) -> usize {

View File

@ -6,7 +6,7 @@ use async_stream::stream;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use heed::CompactionOption; use heed::CompactionOption;
use log::debug; use log::debug;
use tokio::sync::{mpsc, RwLock}; use tokio::sync::mpsc;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use uuid::Uuid; use uuid::Uuid;
@ -24,7 +24,6 @@ pub const CONCURRENT_INDEX_MSG: usize = 10;
pub struct IndexActor<S> { pub struct IndexActor<S> {
receiver: Option<mpsc::Receiver<IndexMsg>>, receiver: Option<mpsc::Receiver<IndexMsg>>,
update_handler: Arc<UpdateHandler>, update_handler: Arc<UpdateHandler>,
processing: RwLock<Option<Uuid>>,
store: S, store: S,
} }
@ -38,7 +37,6 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
receiver, receiver,
store, store,
update_handler, update_handler,
processing: RwLock::new(None),
}) })
} }
@ -174,9 +172,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.map_err(|e| IndexError::Error(e.into())) .map_err(|e| IndexError::Error(e.into()))
}; };
*self.processing.write().await = Some(uuid);
let result = get_result().await; let result = get_result().await;
*self.processing.write().await = None;
result result
} }
@ -330,16 +326,13 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.await? .await?
.ok_or(IndexError::UnexistingIndex)?; .ok_or(IndexError::UnexistingIndex)?;
let processing = self.processing.read().await;
let is_indexing = *processing == Some(uuid);
spawn_blocking(move || { spawn_blocking(move || {
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
Ok(IndexStats { Ok(IndexStats {
size: index.size(), size: index.size(),
number_of_documents: index.number_of_documents(&rtxn)?, number_of_documents: index.number_of_documents(&rtxn)?,
is_indexing, is_indexing: None,
fields_distribution: index.fields_distribution(&rtxn)?, fields_distribution: index.fields_distribution(&rtxn)?,
}) })
}) })

View File

@ -1,9 +1,11 @@
use std::collections::BTreeMap;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use actix_web::web::{Bytes, Payload}; use actix_web::web::{Bytes, Payload};
use anyhow::bail; use anyhow::bail;
use chrono::{DateTime, Utc};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use log::info; use log::info;
use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateFormat};
@ -37,6 +39,8 @@ pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct IndexMetadata { pub struct IndexMetadata {
#[serde(skip)]
pub uuid: Uuid,
pub uid: String, pub uid: String,
name: String, name: String,
#[serde(flatten)] #[serde(flatten)]
@ -63,11 +67,12 @@ pub struct IndexSettings {
pub primary_key: Option<String>, pub primary_key: Option<String>,
} }
#[derive(Clone, Debug)] #[derive(Serialize)]
pub struct IndexStats { pub struct IndexStats {
#[serde(skip)]
pub size: u64, pub size: u64,
pub number_of_documents: u64, pub number_of_documents: u64,
pub is_indexing: bool, pub is_indexing: Option<bool>,
pub fields_distribution: FieldsDistribution, pub fields_distribution: FieldsDistribution,
} }
@ -77,6 +82,13 @@ pub struct IndexController {
update_handle: update_actor::UpdateActorHandleImpl<Bytes>, update_handle: update_actor::UpdateActorHandleImpl<Bytes>,
} }
#[derive(Serialize)]
pub struct Stats {
pub database_size: u64,
pub last_update: Option<DateTime<Utc>>,
pub indexes: BTreeMap<String, IndexStats>,
}
impl IndexController { impl IndexController {
pub fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> { pub fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> {
let index_size = options.max_mdb_size.get_bytes() as usize; let index_size = options.max_mdb_size.get_bytes() as usize;
@ -166,6 +178,7 @@ impl IndexController {
Err(UuidError::UnexistingIndex(name)) => { Err(UuidError::UnexistingIndex(name)) => {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let status = perform_update(uuid).await?; let status = perform_update(uuid).await?;
self.index_handle.create_index(uuid, None).await?;
self.uuid_resolver.insert(name, uuid).await?; self.uuid_resolver.insert(name, uuid).await?;
Ok(status) Ok(status)
} }
@ -218,6 +231,7 @@ impl IndexController {
Err(UuidError::UnexistingIndex(name)) if create => { Err(UuidError::UnexistingIndex(name)) if create => {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let status = perform_udpate(uuid).await?; let status = perform_udpate(uuid).await?;
self.index_handle.create_index(uuid, None).await?;
self.uuid_resolver.insert(name, uuid).await?; self.uuid_resolver.insert(name, uuid).await?;
Ok(status) Ok(status)
} }
@ -234,6 +248,7 @@ impl IndexController {
let uuid = self.uuid_resolver.create(uid.clone()).await?; let uuid = self.uuid_resolver.create(uid.clone()).await?;
let meta = self.index_handle.create_index(uuid, primary_key).await?; let meta = self.index_handle.create_index(uuid, primary_key).await?;
let meta = IndexMetadata { let meta = IndexMetadata {
uuid,
name: uid.clone(), name: uid.clone(),
uid, uid,
meta, meta,
@ -269,6 +284,7 @@ impl IndexController {
for (uid, uuid) in uuids { for (uid, uuid) in uuids {
let meta = self.index_handle.get_index_meta(uuid).await?; let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata { let meta = IndexMetadata {
uuid,
name: uid.clone(), name: uid.clone(),
uid, uid,
meta, meta,
@ -326,6 +342,7 @@ impl IndexController {
let uuid = self.uuid_resolver.get(uid.clone()).await?; let uuid = self.uuid_resolver.get(uid.clone()).await?;
let meta = self.index_handle.update_index(uuid, index_settings).await?; let meta = self.index_handle.update_index(uuid, index_settings).await?;
let meta = IndexMetadata { let meta = IndexMetadata {
uuid,
name: uid.clone(), name: uid.clone(),
uid, uid,
meta, meta,
@ -343,6 +360,7 @@ impl IndexController {
let uuid = self.uuid_resolver.get(uid.clone()).await?; let uuid = self.uuid_resolver.get(uid.clone()).await?;
let meta = self.index_handle.get_index_meta(uuid).await?; let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata { let meta = IndexMetadata {
uuid,
name: uid.clone(), name: uid.clone(),
uid, uid,
meta, meta,
@ -350,19 +368,43 @@ impl IndexController {
Ok(meta) Ok(meta)
} }
pub async fn get_stats(&self, uid: String) -> anyhow::Result<IndexStats> {
let uuid = self.uuid_resolver.get(uid.clone()).await?;
Ok(self.index_handle.get_index_stats(uuid).await?)
}
pub async fn get_updates_size(&self) -> anyhow::Result<u64> {
Ok(self.update_handle.get_size().await?)
}
pub async fn get_uuids_size(&self) -> anyhow::Result<u64> { pub async fn get_uuids_size(&self) -> anyhow::Result<u64> {
Ok(self.uuid_resolver.get_size().await?) Ok(self.uuid_resolver.get_size().await?)
} }
pub async fn get_index_stats(&self, uid: String) -> anyhow::Result<IndexStats> {
let uuid = self.uuid_resolver.get(uid).await?;
let update_infos = self.update_handle.get_info().await?;
let mut stats = self.index_handle.get_index_stats(uuid).await?;
stats.is_indexing = (Some(uuid) == update_infos.processing).into();
Ok(stats)
}
pub async fn get_all_stats(&self) -> anyhow::Result<Stats> {
let update_infos = self.update_handle.get_info().await?;
let mut database_size = self.get_uuids_size().await? + update_infos.size;
let mut last_update: Option<DateTime<_>> = None;
let mut indexes = BTreeMap::new();
for index in self.list_indexes().await? {
let mut index_stats = self.index_handle.get_index_stats(index.uuid).await?;
database_size += index_stats.size;
last_update = last_update.map_or(Some(index.meta.updated_at), |last| {
Some(last.max(index.meta.updated_at))
});
index_stats.is_indexing = (Some(index.uuid) == update_infos.processing).into();
indexes.insert(index.uid, index_stats);
}
Ok(Stats {
database_size,
last_update,
indexes,
})
}
} }
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T { pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {

View File

@ -10,7 +10,7 @@ use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use futures::StreamExt; use futures::StreamExt;
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore}; use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo};
use crate::index_controller::index_actor::{IndexActorHandle, CONCURRENT_INDEX_MSG}; use crate::index_controller::index_actor::{IndexActorHandle, CONCURRENT_INDEX_MSG};
use crate::index_controller::{UpdateMeta, UpdateStatus}; use crate::index_controller::{UpdateMeta, UpdateStatus};
@ -81,8 +81,8 @@ where
Some(Snapshot { uuids, path, ret }) => { Some(Snapshot { uuids, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuids, path).await); let _ = ret.send(self.handle_snapshot(uuids, path).await);
} }
Some(GetSize { ret }) => { Some(GetInfo { ret }) => {
let _ = ret.send(self.handle_get_size().await); let _ = ret.send(self.handle_get_info().await);
} }
None => break, None => break,
} }
@ -232,17 +232,27 @@ where
Ok(()) Ok(())
} }
async fn handle_get_size(&self) -> Result<u64> { async fn handle_get_info(&self) -> Result<UpdateStoreInfo> {
let update_store = self.store.clone(); let update_store = self.store.clone();
let size = tokio::task::spawn_blocking(move || -> anyhow::Result<u64> { let processing = self.store.processing.clone();
let info = tokio::task::spawn_blocking(move || -> anyhow::Result<UpdateStoreInfo> {
let txn = update_store.env.read_txn()?; let txn = update_store.env.read_txn()?;
let size = update_store.get_size(&txn)?;
update_store.get_size(&txn) let processing = processing
.read()
.as_ref()
.map(|(uuid, _)| uuid)
.cloned();
let info = UpdateStoreInfo {
size, processing
};
Ok(info)
}) })
.await .await
.map_err(|e| UpdateError::Error(e.into()))? .map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?; .map_err(|e| UpdateError::Error(e.into()))?;
Ok(size)
Ok(info)
} }
} }

View File

@ -6,7 +6,7 @@ use uuid::Uuid;
use crate::index_controller::IndexActorHandle; use crate::index_controller::IndexActorHandle;
use super::{ use super::{
PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStatus, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStatus, UpdateStoreInfo
}; };
#[derive(Clone)] #[derive(Clone)]
@ -70,9 +70,9 @@ where
receiver.await.expect("update actor killed.") receiver.await.expect("update actor killed.")
} }
async fn get_size(&self) -> Result<u64> { async fn get_info(&self) -> Result<UpdateStoreInfo> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetSize { ret }; let msg = UpdateMsg::GetInfo { ret };
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.") receiver.await.expect("update actor killed.")
} }

View File

@ -3,7 +3,7 @@ use std::path::PathBuf;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use uuid::Uuid; use uuid::Uuid;
use super::{PayloadData, Result, UpdateMeta, UpdateStatus}; use super::{PayloadData, Result, UpdateMeta, UpdateStatus, UpdateStoreInfo};
pub enum UpdateMsg<D> { pub enum UpdateMsg<D> {
Update { Update {
@ -30,7 +30,7 @@ pub enum UpdateMsg<D> {
path: PathBuf, path: PathBuf,
ret: oneshot::Sender<Result<()>>, ret: oneshot::Sender<Result<()>>,
}, },
GetSize { GetInfo {
ret: oneshot::Sender<Result<u64>>, ret: oneshot::Sender<Result<UpdateStoreInfo>>,
}, },
} }

View File

@ -32,6 +32,14 @@ pub enum UpdateError {
UnexistingUpdate(u64), UnexistingUpdate(u64),
} }
pub struct UpdateStoreInfo {
/// Size of the update store in bytes.
pub size: u64,
/// Uuid of the currently processing update if it exists
pub processing: Option<Uuid>,
}
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, automock(type Data=Vec<u8>;))] #[cfg_attr(test, automock(type Data=Vec<u8>;))]
pub trait UpdateActorHandle { pub trait UpdateActorHandle {
@ -41,7 +49,7 @@ pub trait UpdateActorHandle {
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus>; async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus>;
async fn delete(&self, uuid: Uuid) -> Result<()>; async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, uuids: Vec<Uuid>, path: PathBuf) -> Result<()>; async fn snapshot(&self, uuids: Vec<Uuid>, path: PathBuf) -> Result<()>;
async fn get_size(&self) -> Result<u64>; async fn get_info(&self) -> Result<UpdateStoreInfo>;
async fn update( async fn update(
&self, &self,
meta: UpdateMeta, meta: UpdateMeta,

View File

@ -67,7 +67,7 @@ pub struct UpdateStore<M, N, E> {
processed_meta: Database<ByteSlice, SerdeJson<Processed<M, N>>>, processed_meta: Database<ByteSlice, SerdeJson<Processed<M, N>>>,
failed_meta: Database<ByteSlice, SerdeJson<Failed<M, E>>>, failed_meta: Database<ByteSlice, SerdeJson<Failed<M, E>>>,
aborted_meta: Database<ByteSlice, SerdeJson<Aborted<M>>>, aborted_meta: Database<ByteSlice, SerdeJson<Aborted<M>>>,
processing: Arc<RwLock<Option<(Uuid, Processing<M>)>>>, pub processing: Arc<RwLock<Option<(Uuid, Processing<M>)>>>,
notification_sender: mpsc::Sender<()>, notification_sender: mpsc::Sender<()>,
/// A lock on the update loop. This is meant to prevent a snapshot to occur while an update is /// A lock on the update loop. This is meant to prevent a snapshot to occur while an update is
/// processing, while not preventing writes all together during an update /// processing, while not preventing writes all together during an update

View File

@ -1,15 +1,10 @@
use std::collections::BTreeMap;
use actix_web::get; use actix_web::get;
use actix_web::web; use actix_web::web;
use actix_web::HttpResponse; use actix_web::HttpResponse;
use chrono::{DateTime, Utc};
use serde::Serialize; use serde::Serialize;
use crate::data::Stats;
use crate::error::ResponseError; use crate::error::ResponseError;
use crate::helpers::Authentication; use crate::helpers::Authentication;
use crate::index_controller::IndexStats;
use crate::routes::IndexParam; use crate::routes::IndexParam;
use crate::Data; use crate::Data;
@ -19,59 +14,19 @@ pub fn services(cfg: &mut web::ServiceConfig) {
.service(get_version); .service(get_version);
} }
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct IndexStatsResponse {
number_of_documents: u64,
is_indexing: bool,
fields_distribution: BTreeMap<String, u64>,
}
impl From<IndexStats> for IndexStatsResponse {
fn from(stats: IndexStats) -> Self {
Self {
number_of_documents: stats.number_of_documents,
is_indexing: stats.is_indexing,
fields_distribution: stats.fields_distribution.into_iter().collect(),
}
}
}
#[get("/indexes/{index_uid}/stats", wrap = "Authentication::Private")] #[get("/indexes/{index_uid}/stats", wrap = "Authentication::Private")]
async fn get_index_stats( async fn get_index_stats(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let response: IndexStatsResponse = data.get_index_stats(path.index_uid.clone()).await?.into(); let response = data.get_index_stats(path.index_uid.clone()).await?;
Ok(HttpResponse::Ok().json(response)) Ok(HttpResponse::Ok().json(response))
} }
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct StatsResponse {
database_size: u64,
last_update: Option<DateTime<Utc>>,
indexes: BTreeMap<String, IndexStatsResponse>,
}
impl From<Stats> for StatsResponse {
fn from(stats: Stats) -> Self {
Self {
database_size: stats.database_size,
last_update: stats.last_update,
indexes: stats
.indexes
.into_iter()
.map(|(uid, index_stats)| (uid, index_stats.into()))
.collect(),
}
}
}
#[get("/stats", wrap = "Authentication::Private")] #[get("/stats", wrap = "Authentication::Private")]
async fn get_stats(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> { async fn get_stats(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
let response: StatsResponse = data.get_stats().await?.into(); let response = data.get_all_stats().await?;
Ok(HttpResponse::Ok().json(response)) Ok(HttpResponse::Ok().json(response))
} }