mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 02:27:40 +08:00
Merge #4203
4203: Extract external document docids from docs on deletion by filter r=Kerollmops a=dureuill This fixes some of the performance regression observed on `diff-indexing` when doing delete-by-filter with a filter matching many documents. To delete 19 768 771 documents (hackernews dataset, all documents matching `type = comment`), here are the observed time: |branch (commit sha1sum)|time|speed-down factor (lower is better)| |--|--|--| |`main` (48865470d7
)|1212.885536s (~20min)|x1.0 (baseline)| |`diff-indexing` (523519fdbf
)|5385.550543s (90min)|x4.44| |**`diff-indexing-extract-primary-key`**(f8289cd974
)|2582.323324s (43min) | x2.13| So we're still suffering a speed-down of x2.13, but that's much better than x4.44. --- Changes: - Refactor the logic of PrimaryKey extraction to a struct - Add a trait to abstract the extraction of field id from a name between `DocumentBatch` and `FieldIdMap`. - Add `Index::external_id_of` to get the external ids of a bitmap of internal ids. - Use this new method to add new Transform and Batch methods to remove documents that are known to be from the DB. - Modify delete-by-filter to use the new method Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
commit
72d3fa4898
@ -864,22 +864,12 @@ fn delete_documents_from_ids(index: Index, document_ids_to_delete: Vec<RoaringBi
|
|||||||
|
|
||||||
let indexer_config = IndexerConfig::default();
|
let indexer_config = IndexerConfig::default();
|
||||||
for ids in document_ids_to_delete {
|
for ids in document_ids_to_delete {
|
||||||
let external_documents_ids = index.external_documents_ids();
|
|
||||||
// FIXME: for filters matching a lot of documents, this will allocate a huge vec of external docids (strings).
|
|
||||||
// Since what we have is an iterator, it would be better to delete in chunks
|
|
||||||
let external_to_internal: std::result::Result<Vec<_>, RoaringBitmap> =
|
|
||||||
external_documents_ids
|
|
||||||
.find_external_id_of(&wtxn, ids)
|
|
||||||
.unwrap()
|
|
||||||
.only_external_ids()
|
|
||||||
.collect();
|
|
||||||
let ids = external_to_internal.unwrap();
|
|
||||||
let config = IndexDocumentsConfig::default();
|
let config = IndexDocumentsConfig::default();
|
||||||
|
|
||||||
let mut builder =
|
let mut builder =
|
||||||
IndexDocuments::new(&mut wtxn, &index, &indexer_config, config, |_| (), || false)
|
IndexDocuments::new(&mut wtxn, &index, &indexer_config, config, |_| (), || false)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
(builder, _) = builder.remove_documents(ids).unwrap();
|
(builder, _) = builder.remove_documents_from_db_no_batch(&ids).unwrap();
|
||||||
builder.execute().unwrap();
|
builder.execute().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1534,18 +1534,6 @@ fn delete_document_by_filter<'a>(
|
|||||||
}
|
}
|
||||||
e => e.into(),
|
e => e.into(),
|
||||||
})?;
|
})?;
|
||||||
let external_documents_ids = index.external_documents_ids();
|
|
||||||
// FIXME: for filters matching a lot of documents, this will allocate a huge vec of external docids (strings).
|
|
||||||
// Since what we have is an iterator, it would be better to delete in chunks
|
|
||||||
let external_to_internal: std::result::Result<Vec<_>, RoaringBitmap> =
|
|
||||||
external_documents_ids
|
|
||||||
.find_external_id_of(wtxn, candidates)?
|
|
||||||
.only_external_ids()
|
|
||||||
.collect();
|
|
||||||
let document_ids = match external_to_internal {
|
|
||||||
Ok(external_ids) => external_ids,
|
|
||||||
Err(remaining_ids) => panic!("Couldn't find some external ids {:?}", remaining_ids),
|
|
||||||
};
|
|
||||||
|
|
||||||
let config = IndexDocumentsConfig {
|
let config = IndexDocumentsConfig {
|
||||||
update_method: IndexDocumentsMethod::ReplaceDocuments,
|
update_method: IndexDocumentsMethod::ReplaceDocuments,
|
||||||
@ -1561,13 +1549,10 @@ fn delete_document_by_filter<'a>(
|
|||||||
|| must_stop_processing.get(),
|
|| must_stop_processing.get(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let (new_builder, user_result) = builder.remove_documents(document_ids)?;
|
let (new_builder, count) = builder.remove_documents_from_db_no_batch(&candidates)?;
|
||||||
builder = new_builder;
|
builder = new_builder;
|
||||||
// Uses Invariant: remove documents actually always returns Ok for the inner result
|
|
||||||
let count = user_result.unwrap();
|
|
||||||
|
|
||||||
let _ = builder.execute()?;
|
let _ = builder.execute()?;
|
||||||
|
|
||||||
count
|
count
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
mod builder;
|
mod builder;
|
||||||
mod enriched;
|
mod enriched;
|
||||||
|
mod primary_key;
|
||||||
mod reader;
|
mod reader;
|
||||||
mod serde_impl;
|
mod serde_impl;
|
||||||
|
|
||||||
@ -11,6 +12,7 @@ use bimap::BiHashMap;
|
|||||||
pub use builder::DocumentsBatchBuilder;
|
pub use builder::DocumentsBatchBuilder;
|
||||||
pub use enriched::{EnrichedDocument, EnrichedDocumentsBatchCursor, EnrichedDocumentsBatchReader};
|
pub use enriched::{EnrichedDocument, EnrichedDocumentsBatchCursor, EnrichedDocumentsBatchReader};
|
||||||
use obkv::KvReader;
|
use obkv::KvReader;
|
||||||
|
pub use primary_key::{DocumentIdExtractionError, FieldIdMapper, PrimaryKey, DEFAULT_PRIMARY_KEY};
|
||||||
pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader};
|
pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
@ -87,6 +89,12 @@ impl DocumentsBatchIndex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl FieldIdMapper for DocumentsBatchIndex {
|
||||||
|
fn id(&self, name: &str) -> Option<FieldId> {
|
||||||
|
self.id(name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error("Error parsing number {value:?} at line {line}: {error}")]
|
#[error("Error parsing number {value:?} at line {line}: {error}")]
|
||||||
|
172
milli/src/documents/primary_key.rs
Normal file
172
milli/src/documents/primary_key.rs
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
use std::iter;
|
||||||
|
use std::result::Result as StdResult;
|
||||||
|
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
|
use crate::{FieldId, InternalError, Object, Result, UserError};
|
||||||
|
|
||||||
|
/// The symbol used to define levels in a nested primary key.
|
||||||
|
const PRIMARY_KEY_SPLIT_SYMBOL: char = '.';
|
||||||
|
|
||||||
|
/// The default primary that is used when not specified.
|
||||||
|
pub const DEFAULT_PRIMARY_KEY: &str = "id";
|
||||||
|
|
||||||
|
/// Trait for objects that can map the name of a field to its [`FieldId`].
|
||||||
|
pub trait FieldIdMapper {
|
||||||
|
/// Attempts to map the passed name to its [`FieldId`].
|
||||||
|
///
|
||||||
|
/// `None` if the field with this name was not found.
|
||||||
|
fn id(&self, name: &str) -> Option<FieldId>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A type that represent the type of primary key that has been set
|
||||||
|
/// for this index, a classic flat one or a nested one.
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub enum PrimaryKey<'a> {
|
||||||
|
Flat { name: &'a str, field_id: FieldId },
|
||||||
|
Nested { name: &'a str },
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum DocumentIdExtractionError {
|
||||||
|
InvalidDocumentId(UserError),
|
||||||
|
MissingDocumentId,
|
||||||
|
TooManyDocumentIds(usize),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> PrimaryKey<'a> {
|
||||||
|
pub fn new(path: &'a str, fields: &impl FieldIdMapper) -> Option<Self> {
|
||||||
|
Some(if path.contains(PRIMARY_KEY_SPLIT_SYMBOL) {
|
||||||
|
Self::Nested { name: path }
|
||||||
|
} else {
|
||||||
|
let field_id = fields.id(path)?;
|
||||||
|
Self::Flat { name: path, field_id }
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn name(&self) -> &str {
|
||||||
|
match self {
|
||||||
|
PrimaryKey::Flat { name, .. } => name,
|
||||||
|
PrimaryKey::Nested { name } => name,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn document_id(
|
||||||
|
&self,
|
||||||
|
document: &obkv::KvReader<FieldId>,
|
||||||
|
fields: &impl FieldIdMapper,
|
||||||
|
) -> Result<StdResult<String, DocumentIdExtractionError>> {
|
||||||
|
match self {
|
||||||
|
PrimaryKey::Flat { name: _, field_id } => match document.get(*field_id) {
|
||||||
|
Some(document_id_bytes) => {
|
||||||
|
let document_id = serde_json::from_slice(document_id_bytes)
|
||||||
|
.map_err(InternalError::SerdeJson)?;
|
||||||
|
match validate_document_id_value(document_id)? {
|
||||||
|
Ok(document_id) => Ok(Ok(document_id)),
|
||||||
|
Err(user_error) => {
|
||||||
|
Ok(Err(DocumentIdExtractionError::InvalidDocumentId(user_error)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => Ok(Err(DocumentIdExtractionError::MissingDocumentId)),
|
||||||
|
},
|
||||||
|
nested @ PrimaryKey::Nested { .. } => {
|
||||||
|
let mut matching_documents_ids = Vec::new();
|
||||||
|
for (first_level_name, right) in nested.possible_level_names() {
|
||||||
|
if let Some(field_id) = fields.id(first_level_name) {
|
||||||
|
if let Some(value_bytes) = document.get(field_id) {
|
||||||
|
let object = serde_json::from_slice(value_bytes)
|
||||||
|
.map_err(InternalError::SerdeJson)?;
|
||||||
|
fetch_matching_values(object, right, &mut matching_documents_ids);
|
||||||
|
|
||||||
|
if matching_documents_ids.len() >= 2 {
|
||||||
|
return Ok(Err(DocumentIdExtractionError::TooManyDocumentIds(
|
||||||
|
matching_documents_ids.len(),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match matching_documents_ids.pop() {
|
||||||
|
Some(document_id) => match validate_document_id_value(document_id)? {
|
||||||
|
Ok(document_id) => Ok(Ok(document_id)),
|
||||||
|
Err(user_error) => {
|
||||||
|
Ok(Err(DocumentIdExtractionError::InvalidDocumentId(user_error)))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => Ok(Err(DocumentIdExtractionError::MissingDocumentId)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns an `Iterator` that gives all the possible fields names the primary key
|
||||||
|
/// can have depending of the first level name and depth of the objects.
|
||||||
|
pub fn possible_level_names(&self) -> impl Iterator<Item = (&str, &str)> + '_ {
|
||||||
|
let name = self.name();
|
||||||
|
name.match_indices(PRIMARY_KEY_SPLIT_SYMBOL)
|
||||||
|
.map(move |(i, _)| (&name[..i], &name[i + PRIMARY_KEY_SPLIT_SYMBOL.len_utf8()..]))
|
||||||
|
.chain(iter::once((name, "")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fetch_matching_values(value: Value, selector: &str, output: &mut Vec<Value>) {
|
||||||
|
match value {
|
||||||
|
Value::Object(object) => fetch_matching_values_in_object(object, selector, "", output),
|
||||||
|
otherwise => output.push(otherwise),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fetch_matching_values_in_object(
|
||||||
|
object: Object,
|
||||||
|
selector: &str,
|
||||||
|
base_key: &str,
|
||||||
|
output: &mut Vec<Value>,
|
||||||
|
) {
|
||||||
|
for (key, value) in object {
|
||||||
|
let base_key = if base_key.is_empty() {
|
||||||
|
key.to_string()
|
||||||
|
} else {
|
||||||
|
format!("{}{}{}", base_key, PRIMARY_KEY_SPLIT_SYMBOL, key)
|
||||||
|
};
|
||||||
|
|
||||||
|
if starts_with(selector, &base_key) {
|
||||||
|
match value {
|
||||||
|
Value::Object(object) => {
|
||||||
|
fetch_matching_values_in_object(object, selector, &base_key, output)
|
||||||
|
}
|
||||||
|
value => output.push(value),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn starts_with(selector: &str, key: &str) -> bool {
|
||||||
|
selector.strip_prefix(key).map_or(false, |tail| {
|
||||||
|
tail.chars().next().map(|c| c == PRIMARY_KEY_SPLIT_SYMBOL).unwrap_or(true)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: move to a DocumentId struct
|
||||||
|
|
||||||
|
fn validate_document_id(document_id: &str) -> Option<&str> {
|
||||||
|
if !document_id.is_empty()
|
||||||
|
&& document_id.chars().all(|c| matches!(c, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_'))
|
||||||
|
{
|
||||||
|
Some(document_id)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn validate_document_id_value(document_id: Value) -> Result<StdResult<String, UserError>> {
|
||||||
|
match document_id {
|
||||||
|
Value::String(string) => match validate_document_id(&string) {
|
||||||
|
Some(s) if s.len() == string.len() => Ok(Ok(string)),
|
||||||
|
Some(s) => Ok(Ok(s.to_string())),
|
||||||
|
None => Ok(Err(UserError::InvalidDocumentId { document_id: Value::String(string) })),
|
||||||
|
},
|
||||||
|
Value::Number(number) if number.is_i64() => Ok(Ok(number.to_string())),
|
||||||
|
content => Ok(Err(UserError::InvalidDocumentId { document_id: content })),
|
||||||
|
}
|
||||||
|
}
|
@ -81,6 +81,12 @@ impl Default for FieldsIdsMap {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl crate::documents::FieldIdMapper for FieldsIdsMap {
|
||||||
|
fn id(&self, name: &str) -> Option<FieldId> {
|
||||||
|
self.id(name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -12,6 +12,7 @@ use rstar::RTree;
|
|||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
use crate::distance::NDotProductPoint;
|
use crate::distance::NDotProductPoint;
|
||||||
|
use crate::documents::PrimaryKey;
|
||||||
use crate::error::{InternalError, UserError};
|
use crate::error::{InternalError, UserError};
|
||||||
use crate::fields_ids_map::FieldsIdsMap;
|
use crate::fields_ids_map::FieldsIdsMap;
|
||||||
use crate::heed_codec::facet::{
|
use crate::heed_codec::facet::{
|
||||||
@ -1176,6 +1177,36 @@ impl Index {
|
|||||||
self.iter_documents(rtxn, self.documents_ids(rtxn)?)
|
self.iter_documents(rtxn, self.documents_ids(rtxn)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn external_id_of<'a, 't: 'a>(
|
||||||
|
&'a self,
|
||||||
|
rtxn: &'t RoTxn,
|
||||||
|
ids: impl IntoIterator<Item = DocumentId> + 'a,
|
||||||
|
) -> Result<impl IntoIterator<Item = Result<String>> + 'a> {
|
||||||
|
let fields = self.fields_ids_map(rtxn)?;
|
||||||
|
|
||||||
|
// uses precondition "never called on an empty index"
|
||||||
|
let primary_key = self.primary_key(rtxn)?.ok_or(InternalError::DatabaseMissingEntry {
|
||||||
|
db_name: db_name::MAIN,
|
||||||
|
key: Some(main_key::PRIMARY_KEY_KEY),
|
||||||
|
})?;
|
||||||
|
let primary_key = PrimaryKey::new(primary_key, &fields).ok_or_else(|| {
|
||||||
|
InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldName {
|
||||||
|
field_name: primary_key.to_owned(),
|
||||||
|
process: "external_id_of",
|
||||||
|
})
|
||||||
|
})?;
|
||||||
|
Ok(self.iter_documents(rtxn, ids)?.map(move |entry| -> Result<_> {
|
||||||
|
let (_docid, obkv) = entry?;
|
||||||
|
match primary_key.document_id(&obkv, &fields)? {
|
||||||
|
Ok(document_id) => Ok(document_id),
|
||||||
|
Err(_) => Err(InternalError::DocumentsError(
|
||||||
|
crate::documents::Error::InvalidDocumentFormat,
|
||||||
|
)
|
||||||
|
.into()),
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn facets_distribution<'a>(&'a self, rtxn: &'a RoTxn) -> FacetDistribution<'a> {
|
pub fn facets_distribution<'a>(&'a self, rtxn: &'a RoTxn) -> FacetDistribution<'a> {
|
||||||
FacetDistribution::new(rtxn, self)
|
FacetDistribution::new(rtxn, self)
|
||||||
}
|
}
|
||||||
|
@ -1,20 +1,17 @@
|
|||||||
|
use std::fmt;
|
||||||
use std::io::{BufWriter, Read, Seek};
|
use std::io::{BufWriter, Read, Seek};
|
||||||
use std::result::Result as StdResult;
|
use std::result::Result as StdResult;
|
||||||
use std::{fmt, iter};
|
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader, EnrichedDocumentsBatchReader};
|
use crate::documents::{
|
||||||
|
DocumentIdExtractionError, DocumentsBatchIndex, DocumentsBatchReader,
|
||||||
|
EnrichedDocumentsBatchReader, PrimaryKey, DEFAULT_PRIMARY_KEY,
|
||||||
|
};
|
||||||
use crate::error::{GeoError, InternalError, UserError};
|
use crate::error::{GeoError, InternalError, UserError};
|
||||||
use crate::update::index_documents::{obkv_to_object, writer_into_reader};
|
use crate::update::index_documents::{obkv_to_object, writer_into_reader};
|
||||||
use crate::{FieldId, Index, Object, Result};
|
use crate::{FieldId, Index, Result};
|
||||||
|
|
||||||
/// The symbol used to define levels in a nested primary key.
|
|
||||||
const PRIMARY_KEY_SPLIT_SYMBOL: char = '.';
|
|
||||||
|
|
||||||
/// The default primary that is used when not specified.
|
|
||||||
const DEFAULT_PRIMARY_KEY: &str = "id";
|
|
||||||
|
|
||||||
/// This function validates and enrich the documents by checking that:
|
/// This function validates and enrich the documents by checking that:
|
||||||
/// - we can infer a primary key,
|
/// - we can infer a primary key,
|
||||||
@ -41,14 +38,12 @@ pub fn enrich_documents_batch<R: Read + Seek>(
|
|||||||
// The primary key *field id* that has already been set for this index or the one
|
// The primary key *field id* that has already been set for this index or the one
|
||||||
// we will guess by searching for the first key that contains "id" as a substring.
|
// we will guess by searching for the first key that contains "id" as a substring.
|
||||||
let primary_key = match index.primary_key(rtxn)? {
|
let primary_key = match index.primary_key(rtxn)? {
|
||||||
Some(primary_key) if primary_key.contains(PRIMARY_KEY_SPLIT_SYMBOL) => {
|
Some(primary_key) => match PrimaryKey::new(primary_key, &documents_batch_index) {
|
||||||
PrimaryKey::nested(primary_key)
|
Some(primary_key) => primary_key,
|
||||||
}
|
None if autogenerate_docids => PrimaryKey::Flat {
|
||||||
Some(primary_key) => match documents_batch_index.id(primary_key) {
|
name: primary_key,
|
||||||
Some(id) => PrimaryKey::flat(primary_key, id),
|
field_id: documents_batch_index.insert(primary_key),
|
||||||
None if autogenerate_docids => {
|
},
|
||||||
PrimaryKey::flat(primary_key, documents_batch_index.insert(primary_key))
|
|
||||||
}
|
|
||||||
None => {
|
None => {
|
||||||
return match cursor.next_document()? {
|
return match cursor.next_document()? {
|
||||||
Some(first_document) => Ok(Err(UserError::MissingDocumentId {
|
Some(first_document) => Ok(Err(UserError::MissingDocumentId {
|
||||||
@ -76,14 +71,14 @@ pub fn enrich_documents_batch<R: Read + Seek>(
|
|||||||
});
|
});
|
||||||
|
|
||||||
match guesses.as_slice() {
|
match guesses.as_slice() {
|
||||||
[] if autogenerate_docids => PrimaryKey::flat(
|
[] if autogenerate_docids => PrimaryKey::Flat {
|
||||||
DEFAULT_PRIMARY_KEY,
|
name: DEFAULT_PRIMARY_KEY,
|
||||||
documents_batch_index.insert(DEFAULT_PRIMARY_KEY),
|
field_id: documents_batch_index.insert(DEFAULT_PRIMARY_KEY),
|
||||||
),
|
},
|
||||||
[] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
[] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
||||||
[(field_id, name)] => {
|
[(field_id, name)] => {
|
||||||
log::info!("Primary key was not specified in index. Inferred to '{name}'");
|
log::info!("Primary key was not specified in index. Inferred to '{name}'");
|
||||||
PrimaryKey::flat(name, *field_id)
|
PrimaryKey::Flat { name, field_id: *field_id }
|
||||||
}
|
}
|
||||||
multiple => {
|
multiple => {
|
||||||
return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
|
return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
|
||||||
@ -156,92 +151,24 @@ fn fetch_or_generate_document_id(
|
|||||||
uuid_buffer: &mut [u8; uuid::fmt::Hyphenated::LENGTH],
|
uuid_buffer: &mut [u8; uuid::fmt::Hyphenated::LENGTH],
|
||||||
count: u32,
|
count: u32,
|
||||||
) -> Result<StdResult<DocumentId, UserError>> {
|
) -> Result<StdResult<DocumentId, UserError>> {
|
||||||
match primary_key {
|
Ok(match primary_key.document_id(document, documents_batch_index)? {
|
||||||
PrimaryKey::Flat { name: primary_key, field_id: primary_key_id } => {
|
Ok(document_id) => Ok(DocumentId::Retrieved { value: document_id }),
|
||||||
match document.get(primary_key_id) {
|
Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => Err(user_error),
|
||||||
Some(document_id_bytes) => {
|
Err(DocumentIdExtractionError::MissingDocumentId) if autogenerate_docids => {
|
||||||
let document_id = serde_json::from_slice(document_id_bytes)
|
|
||||||
.map_err(InternalError::SerdeJson)?;
|
|
||||||
match validate_document_id_value(document_id)? {
|
|
||||||
Ok(document_id) => Ok(Ok(DocumentId::retrieved(document_id))),
|
|
||||||
Err(user_error) => Ok(Err(user_error)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None if autogenerate_docids => {
|
|
||||||
let uuid = uuid::Uuid::new_v4().as_hyphenated().encode_lower(uuid_buffer);
|
let uuid = uuid::Uuid::new_v4().as_hyphenated().encode_lower(uuid_buffer);
|
||||||
Ok(Ok(DocumentId::generated(uuid.to_string(), count)))
|
Ok(DocumentId::Generated { value: uuid.to_string(), document_nth: count })
|
||||||
}
|
}
|
||||||
None => Ok(Err(UserError::MissingDocumentId {
|
Err(DocumentIdExtractionError::MissingDocumentId) => Err(UserError::MissingDocumentId {
|
||||||
primary_key: primary_key.to_string(),
|
primary_key: primary_key.name().to_string(),
|
||||||
document: obkv_to_object(document, documents_batch_index)?,
|
document: obkv_to_object(document, documents_batch_index)?,
|
||||||
})),
|
}),
|
||||||
}
|
Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => {
|
||||||
}
|
Err(UserError::TooManyDocumentIds {
|
||||||
nested @ PrimaryKey::Nested { .. } => {
|
primary_key: primary_key.name().to_string(),
|
||||||
let mut matching_documents_ids = Vec::new();
|
|
||||||
for (first_level_name, right) in nested.possible_level_names() {
|
|
||||||
if let Some(field_id) = documents_batch_index.id(first_level_name) {
|
|
||||||
if let Some(value_bytes) = document.get(field_id) {
|
|
||||||
let object = serde_json::from_slice(value_bytes)
|
|
||||||
.map_err(InternalError::SerdeJson)?;
|
|
||||||
fetch_matching_values(object, right, &mut matching_documents_ids);
|
|
||||||
|
|
||||||
if matching_documents_ids.len() >= 2 {
|
|
||||||
return Ok(Err(UserError::TooManyDocumentIds {
|
|
||||||
primary_key: nested.name().to_string(),
|
|
||||||
document: obkv_to_object(document, documents_batch_index)?,
|
document: obkv_to_object(document, documents_batch_index)?,
|
||||||
}));
|
})
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match matching_documents_ids.pop() {
|
|
||||||
Some(document_id) => match validate_document_id_value(document_id)? {
|
|
||||||
Ok(document_id) => Ok(Ok(DocumentId::retrieved(document_id))),
|
|
||||||
Err(user_error) => Ok(Err(user_error)),
|
|
||||||
},
|
|
||||||
None => Ok(Err(UserError::MissingDocumentId {
|
|
||||||
primary_key: nested.name().to_string(),
|
|
||||||
document: obkv_to_object(document, documents_batch_index)?,
|
|
||||||
})),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A type that represent the type of primary key that has been set
|
|
||||||
/// for this index, a classic flat one or a nested one.
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
|
||||||
enum PrimaryKey<'a> {
|
|
||||||
Flat { name: &'a str, field_id: FieldId },
|
|
||||||
Nested { name: &'a str },
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PrimaryKey<'_> {
|
|
||||||
fn flat(name: &str, field_id: FieldId) -> PrimaryKey {
|
|
||||||
PrimaryKey::Flat { name, field_id }
|
|
||||||
}
|
|
||||||
|
|
||||||
fn nested(name: &str) -> PrimaryKey {
|
|
||||||
PrimaryKey::Nested { name }
|
|
||||||
}
|
|
||||||
|
|
||||||
fn name(&self) -> &str {
|
|
||||||
match self {
|
|
||||||
PrimaryKey::Flat { name, .. } => name,
|
|
||||||
PrimaryKey::Nested { name } => name,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns an `Iterator` that gives all the possible fields names the primary key
|
|
||||||
/// can have depending of the first level name and deepnes of the objects.
|
|
||||||
fn possible_level_names(&self) -> impl Iterator<Item = (&str, &str)> + '_ {
|
|
||||||
let name = self.name();
|
|
||||||
name.match_indices(PRIMARY_KEY_SPLIT_SYMBOL)
|
|
||||||
.map(move |(i, _)| (&name[..i], &name[i + PRIMARY_KEY_SPLIT_SYMBOL.len_utf8()..]))
|
|
||||||
.chain(iter::once((name, "")))
|
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A type that represents a document id that has been retrieved from a document or auto-generated.
|
/// A type that represents a document id that has been retrieved from a document or auto-generated.
|
||||||
@ -255,14 +182,6 @@ pub enum DocumentId {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DocumentId {
|
impl DocumentId {
|
||||||
fn retrieved(value: String) -> DocumentId {
|
|
||||||
DocumentId::Retrieved { value }
|
|
||||||
}
|
|
||||||
|
|
||||||
fn generated(value: String, document_nth: u32) -> DocumentId {
|
|
||||||
DocumentId::Generated { value, document_nth }
|
|
||||||
}
|
|
||||||
|
|
||||||
fn debug(&self) -> String {
|
fn debug(&self) -> String {
|
||||||
format!("{:?}", self)
|
format!("{:?}", self)
|
||||||
}
|
}
|
||||||
@ -290,66 +209,6 @@ impl fmt::Debug for DocumentId {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn starts_with(selector: &str, key: &str) -> bool {
|
|
||||||
selector.strip_prefix(key).map_or(false, |tail| {
|
|
||||||
tail.chars().next().map(|c| c == PRIMARY_KEY_SPLIT_SYMBOL).unwrap_or(true)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn fetch_matching_values(value: Value, selector: &str, output: &mut Vec<Value>) {
|
|
||||||
match value {
|
|
||||||
Value::Object(object) => fetch_matching_values_in_object(object, selector, "", output),
|
|
||||||
otherwise => output.push(otherwise),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn fetch_matching_values_in_object(
|
|
||||||
object: Object,
|
|
||||||
selector: &str,
|
|
||||||
base_key: &str,
|
|
||||||
output: &mut Vec<Value>,
|
|
||||||
) {
|
|
||||||
for (key, value) in object {
|
|
||||||
let base_key = if base_key.is_empty() {
|
|
||||||
key.to_string()
|
|
||||||
} else {
|
|
||||||
format!("{}{}{}", base_key, PRIMARY_KEY_SPLIT_SYMBOL, key)
|
|
||||||
};
|
|
||||||
|
|
||||||
if starts_with(selector, &base_key) {
|
|
||||||
match value {
|
|
||||||
Value::Object(object) => {
|
|
||||||
fetch_matching_values_in_object(object, selector, &base_key, output)
|
|
||||||
}
|
|
||||||
value => output.push(value),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn validate_document_id(document_id: &str) -> Option<&str> {
|
|
||||||
if !document_id.is_empty()
|
|
||||||
&& document_id.chars().all(|c| matches!(c, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_'))
|
|
||||||
{
|
|
||||||
Some(document_id)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Parses a Json encoded document id and validate it, returning a user error when it is one.
|
|
||||||
pub fn validate_document_id_value(document_id: Value) -> Result<StdResult<String, UserError>> {
|
|
||||||
match document_id {
|
|
||||||
Value::String(string) => match validate_document_id(&string) {
|
|
||||||
Some(s) if s.len() == string.len() => Ok(Ok(string)),
|
|
||||||
Some(s) => Ok(Ok(s.to_string())),
|
|
||||||
None => Ok(Err(UserError::InvalidDocumentId { document_id: Value::String(string) })),
|
|
||||||
},
|
|
||||||
Value::Number(number) if number.is_i64() => Ok(Ok(number.to_string())),
|
|
||||||
content => Ok(Err(UserError::InvalidDocumentId { document_id: content })),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to extract an `f64` from a JSON `Value` and return the `Value`
|
/// Try to extract an `f64` from a JSON `Value` and return the `Value`
|
||||||
/// in the `Err` variant if it failed.
|
/// in the `Err` variant if it failed.
|
||||||
pub fn extract_finite_float_from_value(value: Value) -> StdResult<f64, Value> {
|
pub fn extract_finite_float_from_value(value: Value) -> StdResult<f64, Value> {
|
||||||
|
@ -20,10 +20,7 @@ use slice_group_by::GroupBy;
|
|||||||
use typed_chunk::{write_typed_chunk_into_index, TypedChunk};
|
use typed_chunk::{write_typed_chunk_into_index, TypedChunk};
|
||||||
|
|
||||||
use self::enrich::enrich_documents_batch;
|
use self::enrich::enrich_documents_batch;
|
||||||
pub use self::enrich::{
|
pub use self::enrich::{extract_finite_float_from_value, validate_geo_from_json, DocumentId};
|
||||||
extract_finite_float_from_value, validate_document_id, validate_document_id_value,
|
|
||||||
validate_geo_from_json, DocumentId,
|
|
||||||
};
|
|
||||||
pub use self::helpers::{
|
pub use self::helpers::{
|
||||||
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
|
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
|
||||||
fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps,
|
fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps,
|
||||||
@ -197,6 +194,39 @@ where
|
|||||||
Ok((self, Ok(deleted_documents)))
|
Ok((self, Ok(deleted_documents)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Removes documents from db using their internal document ids.
|
||||||
|
///
|
||||||
|
/// # Warning
|
||||||
|
///
|
||||||
|
/// This function is dangerous and will only work correctly if:
|
||||||
|
///
|
||||||
|
/// - All the passed ids currently exist in the database
|
||||||
|
/// - No batching using the standards `remove_documents` and `add_documents` took place
|
||||||
|
///
|
||||||
|
/// TODO: make it impossible to call `remove_documents` or `add_documents` on an instance that calls this function.
|
||||||
|
pub fn remove_documents_from_db_no_batch(
|
||||||
|
mut self,
|
||||||
|
to_delete: &RoaringBitmap,
|
||||||
|
) -> Result<(Self, u64)> {
|
||||||
|
puffin::profile_function!();
|
||||||
|
|
||||||
|
// Early return when there is no document to add
|
||||||
|
if to_delete.is_empty() {
|
||||||
|
return Ok((self, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
let deleted_documents = self
|
||||||
|
.transform
|
||||||
|
.as_mut()
|
||||||
|
.expect("Invalid document deletion state")
|
||||||
|
.remove_documents_from_db_no_batch(to_delete, self.wtxn, &self.should_abort)?
|
||||||
|
as u64;
|
||||||
|
|
||||||
|
self.deleted_documents += deleted_documents;
|
||||||
|
|
||||||
|
Ok((self, deleted_documents))
|
||||||
|
}
|
||||||
|
|
||||||
#[logging_timer::time("IndexDocuments::{}")]
|
#[logging_timer::time("IndexDocuments::{}")]
|
||||||
pub fn execute(mut self) -> Result<DocumentAdditionResult> {
|
pub fn execute(mut self) -> Result<DocumentAdditionResult> {
|
||||||
puffin::profile_function!();
|
puffin::profile_function!();
|
||||||
|
@ -421,52 +421,13 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
// Then we push the document in sorters in deletion mode.
|
// Then we push the document in sorters in deletion mode.
|
||||||
let deleted_from_db = match external_documents_ids.get(wtxn, &to_remove)? {
|
let deleted_from_db = match external_documents_ids.get(wtxn, &to_remove)? {
|
||||||
Some(docid) => {
|
Some(docid) => {
|
||||||
self.replaced_documents_ids.insert(docid);
|
self.remove_document_from_db(
|
||||||
|
docid,
|
||||||
// fetch the obkv document
|
to_remove,
|
||||||
let original_key = BEU32::new(docid);
|
wtxn,
|
||||||
let base_obkv = self
|
&mut document_sorter_key_buffer,
|
||||||
.index
|
|
||||||
.documents
|
|
||||||
.remap_data_type::<heed::types::ByteSlice>()
|
|
||||||
.get(wtxn, &original_key)?
|
|
||||||
.ok_or(InternalError::DatabaseMissingEntry {
|
|
||||||
db_name: db_name::DOCUMENTS,
|
|
||||||
key: None,
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// Key is the concatenation of the internal docid and the external one.
|
|
||||||
document_sorter_key_buffer.clear();
|
|
||||||
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
|
|
||||||
document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes());
|
|
||||||
// push it as to delete in the original_sorter
|
|
||||||
document_sorter_value_buffer.clear();
|
|
||||||
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
|
||||||
into_del_add_obkv(
|
|
||||||
KvReaderU16::new(base_obkv),
|
|
||||||
true,
|
|
||||||
false,
|
|
||||||
&mut document_sorter_value_buffer,
|
&mut document_sorter_value_buffer,
|
||||||
)?;
|
)?;
|
||||||
self.original_sorter
|
|
||||||
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
|
||||||
|
|
||||||
// flatten it and push it as to delete in the flattened_sorter
|
|
||||||
let flattened_obkv = KvReader::new(base_obkv);
|
|
||||||
if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
|
|
||||||
// we recreate our buffer with the flattened documents
|
|
||||||
document_sorter_value_buffer.clear();
|
|
||||||
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
|
||||||
into_del_add_obkv(
|
|
||||||
KvReaderU16::new(&obkv),
|
|
||||||
true,
|
|
||||||
false,
|
|
||||||
&mut document_sorter_value_buffer,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
self.flattened_sorter
|
|
||||||
.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
|
|
||||||
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
None => false,
|
None => false,
|
||||||
@ -481,6 +442,97 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
Ok(documents_deleted)
|
Ok(documents_deleted)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Removes documents from db using their internal document ids.
|
||||||
|
///
|
||||||
|
/// # Warning
|
||||||
|
///
|
||||||
|
/// This function is dangerous and will only work correctly if:
|
||||||
|
///
|
||||||
|
/// - All the passed ids currently exist in the database
|
||||||
|
/// - No batching using the standards `remove_documents` and `add_documents` took place
|
||||||
|
///
|
||||||
|
/// TODO: make it impossible to call `remove_documents` or `add_documents` on an instance that calls this function.
|
||||||
|
#[logging_timer::time]
|
||||||
|
pub fn remove_documents_from_db_no_batch<FA>(
|
||||||
|
&mut self,
|
||||||
|
to_remove: &RoaringBitmap,
|
||||||
|
wtxn: &mut heed::RwTxn,
|
||||||
|
should_abort: FA,
|
||||||
|
) -> Result<usize>
|
||||||
|
where
|
||||||
|
FA: Fn() -> bool + Sync,
|
||||||
|
{
|
||||||
|
puffin::profile_function!();
|
||||||
|
|
||||||
|
let mut documents_deleted = 0;
|
||||||
|
let mut document_sorter_value_buffer = Vec::new();
|
||||||
|
let mut document_sorter_key_buffer = Vec::new();
|
||||||
|
let external_ids = self.index.external_id_of(wtxn, to_remove.iter())?;
|
||||||
|
|
||||||
|
for (internal_docid, external_docid) in to_remove.iter().zip(external_ids) {
|
||||||
|
let external_docid = external_docid?;
|
||||||
|
if should_abort() {
|
||||||
|
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
||||||
|
}
|
||||||
|
self.remove_document_from_db(
|
||||||
|
internal_docid,
|
||||||
|
external_docid,
|
||||||
|
wtxn,
|
||||||
|
&mut document_sorter_key_buffer,
|
||||||
|
&mut document_sorter_value_buffer,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
documents_deleted += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(documents_deleted)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_document_from_db(
|
||||||
|
&mut self,
|
||||||
|
internal_docid: u32,
|
||||||
|
external_docid: String,
|
||||||
|
txn: &heed::RoTxn,
|
||||||
|
document_sorter_key_buffer: &mut Vec<u8>,
|
||||||
|
document_sorter_value_buffer: &mut Vec<u8>,
|
||||||
|
) -> Result<()> {
|
||||||
|
self.replaced_documents_ids.insert(internal_docid);
|
||||||
|
|
||||||
|
// fetch the obkv document
|
||||||
|
let original_key = BEU32::new(internal_docid);
|
||||||
|
let base_obkv = self
|
||||||
|
.index
|
||||||
|
.documents
|
||||||
|
.remap_data_type::<heed::types::ByteSlice>()
|
||||||
|
.get(txn, &original_key)?
|
||||||
|
.ok_or(InternalError::DatabaseMissingEntry {
|
||||||
|
db_name: db_name::DOCUMENTS,
|
||||||
|
key: None,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Key is the concatenation of the internal docid and the external one.
|
||||||
|
document_sorter_key_buffer.clear();
|
||||||
|
document_sorter_key_buffer.extend_from_slice(&internal_docid.to_be_bytes());
|
||||||
|
document_sorter_key_buffer.extend_from_slice(external_docid.as_bytes());
|
||||||
|
// push it as to delete in the original_sorter
|
||||||
|
document_sorter_value_buffer.clear();
|
||||||
|
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
||||||
|
into_del_add_obkv(KvReaderU16::new(base_obkv), true, false, document_sorter_value_buffer)?;
|
||||||
|
self.original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
||||||
|
|
||||||
|
// flatten it and push it as to delete in the flattened_sorter
|
||||||
|
let flattened_obkv = KvReader::new(base_obkv);
|
||||||
|
if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
|
||||||
|
// we recreate our buffer with the flattened documents
|
||||||
|
document_sorter_value_buffer.clear();
|
||||||
|
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
||||||
|
into_del_add_obkv(KvReaderU16::new(&obkv), true, false, document_sorter_value_buffer)?;
|
||||||
|
}
|
||||||
|
self.flattened_sorter
|
||||||
|
.insert(internal_docid.to_be_bytes(), &document_sorter_value_buffer)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// Flatten a document from the fields ids map contained in self and insert the new
|
// Flatten a document from the fields ids map contained in self and insert the new
|
||||||
// created fields. Returns `None` if the document doesn't need to be flattened.
|
// created fields. Returns `None` if the document doesn't need to be flattened.
|
||||||
fn flatten_from_fields_ids_map(&mut self, obkv: KvReader<FieldId>) -> Result<Option<Vec<u8>>> {
|
fn flatten_from_fields_ids_map(&mut self, obkv: KvReader<FieldId>) -> Result<Option<Vec<u8>>> {
|
||||||
|
Loading…
Reference in New Issue
Block a user