Finialize the GeoExtractor

This commit is contained in:
Clément Renault 2024-11-12 14:37:23 +01:00
parent e727cf35fb
commit e1454db5ba
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -1,26 +1,27 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::f32::consts::PI;
use std::fs::File; use std::fs::File;
use std::io::{self, BufWriter}; use std::io::{BufWriter, Write as _};
use std::mem::{self, size_of}; use std::mem::size_of;
use std::result;
use bincode::ErrorKind;
use bumpalo::Bump; use bumpalo::Bump;
use heed::RoTxn; use heed::RoTxn;
use raw_collections::bbbul::BitPacker4x; use raw_collections::bbbul::BitPacker4x;
use raw_collections::Bbbul; use raw_collections::Bbbul;
use serde_json::value::RawValue;
use serde_json::Value;
use uell::Uell; use uell::Uell;
use crate::error::GeoError;
use crate::update::new::document::Document; use crate::update::new::document::Document;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend}; use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::{DocumentId, Index, Result}; use crate::{DocumentId, Index, InternalError, Object, Result};
pub struct GeoExtractor { pub struct GeoExtractor {
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
// rtree: Option<rstar::RTree<GeoPoint>>,
} }
impl GeoExtractor { impl GeoExtractor {
@ -72,22 +73,144 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
) -> Result<()> { ) -> Result<()> {
let rtxn = &context.rtxn; let rtxn = &context.rtxn;
let index = context.index; let index = context.index;
let max_memory = self.grenad_parameters.max_memory;
let db_fields_ids_map = context.db_fields_ids_map; let db_fields_ids_map = context.db_fields_ids_map;
let mut data_ref = context.data.borrow_mut_or_yield(); let mut data_ref = context.data.borrow_mut_or_yield();
let mut buffer = Vec::new();
for change in changes { for change in changes {
if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) {
// We must spill as we allocated too much memory
data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?;
data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?;
}
match change? { match change? {
DocumentChange::Deletion(deletion) => todo!(), DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
match &mut data_ref.spilled_removed {
Some(file) => file.write_all(&docid.to_be_bytes()[..])?,
None => data_ref.removed.insert(docid),
}
}
DocumentChange::Update(update) => { DocumentChange::Update(update) => {
let current = update.current(rtxn, index, db_fields_ids_map)?; let current = update.current(rtxn, index, db_fields_ids_map)?;
let current_geo = current.geo_field()?; let external_id = update.external_document_id();
let updated_geo = update.updated().geo_field()?; let docid = update.docid();
// ...
let current_geo = current
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
let updated_geo = update
.updated()
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
if current_geo != updated_geo {
// If the current and new geo points are different it means that
// we need to replace the current by the new point and therefore
// delete the current point from the RTree.
if current_geo.is_some() {
match &mut data_ref.spilled_removed {
Some(file) => file.write_all(&docid.to_be_bytes()[..])?,
None => data_ref.removed.insert(docid),
}
}
if let Some([lat, lng]) = updated_geo {
let entry = concat_docid_lat_lng(&mut buffer, docid, lat, lng);
match &mut data_ref.spilled_inserted {
Some(file) => file.write_all(entry)?,
None => data_ref.inserted.push(entry.try_into().unwrap()),
}
}
}
}
DocumentChange::Insertion(insertion) => {
let external_id = insertion.external_document_id();
let docid = insertion.docid();
let inserted_geo = insertion
.inserted()
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
if let Some([lat, lng]) = inserted_geo {
let entry = concat_docid_lat_lng(&mut buffer, docid, lat, lng);
match &mut data_ref.spilled_inserted {
Some(file) => file.write_all(entry)?,
None => data_ref.inserted.push(entry.try_into().unwrap()),
}
}
} }
DocumentChange::Insertion(insertion) => {}
} }
} }
Ok(()) Ok(())
} }
} }
/// Extracts and validate the latitude and latitude from a document geo field.
///
/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`.
fn extract_geo_coordinates(external_id: &str, raw_value: &RawValue) -> Result<[f64; 2]> {
let mut geo: Object =
serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)?;
let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) {
(Some(lat), Some(lng)) => [lat, lng],
(Some(_), None) => {
return Err(GeoError::MissingLatitude { document_id: Value::from(external_id) }.into())
}
(None, Some(_)) => {
return Err(GeoError::MissingLongitude { document_id: Value::from(external_id) }.into())
}
(None, None) => {
return Err(GeoError::MissingLatitudeAndLongitude {
document_id: Value::from(external_id),
}
.into())
}
};
let lat = extract_finite_float_from_value(lat)
.map_err(|value| GeoError::BadLatitude { document_id: Value::from(external_id), value })?;
let lng = extract_finite_float_from_value(lng)
.map_err(|value| GeoError::BadLongitude { document_id: Value::from(external_id), value })?;
Ok([lat, lng])
}
/// Extracts and validate that a serde JSON Value is actually a finite f64.
pub fn extract_finite_float_from_value(value: Value) -> result::Result<f64, Value> {
let number = match value {
Value::Number(ref n) => match n.as_f64() {
Some(number) => number,
None => return Err(value),
},
Value::String(ref s) => match s.parse::<f64>() {
Ok(number) => number,
Err(_) => return Err(value),
},
value => return Err(value),
};
if number.is_finite() {
Ok(number)
} else {
Err(value)
}
}
fn concat_docid_lat_lng(buffer: &mut Vec<u8>, docid: DocumentId, lat: f64, lng: f64) -> &[u8] {
buffer.clear();
buffer.extend_from_slice(&docid.to_be_bytes());
buffer.extend_from_slice(&lat.to_be_bytes());
buffer.extend_from_slice(&lng.to_be_bytes());
&buffer[..]
}