From 7b4a91370499715b4204359a9e313e50aa6fda05 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 29 Sep 2022 12:04:58 +0200 Subject: [PATCH] Implement the DocumentUpdate batch operation --- index-scheduler/src/batch.rs | 44 ++++++++++++++++++++++++++++++++---- index-scheduler/src/task.rs | 2 +- index/src/error.rs | 11 +++++++++ index/src/index.rs | 1 - 4 files changed, 51 insertions(+), 7 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 975a2468c..3b002587c 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -429,6 +429,7 @@ impl IndexScheduler { Batch::Snapshot(_) => todo!(), Batch::Dump(_) => todo!(), Batch::DocumentClear { tasks, .. } => todo!(), + // TODO we should merge both document import with a method field Batch::DocumentAddition { index_uid, primary_key, @@ -477,22 +478,55 @@ impl IndexScheduler { } => { todo!(); } + // TODO we should merge both document import with a method field Batch::DocumentUpdate { index_uid, primary_key, content_files, - tasks, - } => todo!(), + mut tasks, + } => { + // we NEED a write transaction for the index creation. + // To avoid blocking the whole process we're going to commit asap. + let mut wtxn = self.env.write_txn()?; + let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?; + wtxn.commit()?; + + let ret = index.update_documents( + IndexDocumentsMethod::UpdateDocuments, + primary_key, + self.file_store.clone(), + content_files, + )?; + + for (task, ret) in tasks.iter_mut().zip(ret) { + match ret { + Ok(DocumentAdditionResult { + indexed_documents, + number_of_documents, + }) => { + task.details = Some(Details::DocumentAddition { + received_documents: number_of_documents, + indexed_documents, + }); + } + Err(error) => { + task.error = Some(error.into()); + } + } + } + + Ok(tasks) + } Batch::DocumentDeletion { index_uid, documents, - tasks, + mut tasks, } => { let rtxn = self.env.read_txn()?; let index = self.index_mapper.index(&rtxn, &index_uid)?; let ret = index.delete_documents(&documents); - for task in tasks { + for task in &mut tasks { match ret { Ok(DocumentDeletionResult { deleted_documents, @@ -505,7 +539,7 @@ impl IndexScheduler { deleted_documents: Some(deleted_documents), }); } - Err(error) => { + Err(ref error) => { task.error = Some(error.into()); } } diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index c8156e329..894a214ac 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -9,7 +9,7 @@ use uuid::Uuid; use crate::{Error, TaskId}; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct TaskView { pub uid: TaskId, diff --git a/index/src/error.rs b/index/src/error.rs index b2ecfea0f..c960d6925 100644 --- a/index/src/error.rs +++ b/index/src/error.rs @@ -40,6 +40,17 @@ impl ErrorCode for IndexError { } } +impl ErrorCode for &IndexError { + fn error_code(&self) -> Code { + match self { + IndexError::Internal(_) => Code::Internal, + IndexError::DocumentNotFound(_) => Code::DocumentNotFound, + IndexError::Facet(e) => e.error_code(), + IndexError::Milli(e) => MilliError(e).error_code(), + } + } +} + impl From for IndexError { fn from(error: milli::UserError) -> IndexError { IndexError::Milli(error.into()) diff --git a/index/src/index.rs b/index/src/index.rs index 1de8dc53a..8a76b130f 100644 --- a/index/src/index.rs +++ b/index/src/index.rs @@ -9,7 +9,6 @@ use fst::IntoStreamer; use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn}; use milli::update::{IndexerConfig, Setting}; use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET}; -use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use time::OffsetDateTime;