From 609545072f7651349cd0f6b376483a7aea9947a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 12 Nov 2024 16:25:13 +0100 Subject: [PATCH] Always collect the geopoint even when deleting one --- crates/milli/src/update/new/channel.rs | 47 ++++++- .../milli/src/update/new/extract/geo/mod.rs | 130 +++++++++++++----- crates/milli/src/update/new/indexer/mod.rs | 69 ++++++++-- crates/milli/src/update/new/merger.rs | 25 ++++ 4 files changed, 218 insertions(+), 53 deletions(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index a4896ee3f..85b8463e5 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -4,10 +4,12 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use hashbrown::HashMap; use heed::types::Bytes; +use memmap2::Mmap; use roaring::RoaringBitmap; use super::extract::FacetKind; use super::StdResult; +use crate::index::main_key::GEO_RTREE_KEY; use crate::update::new::KvReaderFieldId; use crate::vector::Embedding; use crate::{DocumentId, Index}; @@ -26,9 +28,9 @@ pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver) ) } -pub struct KeyValueEntry { - pub key_length: usize, - pub data: Box<[u8]>, +pub enum KeyValueEntry { + Small { key_length: usize, data: Box<[u8]> }, + Large { key_entry: KeyEntry, data: Mmap }, } impl KeyValueEntry { @@ -36,14 +38,25 @@ impl KeyValueEntry { let mut data = Vec::with_capacity(key.len() + value.len()); data.extend_from_slice(key); data.extend_from_slice(value); - KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() } + KeyValueEntry::Small { key_length: key.len(), data: data.into_boxed_slice() } } + + fn from_large_key_value(key: &[u8], value: Mmap) -> Self { + KeyValueEntry::Large { key_entry: KeyEntry::from_key(key), data: value } + } + pub fn key(&self) -> &[u8] { - &self.data[..self.key_length] + match self { + KeyValueEntry::Small { key_length, data } => &data[..*key_length], + KeyValueEntry::Large { key_entry, data: _ } => key_entry.entry(), + } } pub fn value(&self) -> &[u8] { - &self.data[self.key_length..] + match self { + KeyValueEntry::Small { key_length, data } => &data[*key_length..], + KeyValueEntry::Large { key_entry: _, data } => &data[..], + } } } @@ -98,6 +111,7 @@ pub struct DbOperation { #[derive(Debug)] pub enum Database { + Main, Documents, ExternalDocumentsIds, ExactWordDocids, @@ -116,6 +130,7 @@ pub enum Database { impl Database { pub fn database(&self, index: &Index) -> heed::Database { match self { + Database::Main => index.main.remap_types(), Database::Documents => index.documents.remap_types(), Database::ExternalDocumentsIds => index.external_documents_ids.remap_types(), Database::ExactWordDocids => index.exact_word_docids.remap_types(), @@ -208,6 +223,10 @@ impl ExtractorSender { EmbeddingSender(&self.sender) } + pub fn geo(&self) -> GeoSender<'_> { + GeoSender(&self.sender) + } + fn send_delete_vector(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { match self .sender @@ -427,3 +446,19 @@ impl EmbeddingSender<'_> { .map_err(|_| SendError(())) } } + +pub struct GeoSender<'a>(&'a Sender); + +impl GeoSender<'_> { + pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> { + self.0 + .send(WriterOperation::DbOperation(DbOperation { + database: Database::Main, + entry: EntryOperation::Write(KeyValueEntry::from_large_key_value( + GEO_RTREE_KEY.as_bytes(), + value, + )), + })) + .map_err(|_| SendError(())) + } +} diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs index d003cf90d..841ee8550 100644 --- a/crates/milli/src/update/new/extract/geo/mod.rs +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -1,16 +1,13 @@ use std::cell::RefCell; use std::fs::File; -use std::io::{BufWriter, Write as _}; -use std::mem::size_of; -use std::result; +use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Write as _}; +use std::{mem, result}; use bumpalo::Bump; +use bytemuck::{bytes_of, Pod, Zeroable}; use heed::RoTxn; -use raw_collections::bbbul::BitPacker4x; -use raw_collections::Bbbul; use serde_json::value::RawValue; use serde_json::Value; -use uell::Uell; use crate::error::GeoError; use crate::update::new::document::Document; @@ -40,29 +37,88 @@ impl GeoExtractor { } } +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C, packed)] +pub struct GeoPoint { + document_id: DocumentId, + lat_lng: [f64; 2], +} + pub struct GeoExtractorData<'extractor> { /// The set of documents ids that were removed. If a document sees its geo - /// point being updated, we first delete it and then insert it in the inserted. - removed: Bbbul<'extractor, BitPacker4x>, - /// The set of document ids associated to the two f64 geo points. - inserted: Uell<'extractor, [u8; size_of::() + 2 * size_of::()]>, + /// point being updated, we first put it in the deleted and then in the inserted. + removed: bumpalo::collections::Vec<'extractor, GeoPoint>, + inserted: bumpalo::collections::Vec<'extractor, GeoPoint>, /// TODO Do the doc spilled_removed: Option>, /// TODO Do the doc spilled_inserted: Option>, } +impl<'extractor> GeoExtractorData<'extractor> { + pub fn freeze(self) -> Result> { + let GeoExtractorData { removed, inserted, spilled_removed, spilled_inserted } = self; + + Ok(FrozenGeoExtractorData { + removed: removed.into_bump_slice(), + inserted: inserted.into_bump_slice(), + spilled_removed: spilled_removed + .map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error())) + .transpose()?, + spilled_inserted: spilled_inserted + .map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error())) + .transpose()?, + }) + } +} + unsafe impl MostlySend for GeoExtractorData<'_> {} +pub struct FrozenGeoExtractorData<'extractor> { + pub removed: &'extractor [GeoPoint], + pub inserted: &'extractor [GeoPoint], + pub spilled_removed: Option>, + pub spilled_inserted: Option>, +} + +impl<'extractor> FrozenGeoExtractorData<'extractor> { + pub fn get_removed_and_clear(&mut self) -> io::Result { + let mut output = RoaringBitmap::new(); + + let mut iter = self.removed.iter_and_clear(); + while let Some(block) = iter.next_block() { + let numbers = block.iter().copied(); + output |= RoaringBitmap::from_sorted_iter(numbers).unwrap(); + } + + if let Some(mut file) = self.spilled_removed.take() { + let mut number_bytes = [0u8; mem::size_of::()]; + loop { + match file.read_exact(&mut number_bytes) { + Ok(()) => { + let number = u32::from_be_bytes(number_bytes); + output.insert(number); + } + Err(e) if e.kind() == ErrorKind::UnexpectedEof => (), + Err(e) => return Err(e), + } + } + } + + Ok(output) + } +} + impl<'extractor> Extractor<'extractor> for GeoExtractor { type Data = RefCell>; fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result { Ok(RefCell::new(GeoExtractorData { - inserted: Uell::new_in(extractor_alloc), - removed: Bbbul::new_in(extractor_alloc), - spilled_removed: None, + removed: bumpalo::collections::Vec::new_in(extractor_alloc), + // inserted: Uell::new_in(extractor_alloc), + inserted: bumpalo::collections::Vec::new_in(extractor_alloc), spilled_inserted: None, + spilled_removed: None, })) } @@ -76,7 +132,6 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { let max_memory = self.grenad_parameters.max_memory; let db_fields_ids_map = context.db_fields_ids_map; let mut data_ref = context.data.borrow_mut_or_yield(); - let mut buffer = Vec::new(); for change in changes { if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) { @@ -88,9 +143,19 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { match change? { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); - match &mut data_ref.spilled_removed { - Some(file) => file.write_all(&docid.to_be_bytes()[..])?, - None => data_ref.removed.insert(docid), + let external_id = deletion.external_document_id(); + let current = deletion.current(rtxn, index, db_fields_ids_map)?; + let current_geo = current + .geo_field()? + .map(|geo| extract_geo_coordinates(external_id, geo)) + .transpose()?; + + if let Some(lat_lng) = current_geo { + let geopoint = GeoPoint { document_id: docid, lat_lng }; + match &mut data_ref.spilled_removed { + Some(file) => file.write_all(bytes_of(&geopoint))?, + None => data_ref.removed.push(geopoint), + } } } DocumentChange::Update(update) => { @@ -113,18 +178,19 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { // If the current and new geo points are different it means that // we need to replace the current by the new point and therefore // delete the current point from the RTree. - if current_geo.is_some() { + if let Some(lat_lng) = current_geo { + let geopoint = GeoPoint { document_id: docid, lat_lng }; match &mut data_ref.spilled_removed { - Some(file) => file.write_all(&docid.to_be_bytes()[..])?, - None => data_ref.removed.insert(docid), + Some(file) => file.write_all(bytes_of(&geopoint))?, + None => data_ref.removed.push(geopoint), } } - if let Some([lat, lng]) = updated_geo { - let entry = concat_docid_lat_lng(&mut buffer, docid, lat, lng); + if let Some(lat_lng) = updated_geo { + let geopoint = GeoPoint { document_id: docid, lat_lng }; match &mut data_ref.spilled_inserted { - Some(file) => file.write_all(entry)?, - None => data_ref.inserted.push(entry.try_into().unwrap()), + Some(file) => file.write_all(bytes_of(&geopoint))?, + None => data_ref.inserted.push(geopoint), } } } @@ -139,11 +205,11 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { .map(|geo| extract_geo_coordinates(external_id, geo)) .transpose()?; - if let Some([lat, lng]) = inserted_geo { - let entry = concat_docid_lat_lng(&mut buffer, docid, lat, lng); + if let Some(lat_lng) = inserted_geo { + let geopoint = GeoPoint { document_id: docid, lat_lng }; match &mut data_ref.spilled_inserted { - Some(file) => file.write_all(entry)?, - None => data_ref.inserted.push(entry.try_into().unwrap()), + Some(file) => file.write_all(bytes_of(&geopoint))?, + None => data_ref.inserted.push(geopoint), } } } @@ -206,11 +272,3 @@ pub fn extract_finite_float_from_value(value: Value) -> result::Result, docid: DocumentId, lat: f64, lng: f64) -> &[u8] { - buffer.clear(); - buffer.extend_from_slice(&docid.to_be_bytes()); - buffer.extend_from_slice(&lat.to_be_bytes()); - buffer.extend_from_slice(&lng.to_be_bytes()); - &buffer[..] -} diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 9d490df3b..88a07da54 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -32,6 +32,7 @@ use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; use crate::proximity::ProximityPrecision; use crate::update::del_add::DelAdd; use crate::update::new::extract::EmbeddingExtractor; +use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::settings::InnerIndexSettings; @@ -92,29 +93,33 @@ mod steps { step(4) } - pub const fn write_db() -> (u16, &'static str) { + pub const fn extract_geo_points() -> (u16, &'static str) { step(5) } - pub const fn write_embedding_db() -> (u16, &'static str) { + pub const fn write_db() -> (u16, &'static str) { step(6) } - pub const fn waiting_extractors() -> (u16, &'static str) { + pub const fn write_embedding_db() -> (u16, &'static str) { step(7) } - pub const fn post_processing_facets() -> (u16, &'static str) { + pub const fn waiting_extractors() -> (u16, &'static str) { step(8) } - pub const fn post_processing_words() -> (u16, &'static str) { + pub const fn post_processing_facets() -> (u16, &'static str) { step(9) } - pub const fn finalizing() -> (u16, &'static str) { + pub const fn post_processing_words() -> (u16, &'static str) { step(10) } + + pub const fn finalizing() -> (u16, &'static str) { + step(11) + } } /// This is the main function of this crate. @@ -324,7 +329,15 @@ where let (finished_steps, step_name) = steps::extract_word_proximity(); - let caches = ::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?; + let caches = ::run_extraction(grenad_parameters, + document_changes, + indexing_context, + &mut extractor_allocs, + finished_steps, + total_steps, + step_name, + )?; + merge_and_send_docids( caches, index.word_pair_proximity_docids.remap_types(), @@ -347,10 +360,15 @@ where let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let (finished_steps, step_name) = steps::extract_embeddings(); - - - extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; - + extract(document_changes, + &extractor, + indexing_context, + &mut extractor_allocs, + &datastore, + finished_steps, + total_steps, + step_name, + )?; let mut user_provided = HashMap::new(); for data in datastore { @@ -369,6 +387,35 @@ where embedding_sender.finish(user_provided).unwrap(); } + 'geo: { + let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); + let _entered = span.enter(); + + // let geo_sender = extractor_sender.geo_points(); + let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { + break 'geo; + }; + let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); + let (finished_steps, step_name) = steps::extract_geo_points(); + extract(document_changes, + &extractor, + indexing_context, + &mut extractor_allocs, + &datastore, + finished_steps, + total_steps, + step_name, + )?; + + merge_and_send_rtree( + datastore, + &rtxn, + index, + extractor_sender.geo(), + &indexing_context.must_stop_processing, + )?; + } + // TODO THIS IS TOO MUCH // - [ ] Extract fieldid docid facet number // - [ ] Extract fieldid docid facet string diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index a0ac8c907..99f2e8489 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -1,3 +1,5 @@ +use std::cell::RefCell; + use hashbrown::HashSet; use heed::types::Bytes; use heed::{Database, RoTxn}; @@ -7,9 +9,32 @@ use roaring::RoaringBitmap; use super::channel::*; use super::extract::{ merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, + GeoExtractorData, }; use crate::{CboRoaringBitmapCodec, FieldId, Index, InternalError, Result}; +#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] +pub fn merge_and_send_rtree<'extractor, MSP>( + datastore: impl IntoIterator>>, + rtxn: &RoTxn, + index: &Index, + geo_sender: GeoSender<'_>, + must_stop_processing: &MSP, +) -> Result<()> +where + MSP: Fn() -> bool + Sync, +{ + let mut rtree = index.geo_rtree(rtxn)?.unwrap_or_default(); + + for data in datastore { + let mut frozen = data.into_inner().freeze()?; + let removed = frozen.get_removed_and_clear()?; + removed.into_iter().for_each(|docid| rtree.remove(t)); + } + + Ok(()) +} + #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] pub fn merge_and_send_docids<'extractor, MSP>( mut caches: Vec>,