Do not create too many rayon tasks

This commit is contained in:
Kerollmops 2025-01-28 16:53:34 +01:00
parent 97e17f52a1
commit aaefbfae1f
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
4 changed files with 85 additions and 43 deletions

View File

@ -1,4 +1,4 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use rayon::{ThreadPool, ThreadPoolBuilder};
@ -9,6 +9,8 @@ use thiserror::Error;
#[derive(Debug)]
pub struct ThreadPoolNoAbort {
thread_pool: ThreadPool,
/// The number of active operations.
active_operations: AtomicUsize,
/// Set to true if the thread pool catched a panic.
pool_catched_panic: Arc<AtomicBool>,
}
@ -19,7 +21,9 @@ impl ThreadPoolNoAbort {
OP: FnOnce() -> R + Send,
R: Send,
{
self.active_operations.fetch_add(1, Ordering::Relaxed);
let output = self.thread_pool.install(op);
self.active_operations.fetch_sub(1, Ordering::Relaxed);
// While reseting the pool panic catcher we return an error if we catched one.
if self.pool_catched_panic.swap(false, Ordering::SeqCst) {
Err(PanicCatched)
@ -31,6 +35,11 @@ impl ThreadPoolNoAbort {
pub fn current_num_threads(&self) -> usize {
self.thread_pool.current_num_threads()
}
/// The number of active operations.
pub fn active_operations(&self) -> usize {
self.active_operations.load(Ordering::Relaxed)
}
}
#[derive(Error, Debug)]
@ -64,6 +73,10 @@ impl ThreadPoolNoAbortBuilder {
let catched_panic = pool_catched_panic.clone();
move |_result| catched_panic.store(true, Ordering::SeqCst)
});
Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic })
Ok(ThreadPoolNoAbort {
thread_pool: self.0.build()?,
active_operations: AtomicUsize::new(0),
pool_catched_panic,
})
}
}

View File

@ -5,7 +5,7 @@ use rayon::slice::ParallelSlice as _;
use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind};
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::DistributionShift;
use super::{DistributionShift, REQUEST_PARALLELISM};
use crate::error::FaultSource;
use crate::vector::Embedding;
use crate::ThreadPoolNoAbort;
@ -133,6 +133,15 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> {
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
} else {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
@ -148,6 +157,7 @@ impl Embedder {
fault: FaultSource::Bug,
})?
}
}
pub fn chunk_count_hint(&self) -> usize {
self.rest_embedder.chunk_count_hint()

View File

@ -7,7 +7,7 @@ use rayon::slice::ParallelSlice as _;
use super::error::{EmbedError, NewEmbedderError};
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::DistributionShift;
use super::{DistributionShift, REQUEST_PARALLELISM};
use crate::error::FaultSource;
use crate::vector::error::EmbedErrorKind;
use crate::vector::Embedding;
@ -270,6 +270,14 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> {
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
} else {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
@ -285,6 +293,7 @@ impl Embedder {
fault: FaultSource::Bug,
})?
}
}
pub fn chunk_count_hint(&self) -> usize {
self.rest_embedder.chunk_count_hint()

View File

@ -203,6 +203,15 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Embedding>, EmbedError> {
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
} else {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
@ -218,6 +227,7 @@ impl Embedder {
fault: FaultSource::Bug,
})?
}
}
pub fn chunk_count_hint(&self) -> usize {
super::REQUEST_PARALLELISM