mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-25 19:45:05 +08:00
Remove useless Transform methods
This commit is contained in:
parent
41dbdd2d18
commit
670aff5553
@ -5,27 +5,16 @@ mod parallel;
|
|||||||
mod transform;
|
mod transform;
|
||||||
mod typed_chunk;
|
mod typed_chunk;
|
||||||
|
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::HashSet;
|
||||||
use std::io::{Read, Seek};
|
|
||||||
use std::iter;
|
|
||||||
use std::num::NonZeroU32;
|
use std::num::NonZeroU32;
|
||||||
use std::result::Result as StdResult;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use crossbeam_channel::{Receiver, Sender};
|
use grenad::Merger;
|
||||||
use grenad::{Merger, MergerBuilder};
|
|
||||||
use heed::types::Str;
|
use heed::types::Str;
|
||||||
use heed::Database;
|
use heed::Database;
|
||||||
use rand::SeedableRng;
|
|
||||||
use rayon::iter::{ParallelBridge, ParallelIterator};
|
|
||||||
use rhai::{Dynamic, Engine, OptimizationLevel, Scope};
|
|
||||||
use roaring::RoaringBitmap;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use slice_group_by::GroupBy;
|
use slice_group_by::GroupBy;
|
||||||
use tracing::debug;
|
use typed_chunk::TypedChunk;
|
||||||
use typed_chunk::{write_typed_chunk_into_index, ChunkAccumulator, TypedChunk};
|
|
||||||
|
|
||||||
use self::enrich::enrich_documents_batch;
|
|
||||||
pub use self::enrich::{extract_finite_float_from_value, DocumentId};
|
pub use self::enrich::{extract_finite_float_from_value, DocumentId};
|
||||||
pub use self::helpers::*;
|
pub use self::helpers::*;
|
||||||
pub use self::transform::{Transform, TransformOutput};
|
pub use self::transform::{Transform, TransformOutput};
|
||||||
@ -128,602 +117,11 @@ where
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adds a batch of documents to the current builder.
|
|
||||||
///
|
|
||||||
/// Since the documents are progressively added to the writer, a failure will cause only
|
|
||||||
/// return an error and not the `IndexDocuments` struct as it is invalid to use it afterward.
|
|
||||||
///
|
|
||||||
/// Returns the number of documents added to the builder.
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
|
|
||||||
pub fn add_documents<R: Read + Seek>(
|
|
||||||
mut self,
|
|
||||||
reader: DocumentsBatchReader<R>,
|
|
||||||
) -> Result<(Self, StdResult<u64, UserError>)> {
|
|
||||||
// Early return when there is no document to add
|
|
||||||
if reader.is_empty() {
|
|
||||||
return Ok((self, Ok(0)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// We check for user errors in this validator and if there is one, we can return
|
|
||||||
// the `IndexDocument` struct as it is valid to send more documents into it.
|
|
||||||
// However, if there is an internal error we throw it away!
|
|
||||||
let enriched_documents_reader = match enrich_documents_batch(
|
|
||||||
self.wtxn,
|
|
||||||
self.index,
|
|
||||||
self.config.autogenerate_docids,
|
|
||||||
reader,
|
|
||||||
)? {
|
|
||||||
Ok(reader) => reader,
|
|
||||||
Err(user_error) => return Ok((self, Err(user_error))),
|
|
||||||
};
|
|
||||||
|
|
||||||
let indexed_documents =
|
|
||||||
self.transform.as_mut().expect("Invalid document addition state").read_documents(
|
|
||||||
enriched_documents_reader,
|
|
||||||
self.wtxn,
|
|
||||||
&self.progress,
|
|
||||||
&self.should_abort,
|
|
||||||
)? as u64;
|
|
||||||
|
|
||||||
self.added_documents += indexed_documents;
|
|
||||||
|
|
||||||
Ok((self, Ok(indexed_documents)))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
|
|
||||||
pub fn edit_documents(
|
|
||||||
self,
|
|
||||||
documents: &RoaringBitmap,
|
|
||||||
context: Option<Object>,
|
|
||||||
code: &str,
|
|
||||||
) -> Result<(Self, StdResult<(u64, u64), UserError>)> {
|
|
||||||
// Early return when there is no document to edit
|
|
||||||
if documents.is_empty() {
|
|
||||||
return Ok((self, Ok((0, 0))));
|
|
||||||
}
|
|
||||||
|
|
||||||
fn rhaimap_to_object(map: rhai::Map) -> Object {
|
|
||||||
let mut output = Object::new();
|
|
||||||
for (key, value) in map {
|
|
||||||
let value = serde_json::to_value(&value).unwrap();
|
|
||||||
output.insert(key.into(), value);
|
|
||||||
}
|
|
||||||
output
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setup the security and limits of the Engine
|
|
||||||
let mut engine = Engine::new();
|
|
||||||
engine.set_optimization_level(OptimizationLevel::Full);
|
|
||||||
engine.set_max_call_levels(1000);
|
|
||||||
// It is an arbitrary value. We need to let users define this in the settings.
|
|
||||||
engine.set_max_operations(1_000_000);
|
|
||||||
engine.set_max_variables(1000);
|
|
||||||
engine.set_max_functions(30);
|
|
||||||
engine.set_max_expr_depths(100, 1000);
|
|
||||||
engine.set_max_string_size(1024 * 1024 * 1024); // 1 GiB
|
|
||||||
engine.set_max_array_size(10_000);
|
|
||||||
engine.set_max_map_size(10_000);
|
|
||||||
|
|
||||||
let ast = engine.compile(code).map_err(UserError::DocumentEditionCompilationError)?;
|
|
||||||
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
|
|
||||||
let primary_key = self.index.primary_key(self.wtxn)?.unwrap();
|
|
||||||
let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?;
|
|
||||||
let mut documents_to_remove = RoaringBitmap::new();
|
|
||||||
|
|
||||||
let context: Option<Dynamic> = match context {
|
|
||||||
Some(context) => {
|
|
||||||
Some(serde_json::from_value(context.into()).map_err(InternalError::SerdeJson)?)
|
|
||||||
}
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
enum DocumentEdition {
|
|
||||||
Deleted(crate::DocumentId),
|
|
||||||
Edited(Object),
|
|
||||||
Nothing,
|
|
||||||
}
|
|
||||||
|
|
||||||
let immutable_obkvs = ImmutableObkvs::new(
|
|
||||||
self.wtxn,
|
|
||||||
self.index.documents,
|
|
||||||
fields_ids_map.clone(),
|
|
||||||
documents.clone(),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let processing = documents.into_iter().par_bridge().map(|docid| {
|
|
||||||
// safety: Both documents *must* exists in the database as
|
|
||||||
// their IDs comes from the list of documents ids.
|
|
||||||
let rhai_document = immutable_obkvs.rhai_map(docid)?.unwrap();
|
|
||||||
let json_document = immutable_obkvs.json_map(docid)?.unwrap();
|
|
||||||
let document_id = &json_document[primary_key];
|
|
||||||
|
|
||||||
let mut scope = Scope::new();
|
|
||||||
if let Some(context) = context.as_ref().cloned() {
|
|
||||||
scope.push_constant_dynamic("context", context.clone());
|
|
||||||
}
|
|
||||||
scope.push("doc", rhai_document);
|
|
||||||
// That's were the magic happens. We run the user script
|
|
||||||
// which edits "doc" scope variable reprensenting the document
|
|
||||||
// and ignore the output and even the type of it, i.e., Dynamic.
|
|
||||||
let _ = engine
|
|
||||||
.eval_ast_with_scope::<Dynamic>(&mut scope, &ast)
|
|
||||||
.map_err(UserError::DocumentEditionRuntimeError)?;
|
|
||||||
|
|
||||||
match scope.remove::<Dynamic>("doc") {
|
|
||||||
// If the "doc" variable has set to (), we effectively delete the document.
|
|
||||||
Some(doc) if doc.is_unit() => Ok(DocumentEdition::Deleted(docid)),
|
|
||||||
None => unreachable!("missing doc variable from the Rhai scope"),
|
|
||||||
Some(document) => match document.try_cast() {
|
|
||||||
Some(document) => {
|
|
||||||
let new_document = rhaimap_to_object(document);
|
|
||||||
// Note: This condition is not perfect. Sometimes it detect changes
|
|
||||||
// like with floating points numbers and consider updating
|
|
||||||
// the document even if nothing actually changed.
|
|
||||||
if json_document != new_document {
|
|
||||||
if Some(document_id) != new_document.get(primary_key) {
|
|
||||||
Err(Error::UserError(
|
|
||||||
UserError::DocumentEditionCannotModifyPrimaryKey,
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
Ok(DocumentEdition::Edited(new_document))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Ok(DocumentEdition::Nothing)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => Err(Error::UserError(UserError::DocumentEditionDocumentMustBeObject)),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
rayon_par_bridge::par_bridge(100, processing, |iterator| {
|
|
||||||
for result in iterator {
|
|
||||||
if (self.should_abort)() {
|
|
||||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
|
||||||
}
|
|
||||||
|
|
||||||
match result? {
|
|
||||||
DocumentEdition::Deleted(docid) => {
|
|
||||||
documents_to_remove.insert(docid);
|
|
||||||
}
|
|
||||||
DocumentEdition::Edited(new_document) => {
|
|
||||||
documents_batch_builder.append_json_object(&new_document)?;
|
|
||||||
}
|
|
||||||
DocumentEdition::Nothing => (),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let file = documents_batch_builder.into_inner()?;
|
|
||||||
let reader = DocumentsBatchReader::from_reader(file)?;
|
|
||||||
|
|
||||||
let (this, removed) = self.remove_documents_from_db_no_batch(&documents_to_remove)?;
|
|
||||||
let (this, result) = this.add_documents(reader)?;
|
|
||||||
|
|
||||||
Ok((this, result.map(|added| (removed, added))))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self {
|
pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self {
|
||||||
self.embedders = embedders;
|
self.embedders = embedders;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a batch of documents from the current builder.
|
|
||||||
///
|
|
||||||
/// Returns the number of documents deleted from the builder.
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
|
|
||||||
pub fn remove_documents(
|
|
||||||
mut self,
|
|
||||||
to_delete: Vec<String>,
|
|
||||||
) -> Result<(Self, StdResult<u64, UserError>)> {
|
|
||||||
// Early return when there is no document to add
|
|
||||||
if to_delete.is_empty() {
|
|
||||||
// Maintains Invariant: remove documents actually always returns Ok for the inner result
|
|
||||||
return Ok((self, Ok(0)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let deleted_documents = self
|
|
||||||
.transform
|
|
||||||
.as_mut()
|
|
||||||
.expect("Invalid document deletion state")
|
|
||||||
.remove_documents(to_delete, self.wtxn, &self.should_abort)?
|
|
||||||
as u64;
|
|
||||||
|
|
||||||
self.deleted_documents += deleted_documents;
|
|
||||||
|
|
||||||
// Maintains Invariant: remove documents actually always returns Ok for the inner result
|
|
||||||
Ok((self, Ok(deleted_documents)))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Removes documents from db using their internal document ids.
|
|
||||||
///
|
|
||||||
/// # Warning
|
|
||||||
///
|
|
||||||
/// This function is dangerous and will only work correctly if:
|
|
||||||
///
|
|
||||||
/// - All the passed ids currently exist in the database
|
|
||||||
/// - No batching using the standards `remove_documents` and `add_documents` took place
|
|
||||||
///
|
|
||||||
/// TODO: make it impossible to call `remove_documents` or `add_documents` on an instance that calls this function.
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::details")]
|
|
||||||
pub fn remove_documents_from_db_no_batch(
|
|
||||||
mut self,
|
|
||||||
to_delete: &RoaringBitmap,
|
|
||||||
) -> Result<(Self, u64)> {
|
|
||||||
// Early return when there is no document to add
|
|
||||||
if to_delete.is_empty() {
|
|
||||||
return Ok((self, 0));
|
|
||||||
}
|
|
||||||
|
|
||||||
let deleted_documents = self
|
|
||||||
.transform
|
|
||||||
.as_mut()
|
|
||||||
.expect("Invalid document deletion state")
|
|
||||||
.remove_documents_from_db_no_batch(to_delete, self.wtxn, &self.should_abort)?
|
|
||||||
as u64;
|
|
||||||
|
|
||||||
self.deleted_documents += deleted_documents;
|
|
||||||
|
|
||||||
Ok((self, deleted_documents))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(
|
|
||||||
level = "trace"
|
|
||||||
skip_all,
|
|
||||||
target = "indexing::documents",
|
|
||||||
name = "index_documents"
|
|
||||||
)]
|
|
||||||
pub fn execute(mut self) -> Result<DocumentAdditionResult> {
|
|
||||||
if self.added_documents == 0 && self.deleted_documents == 0 {
|
|
||||||
let number_of_documents = self.index.number_of_documents(self.wtxn)?;
|
|
||||||
return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents });
|
|
||||||
}
|
|
||||||
let output = self
|
|
||||||
.transform
|
|
||||||
.take()
|
|
||||||
.expect("Invalid document addition state")
|
|
||||||
.output_from_sorter(self.wtxn, &self.progress)?;
|
|
||||||
|
|
||||||
let indexed_documents = output.documents_count as u64;
|
|
||||||
let number_of_documents = self.execute_raw(output)?;
|
|
||||||
|
|
||||||
Ok(DocumentAdditionResult { indexed_documents, number_of_documents })
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the total number of documents in the index after the update.
|
|
||||||
#[tracing::instrument(
|
|
||||||
level = "trace",
|
|
||||||
skip_all,
|
|
||||||
target = "indexing::details",
|
|
||||||
name = "index_documents_raw"
|
|
||||||
)]
|
|
||||||
pub fn execute_raw(self, output: TransformOutput) -> Result<u64>
|
|
||||||
where
|
|
||||||
FP: Fn(UpdateIndexingStep) + Sync,
|
|
||||||
FA: Fn() -> bool + Sync,
|
|
||||||
{
|
|
||||||
let TransformOutput {
|
|
||||||
primary_key,
|
|
||||||
mut settings_diff,
|
|
||||||
field_distribution,
|
|
||||||
documents_count,
|
|
||||||
original_documents,
|
|
||||||
flattened_documents,
|
|
||||||
} = output;
|
|
||||||
|
|
||||||
// update the internal facet and searchable list,
|
|
||||||
// because they might have changed due to the nested documents flattening.
|
|
||||||
settings_diff.new.recompute_facets(self.wtxn, self.index)?;
|
|
||||||
settings_diff.new.recompute_searchables(self.wtxn, self.index)?;
|
|
||||||
|
|
||||||
let settings_diff = Arc::new(settings_diff);
|
|
||||||
let embedders_configs = Arc::new(self.index.embedding_configs(self.wtxn)?);
|
|
||||||
|
|
||||||
let possible_embedding_mistakes =
|
|
||||||
crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution);
|
|
||||||
|
|
||||||
let backup_pool;
|
|
||||||
let pool = match self.indexer_config.thread_pool {
|
|
||||||
Some(ref pool) => pool,
|
|
||||||
None => {
|
|
||||||
// We initialize a backup pool with the default
|
|
||||||
// settings if none have already been set.
|
|
||||||
#[allow(unused_mut)]
|
|
||||||
let mut pool_builder = ThreadPoolNoAbortBuilder::new();
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
{
|
|
||||||
pool_builder = pool_builder.num_threads(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
backup_pool = pool_builder.build()?;
|
|
||||||
&backup_pool
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// create LMDB writer channel
|
|
||||||
let (lmdb_writer_sx, lmdb_writer_rx): (
|
|
||||||
Sender<Result<TypedChunk>>,
|
|
||||||
Receiver<Result<TypedChunk>>,
|
|
||||||
) = crossbeam_channel::unbounded();
|
|
||||||
|
|
||||||
// get the primary key field id
|
|
||||||
let primary_key_id = settings_diff.new.fields_ids_map.id(&primary_key).unwrap();
|
|
||||||
|
|
||||||
let pool_params = GrenadParameters {
|
|
||||||
chunk_compression_type: self.indexer_config.chunk_compression_type,
|
|
||||||
chunk_compression_level: self.indexer_config.chunk_compression_level,
|
|
||||||
max_memory: self.indexer_config.max_memory,
|
|
||||||
max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen.
|
|
||||||
};
|
|
||||||
let documents_chunk_size = match self.indexer_config.documents_chunk_size {
|
|
||||||
Some(chunk_size) => chunk_size,
|
|
||||||
None => {
|
|
||||||
let default_chunk_size = 1024 * 1024 * 4; // 4MiB
|
|
||||||
let min_chunk_size = 1024 * 512; // 512KiB
|
|
||||||
|
|
||||||
// compute the chunk size from the number of available threads and the inputed data size.
|
|
||||||
let total_size = match flattened_documents.as_ref() {
|
|
||||||
Some(flattened_documents) => flattened_documents.metadata().map(|m| m.len()),
|
|
||||||
None => Ok(default_chunk_size as u64),
|
|
||||||
};
|
|
||||||
let current_num_threads = pool.current_num_threads();
|
|
||||||
// if we have more than 2 thread, create a number of chunk equal to 3/4 threads count
|
|
||||||
let chunk_count = if current_num_threads > 2 {
|
|
||||||
(current_num_threads * 3 / 4).max(2)
|
|
||||||
} else {
|
|
||||||
current_num_threads
|
|
||||||
};
|
|
||||||
total_size
|
|
||||||
.map_or(default_chunk_size, |size| (size as usize) / chunk_count)
|
|
||||||
.max(min_chunk_size)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let original_documents = match original_documents {
|
|
||||||
Some(original_documents) => Some(grenad::Reader::new(original_documents)?),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
let flattened_documents = match flattened_documents {
|
|
||||||
Some(flattened_documents) => Some(grenad::Reader::new(flattened_documents)?),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes;
|
|
||||||
|
|
||||||
let mut final_documents_ids = RoaringBitmap::new();
|
|
||||||
let mut databases_seen = 0;
|
|
||||||
let mut word_position_docids = None;
|
|
||||||
let mut word_fid_docids = None;
|
|
||||||
let mut word_docids = None;
|
|
||||||
let mut exact_word_docids = None;
|
|
||||||
let mut chunk_accumulator = ChunkAccumulator::default();
|
|
||||||
let mut dimension = HashMap::new();
|
|
||||||
|
|
||||||
let current_span = tracing::Span::current();
|
|
||||||
|
|
||||||
// Run extraction pipeline in parallel.
|
|
||||||
pool.install(|| {
|
|
||||||
let settings_diff_cloned = settings_diff.clone();
|
|
||||||
rayon::spawn(move || {
|
|
||||||
let child_span = tracing::trace_span!(target: "indexing::details", parent: ¤t_span, "extract_and_send_grenad_chunks");
|
|
||||||
let _enter = child_span.enter();
|
|
||||||
|
|
||||||
// split obkv file into several chunks
|
|
||||||
let original_chunk_iter = match original_documents {
|
|
||||||
Some(original_documents) => {
|
|
||||||
grenad_obkv_into_chunks(original_documents,pool_params,documents_chunk_size).map(either::Left)
|
|
||||||
},
|
|
||||||
None => Ok(either::Right(iter::empty())),
|
|
||||||
};
|
|
||||||
|
|
||||||
// split obkv file into several chunks
|
|
||||||
let flattened_chunk_iter = match flattened_documents {
|
|
||||||
Some(flattened_documents) => {
|
|
||||||
grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size).map(either::Left)
|
|
||||||
},
|
|
||||||
None => Ok(either::Right(iter::empty())),
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = original_chunk_iter.and_then(|original_chunk| {
|
|
||||||
let flattened_chunk = flattened_chunk_iter?;
|
|
||||||
// extract all databases from the chunked obkv douments
|
|
||||||
extract::data_from_obkv_documents(
|
|
||||||
original_chunk,
|
|
||||||
flattened_chunk,
|
|
||||||
pool_params,
|
|
||||||
lmdb_writer_sx.clone(),
|
|
||||||
primary_key_id,
|
|
||||||
embedders_configs.clone(),
|
|
||||||
settings_diff_cloned,
|
|
||||||
max_positions_per_attributes,
|
|
||||||
Arc::new(possible_embedding_mistakes)
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Err(e) = result {
|
|
||||||
let _ = lmdb_writer_sx.send(Err(e));
|
|
||||||
}
|
|
||||||
|
|
||||||
// needs to be dropped to avoid channel waiting lock.
|
|
||||||
drop(lmdb_writer_sx);
|
|
||||||
});
|
|
||||||
|
|
||||||
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
|
|
||||||
databases_seen,
|
|
||||||
total_databases: TOTAL_POSTING_DATABASE_COUNT,
|
|
||||||
});
|
|
||||||
|
|
||||||
loop {
|
|
||||||
if (self.should_abort)() {
|
|
||||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
|
||||||
}
|
|
||||||
|
|
||||||
match lmdb_writer_rx.clone().recv_timeout(std::time::Duration::from_millis(500)) {
|
|
||||||
Err(status) => {
|
|
||||||
if let Some(typed_chunks) = chunk_accumulator.pop_longest() {
|
|
||||||
let (docids, is_merged_database) =
|
|
||||||
write_typed_chunk_into_index(self.wtxn, self.index, &settings_diff, typed_chunks)?;
|
|
||||||
if !docids.is_empty() {
|
|
||||||
final_documents_ids |= docids;
|
|
||||||
let documents_seen_count = final_documents_ids.len();
|
|
||||||
(self.progress)(UpdateIndexingStep::IndexDocuments {
|
|
||||||
documents_seen: documents_seen_count as usize,
|
|
||||||
total_documents: documents_count,
|
|
||||||
});
|
|
||||||
debug!(documents = documents_seen_count, total = documents_count, "Seen");
|
|
||||||
}
|
|
||||||
if is_merged_database {
|
|
||||||
databases_seen += 1;
|
|
||||||
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
|
|
||||||
databases_seen,
|
|
||||||
total_databases: TOTAL_POSTING_DATABASE_COUNT,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
// If no more chunk remains in the chunk accumulator and the channel is disconected, break.
|
|
||||||
} else if status == crossbeam_channel::RecvTimeoutError::Disconnected {
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
rayon::yield_now();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(result) => {
|
|
||||||
let typed_chunk = match result? {
|
|
||||||
TypedChunk::WordDocids {
|
|
||||||
word_docids_reader,
|
|
||||||
exact_word_docids_reader,
|
|
||||||
word_fid_docids_reader,
|
|
||||||
} => {
|
|
||||||
let cloneable_chunk =
|
|
||||||
unsafe { as_cloneable_grenad(&word_docids_reader)? };
|
|
||||||
let word_docids = word_docids.get_or_insert_with(|| {
|
|
||||||
MergerBuilder::new(MergeDeladdCboRoaringBitmaps)
|
|
||||||
});
|
|
||||||
word_docids.push(cloneable_chunk.into_cursor()?);
|
|
||||||
let cloneable_chunk =
|
|
||||||
unsafe { as_cloneable_grenad(&exact_word_docids_reader)? };
|
|
||||||
let exact_word_docids =
|
|
||||||
exact_word_docids.get_or_insert_with(|| {
|
|
||||||
MergerBuilder::new(
|
|
||||||
MergeDeladdCboRoaringBitmaps,
|
|
||||||
)
|
|
||||||
});
|
|
||||||
exact_word_docids.push(cloneable_chunk.into_cursor()?);
|
|
||||||
let cloneable_chunk =
|
|
||||||
unsafe { as_cloneable_grenad(&word_fid_docids_reader)? };
|
|
||||||
let word_fid_docids = word_fid_docids.get_or_insert_with(|| {
|
|
||||||
MergerBuilder::new(MergeDeladdCboRoaringBitmaps)
|
|
||||||
});
|
|
||||||
word_fid_docids.push(cloneable_chunk.into_cursor()?);
|
|
||||||
TypedChunk::WordDocids {
|
|
||||||
word_docids_reader,
|
|
||||||
exact_word_docids_reader,
|
|
||||||
word_fid_docids_reader,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
TypedChunk::WordPositionDocids(chunk) => {
|
|
||||||
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
|
|
||||||
let word_position_docids =
|
|
||||||
word_position_docids.get_or_insert_with(|| {
|
|
||||||
MergerBuilder::new(
|
|
||||||
MergeDeladdCboRoaringBitmaps,
|
|
||||||
)
|
|
||||||
});
|
|
||||||
word_position_docids.push(cloneable_chunk.into_cursor()?);
|
|
||||||
TypedChunk::WordPositionDocids(chunk)
|
|
||||||
}
|
|
||||||
TypedChunk::VectorPoints {
|
|
||||||
expected_dimension,
|
|
||||||
remove_vectors,
|
|
||||||
embeddings,
|
|
||||||
manual_vectors,
|
|
||||||
embedder_name,
|
|
||||||
add_to_user_provided,
|
|
||||||
remove_from_user_provided,
|
|
||||||
} => {
|
|
||||||
dimension.insert(embedder_name.clone(), expected_dimension);
|
|
||||||
TypedChunk::VectorPoints {
|
|
||||||
remove_vectors,
|
|
||||||
embeddings,
|
|
||||||
expected_dimension,
|
|
||||||
manual_vectors,
|
|
||||||
embedder_name,
|
|
||||||
add_to_user_provided,
|
|
||||||
remove_from_user_provided,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
otherwise => otherwise,
|
|
||||||
};
|
|
||||||
|
|
||||||
chunk_accumulator.insert(typed_chunk);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}).map_err(InternalError::from)??;
|
|
||||||
|
|
||||||
// We write the field distribution into the main database
|
|
||||||
self.index.put_field_distribution(self.wtxn, &field_distribution)?;
|
|
||||||
|
|
||||||
// We write the primary key field id into the main database
|
|
||||||
self.index.put_primary_key(self.wtxn, &primary_key)?;
|
|
||||||
let number_of_documents = self.index.number_of_documents(self.wtxn)?;
|
|
||||||
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
|
|
||||||
|
|
||||||
// If an embedder wasn't used in the typedchunk but must be binary quantized
|
|
||||||
// we should insert it in `dimension`
|
|
||||||
for (name, action) in settings_diff.embedding_config_updates.iter() {
|
|
||||||
if action.is_being_quantized && !dimension.contains_key(name.as_str()) {
|
|
||||||
let index = self.index.embedder_category_id.get(self.wtxn, name)?.ok_or(
|
|
||||||
InternalError::DatabaseMissingEntry {
|
|
||||||
db_name: "embedder_category_id",
|
|
||||||
key: None,
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
let reader =
|
|
||||||
ArroyWrapper::new(self.index.vector_arroy, index, action.was_quantized);
|
|
||||||
let dim = reader.dimensions(self.wtxn)?;
|
|
||||||
dimension.insert(name.to_string(), dim);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (embedder_name, dimension) in dimension {
|
|
||||||
let wtxn = &mut *self.wtxn;
|
|
||||||
let vector_arroy = self.index.vector_arroy;
|
|
||||||
let cancel = &self.should_abort;
|
|
||||||
|
|
||||||
let embedder_index = self.index.embedder_category_id.get(wtxn, &embedder_name)?.ok_or(
|
|
||||||
InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None },
|
|
||||||
)?;
|
|
||||||
let embedder_config = settings_diff.embedding_config_updates.get(&embedder_name);
|
|
||||||
let was_quantized = settings_diff
|
|
||||||
.old
|
|
||||||
.embedding_configs
|
|
||||||
.get(&embedder_name)
|
|
||||||
.map_or(false, |conf| conf.2);
|
|
||||||
let is_quantizing = embedder_config.map_or(false, |action| action.is_being_quantized);
|
|
||||||
|
|
||||||
pool.install(|| {
|
|
||||||
let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized);
|
|
||||||
writer.build_and_quantize(wtxn, &mut rng, dimension, is_quantizing, cancel)?;
|
|
||||||
Result::Ok(())
|
|
||||||
})
|
|
||||||
.map_err(InternalError::from)??;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.execute_prefix_databases(
|
|
||||||
word_docids.map(MergerBuilder::build),
|
|
||||||
exact_word_docids.map(MergerBuilder::build),
|
|
||||||
word_position_docids.map(MergerBuilder::build),
|
|
||||||
word_fid_docids.map(MergerBuilder::build),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(number_of_documents)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
level = "trace",
|
level = "trace",
|
||||||
skip_all,
|
skip_all,
|
||||||
|
@ -1,33 +1,29 @@
|
|||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::collections::btree_map::Entry as BEntry;
|
|
||||||
use std::collections::hash_map::Entry as HEntry;
|
|
||||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{Read, Seek};
|
|
||||||
|
|
||||||
use either::Either;
|
use either::Either;
|
||||||
use fxhash::FxHashMap;
|
use fxhash::FxHashMap;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use obkv::{KvReader, KvReaderU16, KvWriter};
|
use obkv::{KvReader, KvWriter};
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use smartstring::SmartString;
|
use smartstring::SmartString;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, create_writer, sorter_into_reader, EitherObkvMerge,
|
create_sorter, sorter_into_reader, EitherObkvMerge, ObkvsKeepLastAdditionMergeDeletions,
|
||||||
ObkvsKeepLastAdditionMergeDeletions, ObkvsMergeAdditionsAndDeletions,
|
ObkvsMergeAdditionsAndDeletions,
|
||||||
};
|
};
|
||||||
use super::{IndexDocumentsMethod, IndexerConfig, KeepFirst};
|
use super::{IndexDocumentsMethod, IndexerConfig, KeepFirst};
|
||||||
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
|
use crate::documents::DocumentsBatchIndex;
|
||||||
use crate::error::{Error, InternalError, UserError};
|
use crate::error::{Error, InternalError, UserError};
|
||||||
use crate::index::{db_name, main_key};
|
use crate::index::{db_name, main_key};
|
||||||
use crate::update::del_add::{
|
use crate::update::del_add::{
|
||||||
into_del_add_obkv, into_del_add_obkv_conditional_operation, DelAdd, DelAddOperation,
|
into_del_add_obkv, into_del_add_obkv_conditional_operation, DelAddOperation,
|
||||||
KvReaderDelAdd,
|
|
||||||
};
|
};
|
||||||
use crate::update::index_documents::GrenadParameters;
|
use crate::update::index_documents::GrenadParameters;
|
||||||
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
|
use crate::update::settings::InnerIndexSettingsDiff;
|
||||||
use crate::update::{AvailableIds, UpdateIndexingStep};
|
use crate::update::AvailableIds;
|
||||||
use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
|
use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
|
||||||
use crate::vector::settings::WriteBackToDocuments;
|
use crate::vector::settings::WriteBackToDocuments;
|
||||||
use crate::vector::ArroyWrapper;
|
use crate::vector::ArroyWrapper;
|
||||||
@ -157,399 +153,6 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
|
|
||||||
pub fn read_documents<R, FP, FA>(
|
|
||||||
&mut self,
|
|
||||||
reader: EnrichedDocumentsBatchReader<R>,
|
|
||||||
wtxn: &mut heed::RwTxn<'_>,
|
|
||||||
progress_callback: FP,
|
|
||||||
should_abort: FA,
|
|
||||||
) -> Result<usize>
|
|
||||||
where
|
|
||||||
R: Read + Seek,
|
|
||||||
FP: Fn(UpdateIndexingStep) + Sync,
|
|
||||||
FA: Fn() -> bool + Sync,
|
|
||||||
{
|
|
||||||
let (mut cursor, fields_index) = reader.into_cursor_and_fields_index();
|
|
||||||
let external_documents_ids = self.index.external_documents_ids();
|
|
||||||
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
|
|
||||||
|
|
||||||
let primary_key = cursor.primary_key().to_string();
|
|
||||||
let primary_key_id =
|
|
||||||
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
|
|
||||||
|
|
||||||
let mut obkv_buffer = Vec::new();
|
|
||||||
let mut document_sorter_value_buffer = Vec::new();
|
|
||||||
let mut document_sorter_key_buffer = Vec::new();
|
|
||||||
let mut documents_count = 0;
|
|
||||||
let mut docid_buffer: Vec<u8> = Vec::new();
|
|
||||||
let mut field_buffer: Vec<(u16, Cow<'_, [u8]>)> = Vec::new();
|
|
||||||
while let Some(enriched_document) = cursor.next_enriched_document()? {
|
|
||||||
let EnrichedDocument { document, document_id } = enriched_document;
|
|
||||||
|
|
||||||
if should_abort() {
|
|
||||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
|
||||||
}
|
|
||||||
|
|
||||||
// drop_and_reuse is called instead of .clear() to communicate to the compiler that field_buffer
|
|
||||||
// does not keep references from the cursor between loop iterations
|
|
||||||
let mut field_buffer_cache = drop_and_reuse(field_buffer);
|
|
||||||
if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) {
|
|
||||||
progress_callback(UpdateIndexingStep::RemapDocumentAddition {
|
|
||||||
documents_seen: documents_count,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// When the document id has been auto-generated by the `enrich_documents_batch`
|
|
||||||
// we must insert this document id into the remaped document.
|
|
||||||
let external_id = document_id.value();
|
|
||||||
if document_id.is_generated() {
|
|
||||||
serde_json::to_writer(&mut docid_buffer, external_id)
|
|
||||||
.map_err(InternalError::SerdeJson)?;
|
|
||||||
field_buffer_cache.push((primary_key_id, Cow::from(&docid_buffer)));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (k, v) in document.iter() {
|
|
||||||
let mapped_id =
|
|
||||||
*mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?;
|
|
||||||
field_buffer_cache.push((mapped_id, Cow::from(v)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insertion in a obkv need to be done with keys ordered. For now they are ordered
|
|
||||||
// according to the document addition key order, so we sort it according to the
|
|
||||||
// fieldids map keys order.
|
|
||||||
field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2));
|
|
||||||
|
|
||||||
// Build the new obkv document.
|
|
||||||
let mut writer = KvWriter::new(&mut obkv_buffer);
|
|
||||||
for (k, v) in field_buffer_cache.iter() {
|
|
||||||
writer.insert(*k, v)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut original_docid = None;
|
|
||||||
let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
|
|
||||||
HEntry::Occupied(entry) => *entry.get() as u32,
|
|
||||||
HEntry::Vacant(entry) => {
|
|
||||||
let docid = match external_documents_ids.get(wtxn, entry.key())? {
|
|
||||||
Some(docid) => {
|
|
||||||
// If it was already in the list of replaced documents it means it was deleted
|
|
||||||
// by the remove_document method. We should starts as if it never existed.
|
|
||||||
if self.replaced_documents_ids.insert(docid) {
|
|
||||||
original_docid = Some(docid);
|
|
||||||
}
|
|
||||||
|
|
||||||
docid
|
|
||||||
}
|
|
||||||
None => self
|
|
||||||
.available_documents_ids
|
|
||||||
.next()
|
|
||||||
.ok_or(UserError::DocumentLimitReached)?,
|
|
||||||
};
|
|
||||||
entry.insert(docid as u64);
|
|
||||||
docid
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut skip_insertion = false;
|
|
||||||
if let Some(original_docid) = original_docid {
|
|
||||||
let original_key = original_docid;
|
|
||||||
let base_obkv = self
|
|
||||||
.index
|
|
||||||
.documents
|
|
||||||
.remap_data_type::<heed::types::Bytes>()
|
|
||||||
.get(wtxn, &original_key)?
|
|
||||||
.ok_or(InternalError::DatabaseMissingEntry {
|
|
||||||
db_name: db_name::DOCUMENTS,
|
|
||||||
key: None,
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// we check if the two documents are exactly equal. If it's the case we can skip this document entirely
|
|
||||||
if base_obkv == obkv_buffer {
|
|
||||||
// we're not replacing anything
|
|
||||||
self.replaced_documents_ids.remove(original_docid);
|
|
||||||
// and we need to put back the original id as it was before
|
|
||||||
self.new_external_documents_ids_builder.remove(external_id);
|
|
||||||
skip_insertion = true;
|
|
||||||
} else {
|
|
||||||
// we associate the base document with the new key, everything will get merged later.
|
|
||||||
let deladd_operation = match self.index_documents_method {
|
|
||||||
IndexDocumentsMethod::UpdateDocuments => {
|
|
||||||
DelAddOperation::DeletionAndAddition
|
|
||||||
}
|
|
||||||
IndexDocumentsMethod::ReplaceDocuments => DelAddOperation::Deletion,
|
|
||||||
};
|
|
||||||
document_sorter_key_buffer.clear();
|
|
||||||
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
|
|
||||||
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
|
|
||||||
document_sorter_value_buffer.clear();
|
|
||||||
document_sorter_value_buffer.push(Operation::Addition as u8);
|
|
||||||
into_del_add_obkv(
|
|
||||||
KvReaderU16::from_slice(base_obkv),
|
|
||||||
deladd_operation,
|
|
||||||
&mut document_sorter_value_buffer,
|
|
||||||
)?;
|
|
||||||
self.original_sorter
|
|
||||||
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
|
||||||
let base_obkv = KvReader::from_slice(base_obkv);
|
|
||||||
if let Some(flattened_obkv) =
|
|
||||||
Self::flatten_from_fields_ids_map(base_obkv, &mut self.fields_ids_map)?
|
|
||||||
{
|
|
||||||
// we recreate our buffer with the flattened documents
|
|
||||||
document_sorter_value_buffer.clear();
|
|
||||||
document_sorter_value_buffer.push(Operation::Addition as u8);
|
|
||||||
into_del_add_obkv(
|
|
||||||
KvReaderU16::from_slice(&flattened_obkv),
|
|
||||||
deladd_operation,
|
|
||||||
&mut document_sorter_value_buffer,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
self.flattened_sorter
|
|
||||||
.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !skip_insertion {
|
|
||||||
self.new_documents_ids.insert(docid);
|
|
||||||
|
|
||||||
document_sorter_key_buffer.clear();
|
|
||||||
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
|
|
||||||
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
|
|
||||||
document_sorter_value_buffer.clear();
|
|
||||||
document_sorter_value_buffer.push(Operation::Addition as u8);
|
|
||||||
into_del_add_obkv(
|
|
||||||
KvReaderU16::from_slice(&obkv_buffer),
|
|
||||||
DelAddOperation::Addition,
|
|
||||||
&mut document_sorter_value_buffer,
|
|
||||||
)?;
|
|
||||||
// We use the extracted/generated user id as the key for this document.
|
|
||||||
self.original_sorter
|
|
||||||
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
|
||||||
|
|
||||||
let flattened_obkv = KvReader::from_slice(&obkv_buffer);
|
|
||||||
if let Some(obkv) =
|
|
||||||
Self::flatten_from_fields_ids_map(flattened_obkv, &mut self.fields_ids_map)?
|
|
||||||
{
|
|
||||||
document_sorter_value_buffer.clear();
|
|
||||||
document_sorter_value_buffer.push(Operation::Addition as u8);
|
|
||||||
into_del_add_obkv(
|
|
||||||
KvReaderU16::from_slice(&obkv),
|
|
||||||
DelAddOperation::Addition,
|
|
||||||
&mut document_sorter_value_buffer,
|
|
||||||
)?
|
|
||||||
}
|
|
||||||
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
|
|
||||||
}
|
|
||||||
documents_count += 1;
|
|
||||||
|
|
||||||
progress_callback(UpdateIndexingStep::RemapDocumentAddition {
|
|
||||||
documents_seen: documents_count,
|
|
||||||
});
|
|
||||||
|
|
||||||
field_buffer = drop_and_reuse(field_buffer_cache);
|
|
||||||
docid_buffer.clear();
|
|
||||||
obkv_buffer.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
progress_callback(UpdateIndexingStep::RemapDocumentAddition {
|
|
||||||
documents_seen: documents_count,
|
|
||||||
});
|
|
||||||
|
|
||||||
self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?;
|
|
||||||
self.index.put_primary_key(wtxn, &primary_key)?;
|
|
||||||
self.documents_count += documents_count;
|
|
||||||
// Now that we have a valid sorter that contains the user id and the obkv we
|
|
||||||
// give it to the last transforming function which returns the TransformOutput.
|
|
||||||
Ok(documents_count)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The counter part of `read_documents` that removes documents either from the transform or the database.
|
|
||||||
/// It can be called before, after or in between two calls of the `read_documents`.
|
|
||||||
///
|
|
||||||
/// It needs to update all the internal datastructure in the transform.
|
|
||||||
/// - If the document is coming from the database -> it's marked as a to_delete document
|
|
||||||
/// - If the document to remove was inserted by the `read_documents` method before AND was present in the db,
|
|
||||||
/// it's marked as `to_delete` + added into the grenad to ensure we don't reinsert it.
|
|
||||||
/// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db,
|
|
||||||
/// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids.
|
|
||||||
/// - If the document to remove was not present in either the db or the transform we do nothing.
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
|
|
||||||
pub fn remove_documents<FA>(
|
|
||||||
&mut self,
|
|
||||||
mut to_remove: Vec<String>,
|
|
||||||
wtxn: &mut heed::RwTxn<'_>,
|
|
||||||
should_abort: FA,
|
|
||||||
) -> Result<usize>
|
|
||||||
where
|
|
||||||
FA: Fn() -> bool + Sync,
|
|
||||||
{
|
|
||||||
// there may be duplicates in the documents to remove.
|
|
||||||
to_remove.sort_unstable();
|
|
||||||
to_remove.dedup();
|
|
||||||
|
|
||||||
let external_documents_ids = self.index.external_documents_ids();
|
|
||||||
|
|
||||||
let mut documents_deleted = 0;
|
|
||||||
let mut document_sorter_value_buffer = Vec::new();
|
|
||||||
let mut document_sorter_key_buffer = Vec::new();
|
|
||||||
for to_remove in to_remove {
|
|
||||||
if should_abort() {
|
|
||||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the document has been added in the current indexing process.
|
|
||||||
let deleted_from_current =
|
|
||||||
match self.new_external_documents_ids_builder.entry((*to_remove).into()) {
|
|
||||||
// if the document was added in a previous iteration of the transform we make it as deleted in the sorters.
|
|
||||||
HEntry::Occupied(entry) => {
|
|
||||||
let docid = *entry.get() as u32;
|
|
||||||
// Key is the concatenation of the internal docid and the external one.
|
|
||||||
document_sorter_key_buffer.clear();
|
|
||||||
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
|
|
||||||
document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes());
|
|
||||||
document_sorter_value_buffer.clear();
|
|
||||||
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
|
||||||
obkv::KvWriterU16::new(&mut document_sorter_value_buffer).finish().unwrap();
|
|
||||||
self.original_sorter
|
|
||||||
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
|
||||||
self.flattened_sorter
|
|
||||||
.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
|
|
||||||
|
|
||||||
// we must NOT update the list of replaced_documents_ids
|
|
||||||
// Either:
|
|
||||||
// 1. It's already in it and there is nothing to do
|
|
||||||
// 2. It wasn't in it because the document was created by a previous batch and since
|
|
||||||
// we're removing it there is nothing to do.
|
|
||||||
self.new_documents_ids.remove(docid);
|
|
||||||
entry.remove_entry();
|
|
||||||
true
|
|
||||||
}
|
|
||||||
HEntry::Vacant(_) => false,
|
|
||||||
};
|
|
||||||
|
|
||||||
// If the document was already in the db we mark it as a `to_delete` document.
|
|
||||||
// Then we push the document in sorters in deletion mode.
|
|
||||||
let deleted_from_db = match external_documents_ids.get(wtxn, &to_remove)? {
|
|
||||||
Some(docid) => {
|
|
||||||
self.remove_document_from_db(
|
|
||||||
docid,
|
|
||||||
to_remove,
|
|
||||||
wtxn,
|
|
||||||
&mut document_sorter_key_buffer,
|
|
||||||
&mut document_sorter_value_buffer,
|
|
||||||
)?;
|
|
||||||
true
|
|
||||||
}
|
|
||||||
None => false,
|
|
||||||
};
|
|
||||||
|
|
||||||
// increase counter only if the document existed somewhere before.
|
|
||||||
if deleted_from_current || deleted_from_db {
|
|
||||||
documents_deleted += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(documents_deleted)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Removes documents from db using their internal document ids.
|
|
||||||
///
|
|
||||||
/// # Warning
|
|
||||||
///
|
|
||||||
/// This function is dangerous and will only work correctly if:
|
|
||||||
///
|
|
||||||
/// - All the passed ids currently exist in the database
|
|
||||||
/// - No batching using the standards `remove_documents` and `add_documents` took place
|
|
||||||
///
|
|
||||||
/// TODO: make it impossible to call `remove_documents` or `add_documents` on an instance that calls this function.
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::details")]
|
|
||||||
pub fn remove_documents_from_db_no_batch<FA>(
|
|
||||||
&mut self,
|
|
||||||
to_remove: &RoaringBitmap,
|
|
||||||
wtxn: &mut heed::RwTxn<'_>,
|
|
||||||
should_abort: FA,
|
|
||||||
) -> Result<usize>
|
|
||||||
where
|
|
||||||
FA: Fn() -> bool + Sync,
|
|
||||||
{
|
|
||||||
let mut documents_deleted = 0;
|
|
||||||
let mut document_sorter_value_buffer = Vec::new();
|
|
||||||
let mut document_sorter_key_buffer = Vec::new();
|
|
||||||
let external_ids = self.index.external_id_of(wtxn, to_remove.iter())?;
|
|
||||||
|
|
||||||
for (internal_docid, external_docid) in to_remove.iter().zip(external_ids) {
|
|
||||||
let external_docid = external_docid?;
|
|
||||||
if should_abort() {
|
|
||||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
|
||||||
}
|
|
||||||
self.remove_document_from_db(
|
|
||||||
internal_docid,
|
|
||||||
external_docid,
|
|
||||||
wtxn,
|
|
||||||
&mut document_sorter_key_buffer,
|
|
||||||
&mut document_sorter_value_buffer,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
documents_deleted += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(documents_deleted)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_document_from_db(
|
|
||||||
&mut self,
|
|
||||||
internal_docid: u32,
|
|
||||||
external_docid: String,
|
|
||||||
txn: &heed::RoTxn<'_>,
|
|
||||||
document_sorter_key_buffer: &mut Vec<u8>,
|
|
||||||
document_sorter_value_buffer: &mut Vec<u8>,
|
|
||||||
) -> Result<()> {
|
|
||||||
self.replaced_documents_ids.insert(internal_docid);
|
|
||||||
|
|
||||||
// fetch the obkv document
|
|
||||||
let original_key = internal_docid;
|
|
||||||
let base_obkv = self
|
|
||||||
.index
|
|
||||||
.documents
|
|
||||||
.remap_data_type::<heed::types::Bytes>()
|
|
||||||
.get(txn, &original_key)?
|
|
||||||
.ok_or(InternalError::DatabaseMissingEntry {
|
|
||||||
db_name: db_name::DOCUMENTS,
|
|
||||||
key: None,
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// Key is the concatenation of the internal docid and the external one.
|
|
||||||
document_sorter_key_buffer.clear();
|
|
||||||
document_sorter_key_buffer.extend_from_slice(&internal_docid.to_be_bytes());
|
|
||||||
document_sorter_key_buffer.extend_from_slice(external_docid.as_bytes());
|
|
||||||
// push it as to delete in the original_sorter
|
|
||||||
document_sorter_value_buffer.clear();
|
|
||||||
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
|
||||||
into_del_add_obkv(
|
|
||||||
KvReaderU16::from_slice(base_obkv),
|
|
||||||
DelAddOperation::Deletion,
|
|
||||||
document_sorter_value_buffer,
|
|
||||||
)?;
|
|
||||||
self.original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
|
||||||
|
|
||||||
// flatten it and push it as to delete in the flattened_sorter
|
|
||||||
let flattened_obkv = KvReader::from_slice(base_obkv);
|
|
||||||
if let Some(obkv) =
|
|
||||||
Self::flatten_from_fields_ids_map(flattened_obkv, &mut self.fields_ids_map)?
|
|
||||||
{
|
|
||||||
// we recreate our buffer with the flattened documents
|
|
||||||
document_sorter_value_buffer.clear();
|
|
||||||
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
|
||||||
into_del_add_obkv(
|
|
||||||
KvReaderU16::from_slice(&obkv),
|
|
||||||
DelAddOperation::Deletion,
|
|
||||||
document_sorter_value_buffer,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
self.flattened_sorter
|
|
||||||
.insert(internal_docid.to_be_bytes(), &document_sorter_value_buffer)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Flatten a document from the fields ids map contained in self and insert the new
|
// Flatten a document from the fields ids map contained in self and insert the new
|
||||||
// created fields. Returns `None` if the document doesn't need to be flattened.
|
// created fields. Returns `None` if the document doesn't need to be flattened.
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
@ -677,167 +280,6 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate the `TransformOutput` based on the given sorter that can be generated from any
|
|
||||||
/// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document
|
|
||||||
/// id for the user side and the value must be an obkv where keys are valid fields ids.
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::transform")]
|
|
||||||
pub(crate) fn output_from_sorter<F>(
|
|
||||||
self,
|
|
||||||
wtxn: &mut heed::RwTxn<'_>,
|
|
||||||
progress_callback: F,
|
|
||||||
) -> Result<TransformOutput>
|
|
||||||
where
|
|
||||||
F: Fn(UpdateIndexingStep) + Sync,
|
|
||||||
{
|
|
||||||
let primary_key = self
|
|
||||||
.index
|
|
||||||
.primary_key(wtxn)?
|
|
||||||
.ok_or(Error::InternalError(InternalError::DatabaseMissingEntry {
|
|
||||||
db_name: db_name::MAIN,
|
|
||||||
key: Some(main_key::PRIMARY_KEY_KEY),
|
|
||||||
}))?
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
// We create a final writer to write the new documents in order from the sorter.
|
|
||||||
let mut writer = create_writer(
|
|
||||||
self.indexer_settings.chunk_compression_type,
|
|
||||||
self.indexer_settings.chunk_compression_level,
|
|
||||||
tempfile::tempfile()?,
|
|
||||||
);
|
|
||||||
|
|
||||||
// To compute the field distribution we need to;
|
|
||||||
// 1. Remove all the deleted documents from the field distribution
|
|
||||||
// 2. Add all the new documents to the field distribution
|
|
||||||
let mut field_distribution = self.index.field_distribution(wtxn)?;
|
|
||||||
|
|
||||||
// Here we are going to do the document count + field distribution + `write_into_stream_writer`
|
|
||||||
let mut iter = self.original_sorter.into_stream_merger_iter()?;
|
|
||||||
// used only for the callback
|
|
||||||
let mut documents_count = 0;
|
|
||||||
|
|
||||||
while let Some((key, val)) = iter.next()? {
|
|
||||||
// skip first byte corresponding to the operation type (Deletion or Addition).
|
|
||||||
let val = &val[1..];
|
|
||||||
|
|
||||||
// send a callback to show at which step we are
|
|
||||||
documents_count += 1;
|
|
||||||
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
|
|
||||||
documents_seen: documents_count,
|
|
||||||
total_documents: self.documents_count,
|
|
||||||
});
|
|
||||||
|
|
||||||
for (key, value) in KvReader::from_slice(val) {
|
|
||||||
let reader = KvReaderDelAdd::from_slice(value);
|
|
||||||
match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) {
|
|
||||||
(None, None) => (),
|
|
||||||
(None, Some(_)) => {
|
|
||||||
// New field
|
|
||||||
let name = self.fields_ids_map.name(key).ok_or(
|
|
||||||
FieldIdMapMissingEntry::FieldId {
|
|
||||||
field_id: key,
|
|
||||||
process: "Computing field distribution in transform.",
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
*field_distribution.entry(name.to_string()).or_insert(0) += 1;
|
|
||||||
}
|
|
||||||
(Some(_), None) => {
|
|
||||||
// Field removed
|
|
||||||
let name = self.fields_ids_map.name(key).ok_or(
|
|
||||||
FieldIdMapMissingEntry::FieldId {
|
|
||||||
field_id: key,
|
|
||||||
process: "Computing field distribution in transform.",
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
match field_distribution.entry(name.to_string()) {
|
|
||||||
BEntry::Vacant(_) => { /* Bug? trying to remove a non-existing field */
|
|
||||||
}
|
|
||||||
BEntry::Occupied(mut entry) => {
|
|
||||||
// attempt to remove one
|
|
||||||
match entry.get_mut().checked_sub(1) {
|
|
||||||
Some(0) => {
|
|
||||||
entry.remove();
|
|
||||||
}
|
|
||||||
Some(new_val) => {
|
|
||||||
*entry.get_mut() = new_val;
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
unreachable!("Attempting to remove a field that wasn't in the field distribution")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(Some(_), Some(_)) => {
|
|
||||||
// Value change, no field distribution change
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
writer.insert(key, val)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut original_documents = writer.into_inner()?;
|
|
||||||
// We then extract the file and reset the seek to be able to read it again.
|
|
||||||
original_documents.rewind()?;
|
|
||||||
|
|
||||||
// We create a final writer to write the new documents in order from the sorter.
|
|
||||||
let mut writer = create_writer(
|
|
||||||
self.indexer_settings.chunk_compression_type,
|
|
||||||
self.indexer_settings.chunk_compression_level,
|
|
||||||
tempfile::tempfile()?,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Once we have written all the documents into the final sorter, we write the nested documents
|
|
||||||
// into this writer.
|
|
||||||
// We get rids of the `Operation` byte and skip the deleted documents as well.
|
|
||||||
let mut iter = self.flattened_sorter.into_stream_merger_iter()?;
|
|
||||||
while let Some((key, val)) = iter.next()? {
|
|
||||||
// skip first byte corresponding to the operation type (Deletion or Addition).
|
|
||||||
let val = &val[1..];
|
|
||||||
writer.insert(key, val)?;
|
|
||||||
}
|
|
||||||
let mut flattened_documents = writer.into_inner()?;
|
|
||||||
flattened_documents.rewind()?;
|
|
||||||
|
|
||||||
let mut new_external_documents_ids_builder: Vec<_> =
|
|
||||||
self.new_external_documents_ids_builder.into_iter().collect();
|
|
||||||
|
|
||||||
new_external_documents_ids_builder
|
|
||||||
.sort_unstable_by(|(left, _), (right, _)| left.cmp(right));
|
|
||||||
let mut fst_new_external_documents_ids_builder = fst::MapBuilder::memory();
|
|
||||||
new_external_documents_ids_builder.into_iter().try_for_each(|(key, value)| {
|
|
||||||
fst_new_external_documents_ids_builder.insert(key, value)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let old_inner_settings = InnerIndexSettings::from_index(self.index, wtxn)?;
|
|
||||||
let fields_ids_map = self.fields_ids_map;
|
|
||||||
let primary_key_id = self.index.primary_key(wtxn)?.and_then(|name| fields_ids_map.id(name));
|
|
||||||
let mut new_inner_settings = old_inner_settings.clone();
|
|
||||||
new_inner_settings.fields_ids_map = fields_ids_map;
|
|
||||||
|
|
||||||
let embedding_config_updates = Default::default();
|
|
||||||
let settings_update_only = false;
|
|
||||||
let settings_diff = InnerIndexSettingsDiff::new(
|
|
||||||
old_inner_settings,
|
|
||||||
new_inner_settings,
|
|
||||||
primary_key_id,
|
|
||||||
embedding_config_updates,
|
|
||||||
settings_update_only,
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(TransformOutput {
|
|
||||||
primary_key,
|
|
||||||
settings_diff,
|
|
||||||
field_distribution,
|
|
||||||
documents_count: self.documents_count,
|
|
||||||
original_documents: Some(
|
|
||||||
original_documents.into_inner().map_err(|err| err.into_error())?,
|
|
||||||
),
|
|
||||||
flattened_documents: Some(
|
|
||||||
flattened_documents.into_inner().map_err(|err| err.into_error())?,
|
|
||||||
),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Rebind the field_ids of the provided document to their values
|
/// Rebind the field_ids of the provided document to their values
|
||||||
/// based on the field_ids_maps difference between the old and the new settings,
|
/// based on the field_ids_maps difference between the old and the new settings,
|
||||||
/// then fill the provided buffers with delta documents using KvWritterDelAdd.
|
/// then fill the provided buffers with delta documents using KvWritterDelAdd.
|
||||||
@ -1145,6 +587,7 @@ fn drop_and_reuse<U, T>(mut vec: Vec<U>) -> Vec<T> {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use grenad::MergeFunction;
|
use grenad::MergeFunction;
|
||||||
|
use obkv::KvReaderU16;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user