diff --git a/Cargo.lock b/Cargo.lock index b20d19ae2..6cd41aae7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1940,6 +1940,7 @@ dependencies = [ "meilisearch-types", "nelson", "page_size 0.5.0", + "parking_lot", "puffin", "roaring", "serde", diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index 6beea99eb..a327b4386 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -33,6 +33,7 @@ time = { version = "0.3.20", features = ["serde-well-known", "formatting", "pars uuid = { version = "1.3.1", features = ["serde", "v4"] } tokio = { version = "1.27.0", features = ["full"] } zookeeper = "0.8.0" +parking_lot = "0.12.1" [dev-dependencies] big_s = "1.0.2" diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 45a1eb481..57f45b5c3 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -37,7 +37,7 @@ use std::ops::{Bound, RangeBounds}; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread; use std::time::Duration; @@ -53,6 +53,7 @@ use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; +use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; use roaring::RoaringBitmap; use synchronoise::SignalEvent; use time::format_description::well_known::Rfc3339; @@ -279,7 +280,11 @@ pub struct IndexSchedulerOptions { /// Structure which holds meilisearch's indexes and schedules the tasks /// to be performed on them. -pub struct IndexScheduler { +#[derive(Clone)] +pub struct IndexScheduler(Arc>>); + +/// This is the internal structure that keeps the indexes alive. +struct IndexSchedulerInner { /// The LMDB environment which the DBs are associated with. pub(crate) env: Env, @@ -365,41 +370,6 @@ pub struct IndexScheduler { run_loop_iteration: Arc>, } -impl IndexScheduler { - fn private_clone(&self) -> IndexScheduler { - IndexScheduler { - env: self.env.clone(), - must_stop_processing: self.must_stop_processing.clone(), - processing_tasks: self.processing_tasks.clone(), - file_store: self.file_store.clone(), - all_tasks: self.all_tasks, - status: self.status, - kind: self.kind, - index_tasks: self.index_tasks, - canceled_by: self.canceled_by, - enqueued_at: self.enqueued_at, - started_at: self.started_at, - finished_at: self.finished_at, - index_mapper: self.index_mapper.clone(), - wake_up: self.wake_up.clone(), - autobatching_enabled: self.autobatching_enabled, - max_number_of_tasks: self.max_number_of_tasks, - snapshots_path: self.snapshots_path.clone(), - dumps_path: self.dumps_path.clone(), - auth_path: self.auth_path.clone(), - zookeeper: self.zookeeper.clone(), - version_file_path: self.version_file_path.clone(), - #[cfg(test)] - test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), - #[cfg(test)] - planned_failures: self.planned_failures.clone(), - #[cfg(test)] - run_loop_iteration: self.run_loop_iteration.clone(), - features: self.features.clone(), - } - } -} - impl IndexScheduler { /// Create an index scheduler and start its run loop. pub fn new( @@ -454,7 +424,7 @@ impl IndexScheduler { wtxn.commit()?; // allow unreachable_code to get rids of the warning in the case of a test build. - let this = Self { + let inner = IndexSchedulerInner { must_stop_processing: MustStopProcessing::default(), processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())), file_store, @@ -495,7 +465,7 @@ impl IndexScheduler { }; // initialize the directories we need to process batches. - if let Some(zookeeper) = &this.zookeeper { + if let Some(zookeeper) = &inner.zookeeper { match zookeeper.create( "/election", vec![], @@ -517,11 +487,16 @@ impl IndexScheduler { } } + let this = IndexScheduler(Arc::new(RwLock::new(inner))); this.run(); - Ok(this) } + /// Returns a read lock of the `IndexScheduler` inner structure. + pub fn inner(&self) -> MappedRwLockReadGuard { + RwLockReadGuard::map(self.0.read(), |opt| opt.unwrap()) + } + /// Return `Ok(())` if the index scheduler is able to access one of its database. pub fn health(&self) -> Result<()> { let rtxn = self.env.read_txn()?;