support error & return document count on addition

This commit is contained in:
mpostma 2020-12-30 18:44:33 +01:00
parent 54861335a0
commit d9dc2036a7
3 changed files with 44 additions and 61 deletions

41
Cargo.lock generated
View File

@ -1188,22 +1188,6 @@ dependencies = [
"unicode-segmentation", "unicode-segmentation",
] ]
[[package]]
name = "heed"
version = "0.10.6"
dependencies = [
"byteorder",
"heed-traits 0.7.0",
"heed-types 0.7.2",
"libc",
"lmdb-rkv-sys",
"once_cell",
"page_size",
"synchronoise",
"url",
"zerocopy",
]
[[package]] [[package]]
name = "heed" name = "heed"
version = "0.10.6" version = "0.10.6"
@ -1211,8 +1195,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afcc6c911acaadad3ebe9f1ef1707d80bd71c92037566f47b6238a03b60adf1a" checksum = "afcc6c911acaadad3ebe9f1ef1707d80bd71c92037566f47b6238a03b60adf1a"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"heed-traits 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "heed-traits",
"heed-types 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "heed-types",
"libc", "libc",
"lmdb-rkv-sys", "lmdb-rkv-sys",
"once_cell", "once_cell",
@ -1223,27 +1207,12 @@ dependencies = [
"zerocopy", "zerocopy",
] ]
[[package]]
name = "heed-traits"
version = "0.7.0"
[[package]] [[package]]
name = "heed-traits" name = "heed-traits"
version = "0.7.0" version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b328f6260a7e51bdb0ca6b68e6ea27ee3d11fba5dee930896ee7ff6ad5fc072c" checksum = "b328f6260a7e51bdb0ca6b68e6ea27ee3d11fba5dee930896ee7ff6ad5fc072c"
[[package]]
name = "heed-types"
version = "0.7.2"
dependencies = [
"bincode",
"heed-traits 0.7.0",
"serde",
"serde_json",
"zerocopy",
]
[[package]] [[package]]
name = "heed-types" name = "heed-types"
version = "0.7.2" version = "0.7.2"
@ -1251,7 +1220,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e628efb08beaee58355f80dc4adba79d644940ea9eef60175ea17dc218aab405" checksum = "e628efb08beaee58355f80dc4adba79d644940ea9eef60175ea17dc218aab405"
dependencies = [ dependencies = [
"bincode", "bincode",
"heed-traits 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "heed-traits",
"serde", "serde",
"serde_json", "serde_json",
"zerocopy", "zerocopy",
@ -1646,7 +1615,7 @@ dependencies = [
"futures", "futures",
"futures-util", "futures-util",
"grenad", "grenad",
"heed 0.10.6 (registry+https://github.com/rust-lang/crates.io-index)", "heed",
"http", "http",
"indexmap", "indexmap",
"jemallocator", "jemallocator",
@ -1738,7 +1707,7 @@ dependencies = [
"fst", "fst",
"fxhash", "fxhash",
"grenad", "grenad",
"heed 0.10.6", "heed",
"human_format", "human_format",
"itertools", "itertools",
"jemallocator", "jemallocator",

View File

@ -7,7 +7,7 @@ use milli::update::{IndexDocumentsMethod, UpdateFormat};
use milli::update_store::UpdateStatus; use milli::update_store::UpdateStatus;
use super::Data; use super::Data;
use crate::updates::UpdateMeta; use crate::updates::{UpdateMeta, UpdateResult};
impl Data { impl Data {
pub async fn add_documents<B, E, S>( pub async fn add_documents<B, E, S>(
@ -47,7 +47,7 @@ impl Data {
#[inline] #[inline]
pub fn get_update_status(&self, _index: &str, uid: u64) -> anyhow::Result<Option<UpdateStatus<UpdateMeta, String, String>>> { pub fn get_update_status(&self, _index: &str, uid: u64) -> anyhow::Result<Option<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
self.update_queue.get_update_status(uid) self.update_queue.get_update_status(uid)
} }
} }

View File

@ -13,7 +13,7 @@ use flate2::read::GzDecoder;
use grenad::CompressionType; use grenad::CompressionType;
use log::info; use log::info;
use milli::Index; use milli::Index;
use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod }; use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod, DocumentAdditionResult };
use milli::update_store::{UpdateStore, UpdateHandler as Handler, UpdateStatus, Processing, Processed, Failed}; use milli::update_store::{UpdateStore, UpdateHandler as Handler, UpdateStatus, Processing, Processed, Failed};
use rayon::ThreadPool; use rayon::ThreadPool;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
@ -41,13 +41,19 @@ pub enum UpdateMetaProgress {
}, },
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateResult {
DocumentsAddition(DocumentAdditionResult),
Other,
}
#[derive(Clone)] #[derive(Clone)]
pub struct UpdateQueue { pub struct UpdateQueue {
inner: Arc<UpdateStore<UpdateMeta, String, String>>, inner: Arc<UpdateStore<UpdateMeta, UpdateResult, String>>,
} }
impl Deref for UpdateQueue { impl Deref for UpdateQueue {
type Target = Arc<UpdateStore<UpdateMeta, String, String>>; type Target = Arc<UpdateStore<UpdateMeta, UpdateResult, String>>;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.inner &self.inner
@ -163,7 +169,7 @@ impl UpdateHandler {
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
content: &[u8], content: &[u8],
update_builder: UpdateBuilder, update_builder: UpdateBuilder,
) -> Result<()> { ) -> Result<UpdateResult> {
// We must use the write transaction of the update here. // We must use the write transaction of the update here.
let mut wtxn = self.indexes.write_txn()?; let mut wtxn = self.indexes.write_txn()?;
let mut builder = update_builder.index_documents(&mut wtxn, &self.indexes); let mut builder = update_builder.index_documents(&mut wtxn, &self.indexes);
@ -180,23 +186,29 @@ impl UpdateHandler {
let result = builder.execute(reader, |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); let result = builder.execute(reader, |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step));
match result { match result {
Ok(()) => wtxn.commit().map_err(Into::into), Ok(addition_result) => wtxn
.commit()
.and(Ok(UpdateResult::DocumentsAddition(addition_result)))
.map_err(Into::into),
Err(e) => Err(e.into()) Err(e) => Err(e.into())
} }
} }
fn clear_documents(&self, update_builder: UpdateBuilder) -> Result<()> { fn clear_documents(&self, update_builder: UpdateBuilder) -> Result<UpdateResult> {
// We must use the write transaction of the update here. // We must use the write transaction of the update here.
let mut wtxn = self.indexes.write_txn()?; let mut wtxn = self.indexes.write_txn()?;
let builder = update_builder.clear_documents(&mut wtxn, &self.indexes); let builder = update_builder.clear_documents(&mut wtxn, &self.indexes);
match builder.execute() { match builder.execute() {
Ok(_count) => wtxn.commit().map_err(Into::into), Ok(_count) => wtxn
.commit()
.and(Ok(UpdateResult::Other))
.map_err(Into::into),
Err(e) => Err(e.into()) Err(e) => Err(e.into())
} }
} }
fn update_settings(&self, settings: &Settings, update_builder: UpdateBuilder) -> Result<()> { fn update_settings(&self, settings: &Settings, update_builder: UpdateBuilder) -> Result<UpdateResult> {
// We must use the write transaction of the update here. // We must use the write transaction of the update here.
let mut wtxn = self.indexes.write_txn()?; let mut wtxn = self.indexes.write_txn()?;
let mut builder = update_builder.settings(&mut wtxn, &self.indexes); let mut builder = update_builder.settings(&mut wtxn, &self.indexes);
@ -233,12 +245,15 @@ impl UpdateHandler {
let result = builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); let result = builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step));
match result { match result {
Ok(_count) => wtxn.commit().map_err(Into::into), Ok(_count) => wtxn
.commit()
.and(Ok(UpdateResult::Other))
.map_err(Into::into),
Err(e) => Err(e.into()) Err(e) => Err(e.into())
} }
} }
fn update_facets(&self, levels: &Facets, update_builder: UpdateBuilder) -> Result<()> { fn update_facets(&self, levels: &Facets, update_builder: UpdateBuilder) -> Result<UpdateResult> {
// We must use the write transaction of the update here. // We must use the write transaction of the update here.
let mut wtxn = self.indexes.write_txn()?; let mut wtxn = self.indexes.write_txn()?;
let mut builder = update_builder.facets(&mut wtxn, &self.indexes); let mut builder = update_builder.facets(&mut wtxn, &self.indexes);
@ -249,38 +264,37 @@ impl UpdateHandler {
builder.min_level_size(value); builder.min_level_size(value);
} }
match builder.execute() { match builder.execute() {
Ok(()) => wtxn.commit().map_err(Into::into), Ok(()) => wtxn
.commit()
.and(Ok(UpdateResult::Other))
.map_err(Into::into),
Err(e) => Err(e.into()) Err(e) => Err(e.into())
} }
} }
} }
impl Handler<UpdateMeta, String, String> for UpdateHandler { impl Handler<UpdateMeta, UpdateResult, String> for UpdateHandler {
fn handle_update( fn handle_update(
&mut self, &mut self,
update_id: u64, update_id: u64,
meta: Processing<UpdateMeta>, meta: Processing<UpdateMeta>,
content: &[u8] content: &[u8]
) -> Result<Processed<UpdateMeta, String>, Failed<UpdateMeta, String>> { ) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
use UpdateMeta::*; use UpdateMeta::*;
let update_builder = self.update_buidler(update_id); let update_builder = self.update_buidler(update_id);
let result: anyhow::Result<()> = match meta.meta() { let result = match meta.meta() {
DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder), DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder),
ClearDocuments => self.clear_documents(update_builder), ClearDocuments => self.clear_documents(update_builder),
Settings(settings) => self.update_settings(settings, update_builder), Settings(settings) => self.update_settings(settings, update_builder),
Facets(levels) => self.update_facets(levels, update_builder), Facets(levels) => self.update_facets(levels, update_builder),
}; };
let new_meta = match result { match result {
Ok(()) => format!("valid update content"), Ok(result) => Ok(meta.process(result)),
Err(e) => format!("error while processing update content: {:?}", e), Err(e) => Err(meta.fail(e.to_string())),
}; }
let meta = meta.process(new_meta);
Ok(meta)
} }
} }
@ -302,7 +316,7 @@ impl UpdateQueue {
} }
#[inline] #[inline]
pub fn get_update_status(&self, update_id: u64) -> Result<Option<UpdateStatus<UpdateMeta, String, String>>> { pub fn get_update_status(&self, update_id: u64) -> Result<Option<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
Ok(self.inner.meta(update_id)?) Ok(self.inner.meta(update_id)?)
} }
} }