Adds new metrics to prometheus: SearchQueue size, SearchQueue searches running, and Search Queue searches waiting.

This commit is contained in:
Pedro Turik Firmino 2024-11-06 15:37:16 -03:00
parent 6b67f9fc4c
commit 8b95f5ccc6
3 changed files with 59 additions and 2 deletions

View File

@ -49,4 +49,18 @@ lazy_static! {
pub static ref MEILISEARCH_IS_INDEXING: IntGauge = pub static ref MEILISEARCH_IS_INDEXING: IntGauge =
register_int_gauge!(opts!("meilisearch_is_indexing", "Meilisearch Is Indexing")) register_int_gauge!(opts!("meilisearch_is_indexing", "Meilisearch Is Indexing"))
.expect("Can't create a metric"); .expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCH_QUEUE_SIZE: IntGauge = register_int_gauge!(opts!(
"meilisearch_search_queue_size",
"Meilisearch Search Queue Size"
))
.expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCHES_RUNNING: IntGauge =
register_int_gauge!(opts!("meilisearch_searches_running", "Meilisearch Searches Running"))
.expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED: IntGauge =
register_int_gauge!(opts!(
"meilisearch_searches_waiting_to_be_processed",
"Meilisearch Searches Being Processed"
))
.expect("Can't create a metric");
} }

View File

@ -10,6 +10,7 @@ use prometheus::{Encoder, TextEncoder};
use crate::extractors::authentication::policies::ActionPolicy; use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::{AuthenticationError, GuardedData}; use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::routes::create_all_stats; use crate::routes::create_all_stats;
use crate::search_queue::SearchQueue;
pub fn configure(config: &mut web::ServiceConfig) { pub fn configure(config: &mut web::ServiceConfig) {
config.service(web::resource("").route(web::get().to(get_metrics))); config.service(web::resource("").route(web::get().to(get_metrics)));
@ -18,6 +19,7 @@ pub fn configure(config: &mut web::ServiceConfig) {
pub async fn get_metrics( pub async fn get_metrics(
index_scheduler: GuardedData<ActionPolicy<{ actions::METRICS_GET }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::METRICS_GET }>, Data<IndexScheduler>>,
auth_controller: Data<AuthController>, auth_controller: Data<AuthController>,
search_queue: web::Data<SearchQueue>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
index_scheduler.features().check_metrics()?; index_scheduler.features().check_metrics()?;
let auth_filters = index_scheduler.filters(); let auth_filters = index_scheduler.filters();
@ -35,6 +37,11 @@ pub async fn get_metrics(
crate::metrics::MEILISEARCH_USED_DB_SIZE_BYTES.set(response.used_database_size as i64); crate::metrics::MEILISEARCH_USED_DB_SIZE_BYTES.set(response.used_database_size as i64);
crate::metrics::MEILISEARCH_INDEX_COUNT.set(response.indexes.len() as i64); crate::metrics::MEILISEARCH_INDEX_COUNT.set(response.indexes.len() as i64);
crate::metrics::MEILISEARCH_SEARCH_QUEUE_SIZE.set(search_queue.capacity() as i64);
crate::metrics::MEILISEARCH_SEARCHES_RUNNING.set(search_queue.searches_running() as i64);
crate::metrics::MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED
.set(search_queue.searches_waiting() as i64);
for (index, value) in response.indexes.iter() { for (index, value) in response.indexes.iter() {
crate::metrics::MEILISEARCH_INDEX_DOCS_COUNT crate::metrics::MEILISEARCH_INDEX_DOCS_COUNT
.with_label_values(&[index]) .with_label_values(&[index])

View File

@ -18,6 +18,8 @@
//! And should drop the Permit only once you have freed all the RAM consumed by the method. //! And should drop the Permit only once you have freed all the RAM consumed by the method.
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use rand::rngs::StdRng; use rand::rngs::StdRng;
@ -33,6 +35,8 @@ pub struct SearchQueue {
/// If we have waited longer than this to get a permit, we should abort the search request entirely. /// If we have waited longer than this to get a permit, we should abort the search request entirely.
/// The client probably already closed the connection, but we have no way to find out. /// The client probably already closed the connection, but we have no way to find out.
time_to_abort: Duration, time_to_abort: Duration,
searches_running: Arc<AtomicUsize>,
searches_waiting_to_be_processed: Arc<AtomicUsize>,
} }
/// You should only run search requests while holding this permit. /// You should only run search requests while holding this permit.
@ -68,14 +72,41 @@ impl SearchQueue {
// so let's not allocate any RAM and keep a capacity of 1. // so let's not allocate any RAM and keep a capacity of 1.
let (sender, receiver) = mpsc::channel(1); let (sender, receiver) = mpsc::channel(1);
tokio::task::spawn(Self::run(capacity, paralellism, receiver)); let instance = Self {
Self { sender, capacity, time_to_abort: Duration::from_secs(60) } sender,
capacity,
time_to_abort: Duration::from_secs(60),
searches_running: Default::default(),
searches_waiting_to_be_processed: Default::default(),
};
tokio::task::spawn(Self::run(
capacity,
paralellism,
receiver,
Arc::clone(&instance.searches_running),
Arc::clone(&instance.searches_waiting_to_be_processed),
));
instance
} }
pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self { pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self {
Self { time_to_abort, ..self } Self { time_to_abort, ..self }
} }
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn searches_running(&self) -> usize {
self.searches_running.load(Ordering::Relaxed)
}
pub fn searches_waiting(&self) -> usize {
self.searches_waiting_to_be_processed.load(Ordering::Relaxed)
}
/// This function is the main loop, it's in charge on scheduling which search request should execute first and /// This function is the main loop, it's in charge on scheduling which search request should execute first and
/// how many should executes at the same time. /// how many should executes at the same time.
/// ///
@ -84,6 +115,8 @@ impl SearchQueue {
capacity: usize, capacity: usize,
parallelism: NonZeroUsize, parallelism: NonZeroUsize,
mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>, mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>,
metric_searches_running: Arc<AtomicUsize>,
metric_searches_waiting: Arc<AtomicUsize>,
) { ) {
let mut queue: Vec<oneshot::Sender<Permit>> = Default::default(); let mut queue: Vec<oneshot::Sender<Permit>> = Default::default();
let mut rng: StdRng = StdRng::from_entropy(); let mut rng: StdRng = StdRng::from_entropy();
@ -133,6 +166,9 @@ impl SearchQueue {
queue.push(search_request); queue.push(search_request);
}, },
} }
metric_searches_running.store(searches_running, Ordering::Relaxed);
metric_searches_waiting.store(queue.len(), Ordering::Relaxed);
} }
} }