mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 20:15:07 +08:00
Finialize the GeoExtractor
This commit is contained in:
parent
717a69dc6e
commit
408a7cec6e
@ -1,26 +1,27 @@
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::f32::consts::PI;
|
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{self, BufWriter};
|
use std::io::{BufWriter, Write as _};
|
||||||
use std::mem::{self, size_of};
|
use std::mem::size_of;
|
||||||
|
use std::result;
|
||||||
|
|
||||||
use bincode::ErrorKind;
|
|
||||||
use bumpalo::Bump;
|
use bumpalo::Bump;
|
||||||
use heed::RoTxn;
|
use heed::RoTxn;
|
||||||
use raw_collections::bbbul::BitPacker4x;
|
use raw_collections::bbbul::BitPacker4x;
|
||||||
use raw_collections::Bbbul;
|
use raw_collections::Bbbul;
|
||||||
|
use serde_json::value::RawValue;
|
||||||
|
use serde_json::Value;
|
||||||
use uell::Uell;
|
use uell::Uell;
|
||||||
|
|
||||||
|
use crate::error::GeoError;
|
||||||
use crate::update::new::document::Document;
|
use crate::update::new::document::Document;
|
||||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
|
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::{DocumentId, Index, Result};
|
use crate::{DocumentId, Index, InternalError, Object, Result};
|
||||||
|
|
||||||
pub struct GeoExtractor {
|
pub struct GeoExtractor {
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: GrenadParameters,
|
||||||
// rtree: Option<rstar::RTree<GeoPoint>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GeoExtractor {
|
impl GeoExtractor {
|
||||||
@ -72,22 +73,144 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let rtxn = &context.rtxn;
|
let rtxn = &context.rtxn;
|
||||||
let index = context.index;
|
let index = context.index;
|
||||||
|
let max_memory = self.grenad_parameters.max_memory;
|
||||||
let db_fields_ids_map = context.db_fields_ids_map;
|
let db_fields_ids_map = context.db_fields_ids_map;
|
||||||
let mut data_ref = context.data.borrow_mut_or_yield();
|
let mut data_ref = context.data.borrow_mut_or_yield();
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
|
||||||
for change in changes {
|
for change in changes {
|
||||||
|
if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) {
|
||||||
|
// We must spill as we allocated too much memory
|
||||||
|
data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?;
|
||||||
|
data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?;
|
||||||
|
}
|
||||||
|
|
||||||
match change? {
|
match change? {
|
||||||
DocumentChange::Deletion(deletion) => todo!(),
|
DocumentChange::Deletion(deletion) => {
|
||||||
|
let docid = deletion.docid();
|
||||||
|
match &mut data_ref.spilled_removed {
|
||||||
|
Some(file) => file.write_all(&docid.to_be_bytes()[..])?,
|
||||||
|
None => data_ref.removed.insert(docid),
|
||||||
|
}
|
||||||
|
}
|
||||||
DocumentChange::Update(update) => {
|
DocumentChange::Update(update) => {
|
||||||
let current = update.current(rtxn, index, db_fields_ids_map)?;
|
let current = update.current(rtxn, index, db_fields_ids_map)?;
|
||||||
let current_geo = current.geo_field()?;
|
let external_id = update.external_document_id();
|
||||||
let updated_geo = update.updated().geo_field()?;
|
let docid = update.docid();
|
||||||
// ...
|
|
||||||
|
let current_geo = current
|
||||||
|
.geo_field()?
|
||||||
|
.map(|geo| extract_geo_coordinates(external_id, geo))
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
let updated_geo = update
|
||||||
|
.updated()
|
||||||
|
.geo_field()?
|
||||||
|
.map(|geo| extract_geo_coordinates(external_id, geo))
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
if current_geo != updated_geo {
|
||||||
|
// If the current and new geo points are different it means that
|
||||||
|
// we need to replace the current by the new point and therefore
|
||||||
|
// delete the current point from the RTree.
|
||||||
|
if current_geo.is_some() {
|
||||||
|
match &mut data_ref.spilled_removed {
|
||||||
|
Some(file) => file.write_all(&docid.to_be_bytes()[..])?,
|
||||||
|
None => data_ref.removed.insert(docid),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some([lat, lng]) = updated_geo {
|
||||||
|
let entry = concat_docid_lat_lng(&mut buffer, docid, lat, lng);
|
||||||
|
match &mut data_ref.spilled_inserted {
|
||||||
|
Some(file) => file.write_all(entry)?,
|
||||||
|
None => data_ref.inserted.push(entry.try_into().unwrap()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DocumentChange::Insertion(insertion) => {
|
||||||
|
let external_id = insertion.external_document_id();
|
||||||
|
let docid = insertion.docid();
|
||||||
|
|
||||||
|
let inserted_geo = insertion
|
||||||
|
.inserted()
|
||||||
|
.geo_field()?
|
||||||
|
.map(|geo| extract_geo_coordinates(external_id, geo))
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
if let Some([lat, lng]) = inserted_geo {
|
||||||
|
let entry = concat_docid_lat_lng(&mut buffer, docid, lat, lng);
|
||||||
|
match &mut data_ref.spilled_inserted {
|
||||||
|
Some(file) => file.write_all(entry)?,
|
||||||
|
None => data_ref.inserted.push(entry.try_into().unwrap()),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
DocumentChange::Insertion(insertion) => {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Extracts and validate the latitude and latitude from a document geo field.
|
||||||
|
///
|
||||||
|
/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`.
|
||||||
|
fn extract_geo_coordinates(external_id: &str, raw_value: &RawValue) -> Result<[f64; 2]> {
|
||||||
|
let mut geo: Object =
|
||||||
|
serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)?;
|
||||||
|
|
||||||
|
let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) {
|
||||||
|
(Some(lat), Some(lng)) => [lat, lng],
|
||||||
|
(Some(_), None) => {
|
||||||
|
return Err(GeoError::MissingLatitude { document_id: Value::from(external_id) }.into())
|
||||||
|
}
|
||||||
|
(None, Some(_)) => {
|
||||||
|
return Err(GeoError::MissingLongitude { document_id: Value::from(external_id) }.into())
|
||||||
|
}
|
||||||
|
(None, None) => {
|
||||||
|
return Err(GeoError::MissingLatitudeAndLongitude {
|
||||||
|
document_id: Value::from(external_id),
|
||||||
|
}
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let lat = extract_finite_float_from_value(lat)
|
||||||
|
.map_err(|value| GeoError::BadLatitude { document_id: Value::from(external_id), value })?;
|
||||||
|
|
||||||
|
let lng = extract_finite_float_from_value(lng)
|
||||||
|
.map_err(|value| GeoError::BadLongitude { document_id: Value::from(external_id), value })?;
|
||||||
|
|
||||||
|
Ok([lat, lng])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extracts and validate that a serde JSON Value is actually a finite f64.
|
||||||
|
pub fn extract_finite_float_from_value(value: Value) -> result::Result<f64, Value> {
|
||||||
|
let number = match value {
|
||||||
|
Value::Number(ref n) => match n.as_f64() {
|
||||||
|
Some(number) => number,
|
||||||
|
None => return Err(value),
|
||||||
|
},
|
||||||
|
Value::String(ref s) => match s.parse::<f64>() {
|
||||||
|
Ok(number) => number,
|
||||||
|
Err(_) => return Err(value),
|
||||||
|
},
|
||||||
|
value => return Err(value),
|
||||||
|
};
|
||||||
|
|
||||||
|
if number.is_finite() {
|
||||||
|
Ok(number)
|
||||||
|
} else {
|
||||||
|
Err(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn concat_docid_lat_lng(buffer: &mut Vec<u8>, docid: DocumentId, lat: f64, lng: f64) -> &[u8] {
|
||||||
|
buffer.clear();
|
||||||
|
buffer.extend_from_slice(&docid.to_be_bytes());
|
||||||
|
buffer.extend_from_slice(&lat.to_be_bytes());
|
||||||
|
buffer.extend_from_slice(&lng.to_be_bytes());
|
||||||
|
&buffer[..]
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user