diff --git a/http-ui/Cargo.toml b/http-ui/Cargo.toml index 9dd269970..e7ed8455a 100644 --- a/http-ui/Cargo.toml +++ b/http-ui/Cargo.toml @@ -27,6 +27,7 @@ futures = "0.3.21" serde = { version = "1.0.136", features = ["derive"] } serde_json = { version = "1.0.79", features = ["preserve_order"] } tokio = { version = "1.17.0", features = ["full"] } +tokio-stream = { version = "0.1.8", default-features = false, features = ["sync"] } warp = "0.3.2" # logging diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index b608e79ec..26c1034eb 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -3,7 +3,7 @@ mod update_store; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt::Display; use std::fs::{create_dir_all, File}; -use std::io::{BufRead, BufReader, Cursor}; +use std::io::{BufRead, BufReader, Cursor, Read}; use std::net::SocketAddr; use std::num::{NonZeroU32, NonZeroUsize}; use std::path::PathBuf; @@ -35,6 +35,7 @@ use structopt::StructOpt; use tokio::fs::File as TFile; use tokio::io::AsyncWriteExt; use tokio::sync::broadcast; +use tokio_stream::wrappers::BroadcastStream; use warp::filters::ws::Message; use warp::http::Response; use warp::Filter; @@ -885,7 +886,8 @@ async fn main() -> anyhow::Result<()> { let mut file = TFile::from_std(file); while let Some(result) = stream.next().await { - let bytes = result.unwrap().to_bytes(); + let mut bytes = Vec::new(); + result.unwrap().reader().read_to_end(&mut bytes).unwrap(); file.write_all(&bytes[..]).await.unwrap(); } @@ -1004,8 +1006,7 @@ async fn main() -> anyhow::Result<()> { let update_status_receiver = update_status_sender.subscribe(); ws.on_upgrade(|websocket| { // Just echo all updates messages... - update_status_receiver - .into_stream() + BroadcastStream::new(update_status_receiver) .flat_map(|result| match result { Ok(status) => { let msg = serde_json::to_string(&status).unwrap(); diff --git a/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs b/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs index 519997274..96aee6855 100644 --- a/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs +++ b/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs @@ -82,7 +82,8 @@ impl CboRoaringBitmapCodec { buffer.extend_from_slice(&integer.to_ne_bytes()); } } else { - let roaring = RoaringBitmap::from_sorted_iter(vec.into_iter()); + // Integers *must* be ordered here, no matter what. + let roaring = RoaringBitmap::from_sorted_iter(vec.into_iter()).unwrap(); roaring.serialize_into(buffer)?; } } else { @@ -152,25 +153,25 @@ mod tests { let mut buffer = Vec::new(); let small_data = vec![ - RoaringBitmap::from_sorted_iter(1..4), - RoaringBitmap::from_sorted_iter(2..5), - RoaringBitmap::from_sorted_iter(4..6), - RoaringBitmap::from_sorted_iter(1..3), + RoaringBitmap::from_sorted_iter(1..4).unwrap(), + RoaringBitmap::from_sorted_iter(2..5).unwrap(), + RoaringBitmap::from_sorted_iter(4..6).unwrap(), + RoaringBitmap::from_sorted_iter(1..3).unwrap(), ]; let small_data: Vec<_> = small_data.iter().map(|b| CboRoaringBitmapCodec::bytes_encode(b).unwrap()).collect(); CboRoaringBitmapCodec::merge_into(small_data.as_slice(), &mut buffer).unwrap(); let bitmap = CboRoaringBitmapCodec::deserialize_from(&buffer).unwrap(); - let expected = RoaringBitmap::from_sorted_iter(1..6); + let expected = RoaringBitmap::from_sorted_iter(1..6).unwrap(); assert_eq!(bitmap, expected); let medium_data = vec![ - RoaringBitmap::from_sorted_iter(1..4), - RoaringBitmap::from_sorted_iter(2..5), - RoaringBitmap::from_sorted_iter(4..8), - RoaringBitmap::from_sorted_iter(0..3), - RoaringBitmap::from_sorted_iter(7..23), + RoaringBitmap::from_sorted_iter(1..4).unwrap(), + RoaringBitmap::from_sorted_iter(2..5).unwrap(), + RoaringBitmap::from_sorted_iter(4..8).unwrap(), + RoaringBitmap::from_sorted_iter(0..3).unwrap(), + RoaringBitmap::from_sorted_iter(7..23).unwrap(), ]; let medium_data: Vec<_> = @@ -179,7 +180,7 @@ mod tests { CboRoaringBitmapCodec::merge_into(medium_data.as_slice(), &mut buffer).unwrap(); let bitmap = CboRoaringBitmapCodec::deserialize_from(&buffer).unwrap(); - let expected = RoaringBitmap::from_sorted_iter(0..23); + let expected = RoaringBitmap::from_sorted_iter(0..23).unwrap(); assert_eq!(bitmap, expected); } } diff --git a/milli/src/search/criteria/mod.rs b/milli/src/search/criteria/mod.rs index 8306f5d0e..1dbfd2524 100644 --- a/milli/src/search/criteria/mod.rs +++ b/milli/src/search/criteria/mod.rs @@ -498,6 +498,7 @@ fn query_pair_proximity_docids( #[cfg(test)] pub mod test { use std::collections::HashMap; + use std::iter; use maplit::hashmap; use rand::rngs::StdRng; @@ -567,7 +568,8 @@ pub mod test { .iter() .enumerate() .map(|(i, w)| { - (w.clone(), RoaringBitmap::from_sorted_iter(std::iter::once(i as u32))) + let bitmap = RoaringBitmap::from_sorted_iter(iter::once(i as u32)).unwrap(); + (w.clone(), bitmap) }) .collect()) } else { @@ -622,7 +624,7 @@ pub mod test { } values.sort_unstable(); - RoaringBitmap::from_sorted_iter(values.into_iter()) + RoaringBitmap::from_sorted_iter(values.into_iter()).unwrap() } let word_docids = hashmap! { diff --git a/milli/src/search/query_tree.rs b/milli/src/search/query_tree.rs index 0744231ae..237bb9be2 100644 --- a/milli/src/search/query_tree.rs +++ b/milli/src/search/query_tree.rs @@ -587,8 +587,7 @@ mod test { values.push(rng.gen()); } values.sort_unstable(); - - RoaringBitmap::from_sorted_iter(values.into_iter()) + RoaringBitmap::from_sorted_iter(values.into_iter()).unwrap() } TestContext { diff --git a/milli/src/update/delete_documents.rs b/milli/src/update/delete_documents.rs index 2391bd0e4..402cc61dd 100644 --- a/milli/src/update/delete_documents.rs +++ b/milli/src/update/delete_documents.rs @@ -186,7 +186,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { // We create the FST map of the external ids that we must delete. external_ids.sort_unstable(); - let external_ids_to_delete = fst::Set::from_iter(external_ids.iter().map(AsRef::as_ref))?; + let external_ids_to_delete = fst::Set::from_iter(external_ids)?; // We acquire the current external documents ids map... let mut new_external_documents_ids = self.index.external_documents_ids(self.wtxn)?; @@ -209,7 +209,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { // the LMDB B-Tree two times but only once. let mut iter = word_docids.prefix_iter_mut(self.wtxn, &word)?; if let Some((key, mut docids)) = iter.next().transpose()? { - if key == word.as_ref() { + if key == word.as_str() { let previous_len = docids.len(); docids -= &self.documents_ids; if docids.is_empty() { @@ -230,7 +230,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { words.iter().filter_map( |(word, must_remove)| { if *must_remove { - Some(word.as_ref()) + Some(word.as_str()) } else { None }