diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 446efd0c4..2bd20b6e8 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -30,7 +30,6 @@ use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::heed::CompactionOption; -use meilisearch_types::milli::update::new::indexer::document_changes::DocumentChanges; use meilisearch_types::milli::update::new::indexer::{self, retrieve_or_guess_primary_key}; use meilisearch_types::milli::update::{ IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings, @@ -1252,7 +1251,7 @@ impl IndexScheduler { mut tasks, } => { let started_processing_at = std::time::Instant::now(); - let mut primary_key_has_been_set = false; + let primary_key_has_been_set = false; let must_stop_processing = self.must_stop_processing.clone(); let indexer_config = self.index_mapper.indexer_config(); // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. diff --git a/milli/src/update/new/document.rs b/milli/src/update/new/document.rs index 96d0e9cca..335e2c327 100644 --- a/milli/src/update/new/document.rs +++ b/milli/src/update/new/document.rs @@ -7,7 +7,7 @@ use super::document_change::{Entry, Versions}; use super::{KvReaderFieldId, KvWriterFieldId}; use crate::documents::FieldIdMapper; use crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME; -use crate::{DocumentId, FieldId, Index, InternalError, Result}; +use crate::{DocumentId, Index, InternalError, Result}; /// A view into a document that can represent either the current version from the DB, /// the update data from payload or other means, or the merged updated version. @@ -66,14 +66,6 @@ impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> { reader.map(|reader| Self { fields_ids_map: db_fields_ids_map, content: reader }) }) } - - fn field_from_fid(&self, fid: FieldId) -> Result> { - Ok(self - .content - .get(fid) - .map(|v| serde_json::from_slice(v).map_err(InternalError::SerdeJson)) - .transpose()?) - } } #[derive(Clone, Copy)] diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index a3f05ce0e..6ae1b3124 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -55,11 +55,7 @@ impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> { change: DocumentChange, context: &crate::update::new::indexer::document_changes::DocumentChangeContext, ) -> Result<()> { - FacetedDocidsExtractor::extract_document_change( - &context, - self.attributes_to_extract, - change, - ) + FacetedDocidsExtractor::extract_document_change(context, self.attributes_to_extract, change) } } @@ -162,7 +158,7 @@ impl FacetedDocidsExtractor { // key: fid buffer.push(FacetKind::Exists as u8); buffer.extend_from_slice(&fid.to_be_bytes()); - cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into)?; + cache_fn(cached_sorter, &buffer, docid).map_err(Into::into)?; match value { // Number @@ -178,7 +174,7 @@ impl FacetedDocidsExtractor { buffer.extend_from_slice(&ordered); buffer.extend_from_slice(&n.to_be_bytes()); - cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) + cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) } else { Ok(()) } @@ -192,7 +188,7 @@ impl FacetedDocidsExtractor { buffer.extend_from_slice(&fid.to_be_bytes()); buffer.push(0); // level 0 buffer.extend_from_slice(truncated.as_bytes()); - cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) + cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) } // Null // key: fid @@ -200,7 +196,7 @@ impl FacetedDocidsExtractor { buffer.clear(); buffer.push(FacetKind::Null as u8); buffer.extend_from_slice(&fid.to_be_bytes()); - cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) + cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) } // Empty // key: fid @@ -208,13 +204,13 @@ impl FacetedDocidsExtractor { buffer.clear(); buffer.push(FacetKind::Empty as u8); buffer.extend_from_slice(&fid.to_be_bytes()); - cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) + cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) } Value::Object(o) if o.is_empty() => { buffer.clear(); buffer.push(FacetKind::Empty as u8); buffer.extend_from_slice(&fid.to_be_bytes()); - cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) + cache_fn(cached_sorter, &buffer, docid).map_err(Into::into) } // Otherwise, do nothing /// TODO: What about Value::Bool? diff --git a/milli/src/update/new/extract/faceted/mod.rs b/milli/src/update/new/extract/faceted/mod.rs index 65e90cdf4..bfe8efd03 100644 --- a/milli/src/update/new/extract/faceted/mod.rs +++ b/milli/src/update/new/extract/faceted/mod.rs @@ -27,7 +27,7 @@ impl From for FacetKind { } impl FacetKind { - pub fn extract_from_key<'k>(key: &'k [u8]) -> (FacetKind, &'k [u8]) { + pub fn extract_from_key(key: &[u8]) -> (FacetKind, &[u8]) { debug_assert!(key.len() > 3); (FacetKind::from(key[0]), &key[1..]) } diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 82bb0ec86..fd74cc8ce 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -3,29 +3,21 @@ use std::collections::HashMap; use std::fs::File; use std::num::NonZero; use std::ops::DerefMut as _; -use std::sync::Arc; use bumpalo::Bump; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; -use rayon::iter::IntoParallelIterator; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; -use super::SearchableExtractor; -use crate::update::new::document::Document; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::indexer::document_changes::{ for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext, ThreadLocal, }; -use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{ - bucketed_position, DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, - MAX_POSITION_PER_ATTRIBUTE, -}; +use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; const MAX_COUNTED_WORDS: usize = 30; diff --git a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index d47ab606c..86ede5b14 100644 --- a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -2,9 +2,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::rc::Rc; -use bumpalo::Bump; use heed::RoTxn; -use obkv::KvReader; use super::tokenize_document::DocumentTokenizer; use super::SearchableExtractor; diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 758b3b6a1..1edeec8b4 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -5,7 +5,6 @@ mod tokenize_document; use std::cell::RefCell; use std::fs::File; use std::marker::PhantomData; -use std::ops::DerefMut; use bumpalo::Bump; pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers}; @@ -23,7 +22,7 @@ use crate::update::new::indexer::document_changes::{ }; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; +use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE}; pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> { tokenizer: &'extractor DocumentTokenizer<'extractor>, @@ -120,7 +119,7 @@ pub trait SearchableExtractor: Sized + Sync { indexing_context, extractor_allocs, &datastore, - ); + )?; } { let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); diff --git a/milli/src/update/new/extract/searchable/tokenize_document.rs b/milli/src/update/new/extract/searchable/tokenize_document.rs index 71585c8d2..b8fd24f1b 100644 --- a/milli/src/update/new/extract/searchable/tokenize_document.rs +++ b/milli/src/update/new/extract/searchable/tokenize_document.rs @@ -8,7 +8,6 @@ use crate::update::new::document::Document; use crate::update::new::extract::perm_json_p::{ seek_leaf_values_in_array, seek_leaf_values_in_object, select_field, }; -use crate::update::new::KvReaderFieldId; use crate::{ FieldId, GlobalFieldsIdsMap, InternalError, LocalizedAttributesRule, Result, UserError, MAX_WORD_LENGTH, @@ -172,7 +171,7 @@ mod test { use bumpalo::Bump; use charabia::TokenizerBuilder; use meili_snap::snapshot; - use obkv::KvReader; + use raw_collections::RawMap; use serde_json::json; use serde_json::value::RawValue; diff --git a/milli/src/update/new/indexer/de.rs b/milli/src/update/new/indexer/de.rs index 749588c86..7976433b9 100644 --- a/milli/src/update/new/indexer/de.rs +++ b/milli/src/update/new/indexer/de.rs @@ -45,7 +45,7 @@ impl<'de, 'p, 'indexer: 'de, Mapper: MutFieldIdMapper> serde::de::Visitor<'de> let fid = fid.unwrap(); match self.primary_key { - PrimaryKey::Flat { name, field_id } => { + PrimaryKey::Flat { name: _, field_id } => { let value: &'de RawValue = map.next_value()?; if fid == *field_id { let value = match value @@ -145,8 +145,8 @@ impl<'de, 'indexer: 'de> serde::de::Visitor<'de> for DocumentIdVisitor<'indexer> { use std::fmt::Write as _; - let mut out = bumpalo::collections::String::new_in(&self.0); - write!(&mut out, "{v}"); + let mut out = bumpalo::collections::String::new_in(self.0); + write!(&mut out, "{v}").unwrap(); Ok(Ok(out.into_bump_str())) } diff --git a/milli/src/update/new/indexer/document_changes.rs b/milli/src/update/new/indexer/document_changes.rs index 8bab9903f..1dd0832f5 100644 --- a/milli/src/update/new/indexer/document_changes.rs +++ b/milli/src/update/new/indexer/document_changes.rs @@ -76,20 +76,6 @@ impl MostlySendWrapper { Self(t) } - fn new_send(t: T) -> Self - where - T: Send, - { - Self(t) - } - - fn get(&self) -> T - where - T: Copy, - { - self.0 - } - fn as_ref(&self) -> &T { &self.0 } @@ -111,6 +97,7 @@ impl MostlySendWrapper { unsafe impl Send for MostlySendWrapper {} /// A wrapper around [`thread_local::ThreadLocal`] that accepts [`MostlySend`] `T`s. +#[derive(Default)] pub struct ThreadLocal { inner: thread_local::ThreadLocal>, // FIXME: this should be necessary @@ -235,6 +222,7 @@ impl< T: MostlySend, > DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, T> { + #[allow(clippy::too_many_arguments)] pub fn new( index: &'indexer Index, db_fields_ids_map: &'indexer FieldsIdsMap, @@ -252,7 +240,7 @@ impl< doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024 * 1024)))); let doc_alloc = doc_alloc.0.take(); let fields_ids_map = fields_ids_map_store - .get_or(|| RefCell::new(GlobalFieldsIdsMap::new(&new_fields_ids_map)).into()); + .get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into()); let fields_ids_map = &fields_ids_map.0; let extractor_alloc = extractor_allocs.get_or_default(); diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index cafc59221..00fe6baee 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -6,10 +6,10 @@ use roaring::RoaringBitmap; use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend}; use crate::documents::PrimaryKey; use crate::index::db_name::EXTERNAL_DOCUMENTS_IDS; -use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::{Deletion, DocumentChange}; use crate::{DocumentId, InternalError, Result}; +#[derive(Default)] pub struct DocumentDeletion { pub to_delete: RoaringBitmap, } @@ -177,8 +177,5 @@ mod test { alloc.get_mut().reset(); } } - drop(deletion_tracker); - drop(changes); - drop(rtxn); } } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 673cd402e..4592feb43 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use std::sync::{Arc, RwLock}; +use std::sync::RwLock; use std::thread::{self, Builder}; use big_s::S; @@ -11,27 +11,25 @@ pub use document_deletion::DocumentDeletion; pub use document_operation::DocumentOperation; use heed::{RoTxn, RwTxn}; pub use partial_dump::PartialDump; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; use rayon::ThreadPool; pub use update_by_function::UpdateByFunction; use super::channel::*; use super::document::write_to_obkv; -use super::document_change::{Deletion, DocumentChange, Insertion, Update}; +use super::document_change::DocumentChange; use super::extract::*; use super::merger::{merge_grenad_entries, FacetFieldIdsDelta}; use super::word_fst_builder::PrefixDelta; use super::words_prefix_docids::{ compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, }; -use super::{extract, StdResult, TopLevelMap}; +use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::facet::FacetType; use crate::update::new::channel::ExtractorSender; -use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::settings::InnerIndexSettings; use crate::update::{FacetsUpdateBulk, GrenadParameters}; -use crate::{fields_ids_map, Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; mod de; pub mod document_changes; @@ -49,7 +47,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { fn init_data( &self, - extractor_alloc: raw_collections::alloc::RefBump<'extractor>, + _extractor_alloc: raw_collections::alloc::RefBump<'extractor>, ) -> Result { Ok(FullySend(())) } @@ -271,7 +269,7 @@ where Ok(()) as Result<_> })?; - drop(indexing_context); + // required to into_inner the new_fields_ids_map drop(fields_ids_map_store); let fields_ids_map = new_fields_ids_map.into_inner().unwrap(); diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 524608801..80556ced9 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -11,14 +11,12 @@ use roaring::RoaringBitmap; use super::channel::*; use super::extract::FacetKind; use super::word_fst_builder::{PrefixData, PrefixDelta, PrefixSettings}; -use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update}; +use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId}; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::update::new::word_fst_builder::WordFstBuilder; use crate::update::MergeDeladdCboRoaringBitmaps; -use crate::{ - CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Prefix, Result, -}; +use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result}; /// TODO We must return some infos/stats #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] @@ -27,7 +25,7 @@ pub fn merge_grenad_entries( sender: MergerSender, rtxn: &RoTxn, index: &Index, - mut global_fields_ids_map: GlobalFieldsIdsMap<'_>, + global_fields_ids_map: GlobalFieldsIdsMap<'_>, ) -> Result { let mut buffer: Vec = Vec::new(); let mut documents_ids = index.documents_ids(rtxn)?; @@ -386,7 +384,7 @@ impl FacetFieldIdsDelta { } } - fn extract_key_data<'a>(&self, key: &'a [u8]) -> (FacetKind, FieldId) { + fn extract_key_data(&self, key: &[u8]) -> (FacetKind, FieldId) { let facet_kind = FacetKind::from(key[0]); let field_id = FieldId::from_be_bytes([key[1], key[2]]); (facet_kind, field_id)