diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index f9463a137..5cffb92a8 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -18,7 +18,6 @@ one indexing operation. */ use std::collections::{BTreeSet, HashSet}; -use std::env::VarError; use std::ffi::OsStr; use std::fmt; use std::fs::{self, File}; @@ -27,19 +26,18 @@ use std::io::BufWriter; use dump::IndexMetadata; use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; -use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; +use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::update::new::indexer::{ self, retrieve_or_guess_primary_key, DocumentChanges, }; -use meilisearch_types::milli::update::new::TopLevelMap; use meilisearch_types::milli::update::{ IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings, }; use meilisearch_types::milli::vector::parsed_vectors::{ ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, }; -use meilisearch_types::milli::{self, Filter, InternalError, Object}; +use meilisearch_types::milli::{self, Filter, Object}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 055685151..85cf33c54 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -1,4 +1,4 @@ -use std::io::{BufReader, ErrorKind}; +use std::io::ErrorKind; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; diff --git a/meilisearch/src/search/mod.rs b/meilisearch/src/search/mod.rs index 54d0c4823..8cdbb31ee 100644 --- a/meilisearch/src/search/mod.rs +++ b/meilisearch/src/search/mod.rs @@ -1247,7 +1247,7 @@ impl<'a> HitMaker<'a> { self.index.iter_documents(self.rtxn, std::iter::once(id))?.next().unwrap()?; // First generate a document with all the displayed fields - let displayed_document = make_document(&self.displayed_ids, &self.fields_ids_map, &obkv)?; + let displayed_document = make_document(&self.displayed_ids, &self.fields_ids_map, obkv)?; let add_vectors_fid = self.vectors_fid.filter(|_fid| self.retrieve_vectors == RetrieveVectors::Retrieve); diff --git a/milli/src/documents/builder.rs b/milli/src/documents/builder.rs index ec4d634aa..1cf90447e 100644 --- a/milli/src/documents/builder.rs +++ b/milli/src/documents/builder.rs @@ -292,7 +292,7 @@ mod test { .unwrap() .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); - let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); + let val = obkv_to_object(doc, &index).map(Value::from).unwrap(); assert_eq!( val, @@ -321,7 +321,7 @@ mod test { .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); - let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); + let val = obkv_to_object(doc, &index).map(Value::from).unwrap(); assert_eq!( val, @@ -348,7 +348,7 @@ mod test { .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); - let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); + let val = obkv_to_object(doc, &index).map(Value::from).unwrap(); assert_eq!( val, @@ -375,7 +375,7 @@ mod test { .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); - let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); + let val = obkv_to_object(doc, &index).map(Value::from).unwrap(); assert_eq!( val, @@ -402,7 +402,7 @@ mod test { .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); - let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); + let val = obkv_to_object(doc, &index).map(Value::from).unwrap(); assert_eq!( val, @@ -429,7 +429,7 @@ mod test { .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); - let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); + let val = obkv_to_object(doc, &index).map(Value::from).unwrap(); assert_eq!( val, @@ -456,7 +456,7 @@ mod test { .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); - let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); + let val = obkv_to_object(doc, &index).map(Value::from).unwrap(); assert_eq!( val, @@ -483,7 +483,7 @@ mod test { .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); - let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); + let val = obkv_to_object(doc, &index).map(Value::from).unwrap(); assert_eq!( val, @@ -510,7 +510,7 @@ mod test { .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); - let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); + let val = obkv_to_object(doc, &index).map(Value::from).unwrap(); assert_eq!( val, @@ -555,7 +555,7 @@ mod test { .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); - let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); + let val = obkv_to_object(doc, &index).map(Value::from).unwrap(); assert_eq!( val, diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 49bada8e7..aea5680a1 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -289,7 +289,7 @@ impl<'a, 'i> Transform<'a, 'i> { .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; let base_obkv = KvReader::from_slice(base_obkv); if let Some(flattened_obkv) = - Self::flatten_from_fields_ids_map(&base_obkv, &mut self.fields_ids_map)? + Self::flatten_from_fields_ids_map(base_obkv, &mut self.fields_ids_map)? { // we recreate our buffer with the flattened documents document_sorter_value_buffer.clear(); @@ -324,7 +324,7 @@ impl<'a, 'i> Transform<'a, 'i> { let flattened_obkv = KvReader::from_slice(&obkv_buffer); if let Some(obkv) = - Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)? + Self::flatten_from_fields_ids_map(flattened_obkv, &mut self.fields_ids_map)? { document_sorter_value_buffer.clear(); document_sorter_value_buffer.push(Operation::Addition as u8); @@ -531,7 +531,7 @@ impl<'a, 'i> Transform<'a, 'i> { // flatten it and push it as to delete in the flattened_sorter let flattened_obkv = KvReader::from_slice(base_obkv); if let Some(obkv) = - Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)? + Self::flatten_from_fields_ids_map(flattened_obkv, &mut self.fields_ids_map)? { // we recreate our buffer with the flattened documents document_sorter_value_buffer.clear(); @@ -938,7 +938,7 @@ impl<'a, 'i> Transform<'a, 'i> { if let Some(flattened_obkv_buffer) = flattened_obkv_buffer { // take the non-flattened version if flatten_from_fields_ids_map returns None. let mut fields_ids_map = settings_diff.new.fields_ids_map.clone(); - let flattened = Self::flatten_from_fields_ids_map(&obkv, &mut fields_ids_map)?; + let flattened = Self::flatten_from_fields_ids_map(obkv, &mut fields_ids_map)?; let flattened = flattened.as_deref().map_or(obkv, KvReader::from_slice); flattened_obkv_buffer.clear(); diff --git a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index ce8136260..5736fc1d4 100644 --- a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, VecDeque}; +use std::collections::VecDeque; use std::rc::Rc; use heed::RoTxn; diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 48d373598..fe7480fa3 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -5,11 +5,7 @@ mod tokenize_document; use std::fs::File; -pub use extract_fid_word_count_docids::FidWordCountDocidsExtractor; -pub use extract_word_docids::{ - ExactWordDocidsExtractor, WordDocidsExtractor, WordDocidsExtractors, WordDocidsMergers, - WordFidDocidsExtractor, WordPositionDocidsExtractor, -}; +pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers}; pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; use grenad::Merger; use heed::RoTxn; diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index b744ec65e..bad72d3b2 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use rayon::iter::{ParallelBridge, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; use super::DocumentChanges; @@ -28,10 +28,11 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion { self, _fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result> + Clone + 'p> { let index = param; let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); - Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| { + let to_delete: Vec<_> = self.to_delete.into_iter().collect(); + Ok(to_delete.into_par_iter().map_with(items, |items, docid| { items.with(|rtxn| { let current = index.document(rtxn, docid)?; Ok(DocumentChange::Deletion(Deletion::create(docid, current.boxed()))) diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index b299124bd..f088370fb 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -2,15 +2,15 @@ use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use heed::types::Bytes; +use heed::types::{Bytes, DecodeIgnore}; use heed::RoTxn; use memmap2::Mmap; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use IndexDocumentsMethod as Idm; use super::super::document_change::DocumentChange; use super::super::items_pool::ItemsPool; -use super::top_level_map::{CowStr, TopLevelMap}; +use super::super::{CowStr, TopLevelMap}; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update}; @@ -73,7 +73,7 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { self, fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result> + Clone + 'p> { let (index, rtxn, primary_key) = param; let documents_ids = index.documents_ids(rtxn)?; @@ -199,29 +199,26 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { // And finally sort them docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops)); - Ok(docids_version_offsets - .into_par_iter() - .map_with( - Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))), - move |context_pool, (external_docid, (internal_docid, operations))| { - context_pool.with(|rtxn| { - let document_merge_function = match self.index_documents_method { - Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, - Idm::UpdateDocuments => MergeDocumentForUpdates::merge, - }; + Ok(docids_version_offsets.into_par_iter().map_with( + Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))), + move |context_pool, (external_docid, (internal_docid, operations))| { + context_pool.with(|rtxn| { + let document_merge_function = match self.index_documents_method { + Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, + Idm::UpdateDocuments => MergeDocumentForUpdates::merge, + }; - document_merge_function( - rtxn, - index, - &fields_ids_map, - internal_docid, - external_docid.to_string(), // TODO do not clone - &operations, - ) - }) - }, - ) - .filter_map(Result::transpose)) + document_merge_function( + rtxn, + index, + &fields_ids_map, + internal_docid, + external_docid.to_string(), // TODO do not clone + &operations, + ) + }) + }, + )) } } @@ -239,7 +236,7 @@ trait MergeChanges { docid: DocumentId, external_docid: String, operations: &[InnerDocOp], - ) -> Result>; + ) -> Result; } struct MergeDocumentForReplacement; @@ -266,7 +263,7 @@ impl MergeChanges for MergeDocumentForReplacement { docid: DocumentId, external_docid: String, operations: &[InnerDocOp], - ) -> Result> { + ) -> Result { let current = index.documents.remap_data_type::().get(rtxn, &docid)?; let current: Option<&KvReaderFieldId> = current.map(Into::into); @@ -288,21 +285,21 @@ impl MergeChanges for MergeDocumentForReplacement { let new = writer.into_boxed(); match current { - Some(current) => Ok(Some(DocumentChange::Update(Update::create( - docid, - current.boxed(), - new, - )))), - None => Ok(Some(DocumentChange::Insertion(Insertion::create(docid, new)))), + Some(current) => { + let update = Update::create(docid, current.boxed(), new); + Ok(DocumentChange::Update(update)) + } + None => Ok(DocumentChange::Insertion(Insertion::create(docid, new))), } } - Some(InnerDocOp::Deletion) => match current { - Some(current) => { - Ok(Some(DocumentChange::Deletion(Deletion::create(docid, current.boxed())))) - } - None => Ok(None), - }, - None => Ok(None), // but it's strange + Some(InnerDocOp::Deletion) => { + let deletion = match current { + Some(current) => Deletion::create(docid, current.boxed()), + None => todo!("Do that with Louis"), + }; + Ok(DocumentChange::Deletion(deletion)) + } + None => unreachable!("We must not have empty set of operations on a document"), } } } @@ -332,13 +329,13 @@ impl MergeChanges for MergeDocumentForUpdates { docid: DocumentId, external_docid: String, operations: &[InnerDocOp], - ) -> Result> { + ) -> Result { let mut document = BTreeMap::<_, Cow<_>>::new(); let current = index.documents.remap_data_type::().get(rtxn, &docid)?; let current: Option<&KvReaderFieldId> = current.map(Into::into); if operations.is_empty() { - return Ok(None); // but it's strange + unreachable!("We must not have empty set of operations on a document"); } let last_deletion = operations.iter().rposition(|op| matches!(op, InnerDocOp::Deletion)); @@ -355,13 +352,11 @@ impl MergeChanges for MergeDocumentForUpdates { } if operations.is_empty() { - match current { - Some(current) => { - let deletion = Deletion::create(docid, current.boxed()); - return Ok(Some(DocumentChange::Deletion(deletion))); - } - None => return Ok(None), - } + let deletion = match current { + Some(current) => Deletion::create(docid, current.boxed()), + None => todo!("Do that with Louis"), + }; + return Ok(DocumentChange::Deletion(deletion)); } for operation in operations { @@ -386,11 +381,11 @@ impl MergeChanges for MergeDocumentForUpdates { match current { Some(current) => { let update = Update::create(docid, current.boxed(), new); - Ok(Some(DocumentChange::Update(update))) + Ok(DocumentChange::Update(update)) } None => { let insertion = Insertion::create(docid, new); - Ok(Some(DocumentChange::Insertion(insertion))) + Ok(DocumentChange::Insertion(insertion)) } } } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 3a9db79b6..b317aefca 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -6,16 +6,15 @@ pub use document_deletion::DocumentDeletion; pub use document_operation::DocumentOperation; use heed::{RoTxn, RwTxn}; pub use partial_dump::PartialDump; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use rayon::ThreadPool; -pub use top_level_map::{CowStr, TopLevelMap}; pub use update_by_function::UpdateByFunction; use super::channel::*; use super::document_change::DocumentChange; use super::extract::*; use super::merger::merge_grenad_entries; -use super::StdResult; +use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::channel::ExtractorSender; use crate::update::GrenadParameters; @@ -24,7 +23,6 @@ use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; mod document_deletion; mod document_operation; mod partial_dump; -mod top_level_map; mod update_by_function; pub trait DocumentChanges<'p> { @@ -34,7 +32,7 @@ pub trait DocumentChanges<'p> { self, fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p>; + ) -> Result> + Clone + 'p>; } /// This is the main function of this crate. @@ -50,8 +48,7 @@ pub fn index( document_changes: PI, ) -> Result<()> where - PI: IntoParallelIterator> + Send, - PI::Iter: Clone, + PI: IndexedParallelIterator> + Send + Clone, { let (merger_sender, writer_receiver) = merger_writer_channel(10_000); // This channel acts as a rendezvous point to ensure that we are one task ahead diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 50768ba82..5f8743e31 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -1,4 +1,4 @@ -use rayon::iter::{ParallelBridge, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, ParallelBridge, ParallelIterator}; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; @@ -18,9 +18,7 @@ impl PartialDump { impl<'p, I> DocumentChanges<'p> for PartialDump where - I: IntoIterator, - I::IntoIter: Send + Clone + 'p, - I::Item: Send, + I: IndexedParallelIterator + Clone + 'p, { type Parameter = (&'p FieldsIdsMap, &'p ConcurrentAvailableIds, &'p PrimaryKey<'p>); @@ -32,10 +30,10 @@ where self, _fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result> + Clone + 'p> { let (fields_ids_map, concurrent_available_ids, primary_key) = param; - Ok(self.iter.into_iter().par_bridge().map(|object| { + Ok(self.iter.map(|object| { let docid = match concurrent_available_ids.next() { Some(id) => id, None => return Err(Error::UserError(UserError::DocumentLimitReached)), diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index 36ff432f8..d4c0f837b 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -1,4 +1,4 @@ -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use super::DocumentChanges; use crate::update::new::DocumentChange; @@ -13,7 +13,7 @@ impl<'p> DocumentChanges<'p> for UpdateByFunction { self, _fields_ids_map: &mut FieldsIdsMap, _param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result> + Clone + 'p> { Ok((0..100).into_par_iter().map(|_| todo!())) } } diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 1c7f04974..ca6b213c1 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -5,7 +5,7 @@ use bincode::ErrorKind; use fst::{Set, SetBuilder, Streamer}; use grenad::Merger; use heed::types::Bytes; -use heed::{BoxedError, Database, RoTxn}; +use heed::{Database, RoTxn}; use memmap2::Mmap; use roaring::RoaringBitmap; use tempfile::tempfile; @@ -16,9 +16,7 @@ use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::update::MergeDeladdCboRoaringBitmaps; -use crate::{ - CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, InternalError, Result, -}; +use crate::{CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, Result}; /// TODO We must return some infos/stats #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 3f5c4b3c9..6389a53c4 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -1,6 +1,6 @@ pub use document_change::{Deletion, DocumentChange, Insertion, Update}; -pub use indexer::{CowStr, TopLevelMap}; pub use items_pool::ItemsPool; +pub use top_level_map::{CowStr, TopLevelMap}; use super::del_add::DelAdd; use crate::FieldId; @@ -11,6 +11,7 @@ mod extract; pub mod indexer; mod items_pool; mod merger; +mod top_level_map; /// TODO move them elsewhere pub type StdResult = std::result::Result; diff --git a/milli/src/update/new/indexer/top_level_map.rs b/milli/src/update/new/top_level_map.rs similarity index 100% rename from milli/src/update/new/indexer/top_level_map.rs rename to milli/src/update/new/top_level_map.rs