Implement the DocumentUpdate batch operation

This commit is contained in:
Kerollmops 2022-09-29 12:04:58 +02:00 committed by Clément Renault
parent a6a1043abb
commit 7b4a913704
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
4 changed files with 51 additions and 7 deletions

View File

@ -429,6 +429,7 @@ impl IndexScheduler {
Batch::Snapshot(_) => todo!(), Batch::Snapshot(_) => todo!(),
Batch::Dump(_) => todo!(), Batch::Dump(_) => todo!(),
Batch::DocumentClear { tasks, .. } => todo!(), Batch::DocumentClear { tasks, .. } => todo!(),
// TODO we should merge both document import with a method field
Batch::DocumentAddition { Batch::DocumentAddition {
index_uid, index_uid,
primary_key, primary_key,
@ -477,22 +478,55 @@ impl IndexScheduler {
} => { } => {
todo!(); todo!();
} }
// TODO we should merge both document import with a method field
Batch::DocumentUpdate { Batch::DocumentUpdate {
index_uid, index_uid,
primary_key, primary_key,
content_files, content_files,
tasks, mut tasks,
} => todo!(), } => {
// we NEED a write transaction for the index creation.
// To avoid blocking the whole process we're going to commit asap.
let mut wtxn = self.env.write_txn()?;
let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?;
wtxn.commit()?;
let ret = index.update_documents(
IndexDocumentsMethod::UpdateDocuments,
primary_key,
self.file_store.clone(),
content_files,
)?;
for (task, ret) in tasks.iter_mut().zip(ret) {
match ret {
Ok(DocumentAdditionResult {
indexed_documents,
number_of_documents,
}) => {
task.details = Some(Details::DocumentAddition {
received_documents: number_of_documents,
indexed_documents,
});
}
Err(error) => {
task.error = Some(error.into());
}
}
}
Ok(tasks)
}
Batch::DocumentDeletion { Batch::DocumentDeletion {
index_uid, index_uid,
documents, documents,
tasks, mut tasks,
} => { } => {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &index_uid)?; let index = self.index_mapper.index(&rtxn, &index_uid)?;
let ret = index.delete_documents(&documents); let ret = index.delete_documents(&documents);
for task in tasks { for task in &mut tasks {
match ret { match ret {
Ok(DocumentDeletionResult { Ok(DocumentDeletionResult {
deleted_documents, deleted_documents,
@ -505,7 +539,7 @@ impl IndexScheduler {
deleted_documents: Some(deleted_documents), deleted_documents: Some(deleted_documents),
}); });
} }
Err(error) => { Err(ref error) => {
task.error = Some(error.into()); task.error = Some(error.into());
} }
} }

View File

@ -9,7 +9,7 @@ use uuid::Uuid;
use crate::{Error, TaskId}; use crate::{Error, TaskId};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct TaskView { pub struct TaskView {
pub uid: TaskId, pub uid: TaskId,

View File

@ -40,6 +40,17 @@ impl ErrorCode for IndexError {
} }
} }
impl ErrorCode for &IndexError {
fn error_code(&self) -> Code {
match self {
IndexError::Internal(_) => Code::Internal,
IndexError::DocumentNotFound(_) => Code::DocumentNotFound,
IndexError::Facet(e) => e.error_code(),
IndexError::Milli(e) => MilliError(e).error_code(),
}
}
}
impl From<milli::UserError> for IndexError { impl From<milli::UserError> for IndexError {
fn from(error: milli::UserError) -> IndexError { fn from(error: milli::UserError) -> IndexError {
IndexError::Milli(error.into()) IndexError::Milli(error.into())

View File

@ -9,7 +9,6 @@ use fst::IntoStreamer;
use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn}; use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn};
use milli::update::{IndexerConfig, Setting}; use milli::update::{IndexerConfig, Setting};
use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET}; use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value}; use serde_json::{Map, Value};
use time::OffsetDateTime; use time::OffsetDateTime;