From fd5c48941a8c8a9ad0d31270a54be2f90440efb2 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 23 Feb 2023 19:31:57 +0100 Subject: [PATCH] Add cache on the indexes stats --- index-scheduler/src/batch.rs | 38 +++++++++++++++++++-- index-scheduler/src/index_mapper/mod.rs | 42 +++++++++++++++++++++-- index-scheduler/src/lib.rs | 19 +++++++++-- meilisearch/src/routes/indexes/mod.rs | 45 +++++++++++-------------- meilisearch/src/routes/mod.rs | 38 +++++++-------------- 5 files changed, 122 insertions(+), 60 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 66c516d9b..03287d7ae 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -828,20 +828,36 @@ impl IndexScheduler { Ok(vec![task]) } Batch::IndexOperation { op, must_create_index } => { - let index_uid = op.index_uid(); + let index_uid = op.index_uid().to_string(); let index = if must_create_index { // create the index if it doesn't already exist let wtxn = self.env.write_txn()?; - self.index_mapper.create_index(wtxn, index_uid, None)? + self.index_mapper.create_index(wtxn, &index_uid, None)? } else { let rtxn = self.env.read_txn()?; - self.index_mapper.index(&rtxn, index_uid)? + self.index_mapper.index(&rtxn, &index_uid)? }; let mut index_wtxn = index.write_txn()?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; index_wtxn.commit()?; + // if the update processed successfully, we're going to store the new + // stats of the index. Since the tasks have already been processed and + // this is a non-critical operation. If it fails, we should not fail + // the entire batch. + let res = || -> Result<()> { + let mut wtxn = self.env.write_txn()?; + self.index_mapper.compute_and_store_stats_of(&mut wtxn, &index_uid)?; + wtxn.commit()?; + Ok(()) + }(); + + match res { + Ok(_) => (), + Err(e) => error!("Could not write the stats of the index {}", e), + } + Ok(tasks) } Batch::IndexCreation { index_uid, primary_key, task } => { @@ -875,6 +891,22 @@ impl IndexScheduler { task.status = Status::Succeeded; task.details = Some(Details::IndexInfo { primary_key }); + // if the update processed successfully, we're going to store the new + // stats of the index. Since the tasks have already been processed and + // this is a non-critical operation. If it fails, we should not fail + // the entire batch. + let res = || -> Result<()> { + let mut wtxn = self.env.write_txn()?; + self.index_mapper.compute_and_store_stats_of(&mut wtxn, &index_uid)?; + wtxn.commit()?; + Ok(()) + }(); + + match res { + Ok(_) => (), + Err(e) => error!("Could not write the stats of the index {}", e), + } + Ok(vec![task]) } Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => { diff --git a/index-scheduler/src/index_mapper/mod.rs b/index-scheduler/src/index_mapper/mod.rs index 1693d12d7..9e1de438a 100644 --- a/index-scheduler/src/index_mapper/mod.rs +++ b/index-scheduler/src/index_mapper/mod.rs @@ -4,10 +4,11 @@ use std::time::Duration; use std::{fs, thread}; use log::error; -use meilisearch_types::heed::types::Str; +use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn}; use meilisearch_types::milli::update::IndexerConfig; -use meilisearch_types::milli::Index; +use meilisearch_types::milli::{FieldDistribution, Index}; +use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use uuid::Uuid; @@ -19,6 +20,7 @@ use crate::{Error, Result}; mod index_map; const INDEX_MAPPING: &str = "index-mapping"; +const INDEX_STATS: &str = "index-stats"; /// Structure managing meilisearch's indexes. /// @@ -52,6 +54,8 @@ pub struct IndexMapper { /// Map an index name with an index uuid currently available on disk. pub(crate) index_mapping: Database, + /// Map an index name with the cached stats associated to the index. + pub(crate) index_stats: Database>, /// Path to the folder where the LMDB environments of each index are. base_path: PathBuf, @@ -76,6 +80,15 @@ pub enum IndexStatus { Available(Index), } +#[derive(Serialize, Deserialize, Debug)] +pub struct IndexStats { + pub number_of_documents: u64, + pub database_size: u64, + pub field_distribution: FieldDistribution, + pub created_at: OffsetDateTime, + pub updated_at: OffsetDateTime, +} + impl IndexMapper { pub fn new( env: &Env, @@ -88,6 +101,7 @@ impl IndexMapper { Ok(Self { index_map: Arc::new(RwLock::new(IndexMap::new(index_count))), index_mapping: env.create_database(Some(INDEX_MAPPING))?, + index_stats: env.create_database(Some(INDEX_STATS))?, base_path, index_base_map_size, index_growth_amount, @@ -135,6 +149,7 @@ impl IndexMapper { /// Removes the index from the mapping table and the in-memory index map /// but keeps the associated tasks. pub fn delete_index(&self, mut wtxn: RwTxn, name: &str) -> Result<()> { + self.index_stats.delete(&mut wtxn, name)?; let uuid = self .index_mapping .get(&wtxn, name)? @@ -360,6 +375,29 @@ impl IndexMapper { Ok(()) } + /// Return the stored stats of an index. + pub fn stats_of(&self, rtxn: &RoTxn, index_uid: &str) -> Result { + self.index_stats + .get(rtxn, index_uid)? + .ok_or_else(|| Error::IndexNotFound(index_uid.to_string())) + } + + /// Return the stats of an index and write it in the index-mapper database. + pub fn compute_and_store_stats_of(&self, wtxn: &mut RwTxn, index_uid: &str) -> Result<()> { + let index = self.index(wtxn, index_uid)?; + let database_size = index.on_disk_size()?; + let rtxn = index.read_txn()?; + let stats = IndexStats { + number_of_documents: index.number_of_documents(&rtxn)?, + database_size, + field_distribution: index.field_distribution(&rtxn)?, + created_at: index.created_at(&rtxn)?, + updated_at: index.updated_at(&rtxn)?, + }; + self.index_stats.put(wtxn, index_uid, &stats)?; + Ok(()) + } + pub fn index_exists(&self, rtxn: &RoTxn, name: &str) -> Result { Ok(self.index_mapping.get(rtxn, name)?.is_some()) } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index e23e4ff8b..4f875eaca 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -44,10 +44,9 @@ use file_store::FileStore; use meilisearch_types::error::ResponseError; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{self, Database, Env, RoTxn}; -use meilisearch_types::milli; use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::update::IndexerConfig; -use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; +use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use roaring::RoaringBitmap; use synchronoise::SignalEvent; @@ -566,7 +565,7 @@ impl IndexScheduler { } /// Return the name of all indexes without opening them. - pub fn index_names(self) -> Result> { + pub fn index_names(&self) -> Result> { let rtxn = self.env.read_txn()?; self.index_mapper.index_names(&rtxn) } @@ -1186,6 +1185,14 @@ impl IndexScheduler { Ok(TickOutcome::TickAgain(processed_tasks)) } + pub fn index_stats(&self, index_uid: &str) -> Result { + let is_indexing = self.is_index_processing(index_uid)?; + let rtxn = self.read_txn()?; + let index_stats = self.index_mapper.stats_of(&rtxn, index_uid)?; + + Ok(IndexStats { is_indexing, inner_stats: index_stats }) + } + pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> { match task.content_uuid() { Some(content_file) => self.delete_update_file(content_file), @@ -1238,6 +1245,12 @@ struct IndexBudget { task_db_size: usize, } +#[derive(Debug)] +pub struct IndexStats { + pub is_indexing: bool, + pub inner_stats: index_mapper::IndexStats, +} + #[cfg(test)] mod tests { use std::io::{BufWriter, Seek, Write}; diff --git a/meilisearch/src/routes/indexes/mod.rs b/meilisearch/src/routes/indexes/mod.rs index c5c168786..28988e30b 100644 --- a/meilisearch/src/routes/indexes/mod.rs +++ b/meilisearch/src/routes/indexes/mod.rs @@ -220,6 +220,24 @@ pub async fn delete_index( Ok(HttpResponse::Accepted().json(task)) } +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct IndexStats { + pub number_of_documents: u64, + pub is_indexing: bool, + pub field_distribution: FieldDistribution, +} + +impl From for IndexStats { + fn from(stats: index_scheduler::IndexStats) -> Self { + IndexStats { + number_of_documents: stats.inner_stats.number_of_documents, + is_indexing: stats.is_indexing, + field_distribution: stats.inner_stats.field_distribution, + } + } +} + pub async fn get_index_stats( index_scheduler: GuardedData, Data>, index_uid: web::Path, @@ -229,33 +247,8 @@ pub async fn get_index_stats( let index_uid = IndexUid::try_from(index_uid.into_inner())?; analytics.publish("Stats Seen".to_string(), json!({ "per_index_uid": true }), Some(&req)); - let stats = IndexStats::new((*index_scheduler).clone(), index_uid.into_inner())?; + let stats = IndexStats::from(index_scheduler.index_stats(&index_uid)?); debug!("returns: {:?}", stats); Ok(HttpResponse::Ok().json(stats)) } - -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct IndexStats { - pub number_of_documents: u64, - pub is_indexing: bool, - pub field_distribution: FieldDistribution, -} - -impl IndexStats { - pub fn new( - index_scheduler: Data, - index_uid: String, - ) -> Result { - // we check if there is currently a task processing associated with this index. - let is_processing = index_scheduler.is_index_processing(&index_uid)?; - let index = index_scheduler.index(&index_uid)?; - let rtxn = index.read_txn()?; - Ok(IndexStats { - number_of_documents: index.number_of_documents(&rtxn)?, - is_indexing: is_processing, - field_distribution: index.field_distribution(&rtxn)?, - }) - } -} diff --git a/meilisearch/src/routes/mod.rs b/meilisearch/src/routes/mod.rs index a4523e53f..f54c8ee38 100644 --- a/meilisearch/src/routes/mod.rs +++ b/meilisearch/src/routes/mod.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; -use index_scheduler::{IndexScheduler, Query}; +use index_scheduler::IndexScheduler; use log::debug; use meilisearch_auth::AuthController; use meilisearch_types::error::ResponseError; @@ -12,7 +12,6 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use time::OffsetDateTime; -use self::indexes::IndexStats; use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; @@ -234,7 +233,7 @@ pub struct Stats { pub database_size: u64, #[serde(serialize_with = "time::serde::rfc3339::option::serialize")] pub last_update: Option, - pub indexes: BTreeMap, + pub indexes: BTreeMap, } async fn get_stats( @@ -260,32 +259,19 @@ pub fn create_all_stats( let mut last_task: Option = None; let mut indexes = BTreeMap::new(); let mut database_size = 0; - let processing_task = index_scheduler.get_tasks_from_authorized_indexes( - Query { statuses: Some(vec![Status::Processing]), limit: Some(1), ..Query::default() }, - filters, - )?; + // accumulate the size of each indexes - let processing_index = processing_task.first().and_then(|task| task.index_uid()); - index_scheduler.try_for_each_index(|name, index| { - if !filters.is_index_authorized(name) { - return Ok(()); + for index_uid in index_scheduler.index_names()? { + if !filters.is_index_authorized(&index_uid) { + continue; } - database_size += index.on_disk_size()?; - - let rtxn = index.read_txn()?; - let stats = IndexStats { - number_of_documents: index.number_of_documents(&rtxn)?, - is_indexing: processing_index.map_or(false, |index_name| name == index_name), - field_distribution: index.field_distribution(&rtxn)?, - }; - - let updated_at = index.updated_at(&rtxn)?; - last_task = last_task.map_or(Some(updated_at), |last| Some(last.max(updated_at))); - - indexes.insert(name.to_string(), stats); - Ok(()) - })?; + let stats = index_scheduler.index_stats(&index_uid)?; + last_task = last_task.map_or(Some(stats.inner_stats.updated_at), |last| { + Some(last.max(stats.inner_stats.updated_at)) + }); + indexes.insert(index_uid.to_string(), stats.into()); + } database_size += index_scheduler.size()?; database_size += auth_controller.size()?;