Add a test to check that it works without autobatching

This commit is contained in:
Kerollmops 2022-10-10 17:00:56 +02:00 committed by Clément Renault
parent db9d1b18ca
commit e2a766acb5
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -138,6 +138,9 @@ pub struct IndexScheduler {
/// Get a signal when a batch needs to be processed. /// Get a signal when a batch needs to be processed.
pub(crate) wake_up: Arc<SignalEvent>, pub(crate) wake_up: Arc<SignalEvent>,
/// Weither autobatching is enabled or not.
pub(crate) autobatching_enabled: bool,
// ================= test // ================= test
/// The next entry is dedicated to the tests. /// The next entry is dedicated to the tests.
/// It provide a way to break in multiple part of the scheduler. /// It provide a way to break in multiple part of the scheduler.
@ -161,6 +164,7 @@ impl IndexScheduler {
indexes_path: PathBuf, indexes_path: PathBuf,
index_size: usize, index_size: usize,
indexer_config: IndexerConfig, indexer_config: IndexerConfig,
autobatching_enabled: bool,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>,
) -> Result<Self> { ) -> Result<Self> {
std::fs::create_dir_all(&tasks_path)?; std::fs::create_dir_all(&tasks_path)?;
@ -187,6 +191,7 @@ impl IndexScheduler {
env, env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)), wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled,
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr, test_breakpoint_sdr,
@ -208,6 +213,7 @@ impl IndexScheduler {
index_tasks: self.index_tasks, index_tasks: self.index_tasks,
index_mapper: self.index_mapper.clone(), index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(), wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
@ -454,7 +460,7 @@ mod tests {
use super::*; use super::*;
impl IndexScheduler { impl IndexScheduler {
pub fn test() -> (Self, IndexSchedulerHandle) { pub fn test(autobatching: bool) -> (Self, IndexSchedulerHandle) {
let tempdir = TempDir::new().unwrap(); let tempdir = TempDir::new().unwrap();
let (sender, receiver) = crossbeam::channel::bounded(0); let (sender, receiver) = crossbeam::channel::bounded(0);
@ -464,6 +470,7 @@ mod tests {
tempdir.path().join("indexes"), tempdir.path().join("indexes"),
1024 * 1024, 1024 * 1024,
IndexerConfig::default(), IndexerConfig::default(),
autobatching, // enable autobatching
sender, sender,
) )
.unwrap(); .unwrap();
@ -504,7 +511,7 @@ mod tests {
#[test] #[test]
fn register() { fn register() {
let (index_scheduler, handle) = IndexScheduler::test(); let (index_scheduler, handle) = IndexScheduler::test(true);
handle.dont_block(); handle.dont_block();
let kinds = [ let kinds = [
@ -583,7 +590,7 @@ mod tests {
#[test] #[test]
fn insert_task_while_another_task_is_processing() { fn insert_task_while_another_task_is_processing() {
let (index_scheduler, handle) = IndexScheduler::test(); let (index_scheduler, handle) = IndexScheduler::test(true);
index_scheduler.register(KindWithContent::Snapshot).unwrap(); index_scheduler.register(KindWithContent::Snapshot).unwrap();
handle.wait_till(Breakpoint::BatchCreated); handle.wait_till(Breakpoint::BatchCreated);
@ -607,7 +614,7 @@ mod tests {
/// we send them very fast, we must make sure that they are all processed. /// we send them very fast, we must make sure that they are all processed.
#[test] #[test]
fn process_tasks_inserted_without_new_signal() { fn process_tasks_inserted_without_new_signal() {
let (index_scheduler, handle) = IndexScheduler::test(); let (index_scheduler, handle) = IndexScheduler::test(true);
index_scheduler index_scheduler
.register(KindWithContent::IndexCreation { .register(KindWithContent::IndexCreation {
@ -640,9 +647,50 @@ mod tests {
assert_eq!(tasks[2].status, Status::Succeeded); assert_eq!(tasks[2].status, Status::Succeeded);
} }
#[test]
fn process_tasks_without_autobatching() {
let (index_scheduler, handle) = IndexScheduler::test(false);
index_scheduler
.register(KindWithContent::IndexCreation {
index_uid: S("doggos"),
primary_key: None,
})
.unwrap();
index_scheduler
.register(KindWithContent::DocumentClear {
index_uid: S("doggos"),
})
.unwrap();
index_scheduler
.register(KindWithContent::DocumentClear {
index_uid: S("doggos"),
})
.unwrap();
index_scheduler
.register(KindWithContent::DocumentClear {
index_uid: S("doggos"),
})
.unwrap();
handle.wait_till(Breakpoint::Start);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
tasks.reverse();
assert_eq!(tasks.len(), 4);
assert_eq!(tasks[0].status, Status::Succeeded);
assert_eq!(tasks[1].status, Status::Succeeded);
assert_eq!(tasks[2].status, Status::Succeeded);
assert_eq!(tasks[3].status, Status::Succeeded);
}
#[test] #[test]
fn document_addition() { fn document_addition() {
let (index_scheduler, handle) = IndexScheduler::test(); let (index_scheduler, handle) = IndexScheduler::test(true);
let content = r#" let content = r#"
{ {
@ -753,6 +801,6 @@ mod tests {
#[test] #[test]
fn simple_new() { fn simple_new() {
crate::IndexScheduler::test(); crate::IndexScheduler::test(true);
} }
} }