diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index c43be62d0..63ad8abbc 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -96,7 +96,6 @@ pub mod db_name { /// This module is responsible for two things; /// 1. Resolve the name of the indexes. /// 2. Schedule the tasks. -#[derive(Clone)] pub struct IndexScheduler { /// The list of tasks currently processing and their starting date. pub(crate) processing_tasks: Arc>, @@ -131,7 +130,7 @@ pub struct IndexScheduler { #[cfg(test)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum Breakpoint { +pub enum Breakpoint { Start, BatchCreated, BatchProcessed, @@ -144,6 +143,7 @@ impl IndexScheduler { indexes_path: PathBuf, index_size: usize, indexer_config: IndexerConfig, + #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender, ) -> Result { std::fs::create_dir_all(&tasks_path)?; std::fs::create_dir_all(&update_file_path)?; @@ -157,8 +157,7 @@ impl IndexScheduler { let file_store = FileStore::new(&update_file_path)?; // allow unreachable_code to get rids of the warning in the case of a test build. - #[allow(unreachable_code)] - Ok(Self { + let this = Self { // by default there is no processing tasks processing_tasks: Arc::new(RwLock::new(processing_tasks)), file_store, @@ -172,10 +171,39 @@ impl IndexScheduler { wake_up: Arc::new(SignalEvent::auto(true)), #[cfg(test)] - test_breakpoint_sdr: panic!( - "Can't use `IndexScheduler::new` in the tests. See `IndexScheduler::test`." - ), - }) + test_breakpoint_sdr, + }; + + this.run(); + Ok(this) + } + + /// This function will execute in a different thread and must be called only once. + fn run(&self) { + let run = Self { + processing_tasks: self.processing_tasks.clone(), + file_store: self.file_store.clone(), + env: self.env.clone(), + all_tasks: self.all_tasks.clone(), + status: self.status.clone(), + kind: self.kind.clone(), + index_tasks: self.index_tasks.clone(), + index_mapper: self.index_mapper.clone(), + wake_up: self.wake_up.clone(), + + #[cfg(test)] + test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), + }; + + std::thread::spawn(move || loop { + println!("started running"); + run.wake_up.wait(); + + match run.tick() { + Ok(()) => (), + Err(e) => log::error!("{}", e), + } + }); } /// Return the index corresponding to the name. If it wasn't opened before @@ -313,18 +341,6 @@ impl IndexScheduler { Ok(self.file_store.delete(uuid)?) } - /// This worker function must be run in a different thread and must be run only once. - pub fn run(&self) -> ! { - loop { - self.wake_up.wait(); - - match self.tick() { - Ok(()) => (), - Err(e) => log::error!("{}", e), - } - } - } - /// Create and execute and store the result of one batch of registered tasks. fn tick(&self) -> Result<()> { #[cfg(test)] @@ -401,8 +417,6 @@ impl IndexScheduler { #[cfg(test)] mod tests { - use std::sync::Arc; - use big_s::S; use insta::*; use tempfile::TempDir; @@ -415,41 +429,18 @@ mod tests { impl IndexScheduler { pub fn test() -> (Self, IndexSchedulerHandle) { let tempdir = TempDir::new().unwrap(); - let tasks_path = tempdir.path().join("db_path"); - let update_file_path = tempdir.path().join("file_store"); - let indexes_path = tempdir.path().join("indexes"); - let index_size = 1024 * 1024; - let indexer_config = IndexerConfig::default(); - - std::fs::create_dir_all(&tasks_path).unwrap(); - std::fs::create_dir_all(&update_file_path).unwrap(); - std::fs::create_dir_all(&indexes_path).unwrap(); - - let mut options = heed::EnvOpenOptions::new(); - options.max_dbs(6); - - let env = options.open(tasks_path).unwrap(); - let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new()); - let file_store = FileStore::new(&update_file_path).unwrap(); - let (sender, receiver) = crossbeam::channel::bounded(0); - let index_scheduler = Self { - // by default there is no processing tasks - processing_tasks: Arc::new(RwLock::new(processing_tasks)), - file_store, - all_tasks: env.create_database(Some(db_name::ALL_TASKS)).unwrap(), - status: env.create_database(Some(db_name::STATUS)).unwrap(), - kind: env.create_database(Some(db_name::KIND)).unwrap(), - index_tasks: env.create_database(Some(db_name::INDEX_TASKS)).unwrap(), - index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config) - .unwrap(), - env, - // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things - wake_up: Arc::new(SignalEvent::auto(true)), + let index_scheduler = Self::new( + tempdir.path().join("db_path"), + tempdir.path().join("file_store"), + tempdir.path().join("indexes"), + 1024 * 1024, + IndexerConfig::default(), + sender, + ) + .unwrap(); - test_breakpoint_sdr: sender, - }; let index_scheduler_handle = IndexSchedulerHandle { _tempdir: tempdir, test_breakpoint_rcv: receiver, @@ -466,25 +457,29 @@ mod tests { impl IndexSchedulerHandle { /// Wait until the provided breakpoint is reached. - fn test_wait_till(&self, breakpoint: Breakpoint) { + fn wait_till(&self, breakpoint: Breakpoint) { self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint); } /// Wait until the provided breakpoint is reached. - fn test_next_breakpoint(&self) -> Breakpoint { + fn next_breakpoint(&self) -> Breakpoint { self.test_breakpoint_rcv.recv().unwrap() } - /// The scheduler will not stop on breakpoints. - fn test_dont_block(&self) { - // unroll and ignore all the state the scheduler is going to send us. - self.test_breakpoint_rcv.iter().last(); + /// The scheduler will not stop on breakpoints anymore. + fn dont_block(self) { + std::thread::spawn(move || loop { + // unroll and ignore all the state the scheduler is going to send us. + self.test_breakpoint_rcv.iter().last(); + }); } } #[test] fn register() { - let (index_scheduler, _handle) = IndexScheduler::test(); + let (index_scheduler, handle) = IndexScheduler::test(); + handle.dont_block(); + let kinds = [ KindWithContent::IndexCreation { index_uid: S("catto"), @@ -567,13 +562,14 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file().unwrap(); - document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = + document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap(); index_scheduler .register(KindWithContent::DocumentAddition { index_uid: S("doggos"), primary_key: Some(S("id")), content_file: uuid, - documents_count: 100, + documents_count, allow_index_creation: true, }) .unwrap(); @@ -595,10 +591,7 @@ mod tests { ] "###); - let t_index_scheduler = index_scheduler.clone(); - std::thread::spawn(move || t_index_scheduler.tick().unwrap()); - - handle.test_wait_till(Breakpoint::BatchCreated); + handle.wait_till(Breakpoint::BatchCreated); // Once the task has started being batched it should be marked as processing let task = index_scheduler.get_tasks(Query::default()).unwrap(); @@ -616,10 +609,7 @@ mod tests { } ] "###); - assert_eq!( - handle.test_next_breakpoint(), - Breakpoint::BatchProcessed - ); + assert_eq!(handle.next_breakpoint(), Breakpoint::BatchProcessed); let task = index_scheduler.get_tasks(Query::default()).unwrap(); assert_json_snapshot!(task,