Fix crash when batching an index swap task containing 0 swaps

This commit is contained in:
Loïc Lecrenier 2022-10-25 10:26:51 +02:00 committed by Clément Renault
parent 0aca5e84b9
commit 16fac10074
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
4 changed files with 120 additions and 51 deletions

View File

@ -37,7 +37,7 @@ use roaring::RoaringBitmap;
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use crate::autobatcher::BatchKind; use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task}; use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, Query, Result, TaskId}; use crate::{Error, IndexScheduler, Query, Result, TaskId};
@ -419,41 +419,47 @@ impl IndexScheduler {
))); )));
} }
// 5. We take the next task and try to batch all the tasks associated with this index. // 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
if let Some(task_id) = enqueued.min() { let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
// This is safe because all the remaining task are associated with // If the task is not associated with any index, verify that it is an index swap and
// AT LEAST one index. We can use the right or left one it doesn't // create the batch directly. Otherwise, get the index name associated with the task
// matter. // and use the autobatcher to batch the enqueued tasks associated with it
let index_name = task.indexes().unwrap()[0];
let index_already_exists = self.index_mapper.exists(rtxn, index_name)?;
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued; let index_name = if let Some(&index_name) = task.indexes().first() {
index_name
} else {
assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty()));
return Ok(Some(Batch::IndexSwap { task }));
};
// If autobatching is disabled we only take one task at a time. let index_already_exists = self.index_mapper.exists(rtxn, index_name)?;
let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 };
let enqueued = index_tasks let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
.into_iter()
.take(tasks_limit)
.map(|task_id| {
self.get_task(rtxn, task_id)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
.map(|task| (task.uid, task.kind))
})
.collect::<Result<Vec<_>>>()?;
if let Some((batchkind, create_index)) = // If autobatching is disabled we only take one task at a time.
crate::autobatcher::autobatch(enqueued, index_already_exists) let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 };
{
return self.create_next_batch_index( let enqueued = index_tasks
rtxn, .into_iter()
index_name.to_string(), .take(tasks_limit)
batchkind, .map(|task_id| {
create_index, self.get_task(rtxn, task_id)
); .and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
} .map(|task| (task.uid, task.kind))
})
.collect::<Result<Vec<_>>>()?;
if let Some((batchkind, create_index)) =
autobatcher::autobatch(enqueued, index_already_exists)
{
return self.create_next_batch_index(
rtxn,
index_name.to_string(),
batchkind,
create_index,
);
} }
// If we found no tasks then we were notified for something that got autobatched // If we found no tasks then we were notified for something that got autobatched
@ -1042,9 +1048,8 @@ impl IndexScheduler {
for task_id in to_delete_tasks.iter() { for task_id in to_delete_tasks.iter() {
let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(task_indexes) = task.indexes() {
affected_indexes.extend(task_indexes.into_iter().map(|x| x.to_owned())); affected_indexes.extend(task.indexes().into_iter().map(|x| x.to_owned()));
}
affected_statuses.insert(task.status); affected_statuses.insert(task.status);
affected_kinds.insert(task.kind.as_kind()); affected_kinds.insert(task.kind.as_kind());
// Note: don't delete the persisted task data since // Note: don't delete the persisted task data since

View File

@ -577,12 +577,10 @@ impl IndexScheduler {
}; };
self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?; self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
if let Some(indexes) = task.indexes() { for index in task.indexes() {
for index in indexes { self.update_index(&mut wtxn, index, |bitmap| {
self.update_index(&mut wtxn, index, |bitmap| { bitmap.insert(task.uid);
bitmap.insert(task.uid); })?;
})?;
}
} }
self.update_status(&mut wtxn, Status::Enqueued, |bitmap| { self.update_status(&mut wtxn, Status::Enqueued, |bitmap| {
@ -709,12 +707,10 @@ impl IndexScheduler {
self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task)?; self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task)?;
if let Some(indexes) = task.indexes() { for index in task.indexes() {
for index in indexes { self.update_index(&mut wtxn, index, |bitmap| {
self.update_index(&mut wtxn, index, |bitmap| { bitmap.insert(task.uid);
bitmap.insert(task.uid); })?;
})?;
}
} }
self.update_status(&mut wtxn, task.status, |bitmap| { self.update_status(&mut wtxn, task.status, |bitmap| {
@ -1506,6 +1502,10 @@ mod tests {
handle.wait_till(Breakpoint::AfterProcessing); handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed");
index_scheduler.register(KindWithContent::IndexSwap { swaps: vec![] }).unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_empty_swap_processed");
} }
#[test] #[test]

View File

@ -0,0 +1,64 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "b", primary_key: Some("id") }}
1 {uid: 1, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "c", primary_key: Some("id") }}
2 {uid: 2, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "d", primary_key: Some("id") }}
3 {uid: 3, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "a", primary_key: Some("id") }}
4 {uid: 4, status: succeeded, details: { indexes: [("a", "b"), ("c", "d")] }, kind: IndexSwap { swaps: [("c", "b"), ("a", "d")] }}
5 {uid: 5, status: succeeded, details: { indexes: [("a", "c")] }, kind: IndexSwap { swaps: [("a", "c")] }}
6 {uid: 6, status: succeeded, details: { indexes: [] }, kind: IndexSwap { swaps: [] }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,2,3,4,5,6,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,2,3,]
"indexSwap" [4,5,6,]
----------------------------------------------------------------------
### Index Tasks:
a [3,4,5,]
b [0,4,]
c [1,4,5,]
d [2,4,]
----------------------------------------------------------------------
### Index Mapper:
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
[timestamp] [6,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
[timestamp] [6,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
[timestamp] [6,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -57,7 +57,7 @@ impl Task {
} }
/// Return the list of indexes updated by this tasks. /// Return the list of indexes updated by this tasks.
pub fn indexes(&self) -> Option<Vec<&str>> { pub fn indexes(&self) -> Vec<&str> {
self.kind.indexes() self.kind.indexes()
} }
@ -154,25 +154,25 @@ impl KindWithContent {
} }
} }
pub fn indexes(&self) -> Option<Vec<&str>> { pub fn indexes(&self) -> Vec<&str> {
use KindWithContent::*; use KindWithContent::*;
match self { match self {
DumpCreation { .. } | Snapshot | TaskCancelation { .. } | TaskDeletion { .. } => None, DumpCreation { .. } | Snapshot | TaskCancelation { .. } | TaskDeletion { .. } => vec![],
DocumentAdditionOrUpdate { index_uid, .. } DocumentAdditionOrUpdate { index_uid, .. }
| DocumentDeletion { index_uid, .. } | DocumentDeletion { index_uid, .. }
| DocumentClear { index_uid } | DocumentClear { index_uid }
| SettingsUpdate { index_uid, .. } | SettingsUpdate { index_uid, .. }
| IndexCreation { index_uid, .. } | IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. } | IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid } => Some(vec![index_uid]), | IndexDeletion { index_uid } => vec![index_uid],
IndexSwap { swaps } => { IndexSwap { swaps } => {
let mut indexes = HashSet::<&str>::default(); let mut indexes = HashSet::<&str>::default();
for (lhs, rhs) in swaps { for (lhs, rhs) in swaps {
indexes.insert(lhs.as_str()); indexes.insert(lhs.as_str());
indexes.insert(rhs.as_str()); indexes.insert(rhs.as_str());
} }
Some(indexes.into_iter().collect()) indexes.into_iter().collect()
} }
} }
} }