diff --git a/Cargo.lock b/Cargo.lock index 13f74b82d..f717be694 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1188,22 +1188,6 @@ dependencies = [ "unicode-segmentation", ] -[[package]] -name = "heed" -version = "0.10.6" -dependencies = [ - "byteorder", - "heed-traits 0.7.0", - "heed-types 0.7.2", - "libc", - "lmdb-rkv-sys", - "once_cell", - "page_size", - "synchronoise", - "url", - "zerocopy", -] - [[package]] name = "heed" version = "0.10.6" @@ -1211,8 +1195,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afcc6c911acaadad3ebe9f1ef1707d80bd71c92037566f47b6238a03b60adf1a" dependencies = [ "byteorder", - "heed-traits 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "heed-types 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", + "heed-traits", + "heed-types", "libc", "lmdb-rkv-sys", "once_cell", @@ -1223,27 +1207,12 @@ dependencies = [ "zerocopy", ] -[[package]] -name = "heed-traits" -version = "0.7.0" - [[package]] name = "heed-traits" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b328f6260a7e51bdb0ca6b68e6ea27ee3d11fba5dee930896ee7ff6ad5fc072c" -[[package]] -name = "heed-types" -version = "0.7.2" -dependencies = [ - "bincode", - "heed-traits 0.7.0", - "serde", - "serde_json", - "zerocopy", -] - [[package]] name = "heed-types" version = "0.7.2" @@ -1251,7 +1220,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e628efb08beaee58355f80dc4adba79d644940ea9eef60175ea17dc218aab405" dependencies = [ "bincode", - "heed-traits 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "heed-traits", "serde", "serde_json", "zerocopy", @@ -1646,7 +1615,7 @@ dependencies = [ "futures", "futures-util", "grenad", - "heed 0.10.6 (registry+https://github.com/rust-lang/crates.io-index)", + "heed", "http", "indexmap", "jemallocator", @@ -1738,7 +1707,7 @@ dependencies = [ "fst", "fxhash", "grenad", - "heed 0.10.6", + "heed", "human_format", "itertools", "jemallocator", diff --git a/src/data/updates.rs b/src/data/updates.rs index d12f271c8..6bb9b20f2 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -7,7 +7,7 @@ use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update_store::UpdateStatus; use super::Data; -use crate::updates::UpdateMeta; +use crate::updates::{UpdateMeta, UpdateResult}; impl Data { pub async fn add_documents( @@ -47,7 +47,7 @@ impl Data { #[inline] - pub fn get_update_status(&self, _index: &str, uid: u64) -> anyhow::Result>> { + pub fn get_update_status(&self, _index: &str, uid: u64) -> anyhow::Result>> { self.update_queue.get_update_status(uid) } } diff --git a/src/updates/mod.rs b/src/updates/mod.rs index cd8052108..93cbcc008 100644 --- a/src/updates/mod.rs +++ b/src/updates/mod.rs @@ -13,7 +13,7 @@ use flate2::read::GzDecoder; use grenad::CompressionType; use log::info; use milli::Index; -use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod }; +use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod, DocumentAdditionResult }; use milli::update_store::{UpdateStore, UpdateHandler as Handler, UpdateStatus, Processing, Processed, Failed}; use rayon::ThreadPool; use serde::{Serialize, Deserialize}; @@ -41,13 +41,19 @@ pub enum UpdateMetaProgress { }, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum UpdateResult { + DocumentsAddition(DocumentAdditionResult), + Other, +} + #[derive(Clone)] pub struct UpdateQueue { - inner: Arc>, + inner: Arc>, } impl Deref for UpdateQueue { - type Target = Arc>; + type Target = Arc>; fn deref(&self) -> &Self::Target { &self.inner @@ -163,7 +169,7 @@ impl UpdateHandler { method: IndexDocumentsMethod, content: &[u8], update_builder: UpdateBuilder, - ) -> Result<()> { + ) -> Result { // We must use the write transaction of the update here. let mut wtxn = self.indexes.write_txn()?; let mut builder = update_builder.index_documents(&mut wtxn, &self.indexes); @@ -180,23 +186,29 @@ impl UpdateHandler { let result = builder.execute(reader, |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); match result { - Ok(()) => wtxn.commit().map_err(Into::into), + Ok(addition_result) => wtxn + .commit() + .and(Ok(UpdateResult::DocumentsAddition(addition_result))) + .map_err(Into::into), Err(e) => Err(e.into()) } } - fn clear_documents(&self, update_builder: UpdateBuilder) -> Result<()> { + fn clear_documents(&self, update_builder: UpdateBuilder) -> Result { // We must use the write transaction of the update here. let mut wtxn = self.indexes.write_txn()?; let builder = update_builder.clear_documents(&mut wtxn, &self.indexes); match builder.execute() { - Ok(_count) => wtxn.commit().map_err(Into::into), + Ok(_count) => wtxn + .commit() + .and(Ok(UpdateResult::Other)) + .map_err(Into::into), Err(e) => Err(e.into()) } } - fn update_settings(&self, settings: &Settings, update_builder: UpdateBuilder) -> Result<()> { + fn update_settings(&self, settings: &Settings, update_builder: UpdateBuilder) -> Result { // We must use the write transaction of the update here. let mut wtxn = self.indexes.write_txn()?; let mut builder = update_builder.settings(&mut wtxn, &self.indexes); @@ -233,12 +245,15 @@ impl UpdateHandler { let result = builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); match result { - Ok(_count) => wtxn.commit().map_err(Into::into), + Ok(_count) => wtxn + .commit() + .and(Ok(UpdateResult::Other)) + .map_err(Into::into), Err(e) => Err(e.into()) } } - fn update_facets(&self, levels: &Facets, update_builder: UpdateBuilder) -> Result<()> { + fn update_facets(&self, levels: &Facets, update_builder: UpdateBuilder) -> Result { // We must use the write transaction of the update here. let mut wtxn = self.indexes.write_txn()?; let mut builder = update_builder.facets(&mut wtxn, &self.indexes); @@ -249,38 +264,37 @@ impl UpdateHandler { builder.min_level_size(value); } match builder.execute() { - Ok(()) => wtxn.commit().map_err(Into::into), + Ok(()) => wtxn + .commit() + .and(Ok(UpdateResult::Other)) + .map_err(Into::into), Err(e) => Err(e.into()) } } } -impl Handler for UpdateHandler { +impl Handler for UpdateHandler { fn handle_update( &mut self, update_id: u64, meta: Processing, content: &[u8] - ) -> Result, Failed> { + ) -> Result, Failed> { use UpdateMeta::*; let update_builder = self.update_buidler(update_id); - let result: anyhow::Result<()> = match meta.meta() { + let result = match meta.meta() { DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder), ClearDocuments => self.clear_documents(update_builder), Settings(settings) => self.update_settings(settings, update_builder), Facets(levels) => self.update_facets(levels, update_builder), }; - let new_meta = match result { - Ok(()) => format!("valid update content"), - Err(e) => format!("error while processing update content: {:?}", e), - }; - - let meta = meta.process(new_meta); - - Ok(meta) + match result { + Ok(result) => Ok(meta.process(result)), + Err(e) => Err(meta.fail(e.to_string())), + } } } @@ -302,7 +316,7 @@ impl UpdateQueue { } #[inline] - pub fn get_update_status(&self, update_id: u64) -> Result>> { + pub fn get_update_status(&self, update_id: u64) -> Result>> { Ok(self.inner.meta(update_id)?) } }