diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 6d7f0e088..10d1ebe2f 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -7,6 +7,7 @@ use index::{Settings, Unchecked}; use milli::{ heed::{RoTxn, RwTxn}, update::IndexDocumentsMethod, + DocumentId, }; use uuid::Uuid; @@ -21,6 +22,33 @@ pub(crate) enum Batch { content_files: Vec, tasks: Vec, }, + DocumentUpdate { + index_uid: String, + primary_key: Option, + content_files: Vec, + tasks: Vec, + }, + DocumentDeletion { + index_uid: String, + documents: Vec, + tasks: Vec, + }, + DocumentClear { + index_uid: String, + tasks: Vec, + }, + Settings { + index_uid: String, + settings: Vec<(bool, Settings)>, + tasks: Vec, + }, + DocumentClearAndSetting { + index_uid: String, + cleared_tasks: Vec, + + settings: Vec<(bool, Settings)>, + settings_tasks: Vec, + }, SettingsAndDocumentAddition { index_uid: String, @@ -28,7 +56,17 @@ pub(crate) enum Batch { content_files: Vec, document_addition_tasks: Vec, - settings: Vec>, + settings: Vec<(bool, Settings)>, + settings_tasks: Vec, + }, + SettingsAndDocumentUpdate { + index_uid: String, + + primary_key: Option, + content_files: Vec, + document_update_tasks: Vec, + + settings: Vec<(bool, Settings)>, settings_tasks: Vec, }, } @@ -37,18 +75,28 @@ impl Batch { pub fn ids(&self) -> Vec { match self { Batch::Cancel(task) => vec![task.uid], - Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::DocumentAddition { tasks, .. } => { - tasks.iter().map(|task| task.uid).collect() - } + Batch::Snapshot(tasks) + | Batch::Dump(tasks) + | Batch::DocumentAddition { tasks, .. } + | Batch::DocumentUpdate { tasks, .. } + | Batch::DocumentDeletion { tasks, .. } + | Batch::Settings { tasks, .. } + | Batch::DocumentClear { tasks, .. } => tasks.iter().map(|task| task.uid).collect(), Batch::SettingsAndDocumentAddition { - document_addition_tasks, - settings_tasks, + document_addition_tasks: tasks, + settings_tasks: other, .. - } => document_addition_tasks - .iter() - .chain(settings_tasks) - .map(|task| task.uid) - .collect(), + } + | Batch::DocumentClearAndSetting { + cleared_tasks: tasks, + settings_tasks: other, + .. + } + | Batch::SettingsAndDocumentUpdate { + document_update_tasks: tasks, + settings_tasks: other, + .. + } => tasks.iter().chain(other).map(|task| task.uid).collect(), } } } @@ -61,42 +109,17 @@ impl IndexScheduler { batch: BatchKind, ) -> Result> { match batch { - BatchKind::DocumentClear { ids: _ } => todo!(), - BatchKind::DocumentAddition { addition_ids: _ } => todo!(), - BatchKind::DocumentUpdate { update_ids: _ } => todo!(), - BatchKind::DocumentDeletion { deletion_ids: _ } => todo!(), - BatchKind::ClearAndSettings { - other: _, - settings_ids: _, - } => todo!(), - BatchKind::SettingsAndDocumentAddition { - addition_ids, - settings_ids, - } => { - // you're not supposed to create an empty BatchKind. - assert!(addition_ids.len() > 0); - assert!(settings_ids.len() > 0); - - let document_addition_tasks = addition_ids - .iter() - .map(|tid| { - self.get_task(rtxn, *tid) - .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) - }) - .collect::>>()?; - let settings_tasks = settings_ids - .iter() - .map(|tid| { - self.get_task(rtxn, *tid) - .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) - }) - .collect::>>()?; - - let primary_key = match &document_addition_tasks[0].kind { + BatchKind::DocumentClear { ids } => Ok(Some(Batch::DocumentClear { + tasks: self.get_existing_tasks(rtxn, ids)?, + index_uid, + })), + BatchKind::DocumentAddition { addition_ids } => { + let tasks = self.get_existing_tasks(rtxn, addition_ids)?; + let primary_key = match &tasks[0].kind { KindWithContent::DocumentAddition { primary_key, .. } => primary_key.clone(), _ => unreachable!(), }; - let content_files = document_addition_tasks + let content_files = tasks .iter() .map(|task| match task.kind { KindWithContent::DocumentAddition { content_file, .. } => content_file, @@ -104,14 +127,141 @@ impl IndexScheduler { }) .collect(); - let settings = settings_tasks + Ok(Some(Batch::DocumentAddition { + index_uid, + primary_key, + content_files, + tasks, + })) + } + BatchKind::DocumentUpdate { update_ids } => { + let tasks = self.get_existing_tasks(rtxn, update_ids)?; + let primary_key = match &tasks[0].kind { + KindWithContent::DocumentUpdate { primary_key, .. } => primary_key.clone(), + _ => unreachable!(), + }; + let content_files = tasks .iter() - .map(|task| match &task.kind { - KindWithContent::Settings { new_settings, .. } => new_settings.clone(), + .map(|task| match task.kind { + KindWithContent::DocumentUpdate { content_file, .. } => content_file, _ => unreachable!(), }) .collect(); + Ok(Some(Batch::DocumentUpdate { + index_uid, + primary_key, + content_files, + tasks, + })) + } + BatchKind::DocumentDeletion { deletion_ids } => { + let tasks = self.get_existing_tasks(rtxn, deletion_ids)?; + + let mut documents = Vec::new(); + for task in &tasks { + match task.kind { + KindWithContent::DocumentDeletion { + ref documents_ids, .. + } => documents.extend_from_slice(documents_ids), + _ => unreachable!(), + } + } + + Ok(Some(Batch::DocumentDeletion { + index_uid, + documents, + tasks, + })) + } + BatchKind::Settings { settings_ids } => { + let tasks = self.get_existing_tasks(rtxn, settings_ids)?; + + let mut settings = Vec::new(); + for task in &tasks { + match task.kind { + KindWithContent::Settings { + ref new_settings, + is_deletion, + .. + } => settings.push((is_deletion, new_settings.clone())), + _ => unreachable!(), + } + } + + Ok(Some(Batch::Settings { + index_uid, + settings, + tasks, + })) + } + BatchKind::ClearAndSettings { + other, + settings_ids, + } => { + let (index_uid, settings, settings_tasks) = match self + .create_next_batch_index(rtxn, index_uid, BatchKind::Settings { settings_ids })? + .unwrap() + { + Batch::Settings { + index_uid, + settings, + tasks, + } => (index_uid, settings, tasks), + _ => unreachable!(), + }; + let (index_uid, cleared_tasks) = match self + .create_next_batch_index( + rtxn, + index_uid, + BatchKind::DocumentClear { ids: other }, + )? + .unwrap() + { + Batch::DocumentClear { index_uid, tasks } => (index_uid, tasks), + _ => unreachable!(), + }; + + Ok(Some(Batch::DocumentClearAndSetting { + index_uid, + cleared_tasks, + settings, + settings_tasks, + })) + } + BatchKind::SettingsAndDocumentAddition { + addition_ids, + settings_ids, + } => { + let (index_uid, settings, settings_tasks) = match self + .create_next_batch_index(rtxn, index_uid, BatchKind::Settings { settings_ids })? + .unwrap() + { + Batch::Settings { + index_uid, + settings, + tasks, + } => (index_uid, settings, tasks), + _ => unreachable!(), + }; + + let (index_uid, primary_key, content_files, document_addition_tasks) = match self + .create_next_batch_index( + rtxn, + index_uid, + BatchKind::DocumentAddition { addition_ids }, + )? + .unwrap() + { + Batch::DocumentAddition { + index_uid, + primary_key, + content_files, + tasks, + } => (index_uid, primary_key, content_files, tasks), + _ => unreachable!(), + }; + Ok(Some(Batch::SettingsAndDocumentAddition { index_uid, primary_key, @@ -122,10 +272,45 @@ impl IndexScheduler { })) } BatchKind::SettingsAndDocumentUpdate { - update_ids: _, - settings_ids: _, - } => todo!(), - BatchKind::Settings { settings_ids: _ } => todo!(), + update_ids, + settings_ids, + } => { + let settings = self.create_next_batch_index( + rtxn, + index_uid.clone(), + BatchKind::Settings { settings_ids }, + )?; + + let document_update = self.create_next_batch_index( + rtxn, + index_uid.clone(), + BatchKind::DocumentUpdate { update_ids }, + )?; + + match (document_update, settings) { + ( + Some(Batch::DocumentUpdate { + primary_key, + content_files, + tasks: document_update_tasks, + .. + }), + Some(Batch::Settings { + settings, + tasks: settings_tasks, + .. + }), + ) => Ok(Some(Batch::SettingsAndDocumentUpdate { + index_uid, + primary_key, + content_files, + document_update_tasks, + settings, + settings_tasks, + })), + _ => unreachable!(), + } + } BatchKind::IndexCreation { id: _ } => todo!(), BatchKind::IndexDeletion { ids: _ } => todo!(), BatchKind::IndexUpdate { id: _ } => todo!(), @@ -202,6 +387,7 @@ impl IndexScheduler { Batch::Cancel(_) => todo!(), Batch::Snapshot(_) => todo!(), Batch::Dump(_) => todo!(), + Batch::DocumentClear { tasks, .. } => todo!(), Batch::DocumentAddition { index_uid: _, primary_key: _, @@ -255,6 +441,36 @@ impl IndexScheduler { } Ok(updated_tasks) } + Batch::DocumentUpdate { + index_uid, + primary_key, + content_files, + tasks, + } => todo!(), + Batch::DocumentDeletion { + index_uid, + documents, + tasks, + } => todo!(), + Batch::Settings { + index_uid, + settings, + tasks, + } => todo!(), + Batch::DocumentClearAndSetting { + index_uid, + cleared_tasks, + settings, + settings_tasks, + } => todo!(), + Batch::SettingsAndDocumentUpdate { + index_uid, + primary_key, + content_files, + document_update_tasks, + settings, + settings_tasks, + } => todo!(), } } } diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index e2c0253f2..200a2f58b 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -1,6 +1,7 @@ use anyhow::Result; use index::{Settings, Unchecked}; +use milli::DocumentId; use serde::{Deserialize, Serialize, Serializer}; use std::{fmt::Write, path::PathBuf}; use time::{Duration, OffsetDateTime}; @@ -125,7 +126,7 @@ pub enum KindWithContent { }, DocumentDeletion { index_uid: String, - documents_ids: Vec, + documents_ids: Vec, }, DocumentClear { index_uid: String,