diff --git a/meilisearch-http/src/data/mod.rs b/meilisearch-http/src/data/mod.rs index ab50e83cc..60fb5ae20 100644 --- a/meilisearch-http/src/data/mod.rs +++ b/meilisearch-http/src/data/mod.rs @@ -1,12 +1,10 @@ -use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; -use chrono::{DateTime, Utc}; use sha2::Digest; use crate::index::Settings; -use crate::index_controller::{IndexController, IndexStats}; +use crate::index_controller::{IndexController, IndexStats, Stats}; use crate::index_controller::{IndexMetadata, IndexSettings}; use crate::option::Opt; @@ -39,13 +37,6 @@ pub struct ApiKeys { pub master: Option, } -#[derive(Default)] -pub struct Stats { - pub database_size: u64, - pub last_update: Option>, - pub indexes: HashMap, -} - impl ApiKeys { pub fn generate_missing_api_keys(&mut self) { if let Some(master_key) = &self.master { @@ -114,33 +105,14 @@ impl Data { } pub async fn get_index_stats(&self, uid: String) -> anyhow::Result { - Ok(self.index_controller.get_stats(uid).await?) + Ok(self.index_controller.get_index_stats(uid).await?) } - pub async fn get_stats(&self) -> anyhow::Result { - let mut stats = Stats::default(); - stats.database_size += self.index_controller.get_uuids_size().await?; - - for index in self.index_controller.list_indexes().await? { - let index_stats = self.index_controller.get_stats(index.uid.clone()).await?; - - stats.database_size += index_stats.size; - stats.database_size += self - .index_controller - .get_updates_size() - .await?; - - stats.last_update = Some(match stats.last_update { - Some(last_update) => last_update.max(index.meta.updated_at), - None => index.meta.updated_at, - }); - - stats.indexes.insert(index.uid, index_stats); - } - - Ok(stats) + pub async fn get_all_stats(&self) -> anyhow::Result { + Ok(self.index_controller.get_all_stats().await?) } + #[inline] pub fn http_payload_size_limit(&self) -> usize { self.options.http_payload_size_limit.get_bytes() as usize diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index 06aa90562..ff7064600 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -6,7 +6,7 @@ use async_stream::stream; use futures::stream::StreamExt; use heed::CompactionOption; use log::debug; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::mpsc; use tokio::task::spawn_blocking; use uuid::Uuid; @@ -24,7 +24,6 @@ pub const CONCURRENT_INDEX_MSG: usize = 10; pub struct IndexActor { receiver: Option>, update_handler: Arc, - processing: RwLock>, store: S, } @@ -38,7 +37,6 @@ impl IndexActor { receiver, store, update_handler, - processing: RwLock::new(None), }) } @@ -174,9 +172,7 @@ impl IndexActor { .map_err(|e| IndexError::Error(e.into())) }; - *self.processing.write().await = Some(uuid); let result = get_result().await; - *self.processing.write().await = None; result } @@ -330,16 +326,13 @@ impl IndexActor { .await? .ok_or(IndexError::UnexistingIndex)?; - let processing = self.processing.read().await; - let is_indexing = *processing == Some(uuid); - spawn_blocking(move || { let rtxn = index.read_txn()?; Ok(IndexStats { size: index.size(), number_of_documents: index.number_of_documents(&rtxn)?, - is_indexing, + is_indexing: None, fields_distribution: index.fields_distribution(&rtxn)?, }) }) diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 9d9ed02c1..2e93a2f7e 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -1,9 +1,11 @@ +use std::collections::BTreeMap; use std::path::Path; use std::sync::Arc; use std::time::Duration; use actix_web::web::{Bytes, Payload}; use anyhow::bail; +use chrono::{DateTime, Utc}; use futures::stream::StreamExt; use log::info; use milli::update::{IndexDocumentsMethod, UpdateFormat}; @@ -37,6 +39,8 @@ pub type UpdateStatus = updates::UpdateStatus; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct IndexMetadata { + #[serde(skip)] + pub uuid: Uuid, pub uid: String, name: String, #[serde(flatten)] @@ -63,11 +67,12 @@ pub struct IndexSettings { pub primary_key: Option, } -#[derive(Clone, Debug)] +#[derive(Serialize)] pub struct IndexStats { + #[serde(skip)] pub size: u64, pub number_of_documents: u64, - pub is_indexing: bool, + pub is_indexing: Option, pub fields_distribution: FieldsDistribution, } @@ -77,6 +82,13 @@ pub struct IndexController { update_handle: update_actor::UpdateActorHandleImpl, } +#[derive(Serialize)] +pub struct Stats { + pub database_size: u64, + pub last_update: Option>, + pub indexes: BTreeMap, +} + impl IndexController { pub fn new(path: impl AsRef, options: &Opt) -> anyhow::Result { let index_size = options.max_mdb_size.get_bytes() as usize; @@ -166,6 +178,7 @@ impl IndexController { Err(UuidError::UnexistingIndex(name)) => { let uuid = Uuid::new_v4(); let status = perform_update(uuid).await?; + self.index_handle.create_index(uuid, None).await?; self.uuid_resolver.insert(name, uuid).await?; Ok(status) } @@ -218,6 +231,7 @@ impl IndexController { Err(UuidError::UnexistingIndex(name)) if create => { let uuid = Uuid::new_v4(); let status = perform_udpate(uuid).await?; + self.index_handle.create_index(uuid, None).await?; self.uuid_resolver.insert(name, uuid).await?; Ok(status) } @@ -234,6 +248,7 @@ impl IndexController { let uuid = self.uuid_resolver.create(uid.clone()).await?; let meta = self.index_handle.create_index(uuid, primary_key).await?; let meta = IndexMetadata { + uuid, name: uid.clone(), uid, meta, @@ -269,6 +284,7 @@ impl IndexController { for (uid, uuid) in uuids { let meta = self.index_handle.get_index_meta(uuid).await?; let meta = IndexMetadata { + uuid, name: uid.clone(), uid, meta, @@ -326,6 +342,7 @@ impl IndexController { let uuid = self.uuid_resolver.get(uid.clone()).await?; let meta = self.index_handle.update_index(uuid, index_settings).await?; let meta = IndexMetadata { + uuid, name: uid.clone(), uid, meta, @@ -343,6 +360,7 @@ impl IndexController { let uuid = self.uuid_resolver.get(uid.clone()).await?; let meta = self.index_handle.get_index_meta(uuid).await?; let meta = IndexMetadata { + uuid, name: uid.clone(), uid, meta, @@ -350,19 +368,43 @@ impl IndexController { Ok(meta) } - pub async fn get_stats(&self, uid: String) -> anyhow::Result { - let uuid = self.uuid_resolver.get(uid.clone()).await?; - - Ok(self.index_handle.get_index_stats(uuid).await?) - } - - pub async fn get_updates_size(&self) -> anyhow::Result { - Ok(self.update_handle.get_size().await?) - } - pub async fn get_uuids_size(&self) -> anyhow::Result { Ok(self.uuid_resolver.get_size().await?) } + + pub async fn get_index_stats(&self, uid: String) -> anyhow::Result { + let uuid = self.uuid_resolver.get(uid).await?; + let update_infos = self.update_handle.get_info().await?; + let mut stats = self.index_handle.get_index_stats(uuid).await?; + stats.is_indexing = (Some(uuid) == update_infos.processing).into(); + Ok(stats) + } + + pub async fn get_all_stats(&self) -> anyhow::Result { + let update_infos = self.update_handle.get_info().await?; + let mut database_size = self.get_uuids_size().await? + update_infos.size; + let mut last_update: Option> = None; + let mut indexes = BTreeMap::new(); + + for index in self.list_indexes().await? { + let mut index_stats = self.index_handle.get_index_stats(index.uuid).await?; + database_size += index_stats.size; + + last_update = last_update.map_or(Some(index.meta.updated_at), |last| { + Some(last.max(index.meta.updated_at)) + }); + + index_stats.is_indexing = (Some(index.uuid) == update_infos.processing).into(); + + indexes.insert(index.uid, index_stats); + } + + Ok(Stats { + database_size, + last_update, + indexes, + }) + } } pub async fn get_arc_ownership_blocking(mut item: Arc) -> T { diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index 2a752d0b3..0b5e88270 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -10,7 +10,7 @@ use tokio::sync::mpsc; use uuid::Uuid; use futures::StreamExt; -use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore}; +use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo}; use crate::index_controller::index_actor::{IndexActorHandle, CONCURRENT_INDEX_MSG}; use crate::index_controller::{UpdateMeta, UpdateStatus}; @@ -81,8 +81,8 @@ where Some(Snapshot { uuids, path, ret }) => { let _ = ret.send(self.handle_snapshot(uuids, path).await); } - Some(GetSize { ret }) => { - let _ = ret.send(self.handle_get_size().await); + Some(GetInfo { ret }) => { + let _ = ret.send(self.handle_get_info().await); } None => break, } @@ -232,17 +232,27 @@ where Ok(()) } - async fn handle_get_size(&self) -> Result { + async fn handle_get_info(&self) -> Result { let update_store = self.store.clone(); - let size = tokio::task::spawn_blocking(move || -> anyhow::Result { + let processing = self.store.processing.clone(); + let info = tokio::task::spawn_blocking(move || -> anyhow::Result { let txn = update_store.env.read_txn()?; - - update_store.get_size(&txn) + let size = update_store.get_size(&txn)?; + let processing = processing + .read() + .as_ref() + .map(|(uuid, _)| uuid) + .cloned(); + let info = UpdateStoreInfo { + size, processing + }; + Ok(info) }) .await .map_err(|e| UpdateError::Error(e.into()))? .map_err(|e| UpdateError::Error(e.into()))?; - Ok(size) + + Ok(info) } } diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index 5c5a3e051..f79ef0e4e 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -6,7 +6,7 @@ use uuid::Uuid; use crate::index_controller::IndexActorHandle; use super::{ - PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStatus, + PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStatus, UpdateStoreInfo }; #[derive(Clone)] @@ -70,9 +70,9 @@ where receiver.await.expect("update actor killed.") } - async fn get_size(&self) -> Result { + async fn get_info(&self) -> Result { let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::GetSize { ret }; + let msg = UpdateMsg::GetInfo { ret }; let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs index 0f0005862..6082ad280 100644 --- a/meilisearch-http/src/index_controller/update_actor/message.rs +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use super::{PayloadData, Result, UpdateMeta, UpdateStatus}; +use super::{PayloadData, Result, UpdateMeta, UpdateStatus, UpdateStoreInfo}; pub enum UpdateMsg { Update { @@ -30,7 +30,7 @@ pub enum UpdateMsg { path: PathBuf, ret: oneshot::Sender>, }, - GetSize { - ret: oneshot::Sender>, + GetInfo { + ret: oneshot::Sender>, }, } diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index 0f815dbf3..faeb140a6 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -32,6 +32,14 @@ pub enum UpdateError { UnexistingUpdate(u64), } +pub struct UpdateStoreInfo { + /// Size of the update store in bytes. + pub size: u64, + /// Uuid of the currently processing update if it exists + pub processing: Option, + +} + #[async_trait::async_trait] #[cfg_attr(test, automock(type Data=Vec;))] pub trait UpdateActorHandle { @@ -41,7 +49,7 @@ pub trait UpdateActorHandle { async fn update_status(&self, uuid: Uuid, id: u64) -> Result; async fn delete(&self, uuid: Uuid) -> Result<()>; async fn snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()>; - async fn get_size(&self) -> Result; + async fn get_info(&self) -> Result; async fn update( &self, meta: UpdateMeta, diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index 79693ba1f..2e5350193 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -67,7 +67,7 @@ pub struct UpdateStore { processed_meta: Database>>, failed_meta: Database>>, aborted_meta: Database>>, - processing: Arc)>>>, + pub processing: Arc)>>>, notification_sender: mpsc::Sender<()>, /// A lock on the update loop. This is meant to prevent a snapshot to occur while an update is /// processing, while not preventing writes all together during an update diff --git a/meilisearch-http/src/routes/stats.rs b/meilisearch-http/src/routes/stats.rs index 226b62fcd..f2d1ddecc 100644 --- a/meilisearch-http/src/routes/stats.rs +++ b/meilisearch-http/src/routes/stats.rs @@ -1,15 +1,10 @@ -use std::collections::BTreeMap; - use actix_web::get; use actix_web::web; use actix_web::HttpResponse; -use chrono::{DateTime, Utc}; use serde::Serialize; -use crate::data::Stats; use crate::error::ResponseError; use crate::helpers::Authentication; -use crate::index_controller::IndexStats; use crate::routes::IndexParam; use crate::Data; @@ -19,59 +14,19 @@ pub fn services(cfg: &mut web::ServiceConfig) { .service(get_version); } -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct IndexStatsResponse { - number_of_documents: u64, - is_indexing: bool, - fields_distribution: BTreeMap, -} - -impl From for IndexStatsResponse { - fn from(stats: IndexStats) -> Self { - Self { - number_of_documents: stats.number_of_documents, - is_indexing: stats.is_indexing, - fields_distribution: stats.fields_distribution.into_iter().collect(), - } - } -} - #[get("/indexes/{index_uid}/stats", wrap = "Authentication::Private")] async fn get_index_stats( data: web::Data, path: web::Path, ) -> Result { - let response: IndexStatsResponse = data.get_index_stats(path.index_uid.clone()).await?.into(); + let response = data.get_index_stats(path.index_uid.clone()).await?; Ok(HttpResponse::Ok().json(response)) } -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct StatsResponse { - database_size: u64, - last_update: Option>, - indexes: BTreeMap, -} - -impl From for StatsResponse { - fn from(stats: Stats) -> Self { - Self { - database_size: stats.database_size, - last_update: stats.last_update, - indexes: stats - .indexes - .into_iter() - .map(|(uid, index_stats)| (uid, index_stats.into())) - .collect(), - } - } -} - #[get("/stats", wrap = "Authentication::Private")] async fn get_stats(data: web::Data) -> Result { - let response: StatsResponse = data.get_stats().await?.into(); + let response = data.get_all_stats().await?; Ok(HttpResponse::Ok().json(response)) }