diff --git a/Cargo.lock b/Cargo.lock index b0e5978b5..03261cacf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3661,6 +3661,7 @@ dependencies = [ "time", "tokenizers", "tracing", + "uell", "ureq", "url", "uuid", @@ -5789,6 +5790,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" +[[package]] +name = "uell" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40de5982e28612e20330e77d81f1559b74f66caf3c7fc10b19ada4843f4b4fd7" +dependencies = [ + "bumpalo", +] + [[package]] name = "unescaper" version = "0.1.5" diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index 005393411..622292e8a 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -100,6 +100,7 @@ bumpalo = "3.16.0" thread_local = "1.1.8" allocator-api2 = "0.2.18" rustc-hash = "2.0.0" +uell = "0.1.0" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/crates/milli/src/update/index_documents/typed_chunk.rs b/crates/milli/src/update/index_documents/typed_chunk.rs index 2c30220bc..a97569800 100644 --- a/crates/milli/src/update/index_documents/typed_chunk.rs +++ b/crates/milli/src/update/index_documents/typed_chunk.rs @@ -737,7 +737,7 @@ pub(crate) fn write_typed_chunk_into_index( } /// Converts the latitude and longitude back to an xyz GeoPoint. -fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint { +pub fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint { let (lat, tail) = helpers::try_split_array_at::(value).unwrap(); let (lng, _) = helpers::try_split_array_at::(tail).unwrap(); let point = [f64::from_ne_bytes(lat), f64::from_ne_bytes(lng)]; diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index 2c93a5def..b76fe207a 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -54,7 +54,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); let content = deletion.current( - &context.txn, + &context.rtxn, context.index, &context.db_fields_ids_map, )?; @@ -72,7 +72,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { DocumentChange::Update(update) => { let docid = update.docid(); let content = - update.current(&context.txn, context.index, &context.db_fields_ids_map)?; + update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?; for res in content.iter_top_level_fields() { let (f, _) = res?; let entry = document_extractor_data @@ -92,9 +92,9 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { } let content = - update.merged(&context.txn, context.index, &context.db_fields_ids_map)?; + update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?; let vector_content = update.merged_vectors( - &context.txn, + &context.rtxn, context.index, &context.db_fields_ids_map, &context.doc_alloc, diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index 11dc8f3c7..d0dc425ae 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -63,7 +63,7 @@ impl FacetedDocidsExtractor { document_change: DocumentChange, ) -> Result<()> { let index = &context.index; - let rtxn = &context.txn; + let rtxn = &context.rtxn; let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let mut cached_sorter = context.data.borrow_mut_or_yield(); match document_change { diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs new file mode 100644 index 000000000..1dca78e34 --- /dev/null +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -0,0 +1,100 @@ +use std::cell::RefCell; +use std::f32::consts::PI; +use std::fs::File; +use std::io::{self, BufWriter}; +use std::mem::{self, size_of}; + +use bincode::ErrorKind; +use heed::RoTxn; +use raw_collections::bbbul::BitPacker4x; +use raw_collections::Bbbul; +use uell::Uell; + +use crate::update::new::document::Document; +use crate::update::new::indexer::document_changes::{ + DocumentChangeContext, Extractor, FullySend, MostlySend, +}; +use crate::update::new::ref_cell_ext::RefCellExt as _; +use crate::update::new::DocumentChange; +use crate::update::GrenadParameters; +use crate::{ + CboRoaringBitmapCodec, DocumentId, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, + InternalError, Result, +}; + +pub struct GeoExtractor { + grenad_parameters: GrenadParameters, + // rtree: Option>, +} + +impl GeoExtractor { + pub fn new( + rtxn: &RoTxn, + index: &Index, + grenad_parameters: GrenadParameters, + ) -> Result> { + let is_sortable = index.sortable_fields(rtxn)?.contains("_geo"); + let is_filterable = index.filterable_fields(rtxn)?.contains("_geo"); + if is_sortable || is_filterable { + // Ok(Some(GeoExtractor { rtree: index.geo_rtree(rtxn)? })) + Ok(Some(GeoExtractor { grenad_parameters })) + } else { + Ok(None) + } + } +} + +pub struct GeoExtractorData<'extractor> { + /// The set of documents ids that were removed. If a document sees its geo + /// point being updated, we first delete it and then insert it in the inserted. + removed: Bbbul<'extractor, BitPacker4x>, + /// The set of document ids associated to the two f64 geo points. + inserted: Uell<'extractor, [u8; size_of::() + 2 * size_of::()]>, + /// TODO Do the doc + spilled_removed: Option>, + /// TODO Do the doc + spilled_inserted: Option>, +} + +unsafe impl MostlySend for GeoExtractorData<'_> {} + +impl<'extractor> Extractor<'extractor> for GeoExtractor { + type Data = RefCell>; + + fn init_data<'doc>( + &'doc self, + extractor_alloc: &'extractor bumpalo::Bump, + ) -> Result { + Ok(RefCell::new(GeoExtractorData { + inserted: Uell::new_in(extractor_alloc), + removed: Bbbul::new_in(extractor_alloc), + spilled: None, + })) + } + + fn process<'doc>( + &'doc self, + changes: impl Iterator>>, + context: &'doc DocumentChangeContext, + ) -> Result<()> { + let rtxn = &context.rtxn; + let index = context.index; + let db_fields_ids_map = context.db_fields_ids_map; + let mut data_ref = context.data.borrow_mut_or_yield(); + + for change in changes { + match change? { + DocumentChange::Deletion(deletion) => todo!(), + DocumentChange::Update(update) => { + let current = update.current(rtxn, index, db_fields_ids_map)?; + let current_geo = current.geo_field()?; + let updated_geo = update.updated().geo_field()?; + // ... + } + DocumentChange::Insertion(insertion) => todo!(), + } + } + + Ok(()) + } +} diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index af6a29d07..14cfa83cb 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -1,6 +1,7 @@ mod cache; mod documents; mod faceted; +mod geo; mod searchable; mod vectors; @@ -8,6 +9,7 @@ use bumpalo::Bump; pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap}; pub use documents::*; pub use faceted::*; +pub use geo::*; pub use searchable::*; pub use vectors::EmbeddingExtractor; diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs index 89583bd93..0223895e6 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -326,7 +326,7 @@ impl WordDocidsExtractors { document_change: DocumentChange, ) -> Result<()> { let index = &context.index; - let rtxn = &context.txn; + let rtxn = &context.rtxn; let mut cached_sorter_ref = context.data.borrow_mut_or_yield(); let cached_sorter = cached_sorter_ref.as_mut().unwrap(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index 7f9fff38f..f637cff49 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -39,7 +39,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { let doc_alloc = &context.doc_alloc; let index = context.index; - let rtxn = &context.txn; + let rtxn = &context.rtxn; let mut key_buffer = bumpalo::collections::Vec::new_in(doc_alloc); let mut del_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc); diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 55121fb14..ddfabc01a 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -2,13 +2,13 @@ use std::cell::RefCell; use bumpalo::collections::Vec as BVec; use bumpalo::Bump; -use hashbrown::HashMap; +use hashbrown::{DefaultHashBuilder, HashMap}; use super::cache::DelAddRoaringBitmap; use crate::error::FaultSource; use crate::prompt::Prompt; use crate::update::new::channel::EmbeddingSender; -use crate::update::new::indexer::document_changes::{Extractor, MostlySend}; +use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend}; use crate::update::new::vector_document::VectorDocument; use crate::update::new::DocumentChange; use crate::vector::error::{ @@ -37,7 +37,7 @@ impl<'a> EmbeddingExtractor<'a> { } pub struct EmbeddingExtractorData<'extractor>( - pub HashMap, + pub HashMap, ); unsafe impl MostlySend for EmbeddingExtractorData<'_> {} @@ -52,9 +52,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { fn process<'doc>( &'doc self, changes: impl Iterator>>, - context: &'doc crate::update::new::indexer::document_changes::DocumentChangeContext< - Self::Data, - >, + context: &'doc DocumentChangeContext, ) -> crate::Result<()> { let embedders = self.embedders.inner_as_ref(); let mut unused_vectors_distribution = @@ -63,7 +61,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc); for (embedder_name, (embedder, prompt, _is_quantized)) in embedders { let embedder_id = - context.index.embedder_category_id.get(&context.txn, embedder_name)?.ok_or_else( + context.index.embedder_category_id.get(&context.rtxn, embedder_name)?.ok_or_else( || InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None, @@ -90,7 +88,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { } DocumentChange::Update(update) => { let old_vectors = update.current_vectors( - &context.txn, + &context.rtxn, context.index, context.db_fields_ids_map, &context.doc_alloc, @@ -130,7 +128,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { } else if new_vectors.regenerate { let new_rendered = prompt.render_document( update.current( - &context.txn, + &context.rtxn, context.index, context.db_fields_ids_map, )?, @@ -139,7 +137,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { )?; let old_rendered = prompt.render_document( update.merged( - &context.txn, + &context.rtxn, context.index, context.db_fields_ids_map, )?, @@ -157,7 +155,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { } else if old_vectors.regenerate { let old_rendered = prompt.render_document( update.current( - &context.txn, + &context.rtxn, context.index, context.db_fields_ids_map, )?, @@ -166,7 +164,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { )?; let new_rendered = prompt.render_document( update.merged( - &context.txn, + &context.rtxn, context.index, context.db_fields_ids_map, )?, diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index b9bf79e47..e4b088f31 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -197,7 +197,7 @@ pub struct DocumentChangeContext< /// inside of the DB. pub db_fields_ids_map: &'indexer FieldsIdsMap, /// A transaction providing data from the DB before all indexing operations - pub txn: RoTxn<'indexer>, + pub rtxn: RoTxn<'indexer>, /// Global field id map that is up to date with the current state of the indexing process. /// @@ -255,7 +255,7 @@ impl< let txn = index.read_txn()?; Ok(DocumentChangeContext { index, - txn, + rtxn: txn, db_fields_ids_map, new_fields_ids_map: fields_ids_map, doc_alloc, diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index d7648acd8..e89b04223 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -63,7 +63,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { where 'pl: 'doc, // the payload must survive the process calls { - let current = context.index.document(&context.txn, *docid)?; + let current = context.index.document(&context.rtxn, *docid)?; let external_document_id = self.primary_key.extract_docid_from_db( current, diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 001f59fe4..9d490df3b 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -143,11 +143,8 @@ where let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000); let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; - let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); - let new_fields_ids_map = RwLock::new(new_fields_ids_map); - let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads()); let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); diff --git a/crates/milli/src/update/new/indexer/update_by_function.rs b/crates/milli/src/update/new/indexer/update_by_function.rs index eb7252445..f6df3981d 100644 --- a/crates/milli/src/update/new/indexer/update_by_function.rs +++ b/crates/milli/src/update/new/indexer/update_by_function.rs @@ -93,7 +93,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { let DocumentChangeContext { index, db_fields_ids_map, - txn, + rtxn: txn, new_fields_ids_map, doc_alloc, .. diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 4eca113ea..a0ac8c907 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -1,6 +1,3 @@ -use std::io::{self}; - -use bincode::ErrorKind; use hashbrown::HashSet; use heed::types::Bytes; use heed::{Database, RoTxn}; @@ -11,59 +8,7 @@ use super::channel::*; use super::extract::{ merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, }; -use super::DocumentChange; -use crate::{ - CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, InternalError, - Result, -}; - -pub struct GeoExtractor { - rtree: Option>, -} - -impl GeoExtractor { - pub fn new(rtxn: &RoTxn, index: &Index) -> Result> { - let is_sortable = index.sortable_fields(rtxn)?.contains("_geo"); - let is_filterable = index.filterable_fields(rtxn)?.contains("_geo"); - if is_sortable || is_filterable { - Ok(Some(GeoExtractor { rtree: index.geo_rtree(rtxn)? })) - } else { - Ok(None) - } - } - - pub fn manage_change( - &mut self, - fidmap: &mut GlobalFieldsIdsMap, - change: &DocumentChange, - ) -> Result<()> { - match change { - DocumentChange::Deletion(_) => todo!(), - DocumentChange::Update(_) => todo!(), - DocumentChange::Insertion(_) => todo!(), - } - } - - pub fn serialize_rtree(self, writer: &mut W) -> Result { - match self.rtree { - Some(rtree) => { - // TODO What should I do? - bincode::serialize_into(writer, &rtree).map(|_| true).map_err(|e| match *e { - ErrorKind::Io(e) => Error::IoError(e), - ErrorKind::InvalidUtf8Encoding(_) => todo!(), - ErrorKind::InvalidBoolEncoding(_) => todo!(), - ErrorKind::InvalidCharEncoding => todo!(), - ErrorKind::InvalidTagEncoding(_) => todo!(), - ErrorKind::DeserializeAnyNotSupported => todo!(), - ErrorKind::SizeLimit => todo!(), - ErrorKind::SequenceMustHaveLength => todo!(), - ErrorKind::Custom(_) => todo!(), - }) - } - None => Ok(false), - } - } -} +use crate::{CboRoaringBitmapCodec, FieldId, Index, InternalError, Result}; #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] pub fn merge_and_send_docids<'extractor, MSP>(