Compare commits

...

4 Commits

Author SHA1 Message Date
Clément Renault
74da0ddc82
Accept null as a valid geo value 2024-11-12 18:08:27 +01:00
Clément Renault
66ba176df8
Extract and pack the points to inserted them in the rtree 2024-11-12 17:28:09 +01:00
Clément Renault
609545072f
Always collect the geopoint even when deleting one 2024-11-12 16:25:13 +01:00
Clément Renault
e1454db5ba
Finialize the GeoExtractor 2024-11-12 14:37:23 +01:00
4 changed files with 376 additions and 39 deletions

View File

@ -4,10 +4,12 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
use hashbrown::HashMap; use hashbrown::HashMap;
use heed::types::Bytes; use heed::types::Bytes;
use memmap2::Mmap;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::extract::FacetKind; use super::extract::FacetKind;
use super::StdResult; use super::StdResult;
use crate::index::main_key::GEO_RTREE_KEY;
use crate::update::new::KvReaderFieldId; use crate::update::new::KvReaderFieldId;
use crate::vector::Embedding; use crate::vector::Embedding;
use crate::{DocumentId, Index}; use crate::{DocumentId, Index};
@ -26,9 +28,9 @@ pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver)
) )
} }
pub struct KeyValueEntry { pub enum KeyValueEntry {
pub key_length: usize, Small { key_length: usize, data: Box<[u8]> },
pub data: Box<[u8]>, Large { key_entry: KeyEntry, data: Mmap },
} }
impl KeyValueEntry { impl KeyValueEntry {
@ -36,14 +38,25 @@ impl KeyValueEntry {
let mut data = Vec::with_capacity(key.len() + value.len()); let mut data = Vec::with_capacity(key.len() + value.len());
data.extend_from_slice(key); data.extend_from_slice(key);
data.extend_from_slice(value); data.extend_from_slice(value);
KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() } KeyValueEntry::Small { key_length: key.len(), data: data.into_boxed_slice() }
} }
fn from_large_key_value(key: &[u8], value: Mmap) -> Self {
KeyValueEntry::Large { key_entry: KeyEntry::from_key(key), data: value }
}
pub fn key(&self) -> &[u8] { pub fn key(&self) -> &[u8] {
&self.data[..self.key_length] match self {
KeyValueEntry::Small { key_length, data } => &data[..*key_length],
KeyValueEntry::Large { key_entry, data: _ } => key_entry.entry(),
}
} }
pub fn value(&self) -> &[u8] { pub fn value(&self) -> &[u8] {
&self.data[self.key_length..] match self {
KeyValueEntry::Small { key_length, data } => &data[*key_length..],
KeyValueEntry::Large { key_entry: _, data } => &data[..],
}
} }
} }
@ -98,6 +111,7 @@ pub struct DbOperation {
#[derive(Debug)] #[derive(Debug)]
pub enum Database { pub enum Database {
Main,
Documents, Documents,
ExternalDocumentsIds, ExternalDocumentsIds,
ExactWordDocids, ExactWordDocids,
@ -116,6 +130,7 @@ pub enum Database {
impl Database { impl Database {
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> { pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
match self { match self {
Database::Main => index.main.remap_types(),
Database::Documents => index.documents.remap_types(), Database::Documents => index.documents.remap_types(),
Database::ExternalDocumentsIds => index.external_documents_ids.remap_types(), Database::ExternalDocumentsIds => index.external_documents_ids.remap_types(),
Database::ExactWordDocids => index.exact_word_docids.remap_types(), Database::ExactWordDocids => index.exact_word_docids.remap_types(),
@ -208,6 +223,10 @@ impl ExtractorSender {
EmbeddingSender(&self.sender) EmbeddingSender(&self.sender)
} }
pub fn geo(&self) -> GeoSender<'_> {
GeoSender(&self.sender)
}
fn send_delete_vector(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { fn send_delete_vector(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
match self match self
.sender .sender
@ -427,3 +446,19 @@ impl EmbeddingSender<'_> {
.map_err(|_| SendError(())) .map_err(|_| SendError(()))
} }
} }
pub struct GeoSender<'a>(&'a Sender<WriterOperation>);
impl GeoSender<'_> {
pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> {
self.0
.send(WriterOperation::DbOperation(DbOperation {
database: Database::Main,
entry: EntryOperation::Write(KeyValueEntry::from_large_key_value(
GEO_RTREE_KEY.as_bytes(),
value,
)),
}))
.map_err(|_| SendError(()))
}
}

View File

@ -1,26 +1,24 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::f32::consts::PI;
use std::fs::File; use std::fs::File;
use std::io::{self, BufWriter}; use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Write as _};
use std::mem::{self, size_of}; use std::{iter, mem, result};
use bincode::ErrorKind;
use bumpalo::Bump; use bumpalo::Bump;
use bytemuck::{bytes_of, from_bytes, pod_read_unaligned, Pod, Zeroable};
use heed::RoTxn; use heed::RoTxn;
use raw_collections::bbbul::BitPacker4x; use serde_json::value::RawValue;
use raw_collections::Bbbul; use serde_json::Value;
use uell::Uell;
use crate::error::GeoError;
use crate::update::new::document::Document; use crate::update::new::document::Document;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend}; use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::{DocumentId, Index, Result}; use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Object, Result};
pub struct GeoExtractor { pub struct GeoExtractor {
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
// rtree: Option<rstar::RTree<GeoPoint>>,
} }
impl GeoExtractor { impl GeoExtractor {
@ -39,29 +37,109 @@ impl GeoExtractor {
} }
} }
#[derive(Pod, Zeroable, Copy, Clone)]
#[repr(C, packed)]
pub struct ExtractedGeoPoint {
pub docid: DocumentId,
pub lat_lng: [f64; 2],
}
impl From<ExtractedGeoPoint> for GeoPoint {
/// Converts the latitude and longitude back to an xyz GeoPoint.
fn from(value: ExtractedGeoPoint) -> Self {
let [lat, lng] = value.lat_lng;
let point = [lat, lng];
let xyz_point = lat_lng_to_xyz(&point);
GeoPoint::new(xyz_point, (value.docid, point))
}
}
pub struct GeoExtractorData<'extractor> { pub struct GeoExtractorData<'extractor> {
/// The set of documents ids that were removed. If a document sees its geo /// The set of documents ids that were removed. If a document sees its geo
/// point being updated, we first delete it and then insert it in the inserted. /// point being updated, we first put it in the deleted and then in the inserted.
removed: Bbbul<'extractor, BitPacker4x>, removed: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>,
/// The set of document ids associated to the two f64 geo points. inserted: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>,
inserted: Uell<'extractor, [u8; size_of::<DocumentId>() + 2 * size_of::<f64>()]>,
/// TODO Do the doc /// TODO Do the doc
spilled_removed: Option<BufWriter<File>>, spilled_removed: Option<BufWriter<File>>,
/// TODO Do the doc /// TODO Do the doc
spilled_inserted: Option<BufWriter<File>>, spilled_inserted: Option<BufWriter<File>>,
} }
impl<'extractor> GeoExtractorData<'extractor> {
pub fn freeze(self) -> Result<FrozenGeoExtractorData<'extractor>> {
let GeoExtractorData { removed, inserted, spilled_removed, spilled_inserted } = self;
Ok(FrozenGeoExtractorData {
removed: removed.into_bump_slice(),
inserted: inserted.into_bump_slice(),
spilled_removed: spilled_removed
.map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
.transpose()?,
spilled_inserted: spilled_inserted
.map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
.transpose()?,
})
}
}
unsafe impl MostlySend for GeoExtractorData<'_> {} unsafe impl MostlySend for GeoExtractorData<'_> {}
pub struct FrozenGeoExtractorData<'extractor> {
pub removed: &'extractor [ExtractedGeoPoint],
pub inserted: &'extractor [ExtractedGeoPoint],
pub spilled_removed: Option<BufReader<File>>,
pub spilled_inserted: Option<BufReader<File>>,
}
impl<'extractor> FrozenGeoExtractorData<'extractor> {
pub fn iter_and_clear_removed(
&mut self,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
mem::take(&mut self.removed)
.iter()
.copied()
.map(Ok)
.chain(iterator_over_spilled_geopoints(&mut self.spilled_removed))
}
pub fn iter_and_clear_inserted(
&mut self,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
mem::take(&mut self.inserted)
.iter()
.copied()
.map(Ok)
.chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted))
}
}
fn iterator_over_spilled_geopoints(
spilled: &mut Option<BufReader<File>>,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
let mut spilled = spilled.take();
iter::from_fn(move || match &mut spilled {
Some(file) => {
let geopoint_bytes = &mut [0u8; mem::size_of::<ExtractedGeoPoint>()];
match file.read_exact(geopoint_bytes) {
Ok(()) => Some(Ok(pod_read_unaligned(geopoint_bytes))),
Err(e) if e.kind() == ErrorKind::UnexpectedEof => None,
Err(e) => Some(Err(e)),
}
}
None => None,
})
}
impl<'extractor> Extractor<'extractor> for GeoExtractor { impl<'extractor> Extractor<'extractor> for GeoExtractor {
type Data = RefCell<GeoExtractorData<'extractor>>; type Data = RefCell<GeoExtractorData<'extractor>>;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> { fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(GeoExtractorData { Ok(RefCell::new(GeoExtractorData {
inserted: Uell::new_in(extractor_alloc), removed: bumpalo::collections::Vec::new_in(extractor_alloc),
removed: Bbbul::new_in(extractor_alloc), // inserted: Uell::new_in(extractor_alloc),
spilled_removed: None, inserted: bumpalo::collections::Vec::new_in(extractor_alloc),
spilled_inserted: None, spilled_inserted: None,
spilled_removed: None,
})) }))
} }
@ -72,22 +150,153 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
) -> Result<()> { ) -> Result<()> {
let rtxn = &context.rtxn; let rtxn = &context.rtxn;
let index = context.index; let index = context.index;
let max_memory = self.grenad_parameters.max_memory;
let db_fields_ids_map = context.db_fields_ids_map; let db_fields_ids_map = context.db_fields_ids_map;
let mut data_ref = context.data.borrow_mut_or_yield(); let mut data_ref = context.data.borrow_mut_or_yield();
for change in changes { for change in changes {
if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) {
// We must spill as we allocated too much memory
data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?;
data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?;
}
match change? { match change? {
DocumentChange::Deletion(deletion) => todo!(), DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
let external_id = deletion.external_document_id();
let current = deletion.current(rtxn, index, db_fields_ids_map)?;
let current_geo = current
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
if let Some(lat_lng) = current_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_removed {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.removed.push(geopoint),
}
}
}
DocumentChange::Update(update) => { DocumentChange::Update(update) => {
let current = update.current(rtxn, index, db_fields_ids_map)?; let current = update.current(rtxn, index, db_fields_ids_map)?;
let current_geo = current.geo_field()?; let external_id = update.external_document_id();
let updated_geo = update.updated().geo_field()?; let docid = update.docid();
// ...
let current_geo = current
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
let updated_geo = update
.updated()
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
if current_geo != updated_geo {
// If the current and new geo points are different it means that
// we need to replace the current by the new point and therefore
// delete the current point from the RTree.
if let Some(lat_lng) = current_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_removed {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.removed.push(geopoint),
}
}
if let Some(lat_lng) = updated_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_inserted {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.inserted.push(geopoint),
}
}
}
}
DocumentChange::Insertion(insertion) => {
let external_id = insertion.external_document_id();
let docid = insertion.docid();
let inserted_geo = insertion
.inserted()
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
if let Some(lat_lng) = inserted_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_inserted {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.inserted.push(geopoint),
}
}
} }
DocumentChange::Insertion(insertion) => {}
} }
} }
Ok(()) Ok(())
} }
} }
/// Extracts and validate the latitude and latitude from a document geo field.
///
/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`.
fn extract_geo_coordinates(external_id: &str, raw_value: &RawValue) -> Result<Option<[f64; 2]>> {
let mut geo = match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)? {
Value::Null => return Ok(None),
Value::Object(map) => map,
value => {
return Err(
GeoError::NotAnObject { document_id: Value::from(external_id), value }.into()
)
}
};
let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) {
(Some(lat), Some(lng)) => [lat, lng],
(Some(_), None) => {
return Err(GeoError::MissingLatitude { document_id: Value::from(external_id) }.into())
}
(None, Some(_)) => {
return Err(GeoError::MissingLongitude { document_id: Value::from(external_id) }.into())
}
(None, None) => {
return Err(GeoError::MissingLatitudeAndLongitude {
document_id: Value::from(external_id),
}
.into())
}
};
let lat = extract_finite_float_from_value(lat)
.map_err(|value| GeoError::BadLatitude { document_id: Value::from(external_id), value })?;
let lng = extract_finite_float_from_value(lng)
.map_err(|value| GeoError::BadLongitude { document_id: Value::from(external_id), value })?;
Ok(Some([lat, lng]))
}
/// Extracts and validate that a serde JSON Value is actually a finite f64.
pub fn extract_finite_float_from_value(value: Value) -> result::Result<f64, Value> {
let number = match value {
Value::Number(ref n) => match n.as_f64() {
Some(number) => number,
None => return Err(value),
},
Value::String(ref s) => match s.parse::<f64>() {
Ok(number) => number,
Err(_) => return Err(value),
},
value => return Err(value),
};
if number.is_finite() {
Ok(number)
} else {
Err(value)
}
}

View File

@ -32,6 +32,7 @@ use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::InnerIndexSettings; use crate::update::settings::InnerIndexSettings;
@ -56,6 +57,7 @@ mod steps {
"extracting words", "extracting words",
"extracting word proximity", "extracting word proximity",
"extracting embeddings", "extracting embeddings",
"writing geo points",
"writing to database", "writing to database",
"writing embeddings to database", "writing embeddings to database",
"waiting for extractors", "waiting for extractors",
@ -92,29 +94,33 @@ mod steps {
step(4) step(4)
} }
pub const fn write_db() -> (u16, &'static str) { pub const fn extract_geo_points() -> (u16, &'static str) {
step(5) step(5)
} }
pub const fn write_embedding_db() -> (u16, &'static str) { pub const fn write_db() -> (u16, &'static str) {
step(6) step(6)
} }
pub const fn waiting_extractors() -> (u16, &'static str) { pub const fn write_embedding_db() -> (u16, &'static str) {
step(7) step(7)
} }
pub const fn post_processing_facets() -> (u16, &'static str) { pub const fn waiting_extractors() -> (u16, &'static str) {
step(8) step(8)
} }
pub const fn post_processing_words() -> (u16, &'static str) { pub const fn post_processing_facets() -> (u16, &'static str) {
step(9) step(9)
} }
pub const fn finalizing() -> (u16, &'static str) { pub const fn post_processing_words() -> (u16, &'static str) {
step(10) step(10)
} }
pub const fn finalizing() -> (u16, &'static str) {
step(11)
}
} }
/// This is the main function of this crate. /// This is the main function of this crate.
@ -324,7 +330,15 @@ where
let (finished_steps, step_name) = steps::extract_word_proximity(); let (finished_steps, step_name) = steps::extract_word_proximity();
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?; let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
finished_steps,
total_steps,
step_name,
)?;
merge_and_send_docids( merge_and_send_docids(
caches, caches,
index.word_pair_proximity_docids.remap_types(), index.word_pair_proximity_docids.remap_types(),
@ -347,10 +361,15 @@ where
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_embeddings(); let (finished_steps, step_name) = steps::extract_embeddings();
extract(document_changes,
&extractor,
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; indexing_context,
&mut extractor_allocs,
&datastore,
finished_steps,
total_steps,
step_name,
)?;
let mut user_provided = HashMap::new(); let mut user_provided = HashMap::new();
for data in datastore { for data in datastore {
@ -369,6 +388,35 @@ where
embedding_sender.finish(user_provided).unwrap(); embedding_sender.finish(user_provided).unwrap();
} }
'geo: {
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
let _entered = span.enter();
// let geo_sender = extractor_sender.geo_points();
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
break 'geo;
};
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_geo_points();
extract(document_changes,
&extractor,
indexing_context,
&mut extractor_allocs,
&datastore,
finished_steps,
total_steps,
step_name,
)?;
merge_and_send_rtree(
datastore,
&rtxn,
index,
extractor_sender.geo(),
&indexing_context.must_stop_processing,
)?;
}
// TODO THIS IS TOO MUCH // TODO THIS IS TOO MUCH
// - [ ] Extract fieldid docid facet number // - [ ] Extract fieldid docid facet number
// - [ ] Extract fieldid docid facet string // - [ ] Extract fieldid docid facet string

View File

@ -1,14 +1,59 @@
use std::cell::RefCell;
use std::io;
use hashbrown::HashSet; use hashbrown::HashSet;
use heed::types::Bytes; use heed::types::Bytes;
use heed::{Database, RoTxn}; use heed::{Database, RoTxn};
use memmap2::Mmap;
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::channel::*; use super::channel::*;
use super::extract::{ use super::extract::{
merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind,
GeoExtractorData,
}; };
use crate::{CboRoaringBitmapCodec, FieldId, Index, InternalError, Result}; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_rtree<'extractor, MSP>(
datastore: impl IntoIterator<Item = RefCell<GeoExtractorData<'extractor>>>,
rtxn: &RoTxn,
index: &Index,
geo_sender: GeoSender<'_>,
must_stop_processing: &MSP,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,
{
let mut rtree = index.geo_rtree(rtxn)?.unwrap_or_default();
for data in datastore {
if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into());
}
let mut frozen = data.into_inner().freeze()?;
for result in frozen.iter_and_clear_removed() {
let extracted_geo_point = result?;
rtree.remove(&GeoPoint::from(extracted_geo_point));
}
for result in frozen.iter_and_clear_inserted() {
let extracted_geo_point = result?;
rtree.insert(GeoPoint::from(extracted_geo_point));
}
}
let mut file = tempfile::tempfile()?;
/// manage error
bincode::serialize_into(&mut file, dbg!(&rtree)).unwrap();
file.sync_all()?;
let rtree_mmap = unsafe { Mmap::map(&file)? };
geo_sender.set_rtree(rtree_mmap).unwrap();
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_docids<'extractor, MSP>( pub fn merge_and_send_docids<'extractor, MSP>(