From db2fb86b8bbb69cb79781d74dda885460ea45560 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 9 Nov 2023 14:19:16 +0100 Subject: [PATCH 1/9] Extract PrimaryKey logic to a type --- milli/src/documents/mod.rs | 10 ++ milli/src/documents/primary_key.rs | 168 +++++++++++++++++++++++++++++ milli/src/fields_ids_map.rs | 6 ++ 3 files changed, 184 insertions(+) create mode 100644 milli/src/documents/primary_key.rs diff --git a/milli/src/documents/mod.rs b/milli/src/documents/mod.rs index 7c037b3bf..4429f083d 100644 --- a/milli/src/documents/mod.rs +++ b/milli/src/documents/mod.rs @@ -1,5 +1,6 @@ mod builder; mod enriched; +mod primary_key; mod reader; mod serde_impl; @@ -11,6 +12,9 @@ use bimap::BiHashMap; pub use builder::DocumentsBatchBuilder; pub use enriched::{EnrichedDocument, EnrichedDocumentsBatchCursor, EnrichedDocumentsBatchReader}; use obkv::KvReader; +pub use primary_key::{ + DocumentIdExtractionError, FieldDistribution, PrimaryKey, DEFAULT_PRIMARY_KEY, +}; pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader}; use serde::{Deserialize, Serialize}; @@ -87,6 +91,12 @@ impl DocumentsBatchIndex { } } +impl FieldDistribution for DocumentsBatchIndex { + fn id(&self, name: &str) -> Option { + self.id(name) + } +} + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Error parsing number {value:?} at line {line}: {error}")] diff --git a/milli/src/documents/primary_key.rs b/milli/src/documents/primary_key.rs new file mode 100644 index 000000000..dd97f2608 --- /dev/null +++ b/milli/src/documents/primary_key.rs @@ -0,0 +1,168 @@ +use std::iter; +use std::result::Result as StdResult; + +use serde_json::Value; + +use crate::{FieldId, InternalError, Object, Result, UserError}; + +/// The symbol used to define levels in a nested primary key. +const PRIMARY_KEY_SPLIT_SYMBOL: char = '.'; + +/// The default primary that is used when not specified. +pub const DEFAULT_PRIMARY_KEY: &str = "id"; + +pub trait FieldDistribution { + fn id(&self, name: &str) -> Option; +} + +/// A type that represent the type of primary key that has been set +/// for this index, a classic flat one or a nested one. +#[derive(Debug, Clone, Copy)] +pub enum PrimaryKey<'a> { + Flat { name: &'a str, field_id: FieldId }, + Nested { name: &'a str }, +} + +pub enum DocumentIdExtractionError { + InvalidDocumentId(UserError), + MissingDocumentId, + TooManyDocumentIds(usize), +} + +impl<'a> PrimaryKey<'a> { + pub fn new(path: &'a str, fields: &impl FieldDistribution) -> Option { + Some(if path.contains(PRIMARY_KEY_SPLIT_SYMBOL) { + Self::Nested { name: path } + } else { + let field_id = fields.id(path)?; + Self::Flat { name: path, field_id } + }) + } + + pub fn name(&self) -> &str { + match self { + PrimaryKey::Flat { name, .. } => name, + PrimaryKey::Nested { name } => name, + } + } + + pub fn document_id( + &self, + document: &obkv::KvReader, + fields: &impl FieldDistribution, + ) -> Result> { + match self { + PrimaryKey::Flat { name: _, field_id } => match document.get(*field_id) { + Some(document_id_bytes) => { + let document_id = serde_json::from_slice(document_id_bytes) + .map_err(InternalError::SerdeJson)?; + match validate_document_id_value(document_id)? { + Ok(document_id) => Ok(Ok(document_id)), + Err(user_error) => { + Ok(Err(DocumentIdExtractionError::InvalidDocumentId(user_error))) + } + } + } + None => Ok(Err(DocumentIdExtractionError::MissingDocumentId)), + }, + nested @ PrimaryKey::Nested { .. } => { + let mut matching_documents_ids = Vec::new(); + for (first_level_name, right) in nested.possible_level_names() { + if let Some(field_id) = fields.id(first_level_name) { + if let Some(value_bytes) = document.get(field_id) { + let object = serde_json::from_slice(value_bytes) + .map_err(InternalError::SerdeJson)?; + fetch_matching_values(object, right, &mut matching_documents_ids); + + if matching_documents_ids.len() >= 2 { + return Ok(Err(DocumentIdExtractionError::TooManyDocumentIds( + matching_documents_ids.len(), + ))); + } + } + } + } + + match matching_documents_ids.pop() { + Some(document_id) => match validate_document_id_value(document_id)? { + Ok(document_id) => Ok(Ok(document_id)), + Err(user_error) => { + Ok(Err(DocumentIdExtractionError::InvalidDocumentId(user_error))) + } + }, + None => Ok(Err(DocumentIdExtractionError::MissingDocumentId)), + } + } + } + } + + /// Returns an `Iterator` that gives all the possible fields names the primary key + /// can have depending of the first level name and depth of the objects. + pub fn possible_level_names(&self) -> impl Iterator + '_ { + let name = self.name(); + name.match_indices(PRIMARY_KEY_SPLIT_SYMBOL) + .map(move |(i, _)| (&name[..i], &name[i + PRIMARY_KEY_SPLIT_SYMBOL.len_utf8()..])) + .chain(iter::once((name, ""))) + } +} + +fn fetch_matching_values(value: Value, selector: &str, output: &mut Vec) { + match value { + Value::Object(object) => fetch_matching_values_in_object(object, selector, "", output), + otherwise => output.push(otherwise), + } +} + +fn fetch_matching_values_in_object( + object: Object, + selector: &str, + base_key: &str, + output: &mut Vec, +) { + for (key, value) in object { + let base_key = if base_key.is_empty() { + key.to_string() + } else { + format!("{}{}{}", base_key, PRIMARY_KEY_SPLIT_SYMBOL, key) + }; + + if starts_with(selector, &base_key) { + match value { + Value::Object(object) => { + fetch_matching_values_in_object(object, selector, &base_key, output) + } + value => output.push(value), + } + } + } +} + +fn starts_with(selector: &str, key: &str) -> bool { + selector.strip_prefix(key).map_or(false, |tail| { + tail.chars().next().map(|c| c == PRIMARY_KEY_SPLIT_SYMBOL).unwrap_or(true) + }) +} + +// FIXME: move to a DocumentId struct + +fn validate_document_id(document_id: &str) -> Option<&str> { + if !document_id.is_empty() + && document_id.chars().all(|c| matches!(c, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_')) + { + Some(document_id) + } else { + None + } +} + +pub fn validate_document_id_value(document_id: Value) -> Result> { + match document_id { + Value::String(string) => match validate_document_id(&string) { + Some(s) if s.len() == string.len() => Ok(Ok(string)), + Some(s) => Ok(Ok(s.to_string())), + None => Ok(Err(UserError::InvalidDocumentId { document_id: Value::String(string) })), + }, + Value::Number(number) if number.is_i64() => Ok(Ok(number.to_string())), + content => Ok(Err(UserError::InvalidDocumentId { document_id: content })), + } +} diff --git a/milli/src/fields_ids_map.rs b/milli/src/fields_ids_map.rs index 810ff755b..85320c168 100644 --- a/milli/src/fields_ids_map.rs +++ b/milli/src/fields_ids_map.rs @@ -81,6 +81,12 @@ impl Default for FieldsIdsMap { } } +impl crate::documents::FieldDistribution for FieldsIdsMap { + fn id(&self, name: &str) -> Option { + self.id(name) + } +} + #[cfg(test)] mod tests { use super::*; From 9cef800b2aa8bceb31bd82ca3bcd11a59157a8dc Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 9 Nov 2023 14:22:05 +0100 Subject: [PATCH 2/9] Enrich uses the new type --- milli/src/update/index_documents/enrich.rs | 207 ++++----------------- milli/src/update/index_documents/mod.rs | 5 +- 2 files changed, 34 insertions(+), 178 deletions(-) diff --git a/milli/src/update/index_documents/enrich.rs b/milli/src/update/index_documents/enrich.rs index 22b16f253..03eb3f4de 100644 --- a/milli/src/update/index_documents/enrich.rs +++ b/milli/src/update/index_documents/enrich.rs @@ -1,20 +1,17 @@ +use std::fmt; use std::io::{BufWriter, Read, Seek}; use std::result::Result as StdResult; -use std::{fmt, iter}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader, EnrichedDocumentsBatchReader}; +use crate::documents::{ + DocumentIdExtractionError, DocumentsBatchIndex, DocumentsBatchReader, + EnrichedDocumentsBatchReader, PrimaryKey, DEFAULT_PRIMARY_KEY, +}; use crate::error::{GeoError, InternalError, UserError}; use crate::update::index_documents::{obkv_to_object, writer_into_reader}; -use crate::{FieldId, Index, Object, Result}; - -/// The symbol used to define levels in a nested primary key. -const PRIMARY_KEY_SPLIT_SYMBOL: char = '.'; - -/// The default primary that is used when not specified. -const DEFAULT_PRIMARY_KEY: &str = "id"; +use crate::{FieldId, Index, Result}; /// This function validates and enrich the documents by checking that: /// - we can infer a primary key, @@ -41,14 +38,12 @@ pub fn enrich_documents_batch( // The primary key *field id* that has already been set for this index or the one // we will guess by searching for the first key that contains "id" as a substring. let primary_key = match index.primary_key(rtxn)? { - Some(primary_key) if primary_key.contains(PRIMARY_KEY_SPLIT_SYMBOL) => { - PrimaryKey::nested(primary_key) - } - Some(primary_key) => match documents_batch_index.id(primary_key) { - Some(id) => PrimaryKey::flat(primary_key, id), - None if autogenerate_docids => { - PrimaryKey::flat(primary_key, documents_batch_index.insert(primary_key)) - } + Some(primary_key) => match PrimaryKey::new(primary_key, &documents_batch_index) { + Some(primary_key) => primary_key, + None if autogenerate_docids => PrimaryKey::Flat { + name: primary_key, + field_id: documents_batch_index.insert(primary_key), + }, None => { return match cursor.next_document()? { Some(first_document) => Ok(Err(UserError::MissingDocumentId { @@ -76,14 +71,14 @@ pub fn enrich_documents_batch( }); match guesses.as_slice() { - [] if autogenerate_docids => PrimaryKey::flat( - DEFAULT_PRIMARY_KEY, - documents_batch_index.insert(DEFAULT_PRIMARY_KEY), - ), + [] if autogenerate_docids => PrimaryKey::Flat { + name: DEFAULT_PRIMARY_KEY, + field_id: documents_batch_index.insert(DEFAULT_PRIMARY_KEY), + }, [] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), [(field_id, name)] => { log::info!("Primary key was not specified in index. Inferred to '{name}'"); - PrimaryKey::flat(name, *field_id) + PrimaryKey::Flat { name, field_id: *field_id } } multiple => { return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound { @@ -156,92 +151,24 @@ fn fetch_or_generate_document_id( uuid_buffer: &mut [u8; uuid::fmt::Hyphenated::LENGTH], count: u32, ) -> Result> { - match primary_key { - PrimaryKey::Flat { name: primary_key, field_id: primary_key_id } => { - match document.get(primary_key_id) { - Some(document_id_bytes) => { - let document_id = serde_json::from_slice(document_id_bytes) - .map_err(InternalError::SerdeJson)?; - match validate_document_id_value(document_id)? { - Ok(document_id) => Ok(Ok(DocumentId::retrieved(document_id))), - Err(user_error) => Ok(Err(user_error)), - } - } - None if autogenerate_docids => { - let uuid = uuid::Uuid::new_v4().as_hyphenated().encode_lower(uuid_buffer); - Ok(Ok(DocumentId::generated(uuid.to_string(), count))) - } - None => Ok(Err(UserError::MissingDocumentId { - primary_key: primary_key.to_string(), - document: obkv_to_object(document, documents_batch_index)?, - })), - } + Ok(match primary_key.document_id(document, documents_batch_index)? { + Ok(document_id) => Ok(DocumentId::Retrieved { value: document_id }), + Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => Err(user_error), + Err(DocumentIdExtractionError::MissingDocumentId) if autogenerate_docids => { + let uuid = uuid::Uuid::new_v4().as_hyphenated().encode_lower(uuid_buffer); + Ok(DocumentId::Generated { value: uuid.to_string(), document_nth: count }) } - nested @ PrimaryKey::Nested { .. } => { - let mut matching_documents_ids = Vec::new(); - for (first_level_name, right) in nested.possible_level_names() { - if let Some(field_id) = documents_batch_index.id(first_level_name) { - if let Some(value_bytes) = document.get(field_id) { - let object = serde_json::from_slice(value_bytes) - .map_err(InternalError::SerdeJson)?; - fetch_matching_values(object, right, &mut matching_documents_ids); - - if matching_documents_ids.len() >= 2 { - return Ok(Err(UserError::TooManyDocumentIds { - primary_key: nested.name().to_string(), - document: obkv_to_object(document, documents_batch_index)?, - })); - } - } - } - } - - match matching_documents_ids.pop() { - Some(document_id) => match validate_document_id_value(document_id)? { - Ok(document_id) => Ok(Ok(DocumentId::retrieved(document_id))), - Err(user_error) => Ok(Err(user_error)), - }, - None => Ok(Err(UserError::MissingDocumentId { - primary_key: nested.name().to_string(), - document: obkv_to_object(document, documents_batch_index)?, - })), - } + Err(DocumentIdExtractionError::MissingDocumentId) => Err(UserError::MissingDocumentId { + primary_key: primary_key.name().to_string(), + document: obkv_to_object(document, documents_batch_index)?, + }), + Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { + Err(UserError::TooManyDocumentIds { + primary_key: primary_key.name().to_string(), + document: obkv_to_object(document, documents_batch_index)?, + }) } - } -} - -/// A type that represent the type of primary key that has been set -/// for this index, a classic flat one or a nested one. -#[derive(Debug, Clone, Copy)] -enum PrimaryKey<'a> { - Flat { name: &'a str, field_id: FieldId }, - Nested { name: &'a str }, -} - -impl PrimaryKey<'_> { - fn flat(name: &str, field_id: FieldId) -> PrimaryKey { - PrimaryKey::Flat { name, field_id } - } - - fn nested(name: &str) -> PrimaryKey { - PrimaryKey::Nested { name } - } - - fn name(&self) -> &str { - match self { - PrimaryKey::Flat { name, .. } => name, - PrimaryKey::Nested { name } => name, - } - } - - /// Returns an `Iterator` that gives all the possible fields names the primary key - /// can have depending of the first level name and deepnes of the objects. - fn possible_level_names(&self) -> impl Iterator + '_ { - let name = self.name(); - name.match_indices(PRIMARY_KEY_SPLIT_SYMBOL) - .map(move |(i, _)| (&name[..i], &name[i + PRIMARY_KEY_SPLIT_SYMBOL.len_utf8()..])) - .chain(iter::once((name, ""))) - } + }) } /// A type that represents a document id that has been retrieved from a document or auto-generated. @@ -255,14 +182,6 @@ pub enum DocumentId { } impl DocumentId { - fn retrieved(value: String) -> DocumentId { - DocumentId::Retrieved { value } - } - - fn generated(value: String, document_nth: u32) -> DocumentId { - DocumentId::Generated { value, document_nth } - } - fn debug(&self) -> String { format!("{:?}", self) } @@ -290,66 +209,6 @@ impl fmt::Debug for DocumentId { } } -fn starts_with(selector: &str, key: &str) -> bool { - selector.strip_prefix(key).map_or(false, |tail| { - tail.chars().next().map(|c| c == PRIMARY_KEY_SPLIT_SYMBOL).unwrap_or(true) - }) -} - -pub fn fetch_matching_values(value: Value, selector: &str, output: &mut Vec) { - match value { - Value::Object(object) => fetch_matching_values_in_object(object, selector, "", output), - otherwise => output.push(otherwise), - } -} - -pub fn fetch_matching_values_in_object( - object: Object, - selector: &str, - base_key: &str, - output: &mut Vec, -) { - for (key, value) in object { - let base_key = if base_key.is_empty() { - key.to_string() - } else { - format!("{}{}{}", base_key, PRIMARY_KEY_SPLIT_SYMBOL, key) - }; - - if starts_with(selector, &base_key) { - match value { - Value::Object(object) => { - fetch_matching_values_in_object(object, selector, &base_key, output) - } - value => output.push(value), - } - } - } -} - -pub fn validate_document_id(document_id: &str) -> Option<&str> { - if !document_id.is_empty() - && document_id.chars().all(|c| matches!(c, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_')) - { - Some(document_id) - } else { - None - } -} - -/// Parses a Json encoded document id and validate it, returning a user error when it is one. -pub fn validate_document_id_value(document_id: Value) -> Result> { - match document_id { - Value::String(string) => match validate_document_id(&string) { - Some(s) if s.len() == string.len() => Ok(Ok(string)), - Some(s) => Ok(Ok(s.to_string())), - None => Ok(Err(UserError::InvalidDocumentId { document_id: Value::String(string) })), - }, - Value::Number(number) if number.is_i64() => Ok(Ok(number.to_string())), - content => Ok(Err(UserError::InvalidDocumentId { document_id: content })), - } -} - /// Try to extract an `f64` from a JSON `Value` and return the `Value` /// in the `Err` variant if it failed. pub fn extract_finite_float_from_value(value: Value) -> StdResult { diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 2be410ace..d60006289 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -20,10 +20,7 @@ use slice_group_by::GroupBy; use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; use self::enrich::enrich_documents_batch; -pub use self::enrich::{ - extract_finite_float_from_value, validate_document_id, validate_document_id_value, - validate_geo_from_json, DocumentId, -}; +pub use self::enrich::{extract_finite_float_from_value, validate_geo_from_json, DocumentId}; pub use self::helpers::{ as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset, fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, From b11c2afac09bd1eae5a1f73e97efe1651add7e67 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 9 Nov 2023 14:22:43 +0100 Subject: [PATCH 3/9] Index::external_id_of --- milli/src/index.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/milli/src/index.rs b/milli/src/index.rs index 86ef6105b..5b705e0b2 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -12,6 +12,7 @@ use rstar::RTree; use time::OffsetDateTime; use crate::distance::NDotProductPoint; +use crate::documents::PrimaryKey; use crate::error::{InternalError, UserError}; use crate::fields_ids_map::FieldsIdsMap; use crate::heed_codec::facet::{ @@ -1176,6 +1177,36 @@ impl Index { self.iter_documents(rtxn, self.documents_ids(rtxn)?) } + pub fn external_id_of<'a, 't: 'a>( + &'a self, + rtxn: &'t RoTxn, + ids: impl IntoIterator + 'a, + ) -> Result> + 'a> { + let fields = self.fields_ids_map(rtxn)?; + + // uses precondition "never called on an empty index" + let primary_key = self.primary_key(rtxn)?.ok_or(InternalError::DatabaseMissingEntry { + db_name: db_name::MAIN, + key: Some(main_key::PRIMARY_KEY_KEY), + })?; + let primary_key = PrimaryKey::new(primary_key, &fields).ok_or_else(|| { + InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldName { + field_name: primary_key.to_owned(), + process: "external_id_of", + }) + })?; + Ok(self.iter_documents(rtxn, ids)?.map(move |entry| -> Result<_> { + let (_docid, obkv) = entry?; + match primary_key.document_id(&obkv, &fields)? { + Ok(document_id) => Ok(document_id), + Err(_) => Err(InternalError::DocumentsError( + crate::documents::Error::InvalidDocumentFormat, + ) + .into()), + } + })) + } + pub fn facets_distribution<'a>(&'a self, rtxn: &'a RoTxn) -> FacetDistribution<'a> { FacetDistribution::new(rtxn, self) } From 3053e01c05df5d840c1d4efe4810cdafef5a8c70 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 9 Nov 2023 14:23:02 +0100 Subject: [PATCH 4/9] Batch::remove_documents_from_db_no_batch --- milli/src/update/index_documents/mod.rs | 33 ++++++++ milli/src/update/index_documents/transform.rs | 83 +++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index d60006289..de40e0b9b 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -194,6 +194,39 @@ where Ok((self, Ok(deleted_documents))) } + /// Removes documents from db using their internal document ids. + /// + /// # Warning + /// + /// This function is dangerous and will only work correctly if: + /// + /// - All the passed ids currently exist in the database + /// - No batching using the standards `remove_documents` and `add_documents` took place + /// + /// TODO: make it impossible to call `remove_documents` or `add_documents` on an instance that calls this function. + pub fn remove_documents_from_db_no_batch( + mut self, + to_delete: &RoaringBitmap, + ) -> Result<(Self, u64)> { + puffin::profile_function!(); + + // Early return when there is no document to add + if to_delete.is_empty() { + return Ok((self, 0)); + } + + let deleted_documents = self + .transform + .as_mut() + .expect("Invalid document deletion state") + .remove_documents_from_db_no_batch(to_delete, self.wtxn, &self.should_abort)? + as u64; + + self.deleted_documents += deleted_documents; + + Ok((self, deleted_documents)) + } + #[logging_timer::time("IndexDocuments::{}")] pub fn execute(mut self) -> Result { puffin::profile_function!(); diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 186974bfe..5f5e698d3 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -481,6 +481,89 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(documents_deleted) } + /// The counter part of `read_documents` that removes documents either from the transform or the database. + /// It can be called before, after or in between two calls of the `read_documents`. + /// + /// It needs to update all the internal datastructure in the transform. + /// - If the document is coming from the database -> it's marked as a to_delete document + /// - If the document to remove was inserted by the `read_documents` method before AND was present in the db, + /// it's marked as `to_delete` + added into the grenad to ensure we don't reinsert it. + /// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db, + /// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids. + /// - If the document to remove was not present in either the db or the transform we do nothing. + #[logging_timer::time] + pub fn remove_documents_from_db_no_batch( + &mut self, + to_remove: &RoaringBitmap, + wtxn: &mut heed::RwTxn, + should_abort: FA, + ) -> Result + where + FA: Fn() -> bool + Sync, + { + puffin::profile_function!(); + + let mut documents_deleted = 0; + let mut document_sorter_value_buffer = Vec::new(); + let mut document_sorter_key_buffer = Vec::new(); + let external_ids = self.index.external_id_of(wtxn, to_remove.iter())?; + + for (to_remove, external_docid) in to_remove.iter().zip(external_ids) { + let external_docid = external_docid?; + if should_abort() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + self.replaced_documents_ids.insert(to_remove); + + // fetch the obkv document + let original_key = BEU32::new(to_remove); + let base_obkv = self + .index + .documents + .remap_data_type::() + .get(wtxn, &original_key)? + .ok_or(InternalError::DatabaseMissingEntry { + db_name: db_name::DOCUMENTS, + key: None, + })?; + + // Key is the concatenation of the internal docid and the external one. + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&to_remove.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(external_docid.as_bytes()); + // push it as to delete in the original_sorter + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Deletion as u8); + into_del_add_obkv( + KvReaderU16::new(base_obkv), + true, + false, + &mut document_sorter_value_buffer, + )?; + self.original_sorter + .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; + + // flatten it and push it as to delete in the flattened_sorter + let flattened_obkv = KvReader::new(base_obkv); + if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { + // we recreate our buffer with the flattened documents + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Deletion as u8); + into_del_add_obkv( + KvReaderU16::new(&obkv), + true, + false, + &mut document_sorter_value_buffer, + )?; + } + self.flattened_sorter.insert(to_remove.to_be_bytes(), &document_sorter_value_buffer)?; + + documents_deleted += 1; + } + + Ok(documents_deleted) + } + // Flatten a document from the fields ids map contained in self and insert the new // created fields. Returns `None` if the document doesn't need to be flattened. fn flatten_from_fields_ids_map(&mut self, obkv: KvReader) -> Result>> { From f8289cd974d957d38645ca66c993ca518ec81955 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 9 Nov 2023 14:23:15 +0100 Subject: [PATCH 5/9] Use it from delete-by-filter --- index-scheduler/src/batch.rs | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index c9deedb37..5260a9d7e 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1534,18 +1534,6 @@ fn delete_document_by_filter<'a>( } e => e.into(), })?; - let external_documents_ids = index.external_documents_ids(); - // FIXME: for filters matching a lot of documents, this will allocate a huge vec of external docids (strings). - // Since what we have is an iterator, it would be better to delete in chunks - let external_to_internal: std::result::Result, RoaringBitmap> = - external_documents_ids - .find_external_id_of(wtxn, candidates)? - .only_external_ids() - .collect(); - let document_ids = match external_to_internal { - Ok(external_ids) => external_ids, - Err(remaining_ids) => panic!("Couldn't find some external ids {:?}", remaining_ids), - }; let config = IndexDocumentsConfig { update_method: IndexDocumentsMethod::ReplaceDocuments, @@ -1561,13 +1549,10 @@ fn delete_document_by_filter<'a>( || must_stop_processing.get(), )?; - let (new_builder, user_result) = builder.remove_documents(document_ids)?; + let (new_builder, count) = builder.remove_documents_from_db_no_batch(&candidates)?; builder = new_builder; - // Uses Invariant: remove documents actually always returns Ok for the inner result - let count = user_result.unwrap(); let _ = builder.execute()?; - count } else { 0 From 825257da76b33809cbb0496773449c63de023260 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 9 Nov 2023 16:13:15 +0100 Subject: [PATCH 6/9] Use more efficient method for deletion in benchmarks --- benchmarks/benches/indexing.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/benchmarks/benches/indexing.rs b/benchmarks/benches/indexing.rs index c31bfab89..65f581b93 100644 --- a/benchmarks/benches/indexing.rs +++ b/benchmarks/benches/indexing.rs @@ -864,22 +864,12 @@ fn delete_documents_from_ids(index: Index, document_ids_to_delete: Vec, RoaringBitmap> = - external_documents_ids - .find_external_id_of(&wtxn, ids) - .unwrap() - .only_external_ids() - .collect(); - let ids = external_to_internal.unwrap(); let config = IndexDocumentsConfig::default(); let mut builder = IndexDocuments::new(&mut wtxn, &index, &indexer_config, config, |_| (), || false) .unwrap(); - (builder, _) = builder.remove_documents(ids).unwrap(); + (builder, _) = builder.remove_documents_from_db_no_batch(&ids).unwrap(); builder.execute().unwrap(); } From 264b10ec20956cfe599cfeb5e3fc08ae2298bfc8 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 9 Nov 2023 16:23:20 +0100 Subject: [PATCH 7/9] Fixup documentation --- milli/src/update/index_documents/transform.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 5f5e698d3..23313547a 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -481,16 +481,16 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(documents_deleted) } - /// The counter part of `read_documents` that removes documents either from the transform or the database. - /// It can be called before, after or in between two calls of the `read_documents`. + /// Removes documents from db using their internal document ids. /// - /// It needs to update all the internal datastructure in the transform. - /// - If the document is coming from the database -> it's marked as a to_delete document - /// - If the document to remove was inserted by the `read_documents` method before AND was present in the db, - /// it's marked as `to_delete` + added into the grenad to ensure we don't reinsert it. - /// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db, - /// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids. - /// - If the document to remove was not present in either the db or the transform we do nothing. + /// # Warning + /// + /// This function is dangerous and will only work correctly if: + /// + /// - All the passed ids currently exist in the database + /// - No batching using the standards `remove_documents` and `add_documents` took place + /// + /// TODO: make it impossible to call `remove_documents` or `add_documents` on an instance that calls this function. #[logging_timer::time] pub fn remove_documents_from_db_no_batch( &mut self, From 378deb0bef48269ee373fc3a6426e24c80393b2e Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 13 Nov 2023 13:37:58 +0100 Subject: [PATCH 8/9] Rename trait --- milli/src/documents/mod.rs | 6 ++---- milli/src/documents/primary_key.rs | 10 +++++++--- milli/src/fields_ids_map.rs | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/milli/src/documents/mod.rs b/milli/src/documents/mod.rs index 4429f083d..a874ac17e 100644 --- a/milli/src/documents/mod.rs +++ b/milli/src/documents/mod.rs @@ -12,9 +12,7 @@ use bimap::BiHashMap; pub use builder::DocumentsBatchBuilder; pub use enriched::{EnrichedDocument, EnrichedDocumentsBatchCursor, EnrichedDocumentsBatchReader}; use obkv::KvReader; -pub use primary_key::{ - DocumentIdExtractionError, FieldDistribution, PrimaryKey, DEFAULT_PRIMARY_KEY, -}; +pub use primary_key::{DocumentIdExtractionError, FieldIdMapper, PrimaryKey, DEFAULT_PRIMARY_KEY}; pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader}; use serde::{Deserialize, Serialize}; @@ -91,7 +89,7 @@ impl DocumentsBatchIndex { } } -impl FieldDistribution for DocumentsBatchIndex { +impl FieldIdMapper for DocumentsBatchIndex { fn id(&self, name: &str) -> Option { self.id(name) } diff --git a/milli/src/documents/primary_key.rs b/milli/src/documents/primary_key.rs index dd97f2608..16a95c21f 100644 --- a/milli/src/documents/primary_key.rs +++ b/milli/src/documents/primary_key.rs @@ -11,7 +11,11 @@ const PRIMARY_KEY_SPLIT_SYMBOL: char = '.'; /// The default primary that is used when not specified. pub const DEFAULT_PRIMARY_KEY: &str = "id"; -pub trait FieldDistribution { +/// Trait for objects that can map the name of a field to its [`FieldId`]. +pub trait FieldIdMapper { + /// Attempts to map the passed name to its [`FieldId`]. + /// + /// `None` if the field with this name was not found. fn id(&self, name: &str) -> Option; } @@ -30,7 +34,7 @@ pub enum DocumentIdExtractionError { } impl<'a> PrimaryKey<'a> { - pub fn new(path: &'a str, fields: &impl FieldDistribution) -> Option { + pub fn new(path: &'a str, fields: &impl FieldIdMapper) -> Option { Some(if path.contains(PRIMARY_KEY_SPLIT_SYMBOL) { Self::Nested { name: path } } else { @@ -49,7 +53,7 @@ impl<'a> PrimaryKey<'a> { pub fn document_id( &self, document: &obkv::KvReader, - fields: &impl FieldDistribution, + fields: &impl FieldIdMapper, ) -> Result> { match self { PrimaryKey::Flat { name: _, field_id } => match document.get(*field_id) { diff --git a/milli/src/fields_ids_map.rs b/milli/src/fields_ids_map.rs index 85320c168..9c1c87f82 100644 --- a/milli/src/fields_ids_map.rs +++ b/milli/src/fields_ids_map.rs @@ -81,7 +81,7 @@ impl Default for FieldsIdsMap { } } -impl crate::documents::FieldDistribution for FieldsIdsMap { +impl crate::documents::FieldIdMapper for FieldsIdsMap { fn id(&self, name: &str) -> Option { self.id(name) } From 772964125d6f59ef4eaa1957c305211f45f07526 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 13 Nov 2023 13:51:22 +0100 Subject: [PATCH 9/9] Factor removal of document from DB --- milli/src/update/index_documents/transform.rs | 143 +++++++----------- 1 file changed, 56 insertions(+), 87 deletions(-) diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 23313547a..8dc88efb9 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -421,52 +421,13 @@ impl<'a, 'i> Transform<'a, 'i> { // Then we push the document in sorters in deletion mode. let deleted_from_db = match external_documents_ids.get(wtxn, &to_remove)? { Some(docid) => { - self.replaced_documents_ids.insert(docid); - - // fetch the obkv document - let original_key = BEU32::new(docid); - let base_obkv = self - .index - .documents - .remap_data_type::() - .get(wtxn, &original_key)? - .ok_or(InternalError::DatabaseMissingEntry { - db_name: db_name::DOCUMENTS, - key: None, - })?; - - // Key is the concatenation of the internal docid and the external one. - document_sorter_key_buffer.clear(); - document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); - document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes()); - // push it as to delete in the original_sorter - document_sorter_value_buffer.clear(); - document_sorter_value_buffer.push(Operation::Deletion as u8); - into_del_add_obkv( - KvReaderU16::new(base_obkv), - true, - false, + self.remove_document_from_db( + docid, + to_remove, + wtxn, + &mut document_sorter_key_buffer, &mut document_sorter_value_buffer, )?; - self.original_sorter - .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; - - // flatten it and push it as to delete in the flattened_sorter - let flattened_obkv = KvReader::new(base_obkv); - if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { - // we recreate our buffer with the flattened documents - document_sorter_value_buffer.clear(); - document_sorter_value_buffer.push(Operation::Deletion as u8); - into_del_add_obkv( - KvReaderU16::new(&obkv), - true, - false, - &mut document_sorter_value_buffer, - )?; - } - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; - true } None => false, @@ -508,55 +469,18 @@ impl<'a, 'i> Transform<'a, 'i> { let mut document_sorter_key_buffer = Vec::new(); let external_ids = self.index.external_id_of(wtxn, to_remove.iter())?; - for (to_remove, external_docid) in to_remove.iter().zip(external_ids) { + for (internal_docid, external_docid) in to_remove.iter().zip(external_ids) { let external_docid = external_docid?; if should_abort() { return Err(Error::InternalError(InternalError::AbortedIndexation)); } - self.replaced_documents_ids.insert(to_remove); - - // fetch the obkv document - let original_key = BEU32::new(to_remove); - let base_obkv = self - .index - .documents - .remap_data_type::() - .get(wtxn, &original_key)? - .ok_or(InternalError::DatabaseMissingEntry { - db_name: db_name::DOCUMENTS, - key: None, - })?; - - // Key is the concatenation of the internal docid and the external one. - document_sorter_key_buffer.clear(); - document_sorter_key_buffer.extend_from_slice(&to_remove.to_be_bytes()); - document_sorter_key_buffer.extend_from_slice(external_docid.as_bytes()); - // push it as to delete in the original_sorter - document_sorter_value_buffer.clear(); - document_sorter_value_buffer.push(Operation::Deletion as u8); - into_del_add_obkv( - KvReaderU16::new(base_obkv), - true, - false, + self.remove_document_from_db( + internal_docid, + external_docid, + wtxn, + &mut document_sorter_key_buffer, &mut document_sorter_value_buffer, )?; - self.original_sorter - .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; - - // flatten it and push it as to delete in the flattened_sorter - let flattened_obkv = KvReader::new(base_obkv); - if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { - // we recreate our buffer with the flattened documents - document_sorter_value_buffer.clear(); - document_sorter_value_buffer.push(Operation::Deletion as u8); - into_del_add_obkv( - KvReaderU16::new(&obkv), - true, - false, - &mut document_sorter_value_buffer, - )?; - } - self.flattened_sorter.insert(to_remove.to_be_bytes(), &document_sorter_value_buffer)?; documents_deleted += 1; } @@ -564,6 +488,51 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(documents_deleted) } + fn remove_document_from_db( + &mut self, + internal_docid: u32, + external_docid: String, + txn: &heed::RoTxn, + document_sorter_key_buffer: &mut Vec, + document_sorter_value_buffer: &mut Vec, + ) -> Result<()> { + self.replaced_documents_ids.insert(internal_docid); + + // fetch the obkv document + let original_key = BEU32::new(internal_docid); + let base_obkv = self + .index + .documents + .remap_data_type::() + .get(txn, &original_key)? + .ok_or(InternalError::DatabaseMissingEntry { + db_name: db_name::DOCUMENTS, + key: None, + })?; + + // Key is the concatenation of the internal docid and the external one. + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&internal_docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(external_docid.as_bytes()); + // push it as to delete in the original_sorter + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Deletion as u8); + into_del_add_obkv(KvReaderU16::new(base_obkv), true, false, document_sorter_value_buffer)?; + self.original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; + + // flatten it and push it as to delete in the flattened_sorter + let flattened_obkv = KvReader::new(base_obkv); + if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { + // we recreate our buffer with the flattened documents + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Deletion as u8); + into_del_add_obkv(KvReaderU16::new(&obkv), true, false, document_sorter_value_buffer)?; + } + self.flattened_sorter + .insert(internal_docid.to_be_bytes(), &document_sorter_value_buffer)?; + Ok(()) + } + // Flatten a document from the fields ids map contained in self and insert the new // created fields. Returns `None` if the document doesn't need to be flattened. fn flatten_from_fields_ids_map(&mut self, obkv: KvReader) -> Result>> {