Implement a recursive indexation for the index-related operations

This commit is contained in:
Kerollmops 2022-09-29 18:15:50 +02:00 committed by Clément Renault
parent 3b343a930e
commit 31de33d5ee
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 339 additions and 267 deletions

View File

@ -3,8 +3,13 @@ use crate::{
task::{Details, Kind, KindWithContent, Status, Task}, task::{Details, Kind, KindWithContent, Status, Task},
Error, IndexScheduler, Result, TaskId, Error, IndexScheduler, Result, TaskId,
}; };
use index::apply_settings_to_builder;
use index::error::{IndexError, MilliError};
use index::{Settings, Unchecked}; use index::{Settings, Unchecked};
use milli::heed::RoTxn; use log::{debug, info};
use milli::documents::DocumentsBatchReader;
use milli::heed::{RoTxn, RwTxn};
use milli::update::IndexDocumentsConfig;
use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod}; use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod};
use uuid::Uuid; use uuid::Uuid;
@ -12,6 +17,24 @@ pub(crate) enum Batch {
Cancel(Task), Cancel(Task),
Snapshot(Vec<Task>), Snapshot(Vec<Task>),
Dump(Vec<Task>), Dump(Vec<Task>),
IndexOperation(IndexOperation),
IndexCreation {
index_uid: String,
primary_key: Option<String>,
task: Task,
},
IndexUpdate {
index_uid: String,
primary_key: Option<String>,
task: Task,
},
IndexDeletion {
index_uid: String,
tasks: Vec<Task>,
},
}
pub(crate) enum IndexOperation {
DocumentImport { DocumentImport {
index_uid: String, index_uid: String,
primary_key: Option<String>, primary_key: Option<String>,
@ -54,20 +77,6 @@ pub(crate) enum Batch {
settings: Vec<(bool, Settings<Unchecked>)>, settings: Vec<(bool, Settings<Unchecked>)>,
settings_tasks: Vec<Task>, settings_tasks: Vec<Task>,
}, },
IndexCreation {
index_uid: String,
primary_key: Option<String>,
task: Task,
},
IndexUpdate {
index_uid: String,
primary_key: Option<String>,
task: Task,
},
IndexDeletion {
index_uid: String,
tasks: Vec<Task>,
},
} }
impl Batch { impl Batch {
@ -76,23 +85,27 @@ impl Batch {
Batch::Cancel(task) Batch::Cancel(task)
| Batch::IndexCreation { task, .. } | Batch::IndexCreation { task, .. }
| Batch::IndexUpdate { task, .. } => vec![task.uid], | Batch::IndexUpdate { task, .. } => vec![task.uid],
Batch::Snapshot(tasks) Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::IndexDeletion { tasks, .. } => {
| Batch::Dump(tasks) tasks.iter().map(|task| task.uid).collect()
| Batch::DocumentImport { tasks, .. }
| Batch::DocumentDeletion { tasks, .. }
| Batch::Settings { tasks, .. }
| Batch::DocumentClear { tasks, .. }
| Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(),
Batch::SettingsAndDocumentImport {
document_import_tasks: tasks,
settings_tasks: other,
..
} }
| Batch::DocumentClearAndSetting { Batch::IndexOperation(operation) => match operation {
cleared_tasks: tasks, IndexOperation::DocumentImport { tasks, .. }
settings_tasks: other, | IndexOperation::DocumentDeletion { tasks, .. }
.. | IndexOperation::Settings { tasks, .. }
} => tasks.iter().chain(other).map(|task| task.uid).collect(), | IndexOperation::DocumentClear { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect()
}
IndexOperation::SettingsAndDocumentImport {
document_import_tasks: tasks,
settings_tasks: other,
..
}
| IndexOperation::DocumentClearAndSetting {
cleared_tasks: tasks,
settings_tasks: other,
..
} => tasks.iter().chain(other).map(|task| task.uid).collect(),
},
} }
} }
} }
@ -105,10 +118,12 @@ impl IndexScheduler {
batch: BatchKind, batch: BatchKind,
) -> Result<Option<Batch>> { ) -> Result<Option<Batch>> {
match batch { match batch {
BatchKind::DocumentClear { ids } => Ok(Some(Batch::DocumentClear { BatchKind::DocumentClear { ids } => {
tasks: self.get_existing_tasks(rtxn, ids)?, Ok(Some(Batch::IndexOperation(IndexOperation::DocumentClear {
index_uid, tasks: self.get_existing_tasks(rtxn, ids)?,
})), index_uid,
})))
}
BatchKind::DocumentImport { method, import_ids } => { BatchKind::DocumentImport { method, import_ids } => {
let tasks = self.get_existing_tasks(rtxn, import_ids)?; let tasks = self.get_existing_tasks(rtxn, import_ids)?;
let primary_key = match &tasks[0].kind { let primary_key = match &tasks[0].kind {
@ -123,13 +138,15 @@ impl IndexScheduler {
}) })
.collect(); .collect();
Ok(Some(Batch::DocumentImport { Ok(Some(Batch::IndexOperation(
index_uid, IndexOperation::DocumentImport {
primary_key, index_uid,
method, primary_key,
content_files, method,
tasks, content_files,
})) tasks,
},
)))
} }
BatchKind::DocumentDeletion { deletion_ids } => { BatchKind::DocumentDeletion { deletion_ids } => {
let tasks = self.get_existing_tasks(rtxn, deletion_ids)?; let tasks = self.get_existing_tasks(rtxn, deletion_ids)?;
@ -144,11 +161,13 @@ impl IndexScheduler {
} }
} }
Ok(Some(Batch::DocumentDeletion { Ok(Some(Batch::IndexOperation(
index_uid, IndexOperation::DocumentDeletion {
documents, index_uid,
tasks, documents,
})) tasks,
},
)))
} }
BatchKind::Settings { settings_ids } => { BatchKind::Settings { settings_ids } => {
let tasks = self.get_existing_tasks(rtxn, settings_ids)?; let tasks = self.get_existing_tasks(rtxn, settings_ids)?;
@ -165,11 +184,11 @@ impl IndexScheduler {
} }
} }
Ok(Some(Batch::Settings { Ok(Some(Batch::IndexOperation(IndexOperation::Settings {
index_uid, index_uid,
settings, settings,
tasks, tasks,
})) })))
} }
BatchKind::ClearAndSettings { BatchKind::ClearAndSettings {
other, other,
@ -179,11 +198,11 @@ impl IndexScheduler {
.create_next_batch_index(rtxn, index_uid, BatchKind::Settings { settings_ids })? .create_next_batch_index(rtxn, index_uid, BatchKind::Settings { settings_ids })?
.unwrap() .unwrap()
{ {
Batch::Settings { Batch::IndexOperation(IndexOperation::Settings {
index_uid, index_uid,
settings, settings,
tasks, tasks,
} => (index_uid, settings, tasks), }) => (index_uid, settings, tasks),
_ => unreachable!(), _ => unreachable!(),
}; };
let (index_uid, cleared_tasks) = match self let (index_uid, cleared_tasks) = match self
@ -194,16 +213,20 @@ impl IndexScheduler {
)? )?
.unwrap() .unwrap()
{ {
Batch::DocumentClear { index_uid, tasks } => (index_uid, tasks), Batch::IndexOperation(IndexOperation::DocumentClear { index_uid, tasks }) => {
(index_uid, tasks)
}
_ => unreachable!(), _ => unreachable!(),
}; };
Ok(Some(Batch::DocumentClearAndSetting { Ok(Some(Batch::IndexOperation(
index_uid, IndexOperation::DocumentClearAndSetting {
cleared_tasks, index_uid,
settings, cleared_tasks,
settings_tasks, settings,
})) settings_tasks,
},
)))
} }
BatchKind::SettingsAndDocumentImport { BatchKind::SettingsAndDocumentImport {
settings_ids, settings_ids,
@ -224,26 +247,28 @@ impl IndexScheduler {
match (document_import, settings) { match (document_import, settings) {
( (
Some(Batch::DocumentImport { Some(Batch::IndexOperation(IndexOperation::DocumentImport {
primary_key, primary_key,
content_files, content_files,
tasks: document_import_tasks, tasks: document_import_tasks,
.. ..
}), })),
Some(Batch::Settings { Some(Batch::IndexOperation(IndexOperation::Settings {
settings, settings,
tasks: settings_tasks, tasks: settings_tasks,
.. ..
}), })),
) => Ok(Some(Batch::SettingsAndDocumentImport { ) => Ok(Some(Batch::IndexOperation(
index_uid, IndexOperation::SettingsAndDocumentImport {
primary_key, index_uid,
method, primary_key,
content_files, method,
document_import_tasks, content_files,
settings, document_import_tasks,
settings_tasks, settings,
})), settings_tasks,
},
))),
_ => unreachable!(), _ => unreachable!(),
} }
} }
@ -351,206 +376,30 @@ impl IndexScheduler {
Batch::Cancel(_) => todo!(), Batch::Cancel(_) => todo!(),
Batch::Snapshot(_) => todo!(), Batch::Snapshot(_) => todo!(),
Batch::Dump(_) => todo!(), Batch::Dump(_) => todo!(),
Batch::DocumentClear { Batch::IndexOperation(operation) => {
index_uid, let index = match operation {
mut tasks, IndexOperation::DocumentDeletion { ref index_uid, .. }
} => { | IndexOperation::DocumentClear { ref index_uid, .. } => {
let rtxn = self.env.read_txn()?; // only get the index, don't create it
let index = self.index_mapper.index(&rtxn, &index_uid)?; let rtxn = self.env.read_txn()?;
rtxn.abort()?; self.index_mapper.index(&rtxn, &index_uid)?
let ret = index.clear_documents();
for task in &mut tasks {
task.details = Some(Details::ClearAll {
// TODO where can I find this information of how many documents did we delete?
deleted_documents: None,
});
if let Err(ref error) = ret {
task.error = Some(error.into());
} }
} IndexOperation::DocumentImport { ref index_uid, .. }
| IndexOperation::Settings { ref index_uid, .. }
Ok(tasks) | IndexOperation::DocumentClearAndSetting { ref index_uid, .. }
} | IndexOperation::SettingsAndDocumentImport { ref index_uid, .. } => {
Batch::DocumentImport { // create the index if it doesn't already exist
index_uid, let mut wtxn = self.env.write_txn()?;
primary_key, let index = self.index_mapper.index(&mut wtxn, index_uid)?;
method, wtxn.commit()?;
content_files, index
mut tasks,
} => {
// we NEED a write transaction for the index creation.
// To avoid blocking the whole process we're going to commit asap.
let mut wtxn = self.env.write_txn()?;
let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?;
wtxn.commit()?;
let ret = index.update_documents(
method,
primary_key,
self.file_store.clone(),
content_files,
)?;
for (task, ret) in tasks.iter_mut().zip(ret) {
match ret {
Ok(DocumentAdditionResult {
indexed_documents,
number_of_documents,
}) => {
task.details = Some(Details::DocumentAddition {
received_documents: number_of_documents,
indexed_documents,
});
}
Err(error) => task.error = Some(error.into()),
} }
} };
Ok(tasks) let mut index_wtxn = index.write_txn()?;
} let tasks = self.apply_index_operation(&mut index_wtxn, &index, operation)?;
Batch::SettingsAndDocumentImport { index_wtxn.commit()?;
index_uid,
primary_key,
method,
content_files,
mut document_import_tasks,
settings,
mut settings_tasks,
} => {
// we NEED a write transaction for the index creation.
// To avoid blocking the whole process we're going to commit asap.
let mut wtxn = self.env.write_txn()?;
let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?;
wtxn.commit()?;
// TODO merge the settings to only do a reindexation once.
for (task, (_, settings)) in settings_tasks.iter_mut().zip(settings) {
let checked_settings = settings.clone().check();
task.details = Some(Details::Settings { settings });
if let Err(error) = index.update_settings(&checked_settings) {
task.error = Some(error.into());
}
}
// TODO we must use the same write transaction, here!
let ret = index.update_documents(
method,
primary_key,
self.file_store.clone(),
content_files,
)?;
for (task, ret) in document_import_tasks.iter_mut().zip(ret) {
match ret {
Ok(DocumentAdditionResult {
indexed_documents,
number_of_documents,
}) => {
task.details = Some(Details::DocumentAddition {
received_documents: number_of_documents,
indexed_documents,
});
}
Err(error) => task.error = Some(error.into()),
}
}
let mut tasks = settings_tasks;
tasks.append(&mut document_import_tasks);
Ok(tasks)
}
Batch::DocumentDeletion {
index_uid,
documents,
mut tasks,
} => {
let rtxn = self.env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &index_uid)?;
let ret = index.delete_documents(&documents);
for task in &mut tasks {
match ret {
Ok(DocumentDeletionResult {
deleted_documents,
remaining_documents: _,
}) => {
// TODO we are assigning the same amount of documents to
// all the tasks that are in the same batch. That's wrong!
task.details = Some(Details::DocumentDeletion {
received_document_ids: documents.len(),
deleted_documents: Some(deleted_documents),
});
}
Err(ref error) => task.error = Some(error.into()),
}
}
Ok(tasks)
}
Batch::Settings {
index_uid,
settings,
mut tasks,
} => {
// we NEED a write transaction for the index creation.
// To avoid blocking the whole process we're going to commit asap.
let mut wtxn = self.env.write_txn()?;
let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?;
wtxn.commit()?;
// TODO merge the settings to only do a reindexation once.
for (task, (_, settings)) in tasks.iter_mut().zip(settings) {
let checked_settings = settings.clone().check();
task.details = Some(Details::Settings { settings });
if let Err(error) = index.update_settings(&checked_settings) {
task.error = Some(error.into());
}
}
Ok(tasks)
}
Batch::DocumentClearAndSetting {
index_uid,
mut cleared_tasks,
settings,
mut settings_tasks,
} => {
// If the settings were given before the document clear
// we must create the index first.
// we NEED a write transaction for the index creation.
// To avoid blocking the whole process we're going to commit asap.
let mut wtxn = self.env.write_txn()?;
let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?;
wtxn.commit()?;
// TODO We must use the same write transaction to commit
// the clear AND the settings in one transaction.
let ret = index.clear_documents();
for task in &mut cleared_tasks {
task.details = Some(Details::ClearAll {
// TODO where can I find this information of how many documents did we delete?
deleted_documents: None,
});
if let Err(ref error) = ret {
task.error = Some(error.into());
}
}
// TODO merge the settings to only do a reindexation once.
for (task, (_, settings)) in settings_tasks.iter_mut().zip(settings) {
let checked_settings = settings.clone().check();
task.details = Some(Details::Settings { settings });
if let Err(error) = index.update_settings(&checked_settings) {
task.error = Some(error.into());
}
}
let mut tasks = cleared_tasks;
tasks.append(&mut settings_tasks);
Ok(tasks) Ok(tasks)
} }
Batch::IndexCreation { Batch::IndexCreation {
@ -566,4 +415,223 @@ impl IndexScheduler {
Batch::IndexDeletion { index_uid, tasks } => todo!(), Batch::IndexDeletion { index_uid, tasks } => todo!(),
} }
} }
fn apply_index_operation<'txn, 'i>(
&self,
index_wtxn: &'txn mut RwTxn<'i, '_>,
index: &'i milli::Index,
operation: IndexOperation,
) -> Result<Vec<Task>> {
match operation {
IndexOperation::DocumentClear {
index_uid,
mut tasks,
} => {
let result = milli::update::ClearDocuments::new(index_wtxn, index).execute();
for task in &mut tasks {
match result {
Ok(deleted_documents) => {
task.details = Some(Details::ClearAll {
deleted_documents: Some(deleted_documents),
})
}
Err(ref error) => task.error = Some(MilliError(error).into()),
}
}
Ok(tasks)
}
IndexOperation::DocumentImport {
index_uid,
primary_key,
method,
content_files,
mut tasks,
} => {
let indexer_config = self.index_mapper.indexer_config();
if let Some(primary_key) = primary_key {
if index.primary_key(index_wtxn)?.is_none() {
let mut builder =
milli::update::Settings::new(index_wtxn, index, indexer_config);
builder.set_primary_key(primary_key);
builder.execute(|_| ())?;
}
}
let config = IndexDocumentsConfig {
update_method: method,
..Default::default()
};
let mut builder = milli::update::IndexDocuments::new(
index_wtxn,
index,
indexer_config,
config,
|indexing_step| debug!("update: {:?}", indexing_step),
)?;
let mut results = Vec::new();
for content_uuid in content_files.into_iter() {
let content_file = self.file_store.get_update(content_uuid)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(IndexError::from)?;
let (new_builder, user_result) = builder.add_documents(reader)?;
builder = new_builder;
let user_result = match user_result {
Ok(count) => {
let addition = DocumentAdditionResult {
indexed_documents: count,
number_of_documents: count,
};
Ok(addition)
}
Err(e) => Err(IndexError::from(e)),
};
results.push(user_result);
}
if results.iter().any(|res| res.is_ok()) {
let addition = builder.execute()?;
info!("document addition done: {:?}", addition);
}
for (task, ret) in tasks.iter_mut().zip(results) {
match ret {
Ok(DocumentAdditionResult {
indexed_documents,
number_of_documents,
}) => {
task.details = Some(Details::DocumentAddition {
received_documents: number_of_documents,
indexed_documents,
})
}
Err(error) => task.error = Some(error.into()),
}
}
Ok(tasks)
}
IndexOperation::DocumentDeletion {
index_uid,
documents,
mut tasks,
} => {
let mut builder = milli::update::DeleteDocuments::new(index_wtxn, index)?;
documents.iter().for_each(|id| {
builder.delete_external_id(id);
});
let result = builder.execute();
for (task, documents) in tasks.iter_mut().zip(documents) {
match result {
Ok(DocumentDeletionResult {
deleted_documents,
remaining_documents: _,
}) => {
task.details = Some(Details::DocumentDeletion {
received_document_ids: documents.len(),
deleted_documents: Some(deleted_documents),
});
}
Err(ref error) => task.error = Some(MilliError(error).into()),
}
}
Ok(tasks)
}
IndexOperation::Settings {
index_uid,
settings,
mut tasks,
} => {
let indexer_config = self.index_mapper.indexer_config();
// TODO merge the settings to only do *one* reindexation.
for (task, (_, settings)) in tasks.iter_mut().zip(settings) {
let checked_settings = settings.clone().check();
task.details = Some(Details::Settings { settings });
let mut builder =
milli::update::Settings::new(index_wtxn, index, indexer_config);
apply_settings_to_builder(&checked_settings, &mut builder);
let result = builder.execute(|indexing_step| {
debug!("update: {:?}", indexing_step);
});
if let Err(ref error) = result {
task.error = Some(MilliError(error).into());
}
}
Ok(tasks)
}
IndexOperation::SettingsAndDocumentImport {
index_uid,
primary_key,
method,
content_files,
document_import_tasks,
settings,
settings_tasks,
} => {
let settings_tasks = self.apply_index_operation(
index_wtxn,
index,
IndexOperation::Settings {
index_uid: index_uid.clone(),
settings,
tasks: settings_tasks,
},
)?;
let mut import_tasks = self.apply_index_operation(
index_wtxn,
index,
IndexOperation::DocumentImport {
index_uid,
primary_key,
method,
content_files,
tasks: document_import_tasks,
},
)?;
let mut tasks = settings_tasks;
tasks.append(&mut import_tasks);
Ok(tasks)
}
IndexOperation::DocumentClearAndSetting {
index_uid,
cleared_tasks,
settings,
settings_tasks,
} => {
let mut import_tasks = self.apply_index_operation(
index_wtxn,
index,
IndexOperation::DocumentClear {
index_uid: index_uid.clone(),
tasks: cleared_tasks,
},
)?;
let settings_tasks = self.apply_index_operation(
index_wtxn,
index,
IndexOperation::Settings {
index_uid,
settings,
tasks: settings_tasks,
},
)?;
let mut tasks = settings_tasks;
tasks.append(&mut import_tasks);
Ok(tasks)
}
}
}
} }

View File

@ -133,4 +133,8 @@ impl IndexMapper {
Ok(()) Ok(())
} }
pub fn indexer_config(&self) -> &IndexerConfig {
&self.indexer_config
}
} }