From 81ec0abad12310f46bf4b3da2ef2d8dedf595a08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 12 May 2024 14:45:25 +0200 Subject: [PATCH] Use the new rayon-par-bridge library --- Cargo.lock | 10 ++++++++++ milli/Cargo.toml | 1 + milli/src/update/index_documents/mod.rs | 11 +++-------- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 85a4c30ab..9247b9a80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3510,6 +3510,7 @@ dependencies = [ "rand", "rand_pcg", "rayon", + "rayon-par-bridge", "rhai", "roaring", "rstar", @@ -4289,6 +4290,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rayon-par-bridge" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb6a14d8f65834aca6b0fe4cbbd7a27e639cd3efb1f2a32de9942368f1991de8" +dependencies = [ + "rayon", +] + [[package]] name = "reborrow" version = "0.5.5" diff --git a/milli/Cargo.toml b/milli/Cargo.toml index d0513706f..3c4ee1135 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -84,6 +84,7 @@ rand = "0.8.5" tracing = "0.1.40" ureq = { version = "2.10.0", features = ["json"] } url = "2.5.2" +rayon-par-bridge = "0.1.0" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 3ed0d2db9..7994abff3 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -44,7 +44,7 @@ use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, }; use crate::vector::EmbeddingConfigs; -use crate::{CboRoaringBitmapCodec, FieldsIdsMap, Index, Object, Result}; +use crate::{CboRoaringBitmapCodec, Index, Object, Result}; static MERGED_DATABASE_COUNT: usize = 7; static PREFIX_DATABASE_COUNT: usize = 4; @@ -262,11 +262,8 @@ where Ok(DocumentEdition::Nothing) as Result<_> }); - std::thread::scope(|s| { - let (send, recv) = std::sync::mpsc::sync_channel(100); - s.spawn(move || processing.for_each(|el| drop(send.send(el)))); - - for result in recv { + rayon_par_bridge::par_bridge(100, processing, |iterator| { + for result in iterator { if (self.should_abort)() { return Err(Error::InternalError(InternalError::AbortedIndexation)); } @@ -285,8 +282,6 @@ where Ok(()) })?; - drop(immutable_obkvs); - let file = documents_batch_builder.into_inner()?; let reader = DocumentsBatchReader::from_reader(file)?;