diff --git a/milli/src/index.rs b/milli/src/index.rs index d2b4598d3..70aefa9be 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -298,7 +298,7 @@ impl Index { /* geo rtree */ - pub(crate) fn put_geo_rtree>( + pub(crate) fn put_geo_rtree( &self, wtxn: &mut RwTxn, rtree: &RTree, diff --git a/milli/src/update/index_documents/extract/extract_geo_points.rs b/milli/src/update/index_documents/extract/extract_geo_points.rs new file mode 100644 index 000000000..9f6e43199 --- /dev/null +++ b/milli/src/update/index_documents/extract/extract_geo_points.rs @@ -0,0 +1,46 @@ +use std::fs::File; +use std::io; + +use concat_arrays::concat_arrays; +use log::warn; +use serde_json::Value; + +use super::helpers::{create_writer, writer_into_reader, GrenadParameters}; +use crate::{FieldId, InternalError, Result}; + +/// Extracts the geographical coordinates contained in each document under the `_geo` field. +/// +/// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude) +pub fn extract_geo_points( + mut obkv_documents: grenad::Reader, + indexer: GrenadParameters, + geo_field_id: Option, // faire un grenad vide +) -> Result> { + let mut writer = tempfile::tempfile().and_then(|file| { + create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file) + })?; + + // we never encountered any documents with a `_geo` field. We can skip entirely this step + if geo_field_id.is_none() { + return Ok(writer_into_reader(writer)?); + } + let geo_field_id = geo_field_id.unwrap(); + + while let Some((docid_bytes, value)) = obkv_documents.next()? { + let obkv = obkv::KvReader::new(value); + let point = obkv.get(geo_field_id).unwrap(); // TODO: TAMO where should we handle this error? + let point: Value = serde_json::from_slice(point).map_err(InternalError::SerdeJson)?; + + if let Some((lat, long)) = point["lat"].as_f64().zip(point["long"].as_f64()) { + // this will create an array of 16 bytes (two 8 bytes floats) + let bytes: [u8; 16] = concat_arrays![lat.to_le_bytes(), long.to_le_bytes()]; + writer.insert(docid_bytes, bytes)?; + } else { + // TAMO: improve the warn + warn!("Malformed `_geo` field"); + continue; + } + } + + Ok(writer_into_reader(writer)?) +} diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index bb49e3e51..90a279815 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -3,6 +3,7 @@ mod extract_facet_number_docids; mod extract_facet_string_docids; mod extract_fid_docid_facet_values; mod extract_fid_word_count_docids; +mod extract_geo_points; mod extract_word_docids; mod extract_word_level_position_docids; mod extract_word_pair_proximity_docids; @@ -19,6 +20,7 @@ use self::extract_facet_number_docids::extract_facet_number_docids; use self::extract_facet_string_docids::extract_facet_string_docids; use self::extract_fid_docid_facet_values::extract_fid_docid_facet_values; use self::extract_fid_word_count_docids::extract_fid_word_count_docids; +use self::extract_geo_points::extract_geo_points; use self::extract_word_docids::extract_word_docids; use self::extract_word_level_position_docids::extract_word_level_position_docids; use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids; @@ -37,6 +39,7 @@ pub(crate) fn data_from_obkv_documents( lmdb_writer_sx: Sender>, searchable_fields: Option>, faceted_fields: HashSet, + geo_field_id: Option, stop_words: Option>, ) -> Result<()> { let result: Result<(Vec<_>, (Vec<_>, Vec<_>))> = obkv_chunks @@ -54,7 +57,7 @@ pub(crate) fn data_from_obkv_documents( .collect(); let ( - docid_word_positions_chunks, + (docid_word_positions_chunks), (docid_fid_facet_numbers_chunks, docid_fid_facet_strings_chunks), ) = result?; @@ -118,6 +121,16 @@ pub(crate) fn data_from_obkv_documents( "field-id-facet-number-docids", ); + spawn_extraction_task( + documents_chunk, + indexer.clone(), + lmdb_writer_sx.clone(), + move |documents, indexer| extract_geo_points(documents, indexer, geo_field_id), + merge_cbo_roaring_bitmaps, + TypedChunk::GeoPoints, + "geo-points", + ); + Ok(()) } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 7800ae55a..44b108076 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -233,6 +233,8 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.index.searchable_fields_ids(self.wtxn)?.map(HashSet::from_iter); // get filterable fields for facet databases let faceted_fields = self.index.faceted_fields_ids(self.wtxn)?; + // get the fid of the `_geo` field. + let geo_field_id = self.index.fields_ids_map(self.wtxn)?.id("_geo"); let stop_words = self.index.stop_words(self.wtxn)?; // let stop_words = stop_words.as_ref(); @@ -261,6 +263,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { lmdb_writer_sx.clone(), searchable_fields, faceted_fields, + geo_field_id, stop_words, ) }); diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 5f28034fe..dcefee153 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -6,11 +6,12 @@ use heed::{BytesDecode, RwTxn}; use roaring::RoaringBitmap; use super::helpers::{ - roaring_bitmap_from_u32s_array, serialize_roaring_bitmap, valid_lmdb_key, CursorClonableMmap, + self, roaring_bitmap_from_u32s_array, serialize_roaring_bitmap, valid_lmdb_key, + CursorClonableMmap, }; use crate::heed_codec::facet::{decode_prefix_string, encode_prefix_string}; use crate::update::index_documents::helpers::into_clonable_grenad; -use crate::{BoRoaringBitmapCodec, CboRoaringBitmapCodec, Index, Result}; +use crate::{BoRoaringBitmapCodec, CboRoaringBitmapCodec, GeoPoint, Index, Result}; pub(crate) enum TypedChunk { DocidWordPositions(grenad::Reader), @@ -24,6 +25,7 @@ pub(crate) enum TypedChunk { WordPairProximityDocids(grenad::Reader), FieldIdFacetStringDocids(grenad::Reader), FieldIdFacetNumberDocids(grenad::Reader), + GeoPoints(grenad::Reader), } /// Write typed chunk in the corresponding LMDB database of the provided index. @@ -177,6 +179,22 @@ pub(crate) fn write_typed_chunk_into_index( )?; is_merged_database = true; } + TypedChunk::GeoPoints(mut geo_points) => { + // TODO: TAMO: we should create the rtree with the `RTree::bulk_load` function + let mut rtree = index.geo_rtree(&index.read_txn()?)?.unwrap_or_default(); + while let Some((key, value)) = geo_points.next()? { + // convert the key back to a u32 (4 bytes) + let (key, _) = helpers::try_split_array_at::(key).unwrap(); + let key = u32::from_le_bytes(key); + + // convert the latitude and longitude back to a f64 (8 bytes) + let (lat, tail) = helpers::try_split_array_at::(value).unwrap(); + let (long, _) = helpers::try_split_array_at::(tail).unwrap(); + let point = [f64::from_le_bytes(lat), f64::from_le_bytes(long)]; + rtree.insert(GeoPoint::new(point, key)); + } + index.put_geo_rtree(wtxn, &rtree)?; + } } Ok((RoaringBitmap::new(), is_merged_database))