mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 20:15:07 +08:00
Merge #133
133: Implement stats route r=MarinPostma a=shekhirin Resolves https://github.com/meilisearch/transplant/issues/73 Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
commit
8df5f73706
21
Cargo.lock
generated
21
Cargo.lock
generated
@ -1875,6 +1875,7 @@ dependencies = [
|
|||||||
"urlencoding",
|
"urlencoding",
|
||||||
"uuid",
|
"uuid",
|
||||||
"vergen",
|
"vergen",
|
||||||
|
"walkdir",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -2840,6 +2841,15 @@ version = "1.0.5"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
|
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "same-file"
|
||||||
|
version = "1.0.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
|
||||||
|
dependencies = [
|
||||||
|
"winapi-util",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "scopeguard"
|
name = "scopeguard"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
@ -3717,6 +3727,17 @@ version = "0.9.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
|
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "walkdir"
|
||||||
|
version = "2.3.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
|
||||||
|
dependencies = [
|
||||||
|
"same-file",
|
||||||
|
"winapi 0.3.9",
|
||||||
|
"winapi-util",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "want"
|
name = "want"
|
||||||
version = "0.3.0"
|
version = "0.3.0"
|
||||||
|
16
meilisearch-http/Cargo.lock
generated
16
meilisearch-http/Cargo.lock
generated
@ -1827,22 +1827,6 @@ dependencies = [
|
|||||||
"vergen",
|
"vergen",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "meilisearch-tokenizer"
|
|
||||||
version = "0.1.1"
|
|
||||||
source = "git+https://github.com/meilisearch/Tokenizer.git?branch=main#147b6154b1b34cb8f5da2df6a416b7da191bc850"
|
|
||||||
dependencies = [
|
|
||||||
"character_converter",
|
|
||||||
"cow-utils",
|
|
||||||
"deunicode",
|
|
||||||
"fst",
|
|
||||||
"jieba-rs",
|
|
||||||
"once_cell",
|
|
||||||
"slice-group-by",
|
|
||||||
"unicode-segmentation",
|
|
||||||
"whatlang",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "memchr"
|
name = "memchr"
|
||||||
version = "2.3.4"
|
version = "2.3.4"
|
||||||
|
@ -62,6 +62,7 @@ thiserror = "1.0.24"
|
|||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
uuid = "0.8.2"
|
uuid = "0.8.2"
|
||||||
oxidized-json-checker = "0.3.2"
|
oxidized-json-checker = "0.3.2"
|
||||||
|
walkdir = "2.3.2"
|
||||||
|
|
||||||
[dependencies.sentry]
|
[dependencies.sentry]
|
||||||
default-features = false
|
default-features = false
|
||||||
|
@ -8,6 +8,7 @@ use serde_qs as qs;
|
|||||||
use siphasher::sip::SipHasher;
|
use siphasher::sip::SipHasher;
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
|
use crate::helpers::EnvSizer;
|
||||||
use crate::Data;
|
use crate::Data;
|
||||||
use crate::Opt;
|
use crate::Opt;
|
||||||
|
|
||||||
@ -33,12 +34,7 @@ impl EventProperties {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let database_size = WalkDir::new(&data.db_path)
|
let database_size = data.env.size();
|
||||||
.into_iter()
|
|
||||||
.filter_map(|entry| entry.ok())
|
|
||||||
.filter_map(|entry| entry.metadata().ok())
|
|
||||||
.filter(|metadata| metadata.is_file())
|
|
||||||
.fold(0, |acc, m| acc + m.len());
|
|
||||||
|
|
||||||
let last_update_timestamp = data.db.last_update(&reader)?.map(|u| u.timestamp());
|
let last_update_timestamp = data.db.last_update(&reader)?.map(|u| u.timestamp());
|
||||||
|
|
||||||
@ -116,7 +112,7 @@ pub fn analytics_sender(data: Data, opt: Opt) {
|
|||||||
time,
|
time,
|
||||||
app_version,
|
app_version,
|
||||||
user_properties,
|
user_properties,
|
||||||
event_properties
|
event_properties,
|
||||||
};
|
};
|
||||||
let event = serde_json::to_string(&event).unwrap();
|
let event = serde_json::to_string(&event).unwrap();
|
||||||
|
|
||||||
|
@ -1,16 +1,18 @@
|
|||||||
pub mod search;
|
use std::collections::HashMap;
|
||||||
mod updates;
|
|
||||||
|
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
use sha2::Digest;
|
use sha2::Digest;
|
||||||
|
|
||||||
use crate::index::Settings;
|
use crate::index::Settings;
|
||||||
use crate::index_controller::IndexController;
|
use crate::index_controller::{IndexController, IndexStats};
|
||||||
use crate::index_controller::{IndexMetadata, IndexSettings};
|
use crate::index_controller::{IndexMetadata, IndexSettings};
|
||||||
use crate::option::Opt;
|
use crate::option::Opt;
|
||||||
|
|
||||||
|
pub mod search;
|
||||||
|
mod updates;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Data {
|
pub struct Data {
|
||||||
inner: Arc<DataInner>,
|
inner: Arc<DataInner>,
|
||||||
@ -37,6 +39,13 @@ pub struct ApiKeys {
|
|||||||
pub master: Option<String>,
|
pub master: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct Stats {
|
||||||
|
pub database_size: u64,
|
||||||
|
pub last_update: Option<DateTime<Utc>>,
|
||||||
|
pub indexes: HashMap<String, IndexStats>,
|
||||||
|
}
|
||||||
|
|
||||||
impl ApiKeys {
|
impl ApiKeys {
|
||||||
pub fn generate_missing_api_keys(&mut self) {
|
pub fn generate_missing_api_keys(&mut self) {
|
||||||
if let Some(master_key) = &self.master {
|
if let Some(master_key) = &self.master {
|
||||||
@ -104,6 +113,34 @@ impl Data {
|
|||||||
Ok(meta)
|
Ok(meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_index_stats(&self, uid: String) -> anyhow::Result<IndexStats> {
|
||||||
|
Ok(self.index_controller.get_stats(uid).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_stats(&self) -> anyhow::Result<Stats> {
|
||||||
|
let mut stats = Stats::default();
|
||||||
|
stats.database_size += self.index_controller.get_uuids_size().await?;
|
||||||
|
|
||||||
|
for index in self.index_controller.list_indexes().await? {
|
||||||
|
let index_stats = self.index_controller.get_stats(index.uid.clone()).await?;
|
||||||
|
|
||||||
|
stats.database_size += index_stats.size;
|
||||||
|
stats.database_size += self
|
||||||
|
.index_controller
|
||||||
|
.get_updates_size(index.uid.clone())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
stats.last_update = Some(match stats.last_update {
|
||||||
|
Some(last_update) => last_update.max(index.meta.updated_at),
|
||||||
|
None => index.meta.updated_at,
|
||||||
|
});
|
||||||
|
|
||||||
|
stats.indexes.insert(index.uid, index_stats);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(stats)
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn http_payload_size_limit(&self) -> usize {
|
pub fn http_payload_size_limit(&self) -> usize {
|
||||||
self.options.http_payload_size_limit.get_bytes() as usize
|
self.options.http_payload_size_limit.get_bytes() as usize
|
||||||
|
16
meilisearch-http/src/helpers/env.rs
Normal file
16
meilisearch-http/src/helpers/env.rs
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
|
pub trait EnvSizer {
|
||||||
|
fn size(&self) -> u64;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EnvSizer for heed::Env {
|
||||||
|
fn size(&self) -> u64 {
|
||||||
|
WalkDir::new(self.path())
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|entry| entry.ok())
|
||||||
|
.filter_map(|entry| entry.metadata().ok())
|
||||||
|
.filter(|metadata| metadata.is_file())
|
||||||
|
.fold(0, |acc, m| acc + m.len())
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,6 @@
|
|||||||
pub mod authentication;
|
pub mod authentication;
|
||||||
pub mod compression;
|
pub mod compression;
|
||||||
|
mod env;
|
||||||
|
|
||||||
pub use authentication::Authentication;
|
pub use authentication::Authentication;
|
||||||
|
pub use env::EnvSizer;
|
||||||
|
@ -1,6 +1,3 @@
|
|||||||
mod search;
|
|
||||||
mod updates;
|
|
||||||
|
|
||||||
use std::collections::{BTreeSet, HashSet};
|
use std::collections::{BTreeSet, HashSet};
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -11,6 +8,10 @@ use serde_json::{Map, Value};
|
|||||||
|
|
||||||
pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
|
pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
|
||||||
pub use updates::{Facets, Settings, UpdateResult};
|
pub use updates::{Facets, Settings, UpdateResult};
|
||||||
|
use crate::helpers::EnvSizer;
|
||||||
|
|
||||||
|
mod search;
|
||||||
|
mod updates;
|
||||||
|
|
||||||
pub type Document = Map<String, Value>;
|
pub type Document = Map<String, Value>;
|
||||||
|
|
||||||
@ -54,11 +55,7 @@ impl Index {
|
|||||||
let stop_words = self
|
let stop_words = self
|
||||||
.stop_words(&txn)?
|
.stop_words(&txn)?
|
||||||
.map(|stop_words| -> anyhow::Result<BTreeSet<_>> {
|
.map(|stop_words| -> anyhow::Result<BTreeSet<_>> {
|
||||||
Ok(stop_words
|
Ok(stop_words.stream().into_strs()?.into_iter().collect())
|
||||||
.stream()
|
|
||||||
.into_strs()?
|
|
||||||
.into_iter()
|
|
||||||
.collect())
|
|
||||||
})
|
})
|
||||||
.transpose()?
|
.transpose()?
|
||||||
.unwrap_or_else(BTreeSet::new);
|
.unwrap_or_else(BTreeSet::new);
|
||||||
@ -126,6 +123,10 @@ impl Index {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn size(&self) -> u64 {
|
||||||
|
self.env.size()
|
||||||
|
}
|
||||||
|
|
||||||
fn fields_to_display<S: AsRef<str>>(
|
fn fields_to_display<S: AsRef<str>>(
|
||||||
&self,
|
&self,
|
||||||
txn: &heed::RoTxn,
|
txn: &heed::RoTxn,
|
||||||
|
@ -8,20 +8,24 @@ use futures::pin_mut;
|
|||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
use heed::CompactionOption;
|
use heed::CompactionOption;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::{mpsc, RwLock};
|
||||||
use tokio::task::spawn_blocking;
|
use tokio::task::spawn_blocking;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult};
|
|
||||||
use crate::index::{Document, SearchQuery, SearchResult, Settings};
|
use crate::index::{Document, SearchQuery, SearchResult, Settings};
|
||||||
use crate::index_controller::update_handler::UpdateHandler;
|
use crate::index_controller::update_handler::UpdateHandler;
|
||||||
use crate::index_controller::{get_arc_ownership_blocking, updates::Processing, UpdateMeta};
|
use crate::index_controller::{
|
||||||
|
get_arc_ownership_blocking, updates::Processing, IndexStats, UpdateMeta,
|
||||||
|
};
|
||||||
use crate::option::IndexerOpts;
|
use crate::option::IndexerOpts;
|
||||||
|
|
||||||
|
use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult};
|
||||||
|
|
||||||
pub struct IndexActor<S> {
|
pub struct IndexActor<S> {
|
||||||
read_receiver: Option<mpsc::Receiver<IndexMsg>>,
|
read_receiver: Option<mpsc::Receiver<IndexMsg>>,
|
||||||
write_receiver: Option<mpsc::Receiver<IndexMsg>>,
|
write_receiver: Option<mpsc::Receiver<IndexMsg>>,
|
||||||
update_handler: Arc<UpdateHandler>,
|
update_handler: Arc<UpdateHandler>,
|
||||||
|
processing: RwLock<Option<Uuid>>,
|
||||||
store: S,
|
store: S,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,8 +43,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
|
|||||||
Ok(Self {
|
Ok(Self {
|
||||||
read_receiver,
|
read_receiver,
|
||||||
write_receiver,
|
write_receiver,
|
||||||
store,
|
|
||||||
update_handler,
|
update_handler,
|
||||||
|
processing: RwLock::new(None),
|
||||||
|
store,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,6 +151,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
|
|||||||
Snapshot { uuid, path, ret } => {
|
Snapshot { uuid, path, ret } => {
|
||||||
let _ = ret.send(self.handle_snapshot(uuid, path).await);
|
let _ = ret.send(self.handle_snapshot(uuid, path).await);
|
||||||
}
|
}
|
||||||
|
GetStats { uuid, ret } => {
|
||||||
|
let _ = ret.send(self.handle_get_stats(uuid).await);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -175,16 +183,25 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
|
|||||||
meta: Processing<UpdateMeta>,
|
meta: Processing<UpdateMeta>,
|
||||||
data: File,
|
data: File,
|
||||||
) -> Result<UpdateResult> {
|
) -> Result<UpdateResult> {
|
||||||
debug!("Processing update {}", meta.id());
|
async fn get_result<S: IndexStore>(actor: &IndexActor<S>, meta: Processing<UpdateMeta>, data: File) -> Result<UpdateResult> {
|
||||||
let uuid = meta.index_uuid();
|
debug!("Processing update {}", meta.id());
|
||||||
let update_handler = self.update_handler.clone();
|
let uuid = *meta.index_uuid();
|
||||||
let index = match self.store.get(*uuid).await? {
|
let update_handler = actor.update_handler.clone();
|
||||||
Some(index) => index,
|
let index = match actor.store.get(uuid).await? {
|
||||||
None => self.store.create(*uuid, None).await?,
|
Some(index) => index,
|
||||||
};
|
None => actor.store.create(uuid, None).await?,
|
||||||
spawn_blocking(move || update_handler.handle_update(meta, data, index))
|
};
|
||||||
.await
|
|
||||||
.map_err(|e| IndexError::Error(e.into()))
|
spawn_blocking(move || update_handler.handle_update(meta, data, index))
|
||||||
|
.await
|
||||||
|
.map_err(|e| IndexError::Error(e.into()))
|
||||||
|
}
|
||||||
|
|
||||||
|
*self.processing.write().await = Some(meta.index_uuid().clone());
|
||||||
|
let result = get_result(self, meta, data).await;
|
||||||
|
*self.processing.write().await = None;
|
||||||
|
|
||||||
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings> {
|
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings> {
|
||||||
@ -328,4 +345,28 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_get_stats(&self, uuid: Uuid) -> Result<IndexStats> {
|
||||||
|
let index = self
|
||||||
|
.store
|
||||||
|
.get(uuid)
|
||||||
|
.await?
|
||||||
|
.ok_or(IndexError::UnexistingIndex)?;
|
||||||
|
|
||||||
|
let processing = self.processing.read().await;
|
||||||
|
let is_indexing = *processing == Some(uuid);
|
||||||
|
|
||||||
|
spawn_blocking(move || {
|
||||||
|
let rtxn = index.read_txn()?;
|
||||||
|
|
||||||
|
Ok(IndexStats {
|
||||||
|
size: index.size(),
|
||||||
|
number_of_documents: index.number_of_documents(&rtxn)?,
|
||||||
|
is_indexing,
|
||||||
|
fields_distribution: index.fields_distribution(&rtxn)?,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| IndexError::Error(e.into()))?
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,12 +3,13 @@ use std::path::{Path, PathBuf};
|
|||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::index::{Document, SearchQuery, SearchResult, Settings};
|
||||||
|
use crate::index_controller::{updates::Processing, UpdateMeta};
|
||||||
|
use crate::index_controller::{IndexSettings, IndexStats};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult,
|
IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult,
|
||||||
};
|
};
|
||||||
use crate::index::{Document, SearchQuery, SearchResult, Settings};
|
|
||||||
use crate::index_controller::IndexSettings;
|
|
||||||
use crate::index_controller::{updates::Processing, UpdateMeta};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct IndexActorHandleImpl {
|
pub struct IndexActorHandleImpl {
|
||||||
@ -121,6 +122,13 @@ impl IndexActorHandle for IndexActorHandleImpl {
|
|||||||
let _ = self.read_sender.send(msg).await;
|
let _ = self.read_sender.send(msg).await;
|
||||||
Ok(receiver.await.expect("IndexActor has been killed")?)
|
Ok(receiver.await.expect("IndexActor has been killed")?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats> {
|
||||||
|
let (ret, receiver) = oneshot::channel();
|
||||||
|
let msg = IndexMsg::GetStats { uuid, ret };
|
||||||
|
let _ = self.read_sender.send(msg).await;
|
||||||
|
Ok(receiver.await.expect("IndexActor has been killed")?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IndexActorHandleImpl {
|
impl IndexActorHandleImpl {
|
||||||
|
@ -3,9 +3,10 @@ use std::path::PathBuf;
|
|||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::{IndexMeta, IndexSettings, Result, UpdateResult};
|
|
||||||
use crate::index::{Document, SearchQuery, SearchResult, Settings};
|
use crate::index::{Document, SearchQuery, SearchResult, Settings};
|
||||||
use crate::index_controller::{updates::Processing, UpdateMeta};
|
use crate::index_controller::{updates::Processing, IndexStats, UpdateMeta};
|
||||||
|
|
||||||
|
use super::{IndexMeta, IndexSettings, Result, UpdateResult};
|
||||||
|
|
||||||
pub enum IndexMsg {
|
pub enum IndexMsg {
|
||||||
CreateIndex {
|
CreateIndex {
|
||||||
@ -58,4 +59,8 @@ pub enum IndexMsg {
|
|||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
ret: oneshot::Sender<Result<()>>,
|
ret: oneshot::Sender<Result<()>>,
|
||||||
},
|
},
|
||||||
|
GetStats {
|
||||||
|
uuid: Uuid,
|
||||||
|
ret: oneshot::Sender<Result<IndexStats>>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
@ -1,30 +1,30 @@
|
|||||||
mod actor;
|
|
||||||
mod handle_impl;
|
|
||||||
mod message;
|
|
||||||
mod store;
|
|
||||||
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
|
#[cfg(test)]
|
||||||
|
use mockall::automock;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::IndexSettings;
|
use actor::IndexActor;
|
||||||
|
pub use handle_impl::IndexActorHandleImpl;
|
||||||
|
use message::IndexMsg;
|
||||||
|
use store::{IndexStore, MapIndexStore};
|
||||||
|
|
||||||
use crate::index::UpdateResult as UResult;
|
use crate::index::UpdateResult as UResult;
|
||||||
use crate::index::{Document, Index, SearchQuery, SearchResult, Settings};
|
use crate::index::{Document, Index, SearchQuery, SearchResult, Settings};
|
||||||
use crate::index_controller::{
|
use crate::index_controller::{
|
||||||
updates::{Failed, Processed, Processing},
|
updates::{Failed, Processed, Processing},
|
||||||
UpdateMeta,
|
IndexStats, UpdateMeta,
|
||||||
};
|
};
|
||||||
use actor::IndexActor;
|
|
||||||
use message::IndexMsg;
|
|
||||||
use store::{IndexStore, MapIndexStore};
|
|
||||||
|
|
||||||
pub use handle_impl::IndexActorHandleImpl;
|
use super::IndexSettings;
|
||||||
|
|
||||||
#[cfg(test)]
|
mod actor;
|
||||||
use mockall::automock;
|
mod handle_impl;
|
||||||
|
mod message;
|
||||||
|
mod store;
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, IndexError>;
|
pub type Result<T> = std::result::Result<T, IndexError>;
|
||||||
type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<UpdateMeta, String>>;
|
type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<UpdateMeta, String>>;
|
||||||
@ -33,7 +33,7 @@ type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<U
|
|||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct IndexMeta {
|
pub struct IndexMeta {
|
||||||
created_at: DateTime<Utc>,
|
created_at: DateTime<Utc>,
|
||||||
updated_at: DateTime<Utc>,
|
pub updated_at: DateTime<Utc>,
|
||||||
primary_key: Option<String>,
|
primary_key: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -98,4 +98,5 @@ pub trait IndexActorHandle {
|
|||||||
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta>;
|
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta>;
|
||||||
async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result<IndexMeta>;
|
async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result<IndexMeta>;
|
||||||
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
|
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
|
||||||
|
async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats>;
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,3 @@
|
|||||||
mod index_actor;
|
|
||||||
mod snapshot;
|
|
||||||
mod update_actor;
|
|
||||||
mod update_handler;
|
|
||||||
mod updates;
|
|
||||||
mod uuid_resolver;
|
|
||||||
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@ -14,33 +7,40 @@ use anyhow::bail;
|
|||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
use log::info;
|
use log::info;
|
||||||
use milli::update::{IndexDocumentsMethod, UpdateFormat};
|
use milli::update::{IndexDocumentsMethod, UpdateFormat};
|
||||||
|
use milli::FieldsDistribution;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use index_actor::IndexActorHandle;
|
||||||
|
use snapshot::load_snapshot;
|
||||||
|
use snapshot::SnapshotService;
|
||||||
|
use update_actor::UpdateActorHandle;
|
||||||
|
pub use updates::{Failed, Processed, Processing};
|
||||||
|
use uuid_resolver::UuidError;
|
||||||
|
use uuid_resolver::UuidResolverHandle;
|
||||||
|
|
||||||
use crate::index::{Document, SearchQuery, SearchResult};
|
use crate::index::{Document, SearchQuery, SearchResult};
|
||||||
use crate::index::{Facets, Settings, UpdateResult};
|
use crate::index::{Facets, Settings, UpdateResult};
|
||||||
use crate::option::Opt;
|
use crate::option::Opt;
|
||||||
|
|
||||||
use index_actor::IndexActorHandle;
|
mod index_actor;
|
||||||
use snapshot::load_snapshot;
|
mod snapshot;
|
||||||
use update_actor::UpdateActorHandle;
|
mod update_actor;
|
||||||
use uuid_resolver::UuidResolverHandle;
|
mod update_handler;
|
||||||
|
mod updates;
|
||||||
use snapshot::SnapshotService;
|
mod uuid_resolver;
|
||||||
pub use updates::{Failed, Processed, Processing};
|
|
||||||
use uuid_resolver::UuidError;
|
|
||||||
|
|
||||||
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
|
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct IndexMetadata {
|
pub struct IndexMetadata {
|
||||||
uid: String,
|
pub uid: String,
|
||||||
name: String,
|
name: String,
|
||||||
#[serde(flatten)]
|
#[serde(flatten)]
|
||||||
meta: index_actor::IndexMeta,
|
pub meta: index_actor::IndexMeta,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
@ -63,6 +63,14 @@ pub struct IndexSettings {
|
|||||||
pub primary_key: Option<String>,
|
pub primary_key: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct IndexStats {
|
||||||
|
pub size: u64,
|
||||||
|
pub number_of_documents: u64,
|
||||||
|
pub is_indexing: bool,
|
||||||
|
pub fields_distribution: FieldsDistribution,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct IndexController {
|
pub struct IndexController {
|
||||||
uuid_resolver: uuid_resolver::UuidResolverHandleImpl,
|
uuid_resolver: uuid_resolver::UuidResolverHandleImpl,
|
||||||
index_handle: index_actor::IndexActorHandleImpl,
|
index_handle: index_actor::IndexActorHandleImpl,
|
||||||
@ -100,10 +108,11 @@ impl IndexController {
|
|||||||
update_handle.clone(),
|
update_handle.clone(),
|
||||||
Duration::from_secs(options.snapshot_interval_sec),
|
Duration::from_secs(options.snapshot_interval_sec),
|
||||||
options.snapshot_dir.clone(),
|
options.snapshot_dir.clone(),
|
||||||
options.db_path
|
options
|
||||||
.file_name()
|
.db_path
|
||||||
.map(|n| n.to_owned().into_string().expect("invalid path"))
|
.file_name()
|
||||||
.unwrap_or_else(|| String::from("data.ms")),
|
.map(|n| n.to_owned().into_string().expect("invalid path"))
|
||||||
|
.unwrap_or_else(|| String::from("data.ms")),
|
||||||
);
|
);
|
||||||
|
|
||||||
tokio::task::spawn(snapshot_service.run());
|
tokio::task::spawn(snapshot_service.run());
|
||||||
@ -341,6 +350,22 @@ impl IndexController {
|
|||||||
};
|
};
|
||||||
Ok(meta)
|
Ok(meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_stats(&self, uid: String) -> anyhow::Result<IndexStats> {
|
||||||
|
let uuid = self.uuid_resolver.get(uid.clone()).await?;
|
||||||
|
|
||||||
|
Ok(self.index_handle.get_index_stats(uuid).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_updates_size(&self, uid: String) -> anyhow::Result<u64> {
|
||||||
|
let uuid = self.uuid_resolver.get(uid.clone()).await?;
|
||||||
|
|
||||||
|
Ok(self.update_handle.get_size(uuid).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_uuids_size(&self) -> anyhow::Result<u64> {
|
||||||
|
Ok(self.uuid_resolver.get_size().await?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
|
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
|
||||||
|
@ -8,10 +8,11 @@ use tokio::io::{AsyncSeekExt, AsyncWriteExt};
|
|||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStoreStore};
|
|
||||||
use crate::index_controller::index_actor::IndexActorHandle;
|
use crate::index_controller::index_actor::IndexActorHandle;
|
||||||
use crate::index_controller::{get_arc_ownership_blocking, UpdateMeta, UpdateStatus};
|
use crate::index_controller::{get_arc_ownership_blocking, UpdateMeta, UpdateStatus};
|
||||||
|
|
||||||
|
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStoreStore};
|
||||||
|
|
||||||
pub struct UpdateActor<D, S, I> {
|
pub struct UpdateActor<D, S, I> {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
store: S,
|
store: S,
|
||||||
@ -72,6 +73,9 @@ where
|
|||||||
Some(Snapshot { uuid, path, ret }) => {
|
Some(Snapshot { uuid, path, ret }) => {
|
||||||
let _ = ret.send(self.handle_snapshot(uuid, path).await);
|
let _ = ret.send(self.handle_snapshot(uuid, path).await);
|
||||||
}
|
}
|
||||||
|
Some(GetSize { uuid, ret }) => {
|
||||||
|
let _ = ret.send(self.handle_get_size(uuid).await);
|
||||||
|
}
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -223,4 +227,20 @@ where
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_get_size(&self, uuid: Uuid) -> Result<u64> {
|
||||||
|
let size = match self.store.get(uuid).await? {
|
||||||
|
Some(update_store) => tokio::task::spawn_blocking(move || -> anyhow::Result<u64> {
|
||||||
|
let txn = update_store.env.read_txn()?;
|
||||||
|
|
||||||
|
update_store.get_size(&txn)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| UpdateError::Error(e.into()))?
|
||||||
|
.map_err(|e| UpdateError::Error(e.into()))?,
|
||||||
|
None => 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(size)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,11 +3,12 @@ use std::path::{Path, PathBuf};
|
|||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::index_controller::IndexActorHandle;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
MapUpdateStoreStore, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta,
|
MapUpdateStoreStore, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta,
|
||||||
UpdateMsg, UpdateStatus,
|
UpdateMsg, UpdateStatus,
|
||||||
};
|
};
|
||||||
use crate::index_controller::IndexActorHandle;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct UpdateActorHandleImpl<D> {
|
pub struct UpdateActorHandleImpl<D> {
|
||||||
@ -36,6 +37,7 @@ where
|
|||||||
Ok(Self { sender })
|
Ok(Self { sender })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl<D> UpdateActorHandle for UpdateActorHandleImpl<D>
|
impl<D> UpdateActorHandle for UpdateActorHandleImpl<D>
|
||||||
where
|
where
|
||||||
@ -43,29 +45,12 @@ where
|
|||||||
{
|
{
|
||||||
type Data = D;
|
type Data = D;
|
||||||
|
|
||||||
async fn update(
|
|
||||||
&self,
|
|
||||||
meta: UpdateMeta,
|
|
||||||
data: mpsc::Receiver<PayloadData<Self::Data>>,
|
|
||||||
uuid: Uuid,
|
|
||||||
) -> Result<UpdateStatus> {
|
|
||||||
let (ret, receiver) = oneshot::channel();
|
|
||||||
let msg = UpdateMsg::Update {
|
|
||||||
uuid,
|
|
||||||
data,
|
|
||||||
meta,
|
|
||||||
ret,
|
|
||||||
};
|
|
||||||
let _ = self.sender.send(msg).await;
|
|
||||||
receiver.await.expect("update actor killed.")
|
|
||||||
}
|
|
||||||
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
|
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
|
||||||
let (ret, receiver) = oneshot::channel();
|
let (ret, receiver) = oneshot::channel();
|
||||||
let msg = UpdateMsg::ListUpdates { uuid, ret };
|
let msg = UpdateMsg::ListUpdates { uuid, ret };
|
||||||
let _ = self.sender.send(msg).await;
|
let _ = self.sender.send(msg).await;
|
||||||
receiver.await.expect("update actor killed.")
|
receiver.await.expect("update actor killed.")
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
|
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
|
||||||
let (ret, receiver) = oneshot::channel();
|
let (ret, receiver) = oneshot::channel();
|
||||||
let msg = UpdateMsg::GetUpdate { uuid, id, ret };
|
let msg = UpdateMsg::GetUpdate { uuid, id, ret };
|
||||||
@ -93,4 +78,28 @@ where
|
|||||||
let _ = self.sender.send(msg).await;
|
let _ = self.sender.send(msg).await;
|
||||||
receiver.await.expect("update actor killed.")
|
receiver.await.expect("update actor killed.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_size(&self, uuid: Uuid) -> Result<u64> {
|
||||||
|
let (ret, receiver) = oneshot::channel();
|
||||||
|
let msg = UpdateMsg::GetSize { uuid, ret };
|
||||||
|
let _ = self.sender.send(msg).await;
|
||||||
|
receiver.await.expect("update actor killed.")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update(
|
||||||
|
&self,
|
||||||
|
meta: UpdateMeta,
|
||||||
|
data: mpsc::Receiver<PayloadData<Self::Data>>,
|
||||||
|
uuid: Uuid,
|
||||||
|
) -> Result<UpdateStatus> {
|
||||||
|
let (ret, receiver) = oneshot::channel();
|
||||||
|
let msg = UpdateMsg::Update {
|
||||||
|
uuid,
|
||||||
|
data,
|
||||||
|
meta,
|
||||||
|
ret,
|
||||||
|
};
|
||||||
|
let _ = self.sender.send(msg).await;
|
||||||
|
receiver.await.expect("update actor killed.")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,4 +34,8 @@ pub enum UpdateMsg<D> {
|
|||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
ret: oneshot::Sender<Result<()>>,
|
ret: oneshot::Sender<Result<()>>,
|
||||||
},
|
},
|
||||||
|
GetSize {
|
||||||
|
uuid: Uuid,
|
||||||
|
ret: oneshot::Sender<Result<u64>>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
@ -46,6 +46,7 @@ pub trait UpdateActorHandle {
|
|||||||
async fn delete(&self, uuid: Uuid) -> Result<()>;
|
async fn delete(&self, uuid: Uuid) -> Result<()>;
|
||||||
async fn create(&self, uuid: Uuid) -> Result<()>;
|
async fn create(&self, uuid: Uuid) -> Result<()>;
|
||||||
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
|
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
|
||||||
|
async fn get_size(&self, uuid: Uuid) -> Result<u64>;
|
||||||
async fn update(
|
async fn update(
|
||||||
&self,
|
&self,
|
||||||
meta: UpdateMeta,
|
meta: UpdateMeta,
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use std::fs::File;
|
||||||
use std::fs::{copy, create_dir_all, remove_file};
|
use std::fs::{copy, create_dir_all, remove_file};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -6,10 +7,10 @@ use heed::types::{DecodeIgnore, OwnedType, SerdeJson};
|
|||||||
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
|
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::{Mutex, RwLock};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fs::File;
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::helpers::EnvSizer;
|
||||||
use crate::index_controller::updates::*;
|
use crate::index_controller::updates::*;
|
||||||
|
|
||||||
type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
|
type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
|
||||||
@ -409,4 +410,18 @@ where
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_size(&self, txn: &heed::RoTxn) -> anyhow::Result<u64> {
|
||||||
|
let mut size = self.env.size();
|
||||||
|
|
||||||
|
for path in self.pending.iter(txn)? {
|
||||||
|
let (_, path) = path?;
|
||||||
|
|
||||||
|
if let Ok(metadata) = path.metadata() {
|
||||||
|
size += metadata.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(size)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ impl UpdateHandler {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_buidler(&self, update_id: u64) -> UpdateBuilder {
|
fn update_builder(&self, update_id: u64) -> UpdateBuilder {
|
||||||
// We prepare the update by using the update builder.
|
// We prepare the update by using the update builder.
|
||||||
let mut update_builder = UpdateBuilder::new(update_id);
|
let mut update_builder = UpdateBuilder::new(update_id);
|
||||||
if let Some(max_nb_chunks) = self.max_nb_chunks {
|
if let Some(max_nb_chunks) = self.max_nb_chunks {
|
||||||
@ -67,7 +67,7 @@ impl UpdateHandler {
|
|||||||
|
|
||||||
let update_id = meta.id();
|
let update_id = meta.id();
|
||||||
|
|
||||||
let update_builder = self.update_buidler(update_id);
|
let update_builder = self.update_builder(update_id);
|
||||||
|
|
||||||
let result = match meta.meta() {
|
let result = match meta.meta() {
|
||||||
DocumentsAddition {
|
DocumentsAddition {
|
||||||
|
@ -41,6 +41,9 @@ impl<S: UuidStore> UuidResolverActor<S> {
|
|||||||
Some(SnapshotRequest { path, ret }) => {
|
Some(SnapshotRequest { path, ret }) => {
|
||||||
let _ = ret.send(self.handle_snapshot(path).await);
|
let _ = ret.send(self.handle_snapshot(path).await);
|
||||||
}
|
}
|
||||||
|
Some(GetSize { ret }) => {
|
||||||
|
let _ = ret.send(self.handle_get_size().await);
|
||||||
|
}
|
||||||
// all senders have been dropped, need to quit.
|
// all senders have been dropped, need to quit.
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
@ -86,6 +89,10 @@ impl<S: UuidStore> UuidResolverActor<S> {
|
|||||||
self.store.insert(uid, uuid).await?;
|
self.store.insert(uid, uuid).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_get_size(&self) -> Result<u64> {
|
||||||
|
self.store.get_size().await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_index_uid_valid(uid: &str) -> bool {
|
fn is_index_uid_valid(uid: &str) -> bool {
|
||||||
|
@ -75,4 +75,13 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
|
|||||||
.await
|
.await
|
||||||
.expect("Uuid resolver actor has been killed")?)
|
.expect("Uuid resolver actor has been killed")?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_size(&self) -> Result<u64> {
|
||||||
|
let (ret, receiver) = oneshot::channel();
|
||||||
|
let msg = UuidResolveMsg::GetSize { ret };
|
||||||
|
let _ = self.sender.send(msg).await;
|
||||||
|
Ok(receiver
|
||||||
|
.await
|
||||||
|
.expect("Uuid resolver actor has been killed")?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ use tokio::sync::oneshot;
|
|||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::Result;
|
use super::Result;
|
||||||
|
|
||||||
pub enum UuidResolveMsg {
|
pub enum UuidResolveMsg {
|
||||||
Get {
|
Get {
|
||||||
uid: String,
|
uid: String,
|
||||||
@ -29,4 +30,7 @@ pub enum UuidResolveMsg {
|
|||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
ret: oneshot::Sender<Result<Vec<Uuid>>>,
|
ret: oneshot::Sender<Result<Vec<Uuid>>>,
|
||||||
},
|
},
|
||||||
|
GetSize {
|
||||||
|
ret: oneshot::Sender<Result<u64>>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,7 @@ pub trait UuidResolverHandle {
|
|||||||
async fn delete(&self, name: String) -> anyhow::Result<Uuid>;
|
async fn delete(&self, name: String) -> anyhow::Result<Uuid>;
|
||||||
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>;
|
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>;
|
||||||
async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>>;
|
async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>>;
|
||||||
|
async fn get_size(&self) -> Result<u64>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
|
@ -8,6 +8,7 @@ use heed::{
|
|||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::{Result, UuidError, UUID_STORE_SIZE};
|
use super::{Result, UuidError, UUID_STORE_SIZE};
|
||||||
|
use crate::helpers::EnvSizer;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait UuidStore {
|
pub trait UuidStore {
|
||||||
@ -19,6 +20,7 @@ pub trait UuidStore {
|
|||||||
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
|
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
|
||||||
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
|
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
|
||||||
async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>>;
|
async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>>;
|
||||||
|
async fn get_size(&self) -> Result<u64>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct HeedUuidStore {
|
pub struct HeedUuidStore {
|
||||||
@ -151,4 +153,8 @@ impl UuidStore for HeedUuidStore {
|
|||||||
})
|
})
|
||||||
.await?
|
.await?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_size(&self) -> Result<u64> {
|
||||||
|
Ok(self.env.size())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::BTreeMap;
|
||||||
|
use std::iter::FromIterator;
|
||||||
|
|
||||||
use actix_web::get;
|
use actix_web::get;
|
||||||
use actix_web::web;
|
use actix_web::web;
|
||||||
@ -6,13 +7,15 @@ use actix_web::HttpResponse;
|
|||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
|
use crate::data::Stats;
|
||||||
use crate::error::ResponseError;
|
use crate::error::ResponseError;
|
||||||
use crate::helpers::Authentication;
|
use crate::helpers::Authentication;
|
||||||
|
use crate::index_controller::IndexStats;
|
||||||
use crate::routes::IndexParam;
|
use crate::routes::IndexParam;
|
||||||
use crate::Data;
|
use crate::Data;
|
||||||
|
|
||||||
pub fn services(cfg: &mut web::ServiceConfig) {
|
pub fn services(cfg: &mut web::ServiceConfig) {
|
||||||
cfg.service(index_stats)
|
cfg.service(get_index_stats)
|
||||||
.service(get_stats)
|
.service(get_stats)
|
||||||
.service(get_version);
|
.service(get_version);
|
||||||
}
|
}
|
||||||
@ -22,28 +25,56 @@ pub fn services(cfg: &mut web::ServiceConfig) {
|
|||||||
struct IndexStatsResponse {
|
struct IndexStatsResponse {
|
||||||
number_of_documents: u64,
|
number_of_documents: u64,
|
||||||
is_indexing: bool,
|
is_indexing: bool,
|
||||||
fields_distribution: BTreeMap<String, usize>,
|
fields_distribution: BTreeMap<String, u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<IndexStats> for IndexStatsResponse {
|
||||||
|
fn from(stats: IndexStats) -> Self {
|
||||||
|
Self {
|
||||||
|
number_of_documents: stats.number_of_documents,
|
||||||
|
is_indexing: stats.is_indexing,
|
||||||
|
fields_distribution: BTreeMap::from_iter(stats.fields_distribution.into_iter()),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[get("/indexes/{index_uid}/stats", wrap = "Authentication::Private")]
|
#[get("/indexes/{index_uid}/stats", wrap = "Authentication::Private")]
|
||||||
async fn index_stats(
|
async fn get_index_stats(
|
||||||
_data: web::Data<Data>,
|
data: web::Data<Data>,
|
||||||
_path: web::Path<IndexParam>,
|
path: web::Path<IndexParam>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
todo!()
|
let response: IndexStatsResponse = data.get_index_stats(path.index_uid.clone()).await?.into();
|
||||||
|
|
||||||
|
Ok(HttpResponse::Ok().json(response))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
struct StatsResult {
|
struct StatsResponse {
|
||||||
database_size: u64,
|
database_size: u64,
|
||||||
last_update: Option<DateTime<Utc>>,
|
last_update: Option<DateTime<Utc>>,
|
||||||
indexes: HashMap<String, IndexStatsResponse>,
|
indexes: BTreeMap<String, IndexStatsResponse>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Stats> for StatsResponse {
|
||||||
|
fn from(stats: Stats) -> Self {
|
||||||
|
Self {
|
||||||
|
database_size: stats.database_size,
|
||||||
|
last_update: stats.last_update,
|
||||||
|
indexes: stats
|
||||||
|
.indexes
|
||||||
|
.into_iter()
|
||||||
|
.map(|(uid, index_stats)| (uid, index_stats.into()))
|
||||||
|
.collect(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[get("/stats", wrap = "Authentication::Private")]
|
#[get("/stats", wrap = "Authentication::Private")]
|
||||||
async fn get_stats(_data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
|
async fn get_stats(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
|
||||||
todo!()
|
let response: StatsResponse = data.get_stats().await?.into();
|
||||||
|
|
||||||
|
Ok(HttpResponse::Ok().json(response))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
@ -58,11 +89,11 @@ struct VersionResponse {
|
|||||||
async fn get_version() -> HttpResponse {
|
async fn get_version() -> HttpResponse {
|
||||||
let commit_sha = match option_env!("COMMIT_SHA") {
|
let commit_sha = match option_env!("COMMIT_SHA") {
|
||||||
Some("") | None => env!("VERGEN_SHA"),
|
Some("") | None => env!("VERGEN_SHA"),
|
||||||
Some(commit_sha) => commit_sha
|
Some(commit_sha) => commit_sha,
|
||||||
};
|
};
|
||||||
let commit_date = match option_env!("COMMIT_DATE") {
|
let commit_date = match option_env!("COMMIT_DATE") {
|
||||||
Some("") | None => env!("VERGEN_COMMIT_DATE"),
|
Some("") | None => env!("VERGEN_COMMIT_DATE"),
|
||||||
Some(commit_date) => commit_date
|
Some(commit_date) => commit_date,
|
||||||
};
|
};
|
||||||
|
|
||||||
HttpResponse::Ok().json(VersionResponse {
|
HttpResponse::Ok().json(VersionResponse {
|
||||||
|
@ -161,6 +161,11 @@ impl Index<'_> {
|
|||||||
let url = format!("/indexes/{}/settings", self.uid);
|
let url = format!("/indexes/{}/settings", self.uid);
|
||||||
self.service.delete(url).await
|
self.service.delete(url).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn stats(&self) -> (Value, StatusCode) {
|
||||||
|
let url = format!("/indexes/{}/stats", self.uid);
|
||||||
|
self.service.get(url).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct GetDocumentOptions;
|
pub struct GetDocumentOptions;
|
||||||
|
@ -58,6 +58,10 @@ impl Server {
|
|||||||
pub async fn version(&self) -> (Value, StatusCode) {
|
pub async fn version(&self) -> (Value, StatusCode) {
|
||||||
self.service.get("/version").await
|
self.service.get("/version").await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn stats(&self) -> (Value, StatusCode) {
|
||||||
|
self.service.get("/stats").await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn default_settings(dir: impl AsRef<Path>) -> Opt {
|
pub fn default_settings(dir: impl AsRef<Path>) -> Opt {
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
mod create_index;
|
mod create_index;
|
||||||
mod delete_index;
|
mod delete_index;
|
||||||
mod get_index;
|
mod get_index;
|
||||||
|
mod stats;
|
||||||
mod update_index;
|
mod update_index;
|
||||||
|
53
meilisearch-http/tests/index/stats.rs
Normal file
53
meilisearch-http/tests/index/stats.rs
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use crate::common::Server;
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn stats() {
|
||||||
|
let server = Server::new().await;
|
||||||
|
let index = server.index("test");
|
||||||
|
let (_, code) = index.create(Some("id")).await;
|
||||||
|
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
|
||||||
|
let (response, code) = index.stats().await;
|
||||||
|
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
assert_eq!(response["numberOfDocuments"], 0);
|
||||||
|
assert_eq!(response["isIndexing"], false);
|
||||||
|
assert!(response["fieldsDistribution"]
|
||||||
|
.as_object()
|
||||||
|
.unwrap()
|
||||||
|
.is_empty());
|
||||||
|
|
||||||
|
let documents = json!([
|
||||||
|
{
|
||||||
|
"id": 1,
|
||||||
|
"name": "Alexey",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": 2,
|
||||||
|
"age": 45,
|
||||||
|
}
|
||||||
|
]);
|
||||||
|
|
||||||
|
let (response, code) = index.add_documents(documents, None).await;
|
||||||
|
assert_eq!(code, 202);
|
||||||
|
assert_eq!(response["updateId"], 0);
|
||||||
|
|
||||||
|
let (response, code) = index.stats().await;
|
||||||
|
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
assert_eq!(response["isIndexing"], true);
|
||||||
|
|
||||||
|
index.wait_update_id(0).await;
|
||||||
|
|
||||||
|
let (response, code) = index.stats().await;
|
||||||
|
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
assert_eq!(response["numberOfDocuments"], 2);
|
||||||
|
assert_eq!(response["isIndexing"], false);
|
||||||
|
assert_eq!(response["fieldsDistribution"]["id"], 2);
|
||||||
|
assert_eq!(response["fieldsDistribution"]["name"], 1);
|
||||||
|
assert_eq!(response["fieldsDistribution"]["age"], 1);
|
||||||
|
}
|
@ -1,3 +1,5 @@
|
|||||||
|
use serde_json::json;
|
||||||
|
|
||||||
use crate::common::Server;
|
use crate::common::Server;
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
@ -19,3 +21,56 @@ async fn test_healthyness() {
|
|||||||
assert_eq!(status_code, 200);
|
assert_eq!(status_code, 200);
|
||||||
assert_eq!(response["status"], "available");
|
assert_eq!(response["status"], "available");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn stats() {
|
||||||
|
let server = Server::new().await;
|
||||||
|
let index = server.index("test");
|
||||||
|
let (_, code) = index.create(Some("id")).await;
|
||||||
|
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
|
||||||
|
let (response, code) = server.stats().await;
|
||||||
|
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
assert!(response.get("databaseSize").is_some());
|
||||||
|
assert!(response.get("lastUpdate").is_some());
|
||||||
|
assert!(response["indexes"].get("test").is_some());
|
||||||
|
assert_eq!(response["indexes"]["test"]["numberOfDocuments"], 0);
|
||||||
|
assert_eq!(response["indexes"]["test"]["isIndexing"], false);
|
||||||
|
|
||||||
|
let last_update = response["lastUpdate"].as_str().unwrap();
|
||||||
|
|
||||||
|
let documents = json!([
|
||||||
|
{
|
||||||
|
"id": 1,
|
||||||
|
"name": "Alexey",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": 2,
|
||||||
|
"age": 45,
|
||||||
|
}
|
||||||
|
]);
|
||||||
|
|
||||||
|
let (response, code) = index.add_documents(documents, None).await;
|
||||||
|
assert_eq!(code, 202);
|
||||||
|
assert_eq!(response["updateId"], 0);
|
||||||
|
|
||||||
|
let (response, code) = server.stats().await;
|
||||||
|
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
assert_eq!(response["indexes"]["test"]["isIndexing"], true);
|
||||||
|
|
||||||
|
index.wait_update_id(0).await;
|
||||||
|
|
||||||
|
let (response, code) = server.stats().await;
|
||||||
|
|
||||||
|
assert_eq!(code, 200);
|
||||||
|
assert!(response["databaseSize"].as_u64().unwrap() > 0);
|
||||||
|
assert!(response["lastUpdate"].as_str().unwrap() > last_update);
|
||||||
|
assert_eq!(response["indexes"]["test"]["numberOfDocuments"], 2);
|
||||||
|
assert_eq!(response["indexes"]["test"]["isIndexing"], false);
|
||||||
|
assert_eq!(response["indexes"]["test"]["fieldsDistribution"]["id"], 2);
|
||||||
|
assert_eq!(response["indexes"]["test"]["fieldsDistribution"]["name"], 1);
|
||||||
|
assert_eq!(response["indexes"]["test"]["fieldsDistribution"]["age"], 1);
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user