mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-02-22 02:25:32 +08:00
Fix most issues with the lifetimes
This commit is contained in:
parent
6ac5b3b136
commit
70802eb7c7
@ -93,6 +93,7 @@ pub struct WriterBbqueueReceiver<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// The action to perform on the receiver/writer side.
|
/// The action to perform on the receiver/writer side.
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum ReceiverAction {
|
pub enum ReceiverAction {
|
||||||
/// Wake up, you have frames to read for the BBQueue buffers.
|
/// Wake up, you have frames to read for the BBQueue buffers.
|
||||||
WakeUp,
|
WakeUp,
|
||||||
@ -599,6 +600,7 @@ impl DatabaseType for WordPositionDocids {
|
|||||||
const DATABASE: Database = Database::WordPositionDocids;
|
const DATABASE: Database = Database::WordPositionDocids;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
pub struct WordDocidsSender<'a, 'b, D> {
|
pub struct WordDocidsSender<'a, 'b, D> {
|
||||||
sender: &'a ExtractorBbqueueSender<'b>,
|
sender: &'a ExtractorBbqueueSender<'b>,
|
||||||
_marker: PhantomData<D>,
|
_marker: PhantomData<D>,
|
||||||
@ -621,6 +623,7 @@ impl<D: DatabaseType> WordDocidsSender<'_, '_, D> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
pub struct FacetDocidsSender<'a, 'b> {
|
pub struct FacetDocidsSender<'a, 'b> {
|
||||||
sender: &'a ExtractorBbqueueSender<'b>,
|
sender: &'a ExtractorBbqueueSender<'b>,
|
||||||
}
|
}
|
||||||
@ -667,6 +670,7 @@ impl FacetDocidsSender<'_, '_> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
pub struct FieldIdDocidFacetSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
|
pub struct FieldIdDocidFacetSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
|
||||||
|
|
||||||
impl FieldIdDocidFacetSender<'_, '_> {
|
impl FieldIdDocidFacetSender<'_, '_> {
|
||||||
@ -691,6 +695,7 @@ impl FieldIdDocidFacetSender<'_, '_> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
pub struct DocumentsSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
|
pub struct DocumentsSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
|
||||||
|
|
||||||
impl DocumentsSender<'_, '_> {
|
impl DocumentsSender<'_, '_> {
|
||||||
@ -716,6 +721,7 @@ impl DocumentsSender<'_, '_> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
pub struct EmbeddingSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
|
pub struct EmbeddingSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
|
||||||
|
|
||||||
impl EmbeddingSender<'_, '_> {
|
impl EmbeddingSender<'_, '_> {
|
||||||
@ -741,6 +747,7 @@ impl EmbeddingSender<'_, '_> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
pub struct GeoSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
|
pub struct GeoSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
|
||||||
|
|
||||||
impl GeoSender<'_, '_> {
|
impl GeoSender<'_, '_> {
|
||||||
|
@ -25,14 +25,14 @@ use crate::update::new::DocumentChange;
|
|||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
|
use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
|
||||||
|
|
||||||
pub struct FacetedExtractorData<'a> {
|
pub struct FacetedExtractorData<'a, 'b> {
|
||||||
attributes_to_extract: &'a [&'a str],
|
attributes_to_extract: &'a [&'a str],
|
||||||
sender: &'a FieldIdDocidFacetSender<'a>,
|
sender: &'a FieldIdDocidFacetSender<'a, 'b>,
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: GrenadParameters,
|
||||||
buckets: usize,
|
buckets: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> {
|
impl<'a, 'b, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a, 'b> {
|
||||||
type Data = RefCell<BalancedCaches<'extractor>>;
|
type Data = RefCell<BalancedCaches<'extractor>>;
|
||||||
|
|
||||||
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
|
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
|
||||||
|
@ -18,17 +18,17 @@ use crate::vector::error::{
|
|||||||
use crate::vector::{Embedder, Embedding, EmbeddingConfigs};
|
use crate::vector::{Embedder, Embedding, EmbeddingConfigs};
|
||||||
use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError};
|
use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError};
|
||||||
|
|
||||||
pub struct EmbeddingExtractor<'a> {
|
pub struct EmbeddingExtractor<'a, 'b> {
|
||||||
embedders: &'a EmbeddingConfigs,
|
embedders: &'a EmbeddingConfigs,
|
||||||
sender: EmbeddingSender<'a>,
|
sender: EmbeddingSender<'a, 'b>,
|
||||||
possible_embedding_mistakes: PossibleEmbeddingMistakes,
|
possible_embedding_mistakes: PossibleEmbeddingMistakes,
|
||||||
threads: &'a ThreadPoolNoAbort,
|
threads: &'a ThreadPoolNoAbort,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> EmbeddingExtractor<'a> {
|
impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
embedders: &'a EmbeddingConfigs,
|
embedders: &'a EmbeddingConfigs,
|
||||||
sender: EmbeddingSender<'a>,
|
sender: EmbeddingSender<'a, 'b>,
|
||||||
field_distribution: &'a FieldDistribution,
|
field_distribution: &'a FieldDistribution,
|
||||||
threads: &'a ThreadPoolNoAbort,
|
threads: &'a ThreadPoolNoAbort,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
@ -43,7 +43,7 @@ pub struct EmbeddingExtractorData<'extractor>(
|
|||||||
|
|
||||||
unsafe impl MostlySend for EmbeddingExtractorData<'_> {}
|
unsafe impl MostlySend for EmbeddingExtractorData<'_> {}
|
||||||
|
|
||||||
impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
|
||||||
type Data = RefCell<EmbeddingExtractorData<'extractor>>;
|
type Data = RefCell<EmbeddingExtractorData<'extractor>>;
|
||||||
|
|
||||||
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result<Self::Data> {
|
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result<Self::Data> {
|
||||||
@ -76,7 +76,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
|||||||
context.data,
|
context.data,
|
||||||
&self.possible_embedding_mistakes,
|
&self.possible_embedding_mistakes,
|
||||||
self.threads,
|
self.threads,
|
||||||
&self.sender,
|
self.sender,
|
||||||
&context.doc_alloc,
|
&context.doc_alloc,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
@ -259,7 +259,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
|||||||
// Currently this is the case as:
|
// Currently this is the case as:
|
||||||
// 1. BVec are inside of the bumaplo
|
// 1. BVec are inside of the bumaplo
|
||||||
// 2. All other fields are either trivial (u8) or references.
|
// 2. All other fields are either trivial (u8) or references.
|
||||||
struct Chunks<'a, 'extractor> {
|
struct Chunks<'a, 'b, 'extractor> {
|
||||||
texts: BVec<'a, &'a str>,
|
texts: BVec<'a, &'a str>,
|
||||||
ids: BVec<'a, DocumentId>,
|
ids: BVec<'a, DocumentId>,
|
||||||
|
|
||||||
@ -270,11 +270,11 @@ struct Chunks<'a, 'extractor> {
|
|||||||
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
|
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
|
||||||
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
|
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
|
||||||
threads: &'a ThreadPoolNoAbort,
|
threads: &'a ThreadPoolNoAbort,
|
||||||
sender: &'a EmbeddingSender<'a>,
|
sender: EmbeddingSender<'a, 'b>,
|
||||||
has_manual_generation: Option<&'a str>,
|
has_manual_generation: Option<&'a str>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, 'extractor> Chunks<'a, 'extractor> {
|
impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn new(
|
pub fn new(
|
||||||
embedder: &'a Embedder,
|
embedder: &'a Embedder,
|
||||||
@ -284,7 +284,7 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
|
|||||||
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
|
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
|
||||||
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
|
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
|
||||||
threads: &'a ThreadPoolNoAbort,
|
threads: &'a ThreadPoolNoAbort,
|
||||||
sender: &'a EmbeddingSender<'a>,
|
sender: EmbeddingSender<'a, 'b>,
|
||||||
doc_alloc: &'a Bump,
|
doc_alloc: &'a Bump,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint();
|
let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint();
|
||||||
@ -368,7 +368,7 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
|
|||||||
possible_embedding_mistakes: &PossibleEmbeddingMistakes,
|
possible_embedding_mistakes: &PossibleEmbeddingMistakes,
|
||||||
unused_vectors_distribution: &UnusedVectorsDistributionBump,
|
unused_vectors_distribution: &UnusedVectorsDistributionBump,
|
||||||
threads: &ThreadPoolNoAbort,
|
threads: &ThreadPoolNoAbort,
|
||||||
sender: EmbeddingSender<'a>,
|
sender: EmbeddingSender<'a, 'b>,
|
||||||
has_manual_generation: Option<&'a str>,
|
has_manual_generation: Option<&'a str>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if let Some(external_docid) = has_manual_generation {
|
if let Some(external_docid) = has_manual_generation {
|
||||||
|
@ -80,7 +80,7 @@ where
|
|||||||
let bbbuffers: Vec<_> = (0..rayon::current_num_threads())
|
let bbbuffers: Vec<_> = (0..rayon::current_num_threads())
|
||||||
.map(|_| bbqueue::BBBuffer::new(100 * 1024 * 1024)) // 100 MiB by thread
|
.map(|_| bbqueue::BBBuffer::new(100 * 1024 * 1024)) // 100 MiB by thread
|
||||||
.collect();
|
.collect();
|
||||||
let (extractor_sender, writer_receiver) = extractor_writer_bbqueue(&bbbuffers, 1000);
|
let (extractor_sender, mut writer_receiver) = extractor_writer_bbqueue(&bbbuffers, 1000);
|
||||||
let finished_extraction = AtomicBool::new(false);
|
let finished_extraction = AtomicBool::new(false);
|
||||||
|
|
||||||
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
|
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
|
||||||
@ -302,7 +302,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
let embedding_sender = extractor_sender.embeddings();
|
let embedding_sender = extractor_sender.embeddings();
|
||||||
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
|
let extractor = EmbeddingExtractor::new(embedders, embedding_sender, field_distribution, request_threads());
|
||||||
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
{
|
{
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
|
||||||
@ -363,7 +363,6 @@ where
|
|||||||
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
|
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
|
||||||
|
|
||||||
let vector_arroy = index.vector_arroy;
|
let vector_arroy = index.vector_arroy;
|
||||||
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
|
|
||||||
let indexer_span = tracing::Span::current();
|
let indexer_span = tracing::Span::current();
|
||||||
let arroy_writers: Result<HashMap<_, _>> = embedders
|
let arroy_writers: Result<HashMap<_, _>> = embedders
|
||||||
.inner_as_ref()
|
.inner_as_ref()
|
||||||
@ -490,6 +489,7 @@ where
|
|||||||
Step::WritingEmbeddingsToDatabase,
|
Step::WritingEmbeddingsToDatabase,
|
||||||
));
|
));
|
||||||
|
|
||||||
|
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
|
||||||
for (_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers {
|
for (_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers {
|
||||||
let dimensions = *dimensions;
|
let dimensions = *dimensions;
|
||||||
writer.build_and_quantize(
|
writer.build_and_quantize(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user