Wrap the IndexScheduler fields into an inner struct

This commit is contained in:
Clément Renault 2023-08-31 10:36:33 +02:00
parent e257710961
commit 95a011af13
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 17 additions and 40 deletions

1
Cargo.lock generated
View File

@ -1940,6 +1940,7 @@ dependencies = [
"meilisearch-types", "meilisearch-types",
"nelson", "nelson",
"page_size 0.5.0", "page_size 0.5.0",
"parking_lot",
"puffin", "puffin",
"roaring", "roaring",
"serde", "serde",

View File

@ -33,6 +33,7 @@ time = { version = "0.3.20", features = ["serde-well-known", "formatting", "pars
uuid = { version = "1.3.1", features = ["serde", "v4"] } uuid = { version = "1.3.1", features = ["serde", "v4"] }
tokio = { version = "1.27.0", features = ["full"] } tokio = { version = "1.27.0", features = ["full"] }
zookeeper = "0.8.0" zookeeper = "0.8.0"
parking_lot = "0.12.1"
[dev-dependencies] [dev-dependencies]
big_s = "1.0.2" big_s = "1.0.2"

View File

@ -37,7 +37,7 @@ use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@ -53,6 +53,7 @@ use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use synchronoise::SignalEvent; use synchronoise::SignalEvent;
use time::format_description::well_known::Rfc3339; use time::format_description::well_known::Rfc3339;
@ -279,7 +280,11 @@ pub struct IndexSchedulerOptions {
/// Structure which holds meilisearch's indexes and schedules the tasks /// Structure which holds meilisearch's indexes and schedules the tasks
/// to be performed on them. /// to be performed on them.
pub struct IndexScheduler { #[derive(Clone)]
pub struct IndexScheduler(Arc<RwLock<Option<IndexScheduler>>>);
/// This is the internal structure that keeps the indexes alive.
struct IndexSchedulerInner {
/// The LMDB environment which the DBs are associated with. /// The LMDB environment which the DBs are associated with.
pub(crate) env: Env, pub(crate) env: Env,
@ -365,41 +370,6 @@ pub struct IndexScheduler {
run_loop_iteration: Arc<RwLock<usize>>, run_loop_iteration: Arc<RwLock<usize>>,
} }
impl IndexScheduler {
fn private_clone(&self) -> IndexScheduler {
IndexScheduler {
env: self.env.clone(),
must_stop_processing: self.must_stop_processing.clone(),
processing_tasks: self.processing_tasks.clone(),
file_store: self.file_store.clone(),
all_tasks: self.all_tasks,
status: self.status,
kind: self.kind,
index_tasks: self.index_tasks,
canceled_by: self.canceled_by,
enqueued_at: self.enqueued_at,
started_at: self.started_at,
finished_at: self.finished_at,
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
max_number_of_tasks: self.max_number_of_tasks,
snapshots_path: self.snapshots_path.clone(),
dumps_path: self.dumps_path.clone(),
auth_path: self.auth_path.clone(),
zookeeper: self.zookeeper.clone(),
version_file_path: self.version_file_path.clone(),
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
#[cfg(test)]
planned_failures: self.planned_failures.clone(),
#[cfg(test)]
run_loop_iteration: self.run_loop_iteration.clone(),
features: self.features.clone(),
}
}
}
impl IndexScheduler { impl IndexScheduler {
/// Create an index scheduler and start its run loop. /// Create an index scheduler and start its run loop.
pub fn new( pub fn new(
@ -454,7 +424,7 @@ impl IndexScheduler {
wtxn.commit()?; wtxn.commit()?;
// allow unreachable_code to get rids of the warning in the case of a test build. // allow unreachable_code to get rids of the warning in the case of a test build.
let this = Self { let inner = IndexSchedulerInner {
must_stop_processing: MustStopProcessing::default(), must_stop_processing: MustStopProcessing::default(),
processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())), processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())),
file_store, file_store,
@ -495,7 +465,7 @@ impl IndexScheduler {
}; };
// initialize the directories we need to process batches. // initialize the directories we need to process batches.
if let Some(zookeeper) = &this.zookeeper { if let Some(zookeeper) = &inner.zookeeper {
match zookeeper.create( match zookeeper.create(
"/election", "/election",
vec![], vec![],
@ -517,11 +487,16 @@ impl IndexScheduler {
} }
} }
let this = IndexScheduler(Arc::new(RwLock::new(inner)));
this.run(); this.run();
Ok(this) Ok(this)
} }
/// Returns a read lock of the `IndexScheduler` inner structure.
pub fn inner(&self) -> MappedRwLockReadGuard<IndexSchedulerInner> {
RwLockReadGuard::map(self.0.read(), |opt| opt.unwrap())
}
/// Return `Ok(())` if the index scheduler is able to access one of its database. /// Return `Ok(())` if the index scheduler is able to access one of its database.
pub fn health(&self) -> Result<()> { pub fn health(&self) -> Result<()> {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;