Compare commits

..

1 Commits

Author SHA1 Message Date
Louis Dureuil
c890bd2cdf
Merge a01bc7b454 into 94fb55bb6f 2024-11-13 12:21:41 +01:00
22 changed files with 140 additions and 766 deletions

10
Cargo.lock generated
View File

@ -3664,7 +3664,6 @@ dependencies = [
"time", "time",
"tokenizers", "tokenizers",
"tracing", "tracing",
"uell",
"ureq", "ureq",
"url", "url",
"uuid", "uuid",
@ -5793,15 +5792,6 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
[[package]]
name = "uell"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40de5982e28612e20330e77d81f1559b74f66caf3c7fc10b19ada4843f4b4fd7"
dependencies = [
"bumpalo",
]
[[package]] [[package]]
name = "unescaper" name = "unescaper"
version = "0.1.5" version = "0.1.5"

View File

@ -1664,7 +1664,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "The `_geo` field in the document with the id: `\"11\"` is not an object. Was expecting an object with the `_geo.lat` and `_geo.lng` fields but instead got `\"foobar\"`.", "message": "The `_geo` field in the document with the id: `11` is not an object. Was expecting an object with the `_geo.lat` and `_geo.lng` fields but instead got `\"foobar\"`.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1701,7 +1701,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not find latitude nor longitude in the document with the id: `\"11\"`. Was expecting `_geo.lat` and `_geo.lng` fields.", "message": "Could not find latitude nor longitude in the document with the id: `11`. Was expecting `_geo.lat` and `_geo.lng` fields.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1738,7 +1738,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not find latitude nor longitude in the document with the id: `\"11\"`. Was expecting `_geo.lat` and `_geo.lng` fields.", "message": "Could not find latitude nor longitude in the document with the id: `11`. Was expecting `_geo.lat` and `_geo.lng` fields.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1775,7 +1775,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not find longitude in the document with the id: `\"11\"`. Was expecting a `_geo.lng` field.", "message": "Could not find longitude in the document with the id: `11`. Was expecting a `_geo.lng` field.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1812,7 +1812,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not find latitude in the document with the id: `\"11\"`. Was expecting a `_geo.lat` field.", "message": "Could not find latitude in the document with the id: `11`. Was expecting a `_geo.lat` field.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1849,7 +1849,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not find longitude in the document with the id: `\"11\"`. Was expecting a `_geo.lng` field.", "message": "Could not find longitude in the document with the id: `11`. Was expecting a `_geo.lng` field.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1886,7 +1886,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not find latitude in the document with the id: `\"11\"`. Was expecting a `_geo.lat` field.", "message": "Could not find latitude in the document with the id: `11`. Was expecting a `_geo.lat` field.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1923,7 +1923,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not parse latitude nor longitude in the document with the id: `\"11\"`. Was expecting finite numbers but instead got `false` and `true`.", "message": "Could not parse latitude nor longitude in the document with the id: `11`. Was expecting finite numbers but instead got `false` and `true`.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1960,7 +1960,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not find longitude in the document with the id: `\"11\"`. Was expecting a `_geo.lng` field.", "message": "Could not find longitude in the document with the id: `11`. Was expecting a `_geo.lng` field.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1997,7 +1997,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not find latitude in the document with the id: `\"11\"`. Was expecting a `_geo.lat` field.", "message": "Could not find latitude in the document with the id: `11`. Was expecting a `_geo.lat` field.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2034,7 +2034,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not parse latitude nor longitude in the document with the id: `\"11\"`. Was expecting finite numbers but instead got `\"doggo\"` and `\"doggo\"`.", "message": "Could not parse latitude nor longitude in the document with the id: `11`. Was expecting finite numbers but instead got `\"doggo\"` and `\"doggo\"`.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2071,7 +2071,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "The `_geo` field in the document with the id: `\"11\"` contains the following unexpected fields: `{\"doggo\":\"are the best\"}`.", "message": "The `_geo` field in the document with the id: `11` contains the following unexpected fields: `{\"doggo\":\"are the best\"}`.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2109,7 +2109,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not parse longitude in the document with the id: `\"12\"`. Was expecting a finite number but instead got `null`.", "message": "Could not parse longitude in the document with the id: `12`. Was expecting a finite number but instead got `null`.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2145,7 +2145,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not parse latitude in the document with the id: `\"12\"`. Was expecting a finite number but instead got `null`.", "message": "Could not parse latitude in the document with the id: `12`. Was expecting a finite number but instead got `null`.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2181,7 +2181,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0 "indexedDocuments": 0
}, },
"error": { "error": {
"message": "Could not parse latitude nor longitude in the document with the id: `\"13\"`. Was expecting finite numbers but instead got `null` and `null`.", "message": "Could not parse latitude nor longitude in the document with the id: `13`. Was expecting finite numbers but instead got `null` and `null`.",
"code": "invalid_document_geo_field", "code": "invalid_document_geo_field",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field" "link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2201,7 +2201,7 @@ async fn add_invalid_geo_and_then_settings() {
let index = server.index("test"); let index = server.index("test");
index.create(Some("id")).await; index.create(Some("id")).await;
// _geo is not a correct object // _geo is not an object
let documents = json!([ let documents = json!([
{ {
"id": "11", "id": "11",

View File

@ -70,8 +70,8 @@ async fn geo_bounding_box_with_string_and_number() {
let documents = DOCUMENTS.clone(); let documents = DOCUMENTS.clone();
index.update_settings_filterable_attributes(json!(["_geo"])).await; index.update_settings_filterable_attributes(json!(["_geo"])).await;
index.update_settings_sortable_attributes(json!(["_geo"])).await; index.update_settings_sortable_attributes(json!(["_geo"])).await;
let (ret, _code) = index.add_documents(documents, None).await; index.add_documents(documents, None).await;
index.wait_task(ret.uid()).await.succeeded(); index.wait_task(2).await;
index index
.search( .search(

View File

@ -100,7 +100,6 @@ bumpalo = "3.16.0"
thread_local = "1.1.8" thread_local = "1.1.8"
allocator-api2 = "0.2.18" allocator-api2 = "0.2.18"
rustc-hash = "2.0.0" rustc-hash = "2.0.0"
uell = "0.1.0"
[dev-dependencies] [dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false } mimalloc = { version = "0.1.43", default-features = false }

View File

@ -27,32 +27,15 @@ impl heed::BytesEncode<'_> for OrderedF64Codec {
fn bytes_encode(f: &Self::EItem) -> Result<Cow<'_, [u8]>, BoxedError> { fn bytes_encode(f: &Self::EItem) -> Result<Cow<'_, [u8]>, BoxedError> {
let mut buffer = [0u8; 16]; let mut buffer = [0u8; 16];
encode_f64_into_ordered_bytes(*f, &mut buffer)?; // write the globally ordered float
let bytes = f64_into_bytes(*f).ok_or(InvalidGloballyOrderedFloatError { float: *f })?;
Ok(Cow::Owned(buffer.to_vec()))
}
}
impl OrderedF64Codec {
pub fn serialize_into(
f: f64,
buffer: &mut [u8; 16],
) -> Result<(), InvalidGloballyOrderedFloatError> {
encode_f64_into_ordered_bytes(f, buffer)
}
}
fn encode_f64_into_ordered_bytes(
f: f64,
buffer: &mut [u8; 16],
) -> Result<(), InvalidGloballyOrderedFloatError> {
let bytes = f64_into_bytes(f).ok_or(InvalidGloballyOrderedFloatError { float: f })?;
buffer[..8].copy_from_slice(&bytes[..]); buffer[..8].copy_from_slice(&bytes[..]);
// Then the f64 value just to be able to read it back // Then the f64 value just to be able to read it back
let bytes = f.to_be_bytes(); let bytes = f.to_be_bytes();
buffer[8..16].copy_from_slice(&bytes[..]); buffer[8..16].copy_from_slice(&bytes[..]);
Ok(()) Ok(Cow::Owned(buffer.to_vec()))
}
} }
#[derive(Error, Debug)] #[derive(Error, Debug)]

View File

@ -737,7 +737,7 @@ pub(crate) fn write_typed_chunk_into_index(
} }
/// Converts the latitude and longitude back to an xyz GeoPoint. /// Converts the latitude and longitude back to an xyz GeoPoint.
pub fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint { fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint {
let (lat, tail) = helpers::try_split_array_at::<u8, 8>(value).unwrap(); let (lat, tail) = helpers::try_split_array_at::<u8, 8>(value).unwrap();
let (lng, _) = helpers::try_split_array_at::<u8, 8>(tail).unwrap(); let (lng, _) = helpers::try_split_array_at::<u8, 8>(tail).unwrap();
let point = [f64::from_ne_bytes(lat), f64::from_ne_bytes(lng)]; let point = [f64::from_ne_bytes(lat), f64::from_ne_bytes(lng)];

View File

@ -3,14 +3,9 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
use heed::types::Bytes; use heed::types::Bytes;
use heed::BytesDecode;
use memmap2::Mmap;
use roaring::RoaringBitmap;
use super::extract::FacetKind; use super::extract::FacetKind;
use super::StdResult; use super::StdResult;
use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec};
use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY};
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::update::new::KvReaderFieldId; use crate::update::new::KvReaderFieldId;
use crate::vector::Embedding; use crate::vector::Embedding;
@ -30,9 +25,9 @@ pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver)
) )
} }
pub enum KeyValueEntry { pub struct KeyValueEntry {
Small { key_length: usize, data: Box<[u8]> }, pub key_length: usize,
Large { key_entry: KeyEntry, data: Mmap }, pub data: Box<[u8]>,
} }
impl KeyValueEntry { impl KeyValueEntry {
@ -40,25 +35,14 @@ impl KeyValueEntry {
let mut data = Vec::with_capacity(key.len() + value.len()); let mut data = Vec::with_capacity(key.len() + value.len());
data.extend_from_slice(key); data.extend_from_slice(key);
data.extend_from_slice(value); data.extend_from_slice(value);
KeyValueEntry::Small { key_length: key.len(), data: data.into_boxed_slice() } KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() }
} }
fn from_large_key_value(key: &[u8], value: Mmap) -> Self {
KeyValueEntry::Large { key_entry: KeyEntry::from_key(key), data: value }
}
pub fn key(&self) -> &[u8] { pub fn key(&self) -> &[u8] {
match self { &self.data[..self.key_length]
KeyValueEntry::Small { key_length, data } => &data[..*key_length],
KeyValueEntry::Large { key_entry, data: _ } => key_entry.entry(),
}
} }
pub fn value(&self) -> &[u8] { pub fn value(&self) -> &[u8] {
match self { &self.data[self.key_length..]
KeyValueEntry::Small { key_length, data } => &data[*key_length..],
KeyValueEntry::Large { key_entry: _, data } => &data[..],
}
} }
} }
@ -113,7 +97,6 @@ pub struct DbOperation {
#[derive(Debug)] #[derive(Debug)]
pub enum Database { pub enum Database {
Main,
Documents, Documents,
ExternalDocumentsIds, ExternalDocumentsIds,
ExactWordDocids, ExactWordDocids,
@ -127,14 +110,11 @@ pub enum Database {
FacetIdExistsDocids, FacetIdExistsDocids,
FacetIdF64NumberDocids, FacetIdF64NumberDocids,
FacetIdStringDocids, FacetIdStringDocids,
FieldIdDocidFacetStrings,
FieldIdDocidFacetF64s,
} }
impl Database { impl Database {
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> { pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
match self { match self {
Database::Main => index.main.remap_types(),
Database::Documents => index.documents.remap_types(), Database::Documents => index.documents.remap_types(),
Database::ExternalDocumentsIds => index.external_documents_ids.remap_types(), Database::ExternalDocumentsIds => index.external_documents_ids.remap_types(),
Database::ExactWordDocids => index.exact_word_docids.remap_types(), Database::ExactWordDocids => index.exact_word_docids.remap_types(),
@ -148,8 +128,6 @@ impl Database {
Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(), Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(),
Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(), Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(),
Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(), Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(),
Database::FieldIdDocidFacetStrings => index.field_id_docid_facet_strings.remap_types(),
Database::FieldIdDocidFacetF64s => index.field_id_docid_facet_f64s.remap_types(),
} }
} }
} }
@ -221,10 +199,6 @@ impl ExtractorSender {
FacetDocidsSender { sender: self } FacetDocidsSender { sender: self }
} }
pub fn field_id_docid_facet_sender(&self) -> FieldIdDocidFacetSender<'_> {
FieldIdDocidFacetSender(self)
}
pub fn documents(&self) -> DocumentsSender<'_> { pub fn documents(&self) -> DocumentsSender<'_> {
DocumentsSender(self) DocumentsSender(self)
} }
@ -233,10 +207,6 @@ impl ExtractorSender {
EmbeddingSender(&self.sender) EmbeddingSender(&self.sender)
} }
pub fn geo(&self) -> GeoSender<'_> {
GeoSender(&self.sender)
}
fn send_delete_vector(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { fn send_delete_vector(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
match self match self
.sender .sender
@ -361,36 +331,6 @@ impl DocidsSender for FacetDocidsSender<'_> {
} }
} }
pub struct FieldIdDocidFacetSender<'a>(&'a ExtractorSender);
impl FieldIdDocidFacetSender<'_> {
pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok());
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &value));
self.0
.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry })
}
pub fn write_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok());
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &[]));
self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry })
}
pub fn delete_facet_string(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok());
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
self.0
.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry })
}
pub fn delete_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok());
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry })
}
}
pub struct DocumentsSender<'a>(&'a ExtractorSender); pub struct DocumentsSender<'a>(&'a ExtractorSender);
impl DocumentsSender<'_> { impl DocumentsSender<'_> {
@ -483,34 +423,3 @@ impl EmbeddingSender<'_> {
.map_err(|_| SendError(())) .map_err(|_| SendError(()))
} }
} }
pub struct GeoSender<'a>(&'a Sender<WriterOperation>);
impl GeoSender<'_> {
pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> {
self.0
.send(WriterOperation::DbOperation(DbOperation {
database: Database::Main,
entry: EntryOperation::Write(KeyValueEntry::from_large_key_value(
GEO_RTREE_KEY.as_bytes(),
value,
)),
}))
.map_err(|_| SendError(()))
}
pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> StdResult<(), SendError<()>> {
let mut buffer = Vec::new();
bitmap.serialize_into(&mut buffer).unwrap();
self.0
.send(WriterOperation::DbOperation(DbOperation {
database: Database::Main,
entry: EntryOperation::Write(KeyValueEntry::from_small_key_value(
GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(),
&buffer,
)),
}))
.map_err(|_| SendError(()))
}
}

