Fix crash when batching an index swap task containing 0 swaps

This commit is contained in:
Loïc Lecrenier 2022-10-25 10:26:51 +02:00
parent 3e45fdf7c5
commit 8d5d92f927
4 changed files with 120 additions and 51 deletions

View File

@ -37,7 +37,7 @@ use roaring::RoaringBitmap;
use time::OffsetDateTime;
use uuid::Uuid;
use crate::autobatcher::BatchKind;
use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, Query, Result, TaskId};
@ -416,41 +416,47 @@ impl IndexScheduler {
)));
}
// 5. We take the next task and try to batch all the tasks associated with this index.
if let Some(task_id) = enqueued.min() {
let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
// This is safe because all the remaining task are associated with
// AT LEAST one index. We can use the right or left one it doesn't
// matter.
let index_name = task.indexes().unwrap()[0];
let index_already_exists = self.index_mapper.exists(rtxn, index_name)?;
// If the task is not associated with any index, verify that it is an index swap and
// create the batch directly. Otherwise, get the index name associated with the task
// and use the autobatcher to batch the enqueued tasks associated with it
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
let index_name = if let Some(&index_name) = task.indexes().first() {
index_name
} else {
assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty()));
return Ok(Some(Batch::IndexSwap { task }));
};
// If autobatching is disabled we only take one task at a time.
let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 };
let index_already_exists = self.index_mapper.exists(rtxn, index_name)?;
let enqueued = index_tasks
.into_iter()
.take(tasks_limit)
.map(|task_id| {
self.get_task(rtxn, task_id)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
.map(|task| (task.uid, task.kind))
})
.collect::<Result<Vec<_>>>()?;
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
if let Some((batchkind, create_index)) =
crate::autobatcher::autobatch(enqueued, index_already_exists)
{
return self.create_next_batch_index(
rtxn,
index_name.to_string(),
batchkind,
create_index,
);
}
// If autobatching is disabled we only take one task at a time.
let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 };
let enqueued = index_tasks
.into_iter()
.take(tasks_limit)
.map(|task_id| {
self.get_task(rtxn, task_id)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
.map(|task| (task.uid, task.kind))
})
.collect::<Result<Vec<_>>>()?;
if let Some((batchkind, create_index)) =
autobatcher::autobatch(enqueued, index_already_exists)
{
return self.create_next_batch_index(
rtxn,
index_name.to_string(),
batchkind,
create_index,
);
}
// If we found no tasks then we were notified for something that got autobatched
@ -1034,9 +1040,8 @@ impl IndexScheduler {
for task_id in to_delete_tasks.iter() {
let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(task_indexes) = task.indexes() {
affected_indexes.extend(task_indexes.into_iter().map(|x| x.to_owned()));
}
affected_indexes.extend(task.indexes().into_iter().map(|x| x.to_owned()));
affected_statuses.insert(task.status);
affected_kinds.insert(task.kind.as_kind());
// Note: don't delete the persisted task data since

View File

@ -483,12 +483,10 @@ impl IndexScheduler {
};
self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
if let Some(indexes) = task.indexes() {
for index in indexes {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})?;
}
for index in task.indexes() {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})?;
}
self.update_status(&mut wtxn, Status::Enqueued, |bitmap| {
@ -615,12 +613,10 @@ impl IndexScheduler {
self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task)?;
if let Some(indexes) = task.indexes() {
for index in indexes {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})?;
}
for index in task.indexes() {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})?;
}
self.update_status(&mut wtxn, task.status, |bitmap| {
@ -1233,6 +1229,10 @@ mod tests {
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed");
index_scheduler.register(KindWithContent::IndexSwap { swaps: vec![] }).unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_empty_swap_processed");
}
#[test]

View File

@ -0,0 +1,64 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "b", primary_key: Some("id") }}
1 {uid: 1, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "c", primary_key: Some("id") }}
2 {uid: 2, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "d", primary_key: Some("id") }}
3 {uid: 3, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "a", primary_key: Some("id") }}
4 {uid: 4, status: succeeded, details: { indexes: [("a", "b"), ("c", "d")] }, kind: IndexSwap { swaps: [("c", "b"), ("a", "d")] }}
5 {uid: 5, status: succeeded, details: { indexes: [("a", "c")] }, kind: IndexSwap { swaps: [("a", "c")] }}
6 {uid: 6, status: succeeded, details: { indexes: [] }, kind: IndexSwap { swaps: [] }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,2,3,4,5,6,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,2,3,]
"indexSwap" [4,5,6,]
----------------------------------------------------------------------
### Index Tasks:
a [3,4,5,]
b [0,4,]
c [1,4,5,]
d [2,4,]
----------------------------------------------------------------------
### Index Mapper:
["a", "b", "c", "d"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
[timestamp] [6,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
[timestamp] [6,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
[timestamp] [6,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -57,7 +57,7 @@ impl Task {
}
/// Return the list of indexes updated by this tasks.
pub fn indexes(&self) -> Option<Vec<&str>> {
pub fn indexes(&self) -> Vec<&str> {
self.kind.indexes()
}
@ -154,25 +154,25 @@ impl KindWithContent {
}
}
pub fn indexes(&self) -> Option<Vec<&str>> {
pub fn indexes(&self) -> Vec<&str> {
use KindWithContent::*;
match self {
DumpCreation { .. } | Snapshot | TaskCancelation { .. } | TaskDeletion { .. } => None,
DumpCreation { .. } | Snapshot | TaskCancelation { .. } | TaskDeletion { .. } => vec![],
DocumentAdditionOrUpdate { index_uid, .. }
| DocumentDeletion { index_uid, .. }
| DocumentClear { index_uid }
| SettingsUpdate { index_uid, .. }
| IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid } => Some(vec![index_uid]),
| IndexDeletion { index_uid } => vec![index_uid],
IndexSwap { swaps } => {
let mut indexes = HashSet::<&str>::default();
for (lhs, rhs) in swaps {
indexes.insert(lhs.as_str());
indexes.insert(rhs.as_str());
}
Some(indexes.into_iter().collect())
indexes.into_iter().collect()
}
}
}