mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-31 23:41:42 +08:00
Do not create too many rayon tasks
This commit is contained in:
parent
6a1062edf5
commit
b605549bf2
@ -1,4 +1,4 @@
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||
@ -9,6 +9,8 @@ use thiserror::Error;
|
||||
#[derive(Debug)]
|
||||
pub struct ThreadPoolNoAbort {
|
||||
thread_pool: ThreadPool,
|
||||
/// The number of active operations.
|
||||
active_operations: AtomicUsize,
|
||||
/// Set to true if the thread pool catched a panic.
|
||||
pool_catched_panic: Arc<AtomicBool>,
|
||||
}
|
||||
@ -19,7 +21,9 @@ impl ThreadPoolNoAbort {
|
||||
OP: FnOnce() -> R + Send,
|
||||
R: Send,
|
||||
{
|
||||
self.active_operations.fetch_add(1, Ordering::Relaxed);
|
||||
let output = self.thread_pool.install(op);
|
||||
self.active_operations.fetch_sub(1, Ordering::Relaxed);
|
||||
// While reseting the pool panic catcher we return an error if we catched one.
|
||||
if self.pool_catched_panic.swap(false, Ordering::SeqCst) {
|
||||
Err(PanicCatched)
|
||||
@ -31,6 +35,11 @@ impl ThreadPoolNoAbort {
|
||||
pub fn current_num_threads(&self) -> usize {
|
||||
self.thread_pool.current_num_threads()
|
||||
}
|
||||
|
||||
/// The number of active operations.
|
||||
pub fn active_operations(&self) -> usize {
|
||||
self.active_operations.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
@ -64,6 +73,10 @@ impl ThreadPoolNoAbortBuilder {
|
||||
let catched_panic = pool_catched_panic.clone();
|
||||
move |_result| catched_panic.store(true, Ordering::SeqCst)
|
||||
});
|
||||
Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic })
|
||||
Ok(ThreadPoolNoAbort {
|
||||
thread_pool: self.0.build()?,
|
||||
active_operations: AtomicUsize::new(0),
|
||||
pool_catched_panic,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ use rayon::slice::ParallelSlice as _;
|
||||
|
||||
use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind};
|
||||
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
|
||||
use super::DistributionShift;
|
||||
use super::{DistributionShift, REQUEST_PARALLELISM};
|
||||
use crate::error::FaultSource;
|
||||
use crate::vector::Embedding;
|
||||
use crate::ThreadPoolNoAbort;
|
||||
@ -113,20 +113,30 @@ impl Embedder {
|
||||
texts: &[&str],
|
||||
threads: &ThreadPoolNoAbort,
|
||||
) -> Result<Vec<Vec<f32>>, EmbedError> {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
} else {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
}
|
||||
}
|
||||
|
||||
pub fn chunk_count_hint(&self) -> usize {
|
||||
|
@ -6,7 +6,7 @@ use rayon::slice::ParallelSlice as _;
|
||||
|
||||
use super::error::{EmbedError, NewEmbedderError};
|
||||
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
|
||||
use super::DistributionShift;
|
||||
use super::{DistributionShift, REQUEST_PARALLELISM};
|
||||
use crate::error::FaultSource;
|
||||
use crate::vector::error::EmbedErrorKind;
|
||||
use crate::vector::Embedding;
|
||||
@ -270,20 +270,29 @@ impl Embedder {
|
||||
texts: &[&str],
|
||||
threads: &ThreadPoolNoAbort,
|
||||
) -> Result<Vec<Vec<f32>>, EmbedError> {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
} else {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
}
|
||||
}
|
||||
|
||||
pub fn chunk_count_hint(&self) -> usize {
|
||||
|
@ -203,20 +203,30 @@ impl Embedder {
|
||||
texts: &[&str],
|
||||
threads: &ThreadPoolNoAbort,
|
||||
) -> Result<Vec<Embedding>, EmbedError> {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed_ref(chunk, None))
|
||||
.collect();
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed_ref(chunk, None))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
} else {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed_ref(chunk, None))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
}
|
||||
}
|
||||
|
||||
pub fn chunk_count_hint(&self) -> usize {
|
||||
|
Loading…
x
Reference in New Issue
Block a user