From 0ba1c46e196e3f965ebc3b2db006e5c63301d624 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 26 Sep 2022 22:26:30 +0200 Subject: [PATCH] fix a deadlock --- index-scheduler/src/batch.rs | 13 +++++-- index-scheduler/src/index_scheduler.rs | 52 +++++++++++++++++++++----- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index e647fc120..cbf176523 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -427,7 +427,7 @@ impl IndexScheduler { Ok(None) } - pub(crate) fn process_batch(&self, wtxn: &mut RwTxn, batch: Batch) -> Result> { + pub(crate) fn process_batch(&self, batch: Batch) -> Result> { match batch { Batch::Cancel(_) => todo!(), Batch::Snapshot(_) => todo!(), @@ -439,7 +439,12 @@ impl IndexScheduler { content_files, mut tasks, } => { - let index = self.index_mapper.create_index(wtxn, &index_uid)?; + // we NEED a write transaction for the index creation. + // To avoid blocking the whole process we're going to commit asap. + let mut wtxn = self.env.write_txn()?; + let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?; + wtxn.commit()?; + let ret = index.update_documents( IndexDocumentsMethod::ReplaceDocuments, primary_key, @@ -474,7 +479,9 @@ impl IndexScheduler { settings: _, settings_tasks: _, } => { - let index = self.index_mapper.create_index(wtxn, &index_uid)?; + let mut wtxn = self.env.write_txn()?; + let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?; + wtxn.commit()?; let mut updated_tasks = Vec::new(); /* diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index 63ad8abbc..0fca85493 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -133,7 +133,8 @@ pub struct IndexScheduler { pub enum Breakpoint { Start, BatchCreated, - BatchProcessed, + BeforeProcessing, + AfterProcessing, } impl IndexScheduler { @@ -346,11 +347,13 @@ impl IndexScheduler { #[cfg(test)] self.test_breakpoint_sdr.send(Breakpoint::Start).unwrap(); - let mut wtxn = self.env.write_txn()?; - let batch = match self.create_next_batch(&wtxn)? { + let rtxn = self.env.read_txn()?; + let batch = match self.create_next_batch(&rtxn)? { Some(batch) => batch, None => return Ok(()), }; + // we don't need this transaction any longer. + drop(rtxn); // 1. store the starting date with the bitmap of processing tasks. let mut ids = batch.ids(); @@ -360,12 +363,19 @@ impl IndexScheduler { *self.processing_tasks.write().unwrap() = (started_at, processing_tasks); #[cfg(test)] - self.test_breakpoint_sdr - .send(Breakpoint::BatchCreated) - .unwrap(); + { + self.test_breakpoint_sdr + .send(Breakpoint::BatchCreated) + .unwrap(); + self.test_breakpoint_sdr + .send(Breakpoint::BeforeProcessing) + .unwrap(); + } - // 2. process the tasks - let res = self.process_batch(&mut wtxn, batch); + // 2. Process the tasks + let res = self.process_batch(batch); + + let mut wtxn = self.env.write_txn()?; let finished_at = OffsetDateTime::now_utc(); match res { @@ -403,7 +413,7 @@ impl IndexScheduler { #[cfg(test)] self.test_breakpoint_sdr - .send(Breakpoint::BatchProcessed) + .send(Breakpoint::AfterProcessing) .unwrap(); Ok(()) @@ -551,6 +561,28 @@ mod tests { assert_smol_debug_snapshot!(index_tasks, @r###"[("catto", RoaringBitmap<[0, 1, 3]>), ("doggo", RoaringBitmap<[4]>)]"###); } + #[test] + fn insert_task_while_another_task_is_processing() { + let (index_scheduler, handle) = IndexScheduler::test(); + + index_scheduler.register(KindWithContent::Snapshot).unwrap(); + handle.wait_till(Breakpoint::BatchCreated); + // while the task is processing can we register another task? + index_scheduler.register(KindWithContent::Snapshot).unwrap(); + index_scheduler + .register(KindWithContent::IndexDeletion { + index_uid: S("doggos"), + }) + .unwrap(); + + let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap(); + tasks.reverse(); + assert_eq!(tasks.len(), 3); + assert_eq!(tasks[0].status, Status::Processing); + assert_eq!(tasks[1].status, Status::Enqueued); + assert_eq!(tasks[2].status, Status::Enqueued); + } + #[test] fn document_addition() { let (index_scheduler, handle) = IndexScheduler::test(); @@ -609,7 +641,7 @@ mod tests { } ] "###); - assert_eq!(handle.next_breakpoint(), Breakpoint::BatchProcessed); + handle.wait_till(Breakpoint::AfterProcessing); let task = index_scheduler.get_tasks(Query::default()).unwrap(); assert_json_snapshot!(task,