mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-02-23 19:15:31 +08:00
Compare commits
No commits in common. "876084d48004e04ac0720cb064915d53c4743a1a" and "9bcb271f0021dd4f8651458b6f573df1068bc396" have entirely different histories.
876084d480
...
9bcb271f00
@ -1,7 +1,5 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::env::VarError;
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::str::FromStr;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions};
|
use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions};
|
||||||
@ -304,18 +302,7 @@ fn create_or_open_index(
|
|||||||
) -> Result<Index> {
|
) -> Result<Index> {
|
||||||
let mut options = EnvOpenOptions::new();
|
let mut options = EnvOpenOptions::new();
|
||||||
options.map_size(clamp_to_page_size(map_size));
|
options.map_size(clamp_to_page_size(map_size));
|
||||||
|
options.max_readers(1024);
|
||||||
// You can find more details about this experimental
|
|
||||||
// environment variable on the following GitHub discussion:
|
|
||||||
// <https://github.com/orgs/meilisearch/discussions/806>
|
|
||||||
let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") {
|
|
||||||
Ok(value) => u32::from_str(&value).unwrap(),
|
|
||||||
Err(VarError::NotPresent) => 1024,
|
|
||||||
Err(VarError::NotUnicode(value)) => panic!(
|
|
||||||
"Invalid unicode for the `MEILI_EXPERIMENTAL_INDEX_MAX_READERS` env var: {value:?}"
|
|
||||||
),
|
|
||||||
};
|
|
||||||
options.max_readers(max_readers);
|
|
||||||
if enable_mdb_writemap {
|
if enable_mdb_writemap {
|
||||||
unsafe { options.flags(EnvFlags::WRITE_MAP) };
|
unsafe { options.flags(EnvFlags::WRITE_MAP) };
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||||
@ -9,8 +9,6 @@ use thiserror::Error;
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ThreadPoolNoAbort {
|
pub struct ThreadPoolNoAbort {
|
||||||
thread_pool: ThreadPool,
|
thread_pool: ThreadPool,
|
||||||
/// The number of active operations.
|
|
||||||
active_operations: AtomicUsize,
|
|
||||||
/// Set to true if the thread pool catched a panic.
|
/// Set to true if the thread pool catched a panic.
|
||||||
pool_catched_panic: Arc<AtomicBool>,
|
pool_catched_panic: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
@ -21,9 +19,7 @@ impl ThreadPoolNoAbort {
|
|||||||
OP: FnOnce() -> R + Send,
|
OP: FnOnce() -> R + Send,
|
||||||
R: Send,
|
R: Send,
|
||||||
{
|
{
|
||||||
self.active_operations.fetch_add(1, Ordering::Relaxed);
|
|
||||||
let output = self.thread_pool.install(op);
|
let output = self.thread_pool.install(op);
|
||||||
self.active_operations.fetch_sub(1, Ordering::Relaxed);
|
|
||||||
// While reseting the pool panic catcher we return an error if we catched one.
|
// While reseting the pool panic catcher we return an error if we catched one.
|
||||||
if self.pool_catched_panic.swap(false, Ordering::SeqCst) {
|
if self.pool_catched_panic.swap(false, Ordering::SeqCst) {
|
||||||
Err(PanicCatched)
|
Err(PanicCatched)
|
||||||
@ -35,11 +31,6 @@ impl ThreadPoolNoAbort {
|
|||||||
pub fn current_num_threads(&self) -> usize {
|
pub fn current_num_threads(&self) -> usize {
|
||||||
self.thread_pool.current_num_threads()
|
self.thread_pool.current_num_threads()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The number of active operations.
|
|
||||||
pub fn active_operations(&self) -> usize {
|
|
||||||
self.active_operations.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
@ -73,10 +64,6 @@ impl ThreadPoolNoAbortBuilder {
|
|||||||
let catched_panic = pool_catched_panic.clone();
|
let catched_panic = pool_catched_panic.clone();
|
||||||
move |_result| catched_panic.store(true, Ordering::SeqCst)
|
move |_result| catched_panic.store(true, Ordering::SeqCst)
|
||||||
});
|
});
|
||||||
Ok(ThreadPoolNoAbort {
|
Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic })
|
||||||
thread_pool: self.0.build()?,
|
|
||||||
active_operations: AtomicUsize::new(0),
|
|
||||||
pool_catched_panic,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,7 @@ use rayon::slice::ParallelSlice as _;
|
|||||||
|
|
||||||
use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind};
|
use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind};
|
||||||
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
|
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
|
||||||
use super::{DistributionShift, REQUEST_PARALLELISM};
|
use super::DistributionShift;
|
||||||
use crate::error::FaultSource;
|
use crate::error::FaultSource;
|
||||||
use crate::vector::Embedding;
|
use crate::vector::Embedding;
|
||||||
use crate::ThreadPoolNoAbort;
|
use crate::ThreadPoolNoAbort;
|
||||||
@ -98,20 +98,14 @@ impl Embedder {
|
|||||||
text_chunks: Vec<Vec<String>>,
|
text_chunks: Vec<Vec<String>>,
|
||||||
threads: &ThreadPoolNoAbort,
|
threads: &ThreadPoolNoAbort,
|
||||||
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
||||||
// This condition helps reduce the number of active rayon jobs
|
threads
|
||||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
.install(move || {
|
||||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
|
||||||
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
|
})
|
||||||
} else {
|
.map_err(|error| EmbedError {
|
||||||
threads
|
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||||
.install(move || {
|
fault: FaultSource::Bug,
|
||||||
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
|
})?
|
||||||
})
|
|
||||||
.map_err(|error| EmbedError {
|
|
||||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
|
||||||
fault: FaultSource::Bug,
|
|
||||||
})?
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn embed_chunks_ref(
|
pub(crate) fn embed_chunks_ref(
|
||||||
@ -119,32 +113,20 @@ impl Embedder {
|
|||||||
texts: &[&str],
|
texts: &[&str],
|
||||||
threads: &ThreadPoolNoAbort,
|
threads: &ThreadPoolNoAbort,
|
||||||
) -> Result<Vec<Vec<f32>>, EmbedError> {
|
) -> Result<Vec<Vec<f32>>, EmbedError> {
|
||||||
// This condition helps reduce the number of active rayon jobs
|
threads
|
||||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
.install(move || {
|
||||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||||
.chunks(self.prompt_count_in_chunk_hint())
|
.map(move |chunk| self.embed(chunk, None))
|
||||||
.map(move |chunk| self.embed(chunk, None))
|
.collect();
|
||||||
.collect();
|
|
||||||
|
|
||||||
let embeddings = embeddings?;
|
let embeddings = embeddings?;
|
||||||
Ok(embeddings.into_iter().flatten().collect())
|
Ok(embeddings.into_iter().flatten().collect())
|
||||||
} else {
|
})
|
||||||
threads
|
.map_err(|error| EmbedError {
|
||||||
.install(move || {
|
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
fault: FaultSource::Bug,
|
||||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
})?
|
||||||
.map(move |chunk| self.embed(chunk, None))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let embeddings = embeddings?;
|
|
||||||
Ok(embeddings.into_iter().flatten().collect())
|
|
||||||
})
|
|
||||||
.map_err(|error| EmbedError {
|
|
||||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
|
||||||
fault: FaultSource::Bug,
|
|
||||||
})?
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn chunk_count_hint(&self) -> usize {
|
pub fn chunk_count_hint(&self) -> usize {
|
||||||
|
@ -6,7 +6,7 @@ use rayon::slice::ParallelSlice as _;
|
|||||||
|
|
||||||
use super::error::{EmbedError, NewEmbedderError};
|
use super::error::{EmbedError, NewEmbedderError};
|
||||||
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
|
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
|
||||||
use super::{DistributionShift, REQUEST_PARALLELISM};
|
use super::DistributionShift;
|
||||||
use crate::error::FaultSource;
|
use crate::error::FaultSource;
|
||||||
use crate::vector::error::EmbedErrorKind;
|
use crate::vector::error::EmbedErrorKind;
|
||||||
use crate::vector::Embedding;
|
use crate::vector::Embedding;
|
||||||
@ -255,20 +255,14 @@ impl Embedder {
|
|||||||
text_chunks: Vec<Vec<String>>,
|
text_chunks: Vec<Vec<String>>,
|
||||||
threads: &ThreadPoolNoAbort,
|
threads: &ThreadPoolNoAbort,
|
||||||
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
||||||
// This condition helps reduce the number of active rayon jobs
|
threads
|
||||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
.install(move || {
|
||||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
|
||||||
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
|
})
|
||||||
} else {
|
.map_err(|error| EmbedError {
|
||||||
threads
|
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||||
.install(move || {
|
fault: FaultSource::Bug,
|
||||||
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
|
})?
|
||||||
})
|
|
||||||
.map_err(|error| EmbedError {
|
|
||||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
|
||||||
fault: FaultSource::Bug,
|
|
||||||
})?
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn embed_chunks_ref(
|
pub(crate) fn embed_chunks_ref(
|
||||||
@ -276,31 +270,20 @@ impl Embedder {
|
|||||||
texts: &[&str],
|
texts: &[&str],
|
||||||
threads: &ThreadPoolNoAbort,
|
threads: &ThreadPoolNoAbort,
|
||||||
) -> Result<Vec<Vec<f32>>, EmbedError> {
|
) -> Result<Vec<Vec<f32>>, EmbedError> {
|
||||||
// This condition helps reduce the number of active rayon jobs
|
threads
|
||||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
.install(move || {
|
||||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||||
.chunks(self.prompt_count_in_chunk_hint())
|
.map(move |chunk| self.embed(chunk, None))
|
||||||
.map(move |chunk| self.embed(chunk, None))
|
.collect();
|
||||||
.collect();
|
|
||||||
let embeddings = embeddings?;
|
|
||||||
Ok(embeddings.into_iter().flatten().collect())
|
|
||||||
} else {
|
|
||||||
threads
|
|
||||||
.install(move || {
|
|
||||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
|
||||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
|
||||||
.map(move |chunk| self.embed(chunk, None))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let embeddings = embeddings?;
|
let embeddings = embeddings?;
|
||||||
Ok(embeddings.into_iter().flatten().collect())
|
Ok(embeddings.into_iter().flatten().collect())
|
||||||
})
|
})
|
||||||
.map_err(|error| EmbedError {
|
.map_err(|error| EmbedError {
|
||||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||||
fault: FaultSource::Bug,
|
fault: FaultSource::Bug,
|
||||||
})?
|
})?
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn chunk_count_hint(&self) -> usize {
|
pub fn chunk_count_hint(&self) -> usize {
|
||||||
|
@ -188,20 +188,14 @@ impl Embedder {
|
|||||||
text_chunks: Vec<Vec<String>>,
|
text_chunks: Vec<Vec<String>>,
|
||||||
threads: &ThreadPoolNoAbort,
|
threads: &ThreadPoolNoAbort,
|
||||||
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
||||||
// This condition helps reduce the number of active rayon jobs
|
threads
|
||||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
.install(move || {
|
||||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
|
||||||
text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect()
|
})
|
||||||
} else {
|
.map_err(|error| EmbedError {
|
||||||
threads
|
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||||
.install(move || {
|
fault: FaultSource::Bug,
|
||||||
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
|
})?
|
||||||
})
|
|
||||||
.map_err(|error| EmbedError {
|
|
||||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
|
||||||
fault: FaultSource::Bug,
|
|
||||||
})?
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn embed_chunks_ref(
|
pub(crate) fn embed_chunks_ref(
|
||||||
@ -209,32 +203,20 @@ impl Embedder {
|
|||||||
texts: &[&str],
|
texts: &[&str],
|
||||||
threads: &ThreadPoolNoAbort,
|
threads: &ThreadPoolNoAbort,
|
||||||
) -> Result<Vec<Embedding>, EmbedError> {
|
) -> Result<Vec<Embedding>, EmbedError> {
|
||||||
// This condition helps reduce the number of active rayon jobs
|
threads
|
||||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
.install(move || {
|
||||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||||
.chunks(self.prompt_count_in_chunk_hint())
|
.map(move |chunk| self.embed_ref(chunk, None))
|
||||||
.map(move |chunk| self.embed_ref(chunk, None))
|
.collect();
|
||||||
.collect();
|
|
||||||
|
|
||||||
let embeddings = embeddings?;
|
let embeddings = embeddings?;
|
||||||
Ok(embeddings.into_iter().flatten().collect())
|
Ok(embeddings.into_iter().flatten().collect())
|
||||||
} else {
|
})
|
||||||
threads
|
.map_err(|error| EmbedError {
|
||||||
.install(move || {
|
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
fault: FaultSource::Bug,
|
||||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
})?
|
||||||
.map(move |chunk| self.embed_ref(chunk, None))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let embeddings = embeddings?;
|
|
||||||
Ok(embeddings.into_iter().flatten().collect())
|
|
||||||
})
|
|
||||||
.map_err(|error| EmbedError {
|
|
||||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
|
||||||
fault: FaultSource::Bug,
|
|
||||||
})?
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn chunk_count_hint(&self) -> usize {
|
pub fn chunk_count_hint(&self) -> usize {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user