diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index b3261cbf5..083f39320 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -5,13 +5,11 @@ use file_store::{File, FileStore}; use index::Index; use milli::update::IndexerConfig; use synchronoise::SignalEvent; -use tempfile::TempDir; use time::OffsetDateTime; use uuid::Uuid; use std::path::PathBuf; -use std::sync::Arc; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use milli::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use milli::heed::{self, Database, Env}; @@ -124,18 +122,11 @@ pub struct IndexScheduler { /// Get a signal when a batch needs to be processed. pub(crate) wake_up: Arc, - // ================= tests - // The next entries are dedicated to the tests. - // It helps us to stop the scheduler and check what it is doing efficiently - /// Provide a way to break in multiple part of the scheduler. - #[cfg(test)] - test_breakpoint_rcv: crossbeam::channel::Receiver, + // ================= test + /// The next entry is dedicated to the tests. + /// It provide a way to break in multiple part of the scheduler. #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender, - - /// Hold a reference to its own tempdir to delete itself once dropped. - #[cfg(test)] - test_tempdir: Arc, } #[cfg(test)] @@ -147,7 +138,6 @@ enum Breakpoint { } impl IndexScheduler { - #[cfg(not(test))] pub fn new( tasks_path: PathBuf, update_file_path: PathBuf, @@ -166,6 +156,8 @@ impl IndexScheduler { let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new()); let file_store = FileStore::new(&update_file_path)?; + // allow unreachable_code to get rids of the warning in the case of a test build. + #[allow(unreachable_code)] Ok(Self { // by default there is no processing tasks processing_tasks: Arc::new(RwLock::new(processing_tasks)), @@ -178,6 +170,11 @@ impl IndexScheduler { env, // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things wake_up: Arc::new(SignalEvent::auto(true)), + + #[cfg(test)] + test_breakpoint_sdr: panic!( + "Can't use `IndexScheduler::new` in the tests. See `IndexScheduler::test`." + ), }) } @@ -403,34 +400,15 @@ impl IndexScheduler { pub fn notify(&self) { self.wake_up.signal() } - - /// /!\ Used only for tests purposes. - /// Wait until the provided breakpoint is reached. - #[cfg(test)] - fn test_wait_till(&self, breakpoint: Breakpoint) { - self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint); - } - - /// /!\ Used only for tests purposes. - /// Wait until the provided breakpoint is reached. - #[cfg(test)] - fn test_next_breakpoint(&self) -> Breakpoint { - self.test_breakpoint_rcv.recv().unwrap() - } - - /// /!\ Used only for tests purposes. - /// The scheduler will not stop on breakpoints. - #[cfg(test)] - fn test_dont_block(&self) { - // unroll and ignore all the state the scheduler is going to send us. - self.test_breakpoint_rcv.iter().last(); - } } #[cfg(test)] mod tests { + use std::sync::Arc; + use big_s::S; use insta::*; + use tempfile::TempDir; use uuid::Uuid; use crate::assert_smol_debug_snapshot; @@ -438,11 +416,11 @@ mod tests { use super::*; impl IndexScheduler { - pub fn test() -> Self { - let dir = TempDir::new().unwrap(); - let tasks_path = dir.path().join("db_path"); - let update_file_path = dir.path().join("file_store"); - let indexes_path = dir.path().join("indexes"); + pub fn test() -> (Self, IndexSchedulerHandle) { + let tempdir = TempDir::new().unwrap(); + let tasks_path = tempdir.path().join("db_path"); + let update_file_path = tempdir.path().join("file_store"); + let indexes_path = tempdir.path().join("indexes"); let index_size = 1024 * 1024; let indexer_config = IndexerConfig::default(); @@ -459,7 +437,7 @@ mod tests { let (sender, receiver) = crossbeam::channel::bounded(0); - Self { + let index_scheduler = Self { // by default there is no processing tasks processing_tasks: Arc::new(RwLock::new(processing_tasks)), file_store, @@ -473,16 +451,43 @@ mod tests { // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things wake_up: Arc::new(SignalEvent::auto(true)), - test_breakpoint_rcv: receiver, test_breakpoint_sdr: sender, - test_tempdir: Arc::new(dir), - } + }; + let index_scheduler_handle = IndexSchedulerHandle { + _tempdir: tempdir, + test_breakpoint_rcv: receiver, + }; + + (index_scheduler, index_scheduler_handle) + } + } + + pub struct IndexSchedulerHandle { + _tempdir: TempDir, + test_breakpoint_rcv: crossbeam::channel::Receiver, + } + + impl IndexSchedulerHandle { + /// Wait until the provided breakpoint is reached. + fn test_wait_till(&self, breakpoint: Breakpoint) { + self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint); + } + + /// Wait until the provided breakpoint is reached. + fn test_next_breakpoint(&self) -> Breakpoint { + self.test_breakpoint_rcv.recv().unwrap() + } + + /// The scheduler will not stop on breakpoints. + fn test_dont_block(&self) { + // unroll and ignore all the state the scheduler is going to send us. + self.test_breakpoint_rcv.iter().last(); } } #[test] fn register() { - let index_scheduler = IndexScheduler::test(); + let (index_scheduler, _handle) = IndexScheduler::test(); let kinds = [ KindWithContent::IndexCreation { index_uid: S("catto"), @@ -556,7 +561,7 @@ mod tests { #[test] fn document_addition() { - let index_scheduler = IndexScheduler::test(); + let (index_scheduler, handle) = IndexScheduler::test(); let content = r#" { @@ -596,7 +601,7 @@ mod tests { let t_index_scheduler = index_scheduler.clone(); std::thread::spawn(move || t_index_scheduler.tick().unwrap()); - index_scheduler.test_wait_till(Breakpoint::BatchCreated); + handle.test_wait_till(Breakpoint::BatchCreated); // Once the task has started being batched it should be marked as processing let task = index_scheduler.get_tasks(Query::default()).unwrap(); @@ -615,7 +620,7 @@ mod tests { ] "###); assert_eq!( - index_scheduler.test_next_breakpoint(), + handle.test_next_breakpoint(), Breakpoint::BatchProcessed );