mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 18:17:39 +08:00
implements the automatic batch deletion
This commit is contained in:
parent
e3ddf9ef9a
commit
69d5472814
@ -17,13 +17,14 @@ tasks individually, but should be much faster since we are only performing
|
|||||||
one indexing operation.
|
one indexing operation.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use std::collections::{BTreeSet, HashSet};
|
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||||
use std::ffi::OsStr;
|
use std::ffi::OsStr;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::fs::{self, File};
|
use std::fs::{self, File};
|
||||||
use std::io::BufWriter;
|
use std::io::BufWriter;
|
||||||
|
|
||||||
use dump::IndexMetadata;
|
use dump::IndexMetadata;
|
||||||
|
use meilisearch_types::batches::BatchId;
|
||||||
use meilisearch_types::error::Code;
|
use meilisearch_types::error::Code;
|
||||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
||||||
@ -1675,6 +1676,8 @@ impl IndexScheduler {
|
|||||||
let mut affected_statuses = HashSet::new();
|
let mut affected_statuses = HashSet::new();
|
||||||
let mut affected_kinds = HashSet::new();
|
let mut affected_kinds = HashSet::new();
|
||||||
let mut affected_canceled_by = RoaringBitmap::new();
|
let mut affected_canceled_by = RoaringBitmap::new();
|
||||||
|
// The tasks that have been removed *per batches*.
|
||||||
|
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
|
||||||
|
|
||||||
for task_id in to_delete_tasks.iter() {
|
for task_id in to_delete_tasks.iter() {
|
||||||
let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||||
@ -1696,18 +1699,21 @@ impl IndexScheduler {
|
|||||||
if let Some(canceled_by) = task.canceled_by {
|
if let Some(canceled_by) = task.canceled_by {
|
||||||
affected_canceled_by.insert(canceled_by);
|
affected_canceled_by.insert(canceled_by);
|
||||||
}
|
}
|
||||||
|
if let Some(batch_uid) = task.batch_uid {
|
||||||
|
affected_batches.entry(batch_uid).or_default().insert(task_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for index in affected_indexes {
|
for index in affected_indexes.iter() {
|
||||||
self.update_index(wtxn, &index, |bitmap| *bitmap -= &to_delete_tasks)?;
|
self.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
for status in affected_statuses {
|
for status in affected_statuses.iter() {
|
||||||
self.update_status(wtxn, status, |bitmap| *bitmap -= &to_delete_tasks)?;
|
self.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
for kind in affected_kinds {
|
for kind in affected_kinds.iter() {
|
||||||
self.update_kind(wtxn, kind, |bitmap| *bitmap -= &to_delete_tasks)?;
|
self.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
for task in to_delete_tasks.iter() {
|
for task in to_delete_tasks.iter() {
|
||||||
@ -1723,6 +1729,48 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (batch_id, to_delete_tasks) in affected_batches {
|
||||||
|
if let Some(mut tasks) = self.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
|
||||||
|
tasks -= &to_delete_tasks;
|
||||||
|
// We must remove the batch entirely
|
||||||
|
if tasks.is_empty() {
|
||||||
|
self.all_batches.delete(wtxn, &batch_id)?;
|
||||||
|
self.batch_to_tasks_mapping.delete(wtxn, &batch_id)?;
|
||||||
|
}
|
||||||
|
// Anyway, we must remove the batch from all its reverse indexes.
|
||||||
|
// The only way to do that is to check
|
||||||
|
|
||||||
|
for index in affected_indexes.iter() {
|
||||||
|
let index_tasks = self.index_tasks(wtxn, index)?;
|
||||||
|
let remaining_index_tasks = index_tasks & &tasks;
|
||||||
|
if remaining_index_tasks.is_empty() {
|
||||||
|
self.update_batch_index(wtxn, index, |bitmap| {
|
||||||
|
bitmap.remove(batch_id);
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for status in affected_statuses.iter() {
|
||||||
|
let status_tasks = self.get_status(wtxn, *status)?;
|
||||||
|
let remaining_status_tasks = status_tasks & &tasks;
|
||||||
|
if remaining_status_tasks.is_empty() {
|
||||||
|
self.update_batch_status(wtxn, *status, |bitmap| {
|
||||||
|
bitmap.remove(batch_id);
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for kind in affected_kinds.iter() {
|
||||||
|
let kind_tasks = self.get_kind(wtxn, *kind)?;
|
||||||
|
let remaining_kind_tasks = kind_tasks & &tasks;
|
||||||
|
if remaining_kind_tasks.is_empty() {
|
||||||
|
self.update_batch_kind(wtxn, *kind, |bitmap| {
|
||||||
|
bitmap.remove(batch_id);
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(to_delete_tasks)
|
Ok(to_delete_tasks)
|
||||||
}
|
}
|
||||||
|
@ -41,22 +41,19 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} }
|
|||||||
[timestamp] [2,3,]
|
[timestamp] [2,3,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### All Batches:
|
### All Batches:
|
||||||
0 {uid: 0, }
|
|
||||||
1 {uid: 1, }
|
1 {uid: 1, }
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batch to tasks mapping:
|
### Batch to tasks mapping:
|
||||||
0 [0,]
|
|
||||||
1 [2,3,]
|
1 [2,3,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Status:
|
### Batches Status:
|
||||||
succeeded [0,1,]
|
succeeded [1,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Kind:
|
### Batches Kind:
|
||||||
"documentAdditionOrUpdate" [0,]
|
"documentAdditionOrUpdate" []
|
||||||
"taskDeletion" [1,]
|
"taskDeletion" [1,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Index Tasks:
|
### Batches Index Tasks:
|
||||||
catto [0,]
|
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Enqueued At:
|
### Batches Enqueued At:
|
||||||
[timestamp] [0,]
|
[timestamp] [0,]
|
||||||
|
@ -39,22 +39,19 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} }
|
|||||||
[timestamp] [2,]
|
[timestamp] [2,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### All Batches:
|
### All Batches:
|
||||||
0 {uid: 0, }
|
|
||||||
1 {uid: 1, }
|
1 {uid: 1, }
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batch to tasks mapping:
|
### Batch to tasks mapping:
|
||||||
0 [0,]
|
|
||||||
1 [2,]
|
1 [2,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Status:
|
### Batches Status:
|
||||||
succeeded [0,1,]
|
succeeded [1,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Kind:
|
### Batches Kind:
|
||||||
"documentAdditionOrUpdate" [0,]
|
"documentAdditionOrUpdate" []
|
||||||
"taskDeletion" [1,]
|
"taskDeletion" [1,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Index Tasks:
|
### Batches Index Tasks:
|
||||||
catto [0,]
|
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Enqueued At:
|
### Batches Enqueued At:
|
||||||
[timestamp] [0,]
|
[timestamp] [0,]
|
||||||
|
Loading…
Reference in New Issue
Block a user