diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index 0fca85493..cc0d99791 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -82,6 +82,13 @@ impl Query { ..self } } + + pub fn with_limit(self, limit: u32) -> Self { + Self { + limit, + ..self + } + } } pub mod db_name { @@ -197,7 +204,6 @@ impl IndexScheduler { }; std::thread::spawn(move || loop { - println!("started running"); run.wake_up.wait(); match run.tick() { diff --git a/index/src/index.rs b/index/src/index.rs index 5292f588b..1de8dc53a 100644 --- a/index/src/index.rs +++ b/index/src/index.rs @@ -22,49 +22,6 @@ use super::{Checked, Settings}; pub type Document = Map; -// @kero, what is this structure? Shouldn't it move entirely to milli? -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct IndexMeta { - #[serde(with = "time::serde::rfc3339")] - pub created_at: OffsetDateTime, - #[serde(with = "time::serde::rfc3339")] - pub updated_at: OffsetDateTime, - pub primary_key: Option, -} - -impl IndexMeta { - pub fn new(index: &Index) -> Result { - let txn = index.read_txn()?; - Self::new_txn(index, &txn) - } - - pub fn new_txn(index: &Index, txn: &milli::heed::RoTxn) -> Result { - let created_at = index.created_at(txn)?; - let updated_at = index.updated_at(txn)?; - let primary_key = index.primary_key(txn)?.map(String::from); - Ok(Self { - created_at, - updated_at, - primary_key, - }) - } -} - -// @kero Maybe this should be entirely generated somewhere else since it doesn't really concern the index? -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct IndexStats { - #[serde(skip)] - pub size: u64, - pub number_of_documents: u64, - /// Whether the current index is performing an update. It is initially `None` when the - /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is - /// later set to either true or false, we we retrieve the information from the `UpdateStore` - pub is_indexing: Option, - pub field_distribution: FieldDistribution, -} - #[derive(Clone, derivative::Derivative)] #[derivative(Debug)] pub struct Index { @@ -115,20 +72,16 @@ impl Index { Ok(()) } - pub fn stats(&self) -> Result { + pub fn number_of_documents(&self) -> Result { let rtxn = self.read_txn()?; - - Ok(IndexStats { - size: self.size()?, - number_of_documents: self.number_of_documents(&rtxn)?, - is_indexing: None, - field_distribution: self.field_distribution(&rtxn)?, - }) + Ok(self.inner.number_of_documents(&rtxn)?) } - pub fn meta(&self) -> Result { - IndexMeta::new(self) + pub fn field_distribution(&self) -> Result { + let rtxn = self.read_txn()?; + Ok(self.inner.field_distribution(&rtxn)?) } + pub fn settings(&self) -> Result> { let txn = self.read_txn()?; self.settings_txn(&txn) @@ -261,7 +214,7 @@ impl Index { }; documents.push(document); } - let number_of_documents = self.number_of_documents(&rtxn)?; + let number_of_documents = self.inner.number_of_documents(&rtxn)?; Ok((number_of_documents, documents)) } @@ -315,6 +268,21 @@ impl Index { })) } + pub fn created_at(&self) -> Result { + let rtxn = self.read_txn()?; + Ok(self.inner.created_at(&rtxn)?) + } + + pub fn updated_at(&self) -> Result { + let rtxn = self.read_txn()?; + Ok(self.inner.updated_at(&rtxn)?) + } + + pub fn primary_key(&self) -> Result> { + let rtxn = self.read_txn()?; + Ok(self.inner.primary_key(&rtxn)?.map(str::to_string)) + } + pub fn size(&self) -> Result { Ok(self.inner.on_disk_size()?) } diff --git a/index/src/lib.rs b/index/src/lib.rs index 37ebcec97..2662b7d05 100644 --- a/index/src/lib.rs +++ b/index/src/lib.rs @@ -12,7 +12,7 @@ pub mod updates; #[allow(clippy::module_inception)] mod index; -pub use self::index::{Document, IndexMeta, IndexStats}; +pub use self::index::Document; #[cfg(not(test))] pub use self::index::Index; @@ -30,13 +30,15 @@ pub mod test { use milli::update::{ DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, IndexerConfig, }; + use milli::FieldDistribution; use nelson::Mocker; + use time::OffsetDateTime; use uuid::Uuid; use super::error::Result; use super::index::Index; use super::Document; - use super::{Checked, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings}; + use super::{Checked, SearchQuery, SearchResult, Settings}; use file_store::FileStore; #[derive(Clone)] @@ -71,19 +73,6 @@ pub mod test { } */ - pub fn stats(&self) -> Result { - match self { - MockIndex::Real(index) => index.stats(), - MockIndex::Mock(m) => unsafe { m.get("stats").call(()) }, - } - } - - pub fn meta(&self) -> Result { - match self { - MockIndex::Real(index) => index.meta(), - MockIndex::Mock(_) => todo!(), - } - } pub fn settings(&self) -> Result> { match self { MockIndex::Real(index) => index.settings(), @@ -144,6 +133,20 @@ pub mod test { } } + pub fn number_of_documents(&self) -> Result { + match self { + MockIndex::Real(index) => index.number_of_documents(), + MockIndex::Mock(m) => unsafe { m.get("number_of_documents").call(()) }, + } + } + + pub fn field_distribution(&self) -> Result { + match self { + MockIndex::Real(index) => index.field_distribution(), + MockIndex::Mock(m) => unsafe { m.get("field_distribution").call(()) }, + } + } + pub fn perform_search(&self, query: SearchQuery) -> Result { match self { MockIndex::Real(index) => index.perform_search(query), @@ -151,15 +154,6 @@ pub mod test { } } - /* - pub fn dump(&self, path: impl AsRef) -> Result<()> { - match self { - MockIndex::Real(index) => index.dump(path), - MockIndex::Mock(m) => unsafe { m.get("dump").call(path.as_ref()) }, - } - } - */ - pub fn update_documents( &self, method: IndexDocumentsMethod, @@ -186,7 +180,7 @@ pub mod test { } } - pub fn update_primary_key(&self, primary_key: String) -> Result { + pub fn update_primary_key(&self, primary_key: String) -> Result<()> { match self { MockIndex::Real(index) => index.update_primary_key(primary_key), MockIndex::Mock(m) => unsafe { m.get("update_primary_key").call(primary_key) }, @@ -206,6 +200,27 @@ pub mod test { MockIndex::Mock(m) => unsafe { m.get("clear_documents").call(()) }, } } + + pub fn created_at(&self) -> Result { + match self { + MockIndex::Real(index) => index.created_at(), + MockIndex::Mock(m) => unsafe { m.get("created_ad").call(()) }, + } + } + + pub fn updated_at(&self) -> Result { + match self { + MockIndex::Real(index) => index.updated_at(), + MockIndex::Mock(m) => unsafe { m.get("updated_ad").call(()) }, + } + } + + pub fn primary_key(&self) -> Result> { + match self { + MockIndex::Real(index) => index.primary_key(), + MockIndex::Mock(m) => unsafe { m.get("primary_key").call(()) }, + } + } } #[test] diff --git a/index/src/updates.rs b/index/src/updates.rs index 1c8858c31..be5b9d51a 100644 --- a/index/src/updates.rs +++ b/index/src/updates.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize, Serializer}; use uuid::Uuid; use super::error::{IndexError, Result}; -use super::index::{Index, IndexMeta}; +use super::index::Index; use file_store::FileStore; fn serialize_with_wildcard( @@ -251,21 +251,18 @@ impl Index { &'a self, txn: &mut milli::heed::RwTxn<'a, 'b>, primary_key: String, - ) -> Result { + ) -> Result<()> { let mut builder = milli::update::Settings::new(txn, self, self.indexer_config.as_ref()); builder.set_primary_key(primary_key); builder.execute(|_| ())?; - let meta = IndexMeta::new_txn(self, txn)?; - - Ok(meta) + Ok(()) } - pub fn update_primary_key(&self, primary_key: String) -> Result { + pub fn update_primary_key(&self, primary_key: String) -> Result<()> { let mut txn = self.write_txn()?; - let res = self.update_primary_key_txn(&mut txn, primary_key)?; + self.update_primary_key_txn(&mut txn, primary_key)?; txn.commit()?; - - Ok(res) + Ok(()) } /// Deletes `ids` from the index, and returns how many documents were deleted. @@ -304,7 +301,7 @@ impl Index { let mut txn = self.write_txn()?; if let Some(primary_key) = primary_key { - if self.primary_key(&txn)?.is_none() { + if self.inner.primary_key(&txn)?.is_none() { self.update_primary_key_txn(&mut txn, primary_key)?; } } diff --git a/meilisearch-http/src/routes/indexes/mod.rs b/meilisearch-http/src/routes/indexes/mod.rs index 755e9836b..5a303c5e4 100644 --- a/meilisearch-http/src/routes/indexes/mod.rs +++ b/meilisearch-http/src/routes/indexes/mod.rs @@ -1,6 +1,11 @@ +use std::convert::TryFrom; +use std::sync::Arc; + use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; -use index_scheduler::{IndexScheduler, KindWithContent}; +use index::Index; +use index_scheduler::milli::FieldDistribution; +use index_scheduler::{IndexScheduler, KindWithContent, Query, Status}; use log::debug; use meilisearch_types::error::ResponseError; use serde::{Deserialize, Serialize}; @@ -39,6 +44,30 @@ pub fn configure(cfg: &mut web::ServiceConfig) { ); } +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct IndexView { + pub uid: String, + #[serde(with = "time::serde::rfc3339")] + pub created_at: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub updated_at: OffsetDateTime, + pub primary_key: Option, +} + +impl TryFrom<&Index> for IndexView { + type Error = index::error::IndexError; + + fn try_from(index: &Index) -> Result { + Ok(IndexView { + uid: index.name.clone(), + created_at: index.created_at()?, + updated_at: index.updated_at()?, + primary_key: index.primary_key()?, + }) + } +} + pub async fn list_indexes( index_scheduler: GuardedData, Data>, paginate: web::Query, @@ -46,16 +75,13 @@ pub async fn list_indexes( let search_rules = &index_scheduler.filters().search_rules; let indexes: Vec<_> = index_scheduler.indexes()?; let nb_indexes = indexes.len(); - let iter = indexes - .into_iter() - .filter(|index| search_rules.is_index_authorized(&index.name)); - /* - TODO: TAMO: implements me. It's missing a kind of IndexView or something - let ret = paginate - .into_inner() - .auto_paginate_unsized(nb_indexes, iter); - */ - let ret = todo!(); + let indexes = indexes + .iter() + .filter(|index| search_rules.is_index_authorized(&index.name)) + .map(IndexView::try_from) + .collect::, _>>()?; + + let ret = paginate.auto_paginate_sized(indexes.into_iter()); debug!("returns: {:?}", ret); Ok(HttpResponse::Ok().json(ret)) @@ -104,29 +130,16 @@ pub struct UpdateIndexRequest { primary_key: Option, } -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct UpdateIndexResponse { - name: String, - uid: String, - #[serde(serialize_with = "time::serde::rfc3339::serialize")] - created_at: OffsetDateTime, - #[serde(serialize_with = "time::serde::rfc3339::serialize")] - updated_at: OffsetDateTime, - #[serde(serialize_with = "time::serde::rfc3339::serialize")] - primary_key: OffsetDateTime, -} - pub async fn get_index( index_scheduler: GuardedData, Data>, index_uid: web::Path, ) -> Result { - let meta = index_scheduler.index(&index_uid)?; - debug!("returns: {:?}", meta); + let index = index_scheduler.index(&index_uid)?; + let index_view: IndexView = (&index).try_into()?; - // TODO: TAMO: do this as well - todo!() - // Ok(HttpResponse::Ok().json(meta)) + debug!("returns: {:?}", index_view); + + Ok(HttpResponse::Ok().json(index_view)) } pub async fn update_index( @@ -178,11 +191,40 @@ pub async fn get_index_stats( json!({ "per_index_uid": true }), Some(&req), ); - let index = index_scheduler.index(&index_uid)?; - // TODO: TAMO: Bring the index_stats in meilisearch-http - // let response = index.get_index_stats()?; - let response = todo!(); - debug!("returns: {:?}", response); - Ok(HttpResponse::Ok().json(response)) + let stats = IndexStats::new((*index_scheduler).clone(), index_uid.into_inner()); + + debug!("returns: {:?}", stats); + Ok(HttpResponse::Ok().json(stats)) +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct IndexStats { + pub number_of_documents: u64, + pub is_indexing: bool, + pub field_distribution: FieldDistribution, +} + +impl IndexStats { + pub fn new( + index_scheduler: Data, + index_uid: String, + ) -> Result { + // we check if there is currently a task processing associated with this index. + let processing_task = index_scheduler.get_tasks( + Query::default() + .with_status(Status::Processing) + .with_index(index_uid.clone()) + .with_limit(1), + )?; + let is_processing = !processing_task.is_empty(); + + let index = index_scheduler.index(&index_uid)?; + Ok(IndexStats { + number_of_documents: index.number_of_documents()?, + is_indexing: is_processing, + field_distribution: index.field_distribution()?, + }) + } } diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index 5022256b1..833969384 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -1,6 +1,8 @@ +use std::collections::BTreeMap; + use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; -use index_scheduler::IndexScheduler; +use index_scheduler::{IndexScheduler, Query, Status}; use log::debug; use serde::{Deserialize, Serialize}; @@ -14,6 +16,8 @@ use meilisearch_types::star_or::StarOr; use crate::analytics::Analytics; use crate::extractors::authentication::{policies::*, GuardedData}; +use self::indexes::{IndexStats, IndexView}; + mod api_key; mod dump; pub mod indexes; @@ -232,6 +236,15 @@ pub async fn running() -> HttpResponse { HttpResponse::Ok().json(serde_json::json!({ "status": "Meilisearch is running" })) } +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Stats { + pub database_size: u64, + #[serde(serialize_with = "time::serde::rfc3339::option::serialize")] + pub last_update: Option, + pub indexes: BTreeMap, +} + async fn get_stats( index_scheduler: GuardedData, Data>, req: HttpRequest, @@ -243,11 +256,48 @@ async fn get_stats( Some(&req), ); let search_rules = &index_scheduler.filters().search_rules; - // let response = index_scheduler.get_all_stats(search_rules).await?; - let response = todo!(); - debug!("returns: {:?}", response); - Ok(HttpResponse::Ok().json(response)) + let mut last_task: Option = None; + let mut indexes = BTreeMap::new(); + let mut database_size = 0; + let processing_task = index_scheduler.get_tasks( + Query::default() + .with_status(Status::Processing) + .with_limit(1), + )?; + let processing_index = processing_task + .first() + .and_then(|task| task.index_uid.clone()); + + for index in index_scheduler.indexes()? { + if !search_rules.is_index_authorized(&index.name) { + continue; + } + + database_size += index.size()?; + + let stats = IndexStats { + number_of_documents: index.number_of_documents()?, + is_indexing: processing_index + .as_deref() + .map_or(false, |index_name| index.name == index_name), + field_distribution: index.field_distribution()?, + }; + + let updated_at = index.updated_at()?; + last_task = last_task.map_or(Some(updated_at), |last| Some(last.max(updated_at))); + + indexes.insert(index.name.clone(), stats); + } + + let stats = Stats { + database_size, + last_update: last_task, + indexes, + }; + + debug!("returns: {:?}", stats); + Ok(HttpResponse::Ok().json(stats)) } #[derive(Serialize)]