From 66ba176df8b82b34d536fc4d207d87e8eeb59725 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 12 Nov 2024 17:28:09 +0100 Subject: [PATCH] Extract and pack the points to inserted them in the rtree --- .../milli/src/update/new/extract/geo/mod.rs | 91 ++++++++++++------- crates/milli/src/update/new/merger.rs | 28 +++++- 2 files changed, 80 insertions(+), 39 deletions(-) diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs index 841ee8550..72fc8abf0 100644 --- a/crates/milli/src/update/new/extract/geo/mod.rs +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -1,10 +1,10 @@ use std::cell::RefCell; use std::fs::File; use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Write as _}; -use std::{mem, result}; +use std::{iter, mem, result}; use bumpalo::Bump; -use bytemuck::{bytes_of, Pod, Zeroable}; +use bytemuck::{bytes_of, from_bytes, pod_read_unaligned, Pod, Zeroable}; use heed::RoTxn; use serde_json::value::RawValue; use serde_json::Value; @@ -15,7 +15,7 @@ use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extra use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::DocumentChange; use crate::update::GrenadParameters; -use crate::{DocumentId, Index, InternalError, Object, Result}; +use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Object, Result}; pub struct GeoExtractor { grenad_parameters: GrenadParameters, @@ -39,16 +39,26 @@ impl GeoExtractor { #[derive(Pod, Zeroable, Copy, Clone)] #[repr(C, packed)] -pub struct GeoPoint { - document_id: DocumentId, - lat_lng: [f64; 2], +pub struct ExtractedGeoPoint { + pub docid: DocumentId, + pub lat_lng: [f64; 2], +} + +impl From for GeoPoint { + /// Converts the latitude and longitude back to an xyz GeoPoint. + fn from(value: ExtractedGeoPoint) -> Self { + let [lat, lng] = value.lat_lng; + let point = [lat, lng]; + let xyz_point = lat_lng_to_xyz(&point); + GeoPoint::new(xyz_point, (value.docid, point)) + } } pub struct GeoExtractorData<'extractor> { /// The set of documents ids that were removed. If a document sees its geo /// point being updated, we first put it in the deleted and then in the inserted. - removed: bumpalo::collections::Vec<'extractor, GeoPoint>, - inserted: bumpalo::collections::Vec<'extractor, GeoPoint>, + removed: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>, + inserted: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>, /// TODO Do the doc spilled_removed: Option>, /// TODO Do the doc @@ -75,38 +85,49 @@ impl<'extractor> GeoExtractorData<'extractor> { unsafe impl MostlySend for GeoExtractorData<'_> {} pub struct FrozenGeoExtractorData<'extractor> { - pub removed: &'extractor [GeoPoint], - pub inserted: &'extractor [GeoPoint], + pub removed: &'extractor [ExtractedGeoPoint], + pub inserted: &'extractor [ExtractedGeoPoint], pub spilled_removed: Option>, pub spilled_inserted: Option>, } impl<'extractor> FrozenGeoExtractorData<'extractor> { - pub fn get_removed_and_clear(&mut self) -> io::Result { - let mut output = RoaringBitmap::new(); + pub fn iter_and_clear_removed( + &mut self, + ) -> impl IntoIterator> + '_ { + mem::take(&mut self.removed) + .iter() + .copied() + .map(Ok) + .chain(iterator_over_spilled_geopoints(&mut self.spilled_removed)) + } - let mut iter = self.removed.iter_and_clear(); - while let Some(block) = iter.next_block() { - let numbers = block.iter().copied(); - output |= RoaringBitmap::from_sorted_iter(numbers).unwrap(); - } + pub fn iter_and_clear_inserted( + &mut self, + ) -> impl IntoIterator> + '_ { + mem::take(&mut self.inserted) + .iter() + .copied() + .map(Ok) + .chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted)) + } +} - if let Some(mut file) = self.spilled_removed.take() { - let mut number_bytes = [0u8; mem::size_of::()]; - loop { - match file.read_exact(&mut number_bytes) { - Ok(()) => { - let number = u32::from_be_bytes(number_bytes); - output.insert(number); - } - Err(e) if e.kind() == ErrorKind::UnexpectedEof => (), - Err(e) => return Err(e), - } +fn iterator_over_spilled_geopoints( + spilled: &mut Option>, +) -> impl IntoIterator> + '_ { + let mut spilled = spilled.take(); + iter::from_fn(move || match &mut spilled { + Some(file) => { + let geopoint_bytes = &mut [0u8; mem::size_of::()]; + match file.read_exact(geopoint_bytes) { + Ok(()) => Some(Ok(pod_read_unaligned(geopoint_bytes))), + Err(e) if e.kind() == ErrorKind::UnexpectedEof => None, + Err(e) => return Some(Err(e)), } } - - Ok(output) - } + None => None, + }) } impl<'extractor> Extractor<'extractor> for GeoExtractor { @@ -151,7 +172,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { .transpose()?; if let Some(lat_lng) = current_geo { - let geopoint = GeoPoint { document_id: docid, lat_lng }; + let geopoint = ExtractedGeoPoint { docid, lat_lng }; match &mut data_ref.spilled_removed { Some(file) => file.write_all(bytes_of(&geopoint))?, None => data_ref.removed.push(geopoint), @@ -179,7 +200,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { // we need to replace the current by the new point and therefore // delete the current point from the RTree. if let Some(lat_lng) = current_geo { - let geopoint = GeoPoint { document_id: docid, lat_lng }; + let geopoint = ExtractedGeoPoint { docid, lat_lng }; match &mut data_ref.spilled_removed { Some(file) => file.write_all(bytes_of(&geopoint))?, None => data_ref.removed.push(geopoint), @@ -187,7 +208,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { } if let Some(lat_lng) = updated_geo { - let geopoint = GeoPoint { document_id: docid, lat_lng }; + let geopoint = ExtractedGeoPoint { docid, lat_lng }; match &mut data_ref.spilled_inserted { Some(file) => file.write_all(bytes_of(&geopoint))?, None => data_ref.inserted.push(geopoint), @@ -206,7 +227,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { .transpose()?; if let Some(lat_lng) = inserted_geo { - let geopoint = GeoPoint { document_id: docid, lat_lng }; + let geopoint = ExtractedGeoPoint { docid, lat_lng }; match &mut data_ref.spilled_inserted { Some(file) => file.write_all(bytes_of(&geopoint))?, None => data_ref.inserted.push(geopoint), diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 99f2e8489..7d5543ee8 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -1,8 +1,10 @@ use std::cell::RefCell; +use std::io; use hashbrown::HashSet; use heed::types::Bytes; use heed::{Database, RoTxn}; +use memmap2::Mmap; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; @@ -11,7 +13,7 @@ use super::extract::{ merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, GeoExtractorData, }; -use crate::{CboRoaringBitmapCodec, FieldId, Index, InternalError, Result}; +use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] pub fn merge_and_send_rtree<'extractor, MSP>( @@ -25,13 +27,31 @@ where MSP: Fn() -> bool + Sync, { let mut rtree = index.geo_rtree(rtxn)?.unwrap_or_default(); - for data in datastore { + if must_stop_processing() { + return Err(InternalError::AbortedIndexation.into()); + } + let mut frozen = data.into_inner().freeze()?; - let removed = frozen.get_removed_and_clear()?; - removed.into_iter().for_each(|docid| rtree.remove(t)); + for result in frozen.iter_and_clear_removed() { + let extracted_geo_point = result?; + rtree.remove(&GeoPoint::from(extracted_geo_point)); + } + + for result in frozen.iter_and_clear_inserted() { + let extracted_geo_point = result?; + rtree.insert(GeoPoint::from(extracted_geo_point)); + } } + let mut file = tempfile::tempfile()?; + /// manage error + bincode::serialize_into(&mut file, &rtree).unwrap(); + file.sync_all()?; + + let rtree_mmap = unsafe { Mmap::map(&file)? }; + geo_sender.set_rtree(rtree_mmap).unwrap(); + Ok(()) }