From cde847838882037a2ef49ed3ab5526c66c33a17d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 20 Oct 2020 15:14:06 +0200 Subject: [PATCH] Replace the panic in the merge function by actual errors --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/indexing/merge_function.rs | 29 +++++++++++++++-------------- src/indexing/mod.rs | 9 +++------ src/subcommand/serve.rs | 2 +- 5 files changed, 21 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd5335894..d32dd684b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -611,7 +611,7 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" [[package]] name = "grenad" version = "0.1.0" -source = "git+https://github.com/Kerollmops/grenad.git?rev=1094409#1094409c59f41d3896d487f9869c33343f59c233" +source = "git+https://github.com/Kerollmops/grenad.git?rev=00099b5#00099b58092c67f7ec492a6b37de465289f3110b" dependencies = [ "byteorder", "flate2", diff --git a/Cargo.toml b/Cargo.toml index 4589341d4..a829d781d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ csv = "1.1.3" flate2 = "1.0.17" fst = "0.4.4" fxhash = "0.2.1" -grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "1094409" } +grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "00099b5" } heed = { version = "0.8.1", default-features = false, features = ["lmdb"] } human_format = "1.0.3" jemallocator = "0.3.2" diff --git a/src/indexing/merge_function.rs b/src/indexing/merge_function.rs index f9506db1c..29a9c9125 100644 --- a/src/indexing/merge_function.rs +++ b/src/indexing/merge_function.rs @@ -1,3 +1,4 @@ +use anyhow::bail; use bstr::ByteSlice as _; use fst::IntoStreamer; use roaring::RoaringBitmap; @@ -8,7 +9,7 @@ const WORDS_FST_KEY: &[u8] = crate::WORDS_FST_KEY.as_bytes(); const HEADERS_KEY: &[u8] = crate::HEADERS_KEY.as_bytes(); const DOCUMENTS_IDS_KEY: &[u8] = crate::DOCUMENTS_IDS_KEY.as_bytes(); -pub fn main_merge(key: &[u8], values: &[Vec]) -> Result, ()> { +pub fn main_merge(key: &[u8], values: &[Vec]) -> anyhow::Result> { match key { WORDS_FST_KEY => { let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect(); @@ -27,42 +28,42 @@ pub fn main_merge(key: &[u8], values: &[Vec]) -> Result, ()> { Ok(values[0].to_vec()) }, DOCUMENTS_IDS_KEY => word_docids_merge(&[], values), - otherwise => panic!("wut {:?}", otherwise), + otherwise => bail!("wut {:?}", otherwise), } } -pub fn word_docids_merge(_key: &[u8], values: &[Vec]) -> Result, ()> { +pub fn word_docids_merge(_key: &[u8], values: &[Vec]) -> anyhow::Result> { let (head, tail) = values.split_first().unwrap(); - let mut head = RoaringBitmap::deserialize_from(head.as_slice()).unwrap(); + let mut head = RoaringBitmap::deserialize_from(head.as_slice())?; for value in tail { - let bitmap = RoaringBitmap::deserialize_from(value.as_slice()).unwrap(); + let bitmap = RoaringBitmap::deserialize_from(value.as_slice())?; head.union_with(&bitmap); } let mut vec = Vec::with_capacity(head.serialized_size()); - head.serialize_into(&mut vec).unwrap(); + head.serialize_into(&mut vec)?; Ok(vec) } -pub fn docid_word_positions_merge(key: &[u8], _values: &[Vec]) -> Result, ()> { - panic!("merging docid word positions is an error ({:?})", key.as_bstr()) +pub fn docid_word_positions_merge(key: &[u8], _values: &[Vec]) -> anyhow::Result> { + bail!("merging docid word positions is an error ({:?})", key.as_bstr()) } -pub fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Vec]) -> Result, ()> { +pub fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Vec]) -> anyhow::Result> { let (head, tail) = values.split_first().unwrap(); - let mut head = CboRoaringBitmapCodec::deserialize_from(head.as_slice()).unwrap(); + let mut head = CboRoaringBitmapCodec::deserialize_from(head.as_slice())?; for value in tail { - let bitmap = CboRoaringBitmapCodec::deserialize_from(value.as_slice()).unwrap(); + let bitmap = CboRoaringBitmapCodec::deserialize_from(value.as_slice())?; head.union_with(&bitmap); } let mut vec = Vec::new(); - CboRoaringBitmapCodec::serialize_into(&head, &mut vec).unwrap(); + CboRoaringBitmapCodec::serialize_into(&head, &mut vec)?; Ok(vec) } -pub fn documents_merge(key: &[u8], _values: &[Vec]) -> Result, ()> { - panic!("merging documents is an error ({:?})", key.as_bstr()) +pub fn documents_merge(key: &[u8], _values: &[Vec]) -> anyhow::Result> { + bail!("merging documents is an error ({:?})", key.as_bstr()) } diff --git a/src/indexing/mod.rs b/src/indexing/mod.rs index 0fa4a7374..14bfae991 100644 --- a/src/indexing/mod.rs +++ b/src/indexing/mod.rs @@ -81,7 +81,7 @@ enum WriteMethod { GetMergePut, } -type MergeFn = fn(&[u8], &[Vec]) -> Result, ()>; +type MergeFn = fn(&[u8], &[Vec]) -> anyhow::Result>; fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Result> { let mut builder = Writer::builder(); @@ -159,7 +159,7 @@ fn merge_into_lmdb_database( while let Some((k, v)) = in_iter.next()? { match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? { Some(old_val) => { - // TODO improve the function signature and avoid alocating here! + // TODO improve the function signature and avoid allocating here! let vals = vec![old_val.to_vec(), v.to_vec()]; let val = merge(k, &vals).expect("merge failed"); database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)? @@ -386,12 +386,9 @@ fn run_intern<'a>( } } - debug!("Retrieving the number of documents..."); - let count = index.number_of_documents(&wtxn)?; - wtxn.commit()?; - info!("Wrote {} documents in {:.02?}", count, before_indexing.elapsed()); + info!("Update processed in {:.02?}", before_indexing.elapsed()); Ok(()) } diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index 7e57aa49a..851691083 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -377,7 +377,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let meta = String::from("I am the metadata"); let update_id = update_store.register_update(&meta, &mmap[..]).unwrap(); - update_status_sender.send(UpdateStatus::Pending { update_id, meta }).unwrap(); + let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta }); eprintln!("update {} registered", update_id); Ok(warp::reply())