remove Clone from the IndexScheduler

This commit is contained in:
Tamo 2022-09-26 20:30:26 +02:00 committed by Clément Renault
parent d8d3499aec
commit 22bfb5a7a0
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -96,7 +96,6 @@ pub mod db_name {
/// This module is responsible for two things; /// This module is responsible for two things;
/// 1. Resolve the name of the indexes. /// 1. Resolve the name of the indexes.
/// 2. Schedule the tasks. /// 2. Schedule the tasks.
#[derive(Clone)]
pub struct IndexScheduler { pub struct IndexScheduler {
/// The list of tasks currently processing and their starting date. /// The list of tasks currently processing and their starting date.
pub(crate) processing_tasks: Arc<RwLock<(OffsetDateTime, RoaringBitmap)>>, pub(crate) processing_tasks: Arc<RwLock<(OffsetDateTime, RoaringBitmap)>>,
@ -131,7 +130,7 @@ pub struct IndexScheduler {
#[cfg(test)] #[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Breakpoint { pub enum Breakpoint {
Start, Start,
BatchCreated, BatchCreated,
BatchProcessed, BatchProcessed,
@ -144,6 +143,7 @@ impl IndexScheduler {
indexes_path: PathBuf, indexes_path: PathBuf,
index_size: usize, index_size: usize,
indexer_config: IndexerConfig, indexer_config: IndexerConfig,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>,
) -> Result<Self> { ) -> Result<Self> {
std::fs::create_dir_all(&tasks_path)?; std::fs::create_dir_all(&tasks_path)?;
std::fs::create_dir_all(&update_file_path)?; std::fs::create_dir_all(&update_file_path)?;
@ -157,8 +157,7 @@ impl IndexScheduler {
let file_store = FileStore::new(&update_file_path)?; let file_store = FileStore::new(&update_file_path)?;
// allow unreachable_code to get rids of the warning in the case of a test build. // allow unreachable_code to get rids of the warning in the case of a test build.
#[allow(unreachable_code)] let this = Self {
Ok(Self {
// by default there is no processing tasks // by default there is no processing tasks
processing_tasks: Arc::new(RwLock::new(processing_tasks)), processing_tasks: Arc::new(RwLock::new(processing_tasks)),
file_store, file_store,
@ -172,10 +171,39 @@ impl IndexScheduler {
wake_up: Arc::new(SignalEvent::auto(true)), wake_up: Arc::new(SignalEvent::auto(true)),
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr: panic!( test_breakpoint_sdr,
"Can't use `IndexScheduler::new` in the tests. See `IndexScheduler::test`." };
),
}) this.run();
Ok(this)
}
/// This function will execute in a different thread and must be called only once.
fn run(&self) {
let run = Self {
processing_tasks: self.processing_tasks.clone(),
file_store: self.file_store.clone(),
env: self.env.clone(),
all_tasks: self.all_tasks.clone(),
status: self.status.clone(),
kind: self.kind.clone(),
index_tasks: self.index_tasks.clone(),
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
};
std::thread::spawn(move || loop {
println!("started running");
run.wake_up.wait();
match run.tick() {
Ok(()) => (),
Err(e) => log::error!("{}", e),
}
});
} }
/// Return the index corresponding to the name. If it wasn't opened before /// Return the index corresponding to the name. If it wasn't opened before
@ -313,18 +341,6 @@ impl IndexScheduler {
Ok(self.file_store.delete(uuid)?) Ok(self.file_store.delete(uuid)?)
} }
/// This worker function must be run in a different thread and must be run only once.
pub fn run(&self) -> ! {
loop {
self.wake_up.wait();
match self.tick() {
Ok(()) => (),
Err(e) => log::error!("{}", e),
}
}
}
/// Create and execute and store the result of one batch of registered tasks. /// Create and execute and store the result of one batch of registered tasks.
fn tick(&self) -> Result<()> { fn tick(&self) -> Result<()> {
#[cfg(test)] #[cfg(test)]
@ -401,8 +417,6 @@ impl IndexScheduler {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc;
use big_s::S; use big_s::S;
use insta::*; use insta::*;
use tempfile::TempDir; use tempfile::TempDir;
@ -415,41 +429,18 @@ mod tests {
impl IndexScheduler { impl IndexScheduler {
pub fn test() -> (Self, IndexSchedulerHandle) { pub fn test() -> (Self, IndexSchedulerHandle) {
let tempdir = TempDir::new().unwrap(); let tempdir = TempDir::new().unwrap();
let tasks_path = tempdir.path().join("db_path");
let update_file_path = tempdir.path().join("file_store");
let indexes_path = tempdir.path().join("indexes");
let index_size = 1024 * 1024;
let indexer_config = IndexerConfig::default();
std::fs::create_dir_all(&tasks_path).unwrap();
std::fs::create_dir_all(&update_file_path).unwrap();
std::fs::create_dir_all(&indexes_path).unwrap();
let mut options = heed::EnvOpenOptions::new();
options.max_dbs(6);
let env = options.open(tasks_path).unwrap();
let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new());
let file_store = FileStore::new(&update_file_path).unwrap();
let (sender, receiver) = crossbeam::channel::bounded(0); let (sender, receiver) = crossbeam::channel::bounded(0);
let index_scheduler = Self { let index_scheduler = Self::new(
// by default there is no processing tasks tempdir.path().join("db_path"),
processing_tasks: Arc::new(RwLock::new(processing_tasks)), tempdir.path().join("file_store"),
file_store, tempdir.path().join("indexes"),
all_tasks: env.create_database(Some(db_name::ALL_TASKS)).unwrap(), 1024 * 1024,
status: env.create_database(Some(db_name::STATUS)).unwrap(), IndexerConfig::default(),
kind: env.create_database(Some(db_name::KIND)).unwrap(), sender,
index_tasks: env.create_database(Some(db_name::INDEX_TASKS)).unwrap(), )
index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config) .unwrap();
.unwrap(),
env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
test_breakpoint_sdr: sender,
};
let index_scheduler_handle = IndexSchedulerHandle { let index_scheduler_handle = IndexSchedulerHandle {
_tempdir: tempdir, _tempdir: tempdir,
test_breakpoint_rcv: receiver, test_breakpoint_rcv: receiver,
@ -466,25 +457,29 @@ mod tests {
impl IndexSchedulerHandle { impl IndexSchedulerHandle {
/// Wait until the provided breakpoint is reached. /// Wait until the provided breakpoint is reached.
fn test_wait_till(&self, breakpoint: Breakpoint) { fn wait_till(&self, breakpoint: Breakpoint) {
self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint); self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint);
} }
/// Wait until the provided breakpoint is reached. /// Wait until the provided breakpoint is reached.
fn test_next_breakpoint(&self) -> Breakpoint { fn next_breakpoint(&self) -> Breakpoint {
self.test_breakpoint_rcv.recv().unwrap() self.test_breakpoint_rcv.recv().unwrap()
} }
/// The scheduler will not stop on breakpoints. /// The scheduler will not stop on breakpoints anymore.
fn test_dont_block(&self) { fn dont_block(self) {
std::thread::spawn(move || loop {
// unroll and ignore all the state the scheduler is going to send us. // unroll and ignore all the state the scheduler is going to send us.
self.test_breakpoint_rcv.iter().last(); self.test_breakpoint_rcv.iter().last();
});
} }
} }
#[test] #[test]
fn register() { fn register() {
let (index_scheduler, _handle) = IndexScheduler::test(); let (index_scheduler, handle) = IndexScheduler::test();
handle.dont_block();
let kinds = [ let kinds = [
KindWithContent::IndexCreation { KindWithContent::IndexCreation {
index_uid: S("catto"), index_uid: S("catto"),
@ -567,13 +562,14 @@ mod tests {
}"#; }"#;
let (uuid, mut file) = index_scheduler.create_update_file().unwrap(); let (uuid, mut file) = index_scheduler.create_update_file().unwrap();
let documents_count =
document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap(); document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap();
index_scheduler index_scheduler
.register(KindWithContent::DocumentAddition { .register(KindWithContent::DocumentAddition {
index_uid: S("doggos"), index_uid: S("doggos"),
primary_key: Some(S("id")), primary_key: Some(S("id")),
content_file: uuid, content_file: uuid,
documents_count: 100, documents_count,
allow_index_creation: true, allow_index_creation: true,
}) })
.unwrap(); .unwrap();
@ -595,10 +591,7 @@ mod tests {
] ]
"###); "###);
let t_index_scheduler = index_scheduler.clone(); handle.wait_till(Breakpoint::BatchCreated);
std::thread::spawn(move || t_index_scheduler.tick().unwrap());
handle.test_wait_till(Breakpoint::BatchCreated);
// Once the task has started being batched it should be marked as processing // Once the task has started being batched it should be marked as processing
let task = index_scheduler.get_tasks(Query::default()).unwrap(); let task = index_scheduler.get_tasks(Query::default()).unwrap();
@ -616,10 +609,7 @@ mod tests {
} }
] ]
"###); "###);
assert_eq!( assert_eq!(handle.next_breakpoint(), Breakpoint::BatchProcessed);
handle.test_next_breakpoint(),
Breakpoint::BatchProcessed
);
let task = index_scheduler.get_tasks(Query::default()).unwrap(); let task = index_scheduler.get_tasks(Query::default()).unwrap();
assert_json_snapshot!(task, assert_json_snapshot!(task,