View File

@ -352,13 +352,6 @@ where
unordered_field_buffer.push((vectors_fid, &vectors_value)); unordered_field_buffer.push((vectors_fid, &vectors_value));
} }
if let Some(geo_value) = document.geo_field()? {
let fid = fields_ids_map.id_or_insert("_geo").ok_or(UserError::AttributeLimitReached)?;
fields_ids_map.id_or_insert("_geo.lat").ok_or(UserError::AttributeLimitReached)?;
fields_ids_map.id_or_insert("_geo.lng").ok_or(UserError::AttributeLimitReached)?;
unordered_field_buffer.push((fid, geo_value));
}
unordered_field_buffer.sort_by_key(|(fid, _)| *fid); unordered_field_buffer.sort_by_key(|(fid, _)| *fid);
for (fid, value) in unordered_field_buffer.iter() { for (fid, value) in unordered_field_buffer.iter() {
writer.insert(*fid, value.get().as_bytes()).unwrap(); writer.insert(*fid, value.get().as_bytes()).unwrap();
@ -413,7 +406,6 @@ impl<'doc> Versions<'doc> {
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.data.is_empty() self.data.is_empty()
} }
pub fn top_level_field(&self, k: &str) -> Option<&'doc RawValue> { pub fn top_level_field(&self, k: &str) -> Option<&'doc RawValue> {
if k == RESERVED_VECTORS_FIELD_NAME || k == "_geo" { if k == RESERVED_VECTORS_FIELD_NAME || k == "_geo" {
return None; return None;

View File

@ -54,12 +54,11 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
DocumentChange::Deletion(deletion) => { DocumentChange::Deletion(deletion) => {
let docid = deletion.docid(); let docid = deletion.docid();
let content = deletion.current( let content = deletion.current(
&context.rtxn, &context.txn,
context.index, context.index,
&context.db_fields_ids_map, &context.db_fields_ids_map,
)?; )?;
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); for res in content.iter_top_level_fields() {
for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data
.field_distribution_delta .field_distribution_delta
@ -73,9 +72,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
DocumentChange::Update(update) => { DocumentChange::Update(update) => {
let docid = update.docid(); let docid = update.docid();
let content = let content =
update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?; update.current(&context.txn, context.index, &context.db_fields_ids_map)?;
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); for res in content.iter_top_level_fields() {
for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data
.field_distribution_delta .field_distribution_delta
@ -84,8 +82,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
*entry -= 1; *entry -= 1;
} }
let content = update.updated(); let content = update.updated();
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); for res in content.iter_top_level_fields() {
for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data
.field_distribution_delta .field_distribution_delta
@ -95,9 +92,9 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
} }
let content = let content =
update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?; update.merged(&context.txn, context.index, &context.db_fields_ids_map)?;
let vector_content = update.merged_vectors( let vector_content = update.merged_vectors(
&context.rtxn, &context.txn,
context.index, context.index,
&context.db_fields_ids_map, &context.db_fields_ids_map,
&context.doc_alloc, &context.doc_alloc,
@ -114,8 +111,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
DocumentChange::Insertion(insertion) => { DocumentChange::Insertion(insertion) => {
let docid = insertion.docid(); let docid = insertion.docid();
let content = insertion.inserted(); let content = insertion.inserted();
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); for res in content.iter_top_level_fields() {
for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data
.field_distribution_delta .field_distribution_delta

View File

@ -2,18 +2,15 @@ use std::cell::RefCell;
use std::collections::HashSet; use std::collections::HashSet;
use std::ops::DerefMut as _; use std::ops::DerefMut as _;
use bumpalo::collections::Vec as BVec;
use bumpalo::Bump; use bumpalo::Bump;
use hashbrown::HashMap;
use heed::RoTxn; use heed::RoTxn;
use serde_json::Value; use serde_json::Value;
use super::super::cache::BalancedCaches; use super::super::cache::BalancedCaches;
use super::facet_document::extract_document_facets; use super::facet_document::extract_document_facets;
use super::FacetKind; use super::FacetKind;
use crate::heed_codec::facet::OrderedF64Codec; use crate::facet::value_encoding::f64_into_bytes;
use crate::update::del_add::DelAdd; use crate::update::new::extract::DocidsExtractor;
use crate::update::new::channel::FieldIdDocidFacetSender;
use crate::update::new::indexer::document_changes::{ use crate::update::new::indexer::document_changes::{
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext, extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
Progress, ThreadLocal, Progress, ThreadLocal,
@ -25,7 +22,6 @@ use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
pub struct FacetedExtractorData<'a> { pub struct FacetedExtractorData<'a> {
attributes_to_extract: &'a [&'a str], attributes_to_extract: &'a [&'a str],
sender: &'a FieldIdDocidFacetSender<'a>,
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
buckets: usize, buckets: usize,
} }
@ -52,7 +48,6 @@ impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> {
context, context,
self.attributes_to_extract, self.attributes_to_extract,
change, change,
self.sender,
)? )?
} }
Ok(()) Ok(())
@ -66,28 +61,22 @@ impl FacetedDocidsExtractor {
context: &DocumentChangeContext<RefCell<BalancedCaches>>, context: &DocumentChangeContext<RefCell<BalancedCaches>>,
attributes_to_extract: &[&str], attributes_to_extract: &[&str],
document_change: DocumentChange, document_change: DocumentChange,
sender: &FieldIdDocidFacetSender,
) -> Result<()> { ) -> Result<()> {
let index = &context.index; let index = &context.index;
let rtxn = &context.rtxn; let rtxn = &context.txn;
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let mut cached_sorter = context.data.borrow_mut_or_yield(); let mut cached_sorter = context.data.borrow_mut_or_yield();
let mut del_add_facet_value = DelAddFacetValue::new(&context.doc_alloc); match document_change {
let docid = document_change.docid();
let res = match document_change {
DocumentChange::Deletion(inner) => extract_document_facets( DocumentChange::Deletion(inner) => extract_document_facets(
attributes_to_extract, attributes_to_extract,
inner.current(rtxn, index, context.db_fields_ids_map)?, inner.current(rtxn, index, context.db_fields_ids_map)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(), new_fields_ids_map.deref_mut(),
&mut |fid, value| { &mut |fid, value| {
Self::facet_fn_with_options( Self::facet_fn_with_options(
&context.doc_alloc, &context.doc_alloc,
cached_sorter.deref_mut(), cached_sorter.deref_mut(),
BalancedCaches::insert_del_u32, BalancedCaches::insert_del_u32,
&mut del_add_facet_value, inner.docid(),
DelAddFacetValue::insert_del,
docid,
fid, fid,
value, value,
) )
@ -97,16 +86,13 @@ impl FacetedDocidsExtractor {
extract_document_facets( extract_document_facets(
attributes_to_extract, attributes_to_extract,
inner.current(rtxn, index, context.db_fields_ids_map)?, inner.current(rtxn, index, context.db_fields_ids_map)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(), new_fields_ids_map.deref_mut(),
&mut |fid, value| { &mut |fid, value| {
Self::facet_fn_with_options( Self::facet_fn_with_options(
&context.doc_alloc, &context.doc_alloc,
cached_sorter.deref_mut(), cached_sorter.deref_mut(),
BalancedCaches::insert_del_u32, BalancedCaches::insert_del_u32,
&mut del_add_facet_value, inner.docid(),
DelAddFacetValue::insert_del,
docid,
fid, fid,
value, value,
) )
@ -116,16 +102,13 @@ impl FacetedDocidsExtractor {
extract_document_facets( extract_document_facets(
attributes_to_extract, attributes_to_extract,
inner.merged(rtxn, index, context.db_fields_ids_map)?, inner.merged(rtxn, index, context.db_fields_ids_map)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(), new_fields_ids_map.deref_mut(),
&mut |fid, value| { &mut |fid, value| {
Self::facet_fn_with_options( Self::facet_fn_with_options(
&context.doc_alloc, &context.doc_alloc,
cached_sorter.deref_mut(), cached_sorter.deref_mut(),
BalancedCaches::insert_add_u32, BalancedCaches::insert_add_u32,
&mut del_add_facet_value, inner.docid(),
DelAddFacetValue::insert_add,
docid,
fid, fid,
value, value,
) )
@ -135,38 +118,30 @@ impl FacetedDocidsExtractor {
DocumentChange::Insertion(inner) => extract_document_facets( DocumentChange::Insertion(inner) => extract_document_facets(
attributes_to_extract, attributes_to_extract,
inner.inserted(), inner.inserted(),
inner.external_document_id(),
new_fields_ids_map.deref_mut(), new_fields_ids_map.deref_mut(),
&mut |fid, value| { &mut |fid, value| {
Self::facet_fn_with_options( Self::facet_fn_with_options(
&context.doc_alloc, &context.doc_alloc,
cached_sorter.deref_mut(), cached_sorter.deref_mut(),
BalancedCaches::insert_add_u32, BalancedCaches::insert_add_u32,
&mut del_add_facet_value, inner.docid(),
DelAddFacetValue::insert_add,
docid,
fid, fid,
value, value,
) )
}, },
), ),
}; }
del_add_facet_value.send_data(docid, sender, &context.doc_alloc).unwrap();
res
} }
fn facet_fn_with_options<'extractor, 'doc>( fn facet_fn_with_options<'extractor>(
doc_alloc: &'doc Bump, doc_alloc: &Bump,
cached_sorter: &mut BalancedCaches<'extractor>, cached_sorter: &mut BalancedCaches<'extractor>,
cache_fn: impl Fn(&mut BalancedCaches<'extractor>, &[u8], u32) -> Result<()>, cache_fn: impl Fn(&mut BalancedCaches<'extractor>, &[u8], u32) -> Result<()>,
del_add_facet_value: &mut DelAddFacetValue<'doc>,
facet_fn: impl Fn(&mut DelAddFacetValue<'doc>, FieldId, BVec<'doc, u8>, FacetKind),
docid: DocumentId, docid: DocumentId,
fid: FieldId, fid: FieldId,
value: &Value, value: &Value,
) -> Result<()> { ) -> Result<()> {
let mut buffer = BVec::new_in(doc_alloc); let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
// Exists // Exists
// key: fid // key: fid
buffer.push(FacetKind::Exists as u8); buffer.push(FacetKind::Exists as u8);
@ -177,21 +152,15 @@ impl FacetedDocidsExtractor {
// Number // Number
// key: fid - level - orderedf64 - orignalf64 // key: fid - level - orderedf64 - orignalf64
Value::Number(number) => { Value::Number(number) => {
let mut ordered = [0u8; 16]; if let Some((n, ordered)) =
if number number.as_f64().and_then(|n| f64_into_bytes(n).map(|ordered| (n, ordered)))
.as_f64()
.and_then(|n| OrderedF64Codec::serialize_into(n, &mut ordered).ok())
.is_some()
{ {
let mut number = BVec::with_capacity_in(16, doc_alloc);
number.extend_from_slice(&ordered);
facet_fn(del_add_facet_value, fid, number, FacetKind::Number);
buffer.clear(); buffer.clear();
buffer.push(FacetKind::Number as u8); buffer.push(FacetKind::Number as u8);
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(0); // level 0 buffer.push(0); // level 0
buffer.extend_from_slice(&ordered); buffer.extend_from_slice(&ordered);
buffer.extend_from_slice(&n.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid) cache_fn(cached_sorter, &buffer, docid)
} else { } else {
Ok(()) Ok(())
@ -200,10 +169,6 @@ impl FacetedDocidsExtractor {
// String // String
// key: fid - level - truncated_string // key: fid - level - truncated_string
Value::String(s) => { Value::String(s) => {
let mut string = BVec::new_in(doc_alloc);
string.extend_from_slice(s.as_bytes());
facet_fn(del_add_facet_value, fid, string, FacetKind::String);
let normalized = crate::normalize_facet(s); let normalized = crate::normalize_facet(s);
let truncated = truncate_str(&normalized); let truncated = truncate_str(&normalized);
buffer.clear(); buffer.clear();
@ -246,83 +211,6 @@ impl FacetedDocidsExtractor {
} }
} }
struct DelAddFacetValue<'doc> {
strings: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
f64s: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
}
impl<'doc> DelAddFacetValue<'doc> {
fn new(doc_alloc: &'doc Bump) -> Self {
Self { strings: HashMap::new_in(doc_alloc), f64s: HashMap::new_in(doc_alloc) }
}
fn insert_add(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
let cache = match kind {
FacetKind::String => &mut self.strings,
FacetKind::Number => &mut self.f64s,
_ => return,
};
let key = (fid, value);
if let Some(DelAdd::Deletion) = cache.get(&key) {
cache.remove(&key);
} else {
cache.insert(key, DelAdd::Addition);
}
}
fn insert_del(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
let cache = match kind {
FacetKind::String => &mut self.strings,
FacetKind::Number => &mut self.f64s,
_ => return,
};
let key = (fid, value);
if let Some(DelAdd::Addition) = cache.get(&key) {
cache.remove(&key);
} else {
cache.insert(key, DelAdd::Deletion);
}
}
fn send_data(
self,
docid: DocumentId,
sender: &FieldIdDocidFacetSender,
doc_alloc: &Bump,
) -> std::result::Result<(), crossbeam_channel::SendError<()>> {
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
for ((fid, value), deladd) in self.strings {
if let Ok(s) = std::str::from_utf8(&value) {
buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.extend_from_slice(&docid.to_be_bytes());
let normalized = crate::normalize_facet(s);
let truncated = truncate_str(&normalized);
buffer.extend_from_slice(truncated.as_bytes());
match deladd {
DelAdd::Deletion => sender.delete_facet_string(&buffer)?,
DelAdd::Addition => sender.write_facet_string(&buffer, &value)?,
}
}
}
for ((fid, value), deladd) in self.f64s {
buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.extend_from_slice(&docid.to_be_bytes());
buffer.extend_from_slice(&value);
match deladd {
DelAdd::Deletion => sender.delete_facet_f64(&buffer)?,
DelAdd::Addition => sender.write_facet_f64(&buffer)?,
}
}
Ok(())
}
}
/// Truncates a string to the biggest valid LMDB key size. /// Truncates a string to the biggest valid LMDB key size.
fn truncate_str(s: &str) -> &str { fn truncate_str(s: &str) -> &str {
let index = s let index = s
@ -335,23 +223,13 @@ fn truncate_str(s: &str) -> &str {
&s[..index.unwrap_or(0)] &s[..index.unwrap_or(0)]
} }
impl FacetedDocidsExtractor { impl DocidsExtractor for FacetedDocidsExtractor {
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
pub fn run_extraction< fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP, SP>(
'pl,
'fid,
'indexer,
'index,
'extractor,
DC: DocumentChanges<'pl>,
MSP,
SP,
>(
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
sender: &FieldIdDocidFacetSender,
finished_steps: u16, finished_steps: u16,
total_steps: u16, total_steps: u16,
step_name: &'static str, step_name: &'static str,
@ -376,7 +254,6 @@ impl FacetedDocidsExtractor {
attributes_to_extract: &attributes_to_extract, attributes_to_extract: &attributes_to_extract,
grenad_parameters, grenad_parameters,
buckets: rayon::current_num_threads(), buckets: rayon::current_num_threads(),
sender,
}; };
extract( extract(
document_changes, document_changes,

View File

@ -1,14 +1,12 @@
use serde_json::Value; use serde_json::Value;
use crate::update::new::document::Document; use crate::update::new::document::Document;
use crate::update::new::extract::geo::extract_geo_coordinates;
use crate::update::new::extract::perm_json_p; use crate::update::new::extract::perm_json_p;
use crate::{FieldId, GlobalFieldsIdsMap, InternalError, Result, UserError}; use crate::{FieldId, GlobalFieldsIdsMap, InternalError, Result, UserError};
pub fn extract_document_facets<'doc>( pub fn extract_document_facets<'doc>(
attributes_to_extract: &[&str], attributes_to_extract: &[&str],
document: impl Document<'doc>, document: impl Document<'doc>,
external_document_id: &str,
field_id_map: &mut GlobalFieldsIdsMap, field_id_map: &mut GlobalFieldsIdsMap,
facet_fn: &mut impl FnMut(FieldId, &Value) -> Result<()>, facet_fn: &mut impl FnMut(FieldId, &Value) -> Result<()>,
) -> Result<()> { ) -> Result<()> {
@ -43,19 +41,5 @@ pub fn extract_document_facets<'doc>(
} }
} }
if attributes_to_extract.contains(&"_geo") {
if let Some(geo_value) = document.geo_field()? {
if let Some([lat, lng]) = extract_geo_coordinates(external_document_id, geo_value)? {
let (lat_fid, lng_fid) = field_id_map
.id_or_insert("_geo.lat")
.zip(field_id_map.id_or_insert("_geo.lng"))
.ok_or(UserError::AttributeLimitReached)?;
facet_fn(lat_fid, &lat.into())?;
facet_fn(lng_fid, &lng.into())?;
}
}
}
Ok(()) Ok(())
} }

View File

@ -1,323 +0,0 @@
use std::cell::RefCell;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Write as _};
use std::{iter, mem, result};
use bumpalo::Bump;
use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable};
use heed::RoTxn;
use serde_json::value::RawValue;
use serde_json::Value;
use crate::error::GeoError;
use crate::update::new::document::Document;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::DocumentChange;
use crate::update::GrenadParameters;
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result};
pub struct GeoExtractor {
grenad_parameters: GrenadParameters,
}
impl GeoExtractor {
pub fn new(
rtxn: &RoTxn,
index: &Index,
grenad_parameters: GrenadParameters,
) -> Result<Option<Self>> {
let is_sortable = index.sortable_fields(rtxn)?.contains("_geo");
let is_filterable = index.filterable_fields(rtxn)?.contains("_geo");
if is_sortable || is_filterable {
Ok(Some(GeoExtractor { grenad_parameters }))
} else {
Ok(None)
}
}
}
#[derive(Pod, Zeroable, Copy, Clone)]
#[repr(C, packed)]
pub struct ExtractedGeoPoint {
pub docid: DocumentId,
pub lat_lng: [f64; 2],
}
impl From<ExtractedGeoPoint> for GeoPoint {
/// Converts the latitude and longitude back to an xyz GeoPoint.
fn from(value: ExtractedGeoPoint) -> Self {
let [lat, lng] = value.lat_lng;
let point = [lat, lng];
let xyz_point = lat_lng_to_xyz(&point);
GeoPoint::new(xyz_point, (value.docid, point))
}
}
pub struct GeoExtractorData<'extractor> {
/// The set of documents ids that were removed. If a document sees its geo
/// point being updated, we first put it in the deleted and then in the inserted.
removed: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>,
inserted: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>,
/// TODO Do the doc
spilled_removed: Option<BufWriter<File>>,
/// TODO Do the doc
spilled_inserted: Option<BufWriter<File>>,
}
impl<'extractor> GeoExtractorData<'extractor> {
pub fn freeze(self) -> Result<FrozenGeoExtractorData<'extractor>> {
let GeoExtractorData { removed, inserted, spilled_removed, spilled_inserted } = self;
Ok(FrozenGeoExtractorData {
removed: removed.into_bump_slice(),
inserted: inserted.into_bump_slice(),
spilled_removed: spilled_removed
.map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
.transpose()?,
spilled_inserted: spilled_inserted
.map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
.transpose()?,
})
}
}
unsafe impl MostlySend for GeoExtractorData<'_> {}
pub struct FrozenGeoExtractorData<'extractor> {
pub removed: &'extractor [ExtractedGeoPoint],
pub inserted: &'extractor [ExtractedGeoPoint],
pub spilled_removed: Option<BufReader<File>>,
pub spilled_inserted: Option<BufReader<File>>,
}
impl<'extractor> FrozenGeoExtractorData<'extractor> {
pub fn iter_and_clear_removed(
&mut self,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
mem::take(&mut self.removed)
.iter()
.copied()
.map(Ok)
.chain(iterator_over_spilled_geopoints(&mut self.spilled_removed))
}
pub fn iter_and_clear_inserted(
&mut self,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
mem::take(&mut self.inserted)
.iter()
.copied()
.map(Ok)
.chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted))
}
}
fn iterator_over_spilled_geopoints(
spilled: &mut Option<BufReader<File>>,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
let mut spilled = spilled.take();
iter::from_fn(move || match &mut spilled {
Some(file) => {
let geopoint_bytes = &mut [0u8; mem::size_of::<ExtractedGeoPoint>()];
match file.read_exact(geopoint_bytes) {
Ok(()) => Some(Ok(pod_read_unaligned(geopoint_bytes))),
Err(e) if e.kind() == ErrorKind::UnexpectedEof => None,
Err(e) => Some(Err(e)),
}
}
None => None,
})
}
impl<'extractor> Extractor<'extractor> for GeoExtractor {
type Data = RefCell<GeoExtractorData<'extractor>>;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(GeoExtractorData {
removed: bumpalo::collections::Vec::new_in(extractor_alloc),
// inserted: Uell::new_in(extractor_alloc),
inserted: bumpalo::collections::Vec::new_in(extractor_alloc),
spilled_inserted: None,
spilled_removed: None,
}))
}
fn process<'doc>(
&'doc self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &'doc DocumentChangeContext<Self::Data>,
) -> Result<()> {
let rtxn = &context.rtxn;
let index = context.index;
let max_memory = self.grenad_parameters.max_memory;
let db_fields_ids_map = context.db_fields_ids_map;
let mut data_ref = context.data.borrow_mut_or_yield();
for change in changes {
if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) {
// We must spill as we allocated too much memory
data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?;
data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?;
}
match change? {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
let external_id = deletion.external_document_id();
let current = deletion.current(rtxn, index, db_fields_ids_map)?;
let current_geo = current
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
if let Some(lat_lng) = current_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_removed {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.removed.push(geopoint),
}
}
}
DocumentChange::Update(update) => {
let current = update.current(rtxn, index, db_fields_ids_map)?;
let external_id = update.external_document_id();
let docid = update.docid();
let current_geo = current
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
let updated_geo = update
.updated()
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
if current_geo != updated_geo {
// If the current and new geo points are different it means that
// we need to replace the current by the new point and therefore
// delete the current point from the RTree.
if let Some(lat_lng) = current_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_removed {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.removed.push(geopoint),
}
}
if let Some(lat_lng) = updated_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_inserted {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.inserted.push(geopoint),
}
}
}
}
DocumentChange::Insertion(insertion) => {
let external_id = insertion.external_document_id();
let docid = insertion.docid();
let inserted_geo = insertion
.inserted()
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
if let Some(lat_lng) = inserted_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_inserted {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.inserted.push(geopoint),
}
}
}
}
}
Ok(())
}
}
/// Extracts and validate the latitude and latitude from a document geo field.
///
/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`.
pub fn extract_geo_coordinates(
external_id: &str,
raw_value: &RawValue,
) -> Result<Option<[f64; 2]>> {
let mut geo = match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)? {
Value::Null => return Ok(None),
Value::Object(map) => map,
value => {
return Err(
GeoError::NotAnObject { document_id: Value::from(external_id), value }.into()
)
}
};
let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) {
(Some(lat), Some(lng)) => {
if geo.is_empty() {
[lat, lng]
} else {
return Err(GeoError::UnexpectedExtraFields {
document_id: Value::from(external_id),
value: Value::from(geo),
}
.into());
}
}
(Some(_), None) => {
return Err(GeoError::MissingLongitude { document_id: Value::from(external_id) }.into())
}
(None, Some(_)) => {
return Err(GeoError::MissingLatitude { document_id: Value::from(external_id) }.into())
}
(None, None) => {
return Err(GeoError::MissingLatitudeAndLongitude {
document_id: Value::from(external_id),
}
.into())
}
};
match (extract_finite_float_from_value(lat), extract_finite_float_from_value(lng)) {
(Ok(lat), Ok(lng)) => Ok(Some([lat, lng])),
(Ok(_), Err(value)) => {
Err(GeoError::BadLongitude { document_id: Value::from(external_id), value }.into())
}
(Err(value), Ok(_)) => {
Err(GeoError::BadLatitude { document_id: Value::from(external_id), value }.into())
}
(Err(lat), Err(lng)) => Err(GeoError::BadLatitudeAndLongitude {
document_id: Value::from(external_id),
lat,
lng,
}
.into()),
}
}
/// Extracts and validate that a serde JSON Value is actually a finite f64.
pub fn extract_finite_float_from_value(value: Value) -> result::Result<f64, Value> {
let number = match value {
Value::Number(ref n) => match n.as_f64() {
Some(number) => number,
None => return Err(value),
},
Value::String(ref s) => match s.parse::<f64>() {
Ok(number) => number,
Err(_) => return Err(value),
},
value => return Err(value),
};
if number.is_finite() {
Ok(number)
} else {
Err(value)
}
}

View File

@ -1,7 +1,6 @@
mod cache; mod cache;
mod documents; mod documents;
mod faceted; mod faceted;
mod geo;
mod searchable; mod searchable;
mod vectors; mod vectors;
@ -9,7 +8,6 @@ use bumpalo::Bump;
pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap}; pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap};
pub use documents::*; pub use documents::*;
pub use faceted::*; pub use faceted::*;
pub use geo::*;
pub use searchable::*; pub use searchable::*;
pub use vectors::EmbeddingExtractor; pub use vectors::EmbeddingExtractor;

View File

@ -326,7 +326,7 @@ impl WordDocidsExtractors {
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {
let index = &context.index; let index = &context.index;
let rtxn = &context.rtxn; let rtxn = &context.txn;
let mut cached_sorter_ref = context.data.borrow_mut_or_yield(); let mut cached_sorter_ref = context.data.borrow_mut_or_yield();
let cached_sorter = cached_sorter_ref.as_mut().unwrap(); let cached_sorter = cached_sorter_ref.as_mut().unwrap();
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
@ -419,6 +419,6 @@ impl WordDocidsExtractors {
} }
fn attributes_to_skip<'a>(_rtxn: &'a RoTxn, _index: &'a Index) -> Result<Vec<&'a str>> { fn attributes_to_skip<'a>(_rtxn: &'a RoTxn, _index: &'a Index) -> Result<Vec<&'a str>> {
Ok(vec!["_geo"]) Ok(vec![])
} }
} }

View File

@ -25,7 +25,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
} }
fn attributes_to_skip<'a>(_rtxn: &'a RoTxn, _index: &'a Index) -> Result<Vec<&'a str>> { fn attributes_to_skip<'a>(_rtxn: &'a RoTxn, _index: &'a Index) -> Result<Vec<&'a str>> {
Ok(vec!["_geo"]) Ok(vec![])
} }
// This method is reimplemented to count the number of words in the document in each field // This method is reimplemented to count the number of words in the document in each field
@ -39,7 +39,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
let doc_alloc = &context.doc_alloc; let doc_alloc = &context.doc_alloc;
let index = context.index; let index = context.index;
let rtxn = &context.rtxn; let rtxn = &context.txn;
let mut key_buffer = bumpalo::collections::Vec::new_in(doc_alloc); let mut key_buffer = bumpalo::collections::Vec::new_in(doc_alloc);
let mut del_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc); let mut del_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc);

View File

@ -2,13 +2,13 @@ use std::cell::RefCell;
use bumpalo::collections::Vec as BVec; use bumpalo::collections::Vec as BVec;
use bumpalo::Bump; use bumpalo::Bump;
use hashbrown::{DefaultHashBuilder, HashMap}; use hashbrown::HashMap;
use super::cache::DelAddRoaringBitmap; use super::cache::DelAddRoaringBitmap;
use crate::error::FaultSource; use crate::error::FaultSource;
use crate::prompt::Prompt; use crate::prompt::Prompt;
use crate::update::new::channel::EmbeddingSender; use crate::update::new::channel::EmbeddingSender;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend}; use crate::update::new::indexer::document_changes::{Extractor, MostlySend};
use crate::update::new::vector_document::VectorDocument; use crate::update::new::vector_document::VectorDocument;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::vector::error::{ use crate::vector::error::{
@ -37,7 +37,7 @@ impl<'a> EmbeddingExtractor<'a> {
} }
pub struct EmbeddingExtractorData<'extractor>( pub struct EmbeddingExtractorData<'extractor>(
pub HashMap<String, DelAddRoaringBitmap, DefaultHashBuilder, &'extractor Bump>, pub HashMap<String, DelAddRoaringBitmap, hashbrown::DefaultHashBuilder, &'extractor Bump>,
); );
unsafe impl MostlySend for EmbeddingExtractorData<'_> {} unsafe impl MostlySend for EmbeddingExtractorData<'_> {}
@ -52,7 +52,9 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
fn process<'doc>( fn process<'doc>(
&'doc self, &'doc self,
changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>, changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
context: &'doc DocumentChangeContext<Self::Data>, context: &'doc crate::update::new::indexer::document_changes::DocumentChangeContext<
Self::Data,
>,
) -> crate::Result<()> { ) -> crate::Result<()> {
let embedders = self.embedders.inner_as_ref(); let embedders = self.embedders.inner_as_ref();
let mut unused_vectors_distribution = let mut unused_vectors_distribution =
@ -61,7 +63,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc); let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc);
for (embedder_name, (embedder, prompt, _is_quantized)) in embedders { for (embedder_name, (embedder, prompt, _is_quantized)) in embedders {
let embedder_id = let embedder_id =
context.index.embedder_category_id.get(&context.rtxn, embedder_name)?.ok_or_else( context.index.embedder_category_id.get(&context.txn, embedder_name)?.ok_or_else(
|| InternalError::DatabaseMissingEntry { || InternalError::DatabaseMissingEntry {
db_name: "embedder_category_id", db_name: "embedder_category_id",
key: None, key: None,
@ -93,7 +95,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
} }
DocumentChange::Update(update) => { DocumentChange::Update(update) => {
let old_vectors = update.current_vectors( let old_vectors = update.current_vectors(
&context.rtxn, &context.txn,
context.index, context.index,
context.db_fields_ids_map, context.db_fields_ids_map,
&context.doc_alloc, &context.doc_alloc,
@ -130,7 +132,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
} else if new_vectors.regenerate { } else if new_vectors.regenerate {
let new_rendered = prompt.render_document( let new_rendered = prompt.render_document(
update.current( update.current(
&context.rtxn, &context.txn,
context.index, context.index,
context.db_fields_ids_map, context.db_fields_ids_map,
)?, )?,
@ -139,7 +141,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
)?; )?;
let old_rendered = prompt.render_document( let old_rendered = prompt.render_document(
update.merged( update.merged(
&context.rtxn, &context.txn,
context.index, context.index,
context.db_fields_ids_map, context.db_fields_ids_map,
)?, )?,
@ -158,7 +160,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
} else if old_vectors.regenerate { } else if old_vectors.regenerate {
let old_rendered = prompt.render_document( let old_rendered = prompt.render_document(
update.current( update.current(
&context.rtxn, &context.txn,
context.index, context.index,
context.db_fields_ids_map, context.db_fields_ids_map,
)?, )?,
@ -167,7 +169,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
)?; )?;
let new_rendered = prompt.render_document( let new_rendered = prompt.render_document(
update.merged( update.merged(
&context.rtxn, &context.txn,
context.index, context.index,
context.db_fields_ids_map, context.db_fields_ids_map,
)?, )?,

View File

@ -197,7 +197,7 @@ pub struct DocumentChangeContext<
/// inside of the DB. /// inside of the DB.
pub db_fields_ids_map: &'indexer FieldsIdsMap, pub db_fields_ids_map: &'indexer FieldsIdsMap,
/// A transaction providing data from the DB before all indexing operations /// A transaction providing data from the DB before all indexing operations
pub rtxn: RoTxn<'indexer>, pub txn: RoTxn<'indexer>,
/// Global field id map that is up to date with the current state of the indexing process. /// Global field id map that is up to date with the current state of the indexing process.
/// ///
@ -255,7 +255,7 @@ impl<
let txn = index.read_txn()?; let txn = index.read_txn()?;
Ok(DocumentChangeContext { Ok(DocumentChangeContext {
index, index,
rtxn: txn, txn,
db_fields_ids_map, db_fields_ids_map,
new_fields_ids_map: fields_ids_map, new_fields_ids_map: fields_ids_map,
doc_alloc, doc_alloc,

View File

@ -63,7 +63,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
where where
'pl: 'doc, // the payload must survive the process calls 'pl: 'doc, // the payload must survive the process calls
{ {
let current = context.index.document(&context.rtxn, *docid)?; let current = context.index.document(&context.txn, *docid)?;
let external_document_id = self.primary_key.extract_docid_from_db( let external_document_id = self.primary_key.extract_docid_from_db(
current, current,

View File

@ -310,10 +310,10 @@ impl MergeChanges for MergeDocumentForReplacement {
} }
Some(InnerDocOp::Deletion) => { Some(InnerDocOp::Deletion) => {
return if is_new { return if is_new {
Ok(None)
} else {
let deletion = Deletion::create(docid, external_doc); let deletion = Deletion::create(docid, external_doc);
Ok(Some(DocumentChange::Deletion(deletion))) Ok(Some(DocumentChange::Deletion(deletion)))
} else {
Ok(None)
}; };
} }
None => unreachable!("We must not have empty set of operations on a document"), None => unreachable!("We must not have empty set of operations on a document"),

View File

@ -33,7 +33,6 @@ use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::InnerIndexSettings; use crate::update::settings::InnerIndexSettings;
@ -58,7 +57,6 @@ mod steps {
"extracting words", "extracting words",
"extracting word proximity", "extracting word proximity",
"extracting embeddings", "extracting embeddings",
"writing geo points",
"writing to database", "writing to database",
"writing embeddings to database", "writing embeddings to database",
"waiting for extractors", "waiting for extractors",
@ -95,32 +93,28 @@ mod steps {
step(4) step(4)
} }
pub const fn extract_geo_points() -> (u16, &'static str) { pub const fn write_db() -> (u16, &'static str) {
step(5) step(5)
} }
pub const fn write_db() -> (u16, &'static str) { pub const fn write_embedding_db() -> (u16, &'static str) {
step(6) step(6)
} }
pub const fn write_embedding_db() -> (u16, &'static str) { pub const fn waiting_extractors() -> (u16, &'static str) {
step(7) step(7)
} }
pub const fn waiting_extractors() -> (u16, &'static str) { pub const fn post_processing_facets() -> (u16, &'static str) {
step(8) step(8)
} }
pub const fn post_processing_facets() -> (u16, &'static str) { pub const fn post_processing_words() -> (u16, &'static str) {
step(9) step(9)
} }
pub const fn post_processing_words() -> (u16, &'static str) {
step(10)
}
pub const fn finalizing() -> (u16, &'static str) { pub const fn finalizing() -> (u16, &'static str) {
step(11) step(10)
} }
} }
@ -150,8 +144,11 @@ where
let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000); let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000);
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
let new_fields_ids_map = RwLock::new(new_fields_ids_map); let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads()); let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads());
let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
@ -223,7 +220,7 @@ where
let (finished_steps, step_name) = steps::extract_facets(); let (finished_steps, step_name) = steps::extract_facets();
facet_field_ids_delta = merge_and_send_facet_docids( facet_field_ids_delta = merge_and_send_facet_docids(
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, &extractor_sender.field_id_docid_facet_sender(), finished_steps, total_steps, step_name)?, FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?,
FacetDatabases::new(index), FacetDatabases::new(index),
index, index,
extractor_sender.facet_docids(), extractor_sender.facet_docids(),
@ -331,15 +328,7 @@ where
let (finished_steps, step_name) = steps::extract_word_proximity(); let (finished_steps, step_name) = steps::extract_word_proximity();
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters, let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?;
document_changes,
indexing_context,
&mut extractor_allocs,
finished_steps,
total_steps,
step_name,
)?;
merge_and_send_docids( merge_and_send_docids(
caches, caches,
index.word_pair_proximity_docids.remap_types(), index.word_pair_proximity_docids.remap_types(),
@ -362,6 +351,8 @@ where
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_embeddings(); let (finished_steps, step_name) = steps::extract_embeddings();
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
for config in &mut index_embeddings { for config in &mut index_embeddings {
@ -375,35 +366,6 @@ where
embedding_sender.finish(index_embeddings).unwrap(); embedding_sender.finish(index_embeddings).unwrap();
} }
'geo: {
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
let _entered = span.enter();
// let geo_sender = extractor_sender.geo_points();
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
break 'geo;
};
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_geo_points();
extract(document_changes,
&extractor,
indexing_context,
&mut extractor_allocs,
&datastore,
finished_steps,
total_steps,
step_name,
)?;
merge_and_send_rtree(
datastore,
&rtxn,
index,
extractor_sender.geo(),
&indexing_context.must_stop_processing,
)?;
}
// TODO THIS IS TOO MUCH // TODO THIS IS TOO MUCH
// - [ ] Extract fieldid docid facet number // - [ ] Extract fieldid docid facet number
// - [ ] Extract fieldid docid facet string // - [ ] Extract fieldid docid facet string

View File

@ -93,7 +93,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
let DocumentChangeContext { let DocumentChangeContext {
index, index,
db_fields_ids_map, db_fields_ids_map,
rtxn: txn, txn,
new_fields_ids_map, new_fields_ids_map,
doc_alloc, doc_alloc,
.. ..

View File

@ -1,63 +1,68 @@
use std::cell::RefCell; use std::io::{self};
use std::io;
use bincode::ErrorKind;
use hashbrown::HashSet; use hashbrown::HashSet;
use heed::types::Bytes; use heed::types::Bytes;
use heed::{Database, RoTxn}; use heed::{Database, RoTxn};
use memmap2::Mmap;
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::channel::*; use super::channel::*;
use super::extract::{ use super::extract::{
merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind,
GeoExtractorData,
}; };
use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; use super::DocumentChange;
use crate::{
CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, InternalError,
Result,
};
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] pub struct GeoExtractor {
pub fn merge_and_send_rtree<'extractor, MSP>( rtree: Option<rstar::RTree<GeoPoint>>,
datastore: impl IntoIterator<Item = RefCell<GeoExtractorData<'extractor>>>,
rtxn: &RoTxn,
index: &Index,
geo_sender: GeoSender<'_>,
must_stop_processing: &MSP,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,
{
let mut rtree = index.geo_rtree(rtxn)?.unwrap_or_default();
let mut faceted = index.geo_faceted_documents_ids(rtxn)?;
for data in datastore {
if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into());
} }
let mut frozen = data.into_inner().freeze()?; impl GeoExtractor {
for result in frozen.iter_and_clear_removed() { pub fn new(rtxn: &RoTxn, index: &Index) -> Result<Option<Self>> {
let extracted_geo_point = result?; let is_sortable = index.sortable_fields(rtxn)?.contains("_geo");
debug_assert!(rtree.remove(&GeoPoint::from(extracted_geo_point)).is_some()); let is_filterable = index.filterable_fields(rtxn)?.contains("_geo");
debug_assert!(faceted.remove(extracted_geo_point.docid)); if is_sortable || is_filterable {
} Ok(Some(GeoExtractor { rtree: index.geo_rtree(rtxn)? }))
} else {
for result in frozen.iter_and_clear_inserted() { Ok(None)
let extracted_geo_point = result?;
rtree.insert(GeoPoint::from(extracted_geo_point));
debug_assert!(faceted.insert(extracted_geo_point.docid));
} }
} }
let mut file = tempfile::tempfile()?; pub fn manage_change(
/// manage error &mut self,
bincode::serialize_into(&mut file, &rtree).unwrap(); fidmap: &mut GlobalFieldsIdsMap,
file.sync_all()?; change: &DocumentChange,
) -> Result<()> {
match change {
DocumentChange::Deletion(_) => todo!(),
DocumentChange::Update(_) => todo!(),
DocumentChange::Insertion(_) => todo!(),
}
}
let rtree_mmap = unsafe { Mmap::map(&file)? }; pub fn serialize_rtree<W: io::Write>(self, writer: &mut W) -> Result<bool> {
geo_sender.set_rtree(rtree_mmap).unwrap(); match self.rtree {
geo_sender.set_geo_faceted(&faceted).unwrap(); Some(rtree) => {
// TODO What should I do?
Ok(()) bincode::serialize_into(writer, &rtree).map(|_| true).map_err(|e| match *e {
ErrorKind::Io(e) => Error::IoError(e),
ErrorKind::InvalidUtf8Encoding(_) => todo!(),
ErrorKind::InvalidBoolEncoding(_) => todo!(),
ErrorKind::InvalidCharEncoding => todo!(),
ErrorKind::InvalidTagEncoding(_) => todo!(),
ErrorKind::DeserializeAnyNotSupported => todo!(),
ErrorKind::SizeLimit => todo!(),
ErrorKind::SequenceMustHaveLength => todo!(),
ErrorKind::Custom(_) => todo!(),
})
}
None => Ok(false),
}
}
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]