remove Clone from the IndexScheduler

This commit is contained in:
Tamo 2022-09-26 20:30:26 +02:00 committed by Clément Renault
parent 7f1a85d443
commit 972dd6bcef
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -96,7 +96,6 @@ pub mod db_name {
/// This module is responsible for two things;
/// 1. Resolve the name of the indexes.
/// 2. Schedule the tasks.
#[derive(Clone)]
pub struct IndexScheduler {
/// The list of tasks currently processing and their starting date.
pub(crate) processing_tasks: Arc<RwLock<(OffsetDateTime, RoaringBitmap)>>,
@ -131,7 +130,7 @@ pub struct IndexScheduler {
#[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Breakpoint {
pub enum Breakpoint {
Start,
BatchCreated,
BatchProcessed,
@ -144,6 +143,7 @@ impl IndexScheduler {
indexes_path: PathBuf,
index_size: usize,
indexer_config: IndexerConfig,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>,
) -> Result<Self> {
std::fs::create_dir_all(&tasks_path)?;
std::fs::create_dir_all(&update_file_path)?;
@ -157,8 +157,7 @@ impl IndexScheduler {
let file_store = FileStore::new(&update_file_path)?;
// allow unreachable_code to get rids of the warning in the case of a test build.
#[allow(unreachable_code)]
Ok(Self {
let this = Self {
// by default there is no processing tasks
processing_tasks: Arc::new(RwLock::new(processing_tasks)),
file_store,
@ -172,10 +171,39 @@ impl IndexScheduler {
wake_up: Arc::new(SignalEvent::auto(true)),
#[cfg(test)]
test_breakpoint_sdr: panic!(
"Can't use `IndexScheduler::new` in the tests. See `IndexScheduler::test`."
),
})
test_breakpoint_sdr,
};
this.run();
Ok(this)
}
/// This function will execute in a different thread and must be called only once.
fn run(&self) {
let run = Self {
processing_tasks: self.processing_tasks.clone(),
file_store: self.file_store.clone(),
env: self.env.clone(),
all_tasks: self.all_tasks.clone(),
status: self.status.clone(),
kind: self.kind.clone(),
index_tasks: self.index_tasks.clone(),
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
};
std::thread::spawn(move || loop {
println!("started running");
run.wake_up.wait();
match run.tick() {
Ok(()) => (),
Err(e) => log::error!("{}", e),
}
});
}
/// Return the index corresponding to the name. If it wasn't opened before
@ -313,18 +341,6 @@ impl IndexScheduler {
Ok(self.file_store.delete(uuid)?)
}
/// This worker function must be run in a different thread and must be run only once.
pub fn run(&self) -> ! {
loop {
self.wake_up.wait();
match self.tick() {
Ok(()) => (),
Err(e) => log::error!("{}", e),
}
}
}
/// Create and execute and store the result of one batch of registered tasks.
fn tick(&self) -> Result<()> {
#[cfg(test)]
@ -401,8 +417,6 @@ impl IndexScheduler {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use big_s::S;
use insta::*;
use tempfile::TempDir;
@ -415,41 +429,18 @@ mod tests {
impl IndexScheduler {
pub fn test() -> (Self, IndexSchedulerHandle) {
let tempdir = TempDir::new().unwrap();
let tasks_path = tempdir.path().join("db_path");
let update_file_path = tempdir.path().join("file_store");
let indexes_path = tempdir.path().join("indexes");
let index_size = 1024 * 1024;
let indexer_config = IndexerConfig::default();
std::fs::create_dir_all(&tasks_path).unwrap();
std::fs::create_dir_all(&update_file_path).unwrap();
std::fs::create_dir_all(&indexes_path).unwrap();
let mut options = heed::EnvOpenOptions::new();
options.max_dbs(6);
let env = options.open(tasks_path).unwrap();
let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new());
let file_store = FileStore::new(&update_file_path).unwrap();
let (sender, receiver) = crossbeam::channel::bounded(0);
let index_scheduler = Self {
// by default there is no processing tasks
processing_tasks: Arc::new(RwLock::new(processing_tasks)),
file_store,
all_tasks: env.create_database(Some(db_name::ALL_TASKS)).unwrap(),
status: env.create_database(Some(db_name::STATUS)).unwrap(),
kind: env.create_database(Some(db_name::KIND)).unwrap(),
index_tasks: env.create_database(Some(db_name::INDEX_TASKS)).unwrap(),
index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)
.unwrap(),
env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
let index_scheduler = Self::new(
tempdir.path().join("db_path"),
tempdir.path().join("file_store"),
tempdir.path().join("indexes"),
1024 * 1024,
IndexerConfig::default(),
sender,
)
.unwrap();
test_breakpoint_sdr: sender,
};
let index_scheduler_handle = IndexSchedulerHandle {
_tempdir: tempdir,
test_breakpoint_rcv: receiver,
@ -466,25 +457,29 @@ mod tests {
impl IndexSchedulerHandle {
/// Wait until the provided breakpoint is reached.
fn test_wait_till(&self, breakpoint: Breakpoint) {
fn wait_till(&self, breakpoint: Breakpoint) {
self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint);
}
/// Wait until the provided breakpoint is reached.
fn test_next_breakpoint(&self) -> Breakpoint {
fn next_breakpoint(&self) -> Breakpoint {
self.test_breakpoint_rcv.recv().unwrap()
}
/// The scheduler will not stop on breakpoints.
fn test_dont_block(&self) {
// unroll and ignore all the state the scheduler is going to send us.
self.test_breakpoint_rcv.iter().last();
/// The scheduler will not stop on breakpoints anymore.
fn dont_block(self) {
std::thread::spawn(move || loop {
// unroll and ignore all the state the scheduler is going to send us.
self.test_breakpoint_rcv.iter().last();
});
}
}
#[test]
fn register() {
let (index_scheduler, _handle) = IndexScheduler::test();
let (index_scheduler, handle) = IndexScheduler::test();
handle.dont_block();
let kinds = [
KindWithContent::IndexCreation {
index_uid: S("catto"),
@ -567,13 +562,14 @@ mod tests {
}"#;
let (uuid, mut file) = index_scheduler.create_update_file().unwrap();
document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count =
document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap();
index_scheduler
.register(KindWithContent::DocumentAddition {
index_uid: S("doggos"),
primary_key: Some(S("id")),
content_file: uuid,
documents_count: 100,
documents_count,
allow_index_creation: true,
})
.unwrap();
@ -595,10 +591,7 @@ mod tests {
]
"###);
let t_index_scheduler = index_scheduler.clone();
std::thread::spawn(move || t_index_scheduler.tick().unwrap());
handle.test_wait_till(Breakpoint::BatchCreated);
handle.wait_till(Breakpoint::BatchCreated);
// Once the task has started being batched it should be marked as processing
let task = index_scheduler.get_tasks(Query::default()).unwrap();
@ -616,10 +609,7 @@ mod tests {
}
]
"###);
assert_eq!(
handle.test_next_breakpoint(),
Breakpoint::BatchProcessed
);
assert_eq!(handle.next_breakpoint(), Breakpoint::BatchProcessed);
let task = index_scheduler.get_tasks(Query::default()).unwrap();
assert_json_snapshot!(task,