create the end Batch type for all Document* operation

This commit is contained in:
Tamo 2022-09-16 16:31:16 +02:00 committed by Clément Renault
parent 1ea9c0b4c0
commit 925971809a
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 269 additions and 52 deletions

View File

@ -7,6 +7,7 @@ use index::{Settings, Unchecked};
use milli::{ use milli::{
heed::{RoTxn, RwTxn}, heed::{RoTxn, RwTxn},
update::IndexDocumentsMethod, update::IndexDocumentsMethod,
DocumentId,
}; };
use uuid::Uuid; use uuid::Uuid;
@ -21,6 +22,33 @@ pub(crate) enum Batch {
content_files: Vec<Uuid>, content_files: Vec<Uuid>,
tasks: Vec<Task>, tasks: Vec<Task>,
}, },
DocumentUpdate {
index_uid: String,
primary_key: Option<String>,
content_files: Vec<Uuid>,
tasks: Vec<Task>,
},
DocumentDeletion {
index_uid: String,
documents: Vec<DocumentId>,
tasks: Vec<Task>,
},
DocumentClear {
index_uid: String,
tasks: Vec<Task>,
},
Settings {
index_uid: String,
settings: Vec<(bool, Settings<Unchecked>)>,
tasks: Vec<Task>,
},
DocumentClearAndSetting {
index_uid: String,
cleared_tasks: Vec<Task>,
settings: Vec<(bool, Settings<Unchecked>)>,
settings_tasks: Vec<Task>,
},
SettingsAndDocumentAddition { SettingsAndDocumentAddition {
index_uid: String, index_uid: String,
@ -28,7 +56,17 @@ pub(crate) enum Batch {
content_files: Vec<Uuid>, content_files: Vec<Uuid>,
document_addition_tasks: Vec<Task>, document_addition_tasks: Vec<Task>,
settings: Vec<Settings<Unchecked>>, settings: Vec<(bool, Settings<Unchecked>)>,
settings_tasks: Vec<Task>,
},
SettingsAndDocumentUpdate {
index_uid: String,
primary_key: Option<String>,
content_files: Vec<Uuid>,
document_update_tasks: Vec<Task>,
settings: Vec<(bool, Settings<Unchecked>)>,
settings_tasks: Vec<Task>, settings_tasks: Vec<Task>,
}, },
} }
@ -37,18 +75,28 @@ impl Batch {
pub fn ids(&self) -> Vec<TaskId> { pub fn ids(&self) -> Vec<TaskId> {
match self { match self {
Batch::Cancel(task) => vec![task.uid], Batch::Cancel(task) => vec![task.uid],
Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::DocumentAddition { tasks, .. } => { Batch::Snapshot(tasks)
tasks.iter().map(|task| task.uid).collect() | Batch::Dump(tasks)
} | Batch::DocumentAddition { tasks, .. }
| Batch::DocumentUpdate { tasks, .. }
| Batch::DocumentDeletion { tasks, .. }
| Batch::Settings { tasks, .. }
| Batch::DocumentClear { tasks, .. } => tasks.iter().map(|task| task.uid).collect(),
Batch::SettingsAndDocumentAddition { Batch::SettingsAndDocumentAddition {
document_addition_tasks, document_addition_tasks: tasks,
settings_tasks, settings_tasks: other,
.. ..
} => document_addition_tasks }
.iter() | Batch::DocumentClearAndSetting {
.chain(settings_tasks) cleared_tasks: tasks,
.map(|task| task.uid) settings_tasks: other,
.collect(), ..
}
| Batch::SettingsAndDocumentUpdate {
document_update_tasks: tasks,
settings_tasks: other,
..
} => tasks.iter().chain(other).map(|task| task.uid).collect(),
} }
} }
} }
@ -61,42 +109,17 @@ impl IndexScheduler {
batch: BatchKind, batch: BatchKind,
) -> Result<Option<Batch>> { ) -> Result<Option<Batch>> {
match batch { match batch {
BatchKind::DocumentClear { ids: _ } => todo!(), BatchKind::DocumentClear { ids } => Ok(Some(Batch::DocumentClear {
BatchKind::DocumentAddition { addition_ids: _ } => todo!(), tasks: self.get_existing_tasks(rtxn, ids)?,
BatchKind::DocumentUpdate { update_ids: _ } => todo!(), index_uid,
BatchKind::DocumentDeletion { deletion_ids: _ } => todo!(), })),
BatchKind::ClearAndSettings { BatchKind::DocumentAddition { addition_ids } => {
other: _, let tasks = self.get_existing_tasks(rtxn, addition_ids)?;
settings_ids: _, let primary_key = match &tasks[0].kind {
} => todo!(),
BatchKind::SettingsAndDocumentAddition {
addition_ids,
settings_ids,
} => {
// you're not supposed to create an empty BatchKind.
assert!(addition_ids.len() > 0);
assert!(settings_ids.len() > 0);
let document_addition_tasks = addition_ids
.iter()
.map(|tid| {
self.get_task(rtxn, *tid)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
})
.collect::<Result<Vec<_>>>()?;
let settings_tasks = settings_ids
.iter()
.map(|tid| {
self.get_task(rtxn, *tid)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
})
.collect::<Result<Vec<_>>>()?;
let primary_key = match &document_addition_tasks[0].kind {
KindWithContent::DocumentAddition { primary_key, .. } => primary_key.clone(), KindWithContent::DocumentAddition { primary_key, .. } => primary_key.clone(),
_ => unreachable!(), _ => unreachable!(),
}; };
let content_files = document_addition_tasks let content_files = tasks
.iter() .iter()
.map(|task| match task.kind { .map(|task| match task.kind {
KindWithContent::DocumentAddition { content_file, .. } => content_file, KindWithContent::DocumentAddition { content_file, .. } => content_file,
@ -104,14 +127,141 @@ impl IndexScheduler {
}) })
.collect(); .collect();
let settings = settings_tasks Ok(Some(Batch::DocumentAddition {
index_uid,
primary_key,
content_files,
tasks,
}))
}
BatchKind::DocumentUpdate { update_ids } => {
let tasks = self.get_existing_tasks(rtxn, update_ids)?;
let primary_key = match &tasks[0].kind {
KindWithContent::DocumentUpdate { primary_key, .. } => primary_key.clone(),
_ => unreachable!(),
};
let content_files = tasks
.iter() .iter()
.map(|task| match &task.kind { .map(|task| match task.kind {
KindWithContent::Settings { new_settings, .. } => new_settings.clone(), KindWithContent::DocumentUpdate { content_file, .. } => content_file,
_ => unreachable!(), _ => unreachable!(),
}) })
.collect(); .collect();
Ok(Some(Batch::DocumentUpdate {
index_uid,
primary_key,
content_files,
tasks,
}))
}
BatchKind::DocumentDeletion { deletion_ids } => {
let tasks = self.get_existing_tasks(rtxn, deletion_ids)?;
let mut documents = Vec::new();
for task in &tasks {
match task.kind {
KindWithContent::DocumentDeletion {
ref documents_ids, ..
} => documents.extend_from_slice(documents_ids),
_ => unreachable!(),
}
}
Ok(Some(Batch::DocumentDeletion {
index_uid,
documents,
tasks,
}))
}
BatchKind::Settings { settings_ids } => {
let tasks = self.get_existing_tasks(rtxn, settings_ids)?;
let mut settings = Vec::new();
for task in &tasks {
match task.kind {
KindWithContent::Settings {
ref new_settings,
is_deletion,
..
} => settings.push((is_deletion, new_settings.clone())),
_ => unreachable!(),
}
}
Ok(Some(Batch::Settings {
index_uid,
settings,
tasks,
}))
}
BatchKind::ClearAndSettings {
other,
settings_ids,
} => {
let (index_uid, settings, settings_tasks) = match self
.create_next_batch_index(rtxn, index_uid, BatchKind::Settings { settings_ids })?
.unwrap()
{
Batch::Settings {
index_uid,
settings,
tasks,
} => (index_uid, settings, tasks),
_ => unreachable!(),
};
let (index_uid, cleared_tasks) = match self
.create_next_batch_index(
rtxn,
index_uid,
BatchKind::DocumentClear { ids: other },
)?
.unwrap()
{
Batch::DocumentClear { index_uid, tasks } => (index_uid, tasks),
_ => unreachable!(),
};
Ok(Some(Batch::DocumentClearAndSetting {
index_uid,
cleared_tasks,
settings,
settings_tasks,
}))
}
BatchKind::SettingsAndDocumentAddition {
addition_ids,
settings_ids,
} => {
let (index_uid, settings, settings_tasks) = match self
.create_next_batch_index(rtxn, index_uid, BatchKind::Settings { settings_ids })?
.unwrap()
{
Batch::Settings {
index_uid,
settings,
tasks,
} => (index_uid, settings, tasks),
_ => unreachable!(),
};
let (index_uid, primary_key, content_files, document_addition_tasks) = match self
.create_next_batch_index(
rtxn,
index_uid,
BatchKind::DocumentAddition { addition_ids },
)?
.unwrap()
{
Batch::DocumentAddition {
index_uid,
primary_key,
content_files,
tasks,
} => (index_uid, primary_key, content_files, tasks),
_ => unreachable!(),
};
Ok(Some(Batch::SettingsAndDocumentAddition { Ok(Some(Batch::SettingsAndDocumentAddition {
index_uid, index_uid,
primary_key, primary_key,
@ -122,10 +272,45 @@ impl IndexScheduler {
})) }))
} }
BatchKind::SettingsAndDocumentUpdate { BatchKind::SettingsAndDocumentUpdate {
update_ids: _, update_ids,
settings_ids: _, settings_ids,
} => todo!(), } => {
BatchKind::Settings { settings_ids: _ } => todo!(), let settings = self.create_next_batch_index(
rtxn,
index_uid.clone(),
BatchKind::Settings { settings_ids },
)?;
let document_update = self.create_next_batch_index(
rtxn,
index_uid.clone(),
BatchKind::DocumentUpdate { update_ids },
)?;
match (document_update, settings) {
(
Some(Batch::DocumentUpdate {
primary_key,
content_files,
tasks: document_update_tasks,
..
}),
Some(Batch::Settings {
settings,
tasks: settings_tasks,
..
}),
) => Ok(Some(Batch::SettingsAndDocumentUpdate {
index_uid,
primary_key,
content_files,
document_update_tasks,
settings,
settings_tasks,
})),
_ => unreachable!(),
}
}
BatchKind::IndexCreation { id: _ } => todo!(), BatchKind::IndexCreation { id: _ } => todo!(),
BatchKind::IndexDeletion { ids: _ } => todo!(), BatchKind::IndexDeletion { ids: _ } => todo!(),
BatchKind::IndexUpdate { id: _ } => todo!(), BatchKind::IndexUpdate { id: _ } => todo!(),
@ -202,6 +387,7 @@ impl IndexScheduler {
Batch::Cancel(_) => todo!(), Batch::Cancel(_) => todo!(),
Batch::Snapshot(_) => todo!(), Batch::Snapshot(_) => todo!(),
Batch::Dump(_) => todo!(), Batch::Dump(_) => todo!(),
Batch::DocumentClear { tasks, .. } => todo!(),
Batch::DocumentAddition { Batch::DocumentAddition {
index_uid: _, index_uid: _,
primary_key: _, primary_key: _,
@ -255,6 +441,36 @@ impl IndexScheduler {
} }
Ok(updated_tasks) Ok(updated_tasks)
} }
Batch::DocumentUpdate {
index_uid,
primary_key,
content_files,
tasks,
} => todo!(),
Batch::DocumentDeletion {
index_uid,
documents,
tasks,
} => todo!(),
Batch::Settings {
index_uid,
settings,
tasks,
} => todo!(),
Batch::DocumentClearAndSetting {
index_uid,
cleared_tasks,
settings,
settings_tasks,
} => todo!(),
Batch::SettingsAndDocumentUpdate {
index_uid,
primary_key,
content_files,
document_update_tasks,
settings,
settings_tasks,
} => todo!(),
} }
} }
} }

View File

@ -1,6 +1,7 @@
use anyhow::Result; use anyhow::Result;
use index::{Settings, Unchecked}; use index::{Settings, Unchecked};
use milli::DocumentId;
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
use std::{fmt::Write, path::PathBuf}; use std::{fmt::Write, path::PathBuf};
use time::{Duration, OffsetDateTime}; use time::{Duration, OffsetDateTime};
@ -125,7 +126,7 @@ pub enum KindWithContent {
}, },
DocumentDeletion { DocumentDeletion {
index_uid: String, index_uid: String,
documents_ids: Vec<String>, documents_ids: Vec<DocumentId>,
}, },
DocumentClear { DocumentClear {
index_uid: String, index_uid: String,