diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 2bd20b6e8..5ce658dd9 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -30,7 +30,9 @@ use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::heed::CompactionOption; -use meilisearch_types::milli::update::new::indexer::{self, retrieve_or_guess_primary_key}; +use meilisearch_types::milli::update::new::indexer::{ + self, retrieve_or_guess_primary_key, UpdateByFunction, +}; use meilisearch_types::milli::update::{ IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings, }; @@ -1392,7 +1394,7 @@ impl IndexScheduler { Ok(tasks) } IndexOperation::DocumentEdition { mut task, .. } => { - let (filter, context, function) = + let (filter, context, code) = if let KindWithContent::DocumentEdition { filter_expr, context, function, .. } = &task.kind @@ -1401,52 +1403,84 @@ impl IndexScheduler { } else { unreachable!() }; - let result_count = edit_documents_by_function( - index_wtxn, - filter, - context.clone(), - function, - self.index_mapper.indexer_config(), - self.must_stop_processing.clone(), - index, - ); - let (original_filter, context, function) = if let Some(Details::DocumentEdition { - original_filter, - context, - function, - .. - }) = task.details - { - (original_filter, context, function) - } else { - // In the case of a `documentEdition` the details MUST be set - unreachable!(); + + let candidates = match filter.as_ref().map(Filter::from_json) { + Some(Ok(Some(filter))) => { + filter.evaluate(index_wtxn, index).map_err(|err| match err { + milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { + Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter) + } + e => e.into(), + })? + } + None | Some(Ok(None)) => index.documents_ids(index_wtxn)?, + Some(Err(e)) => return Err(e.into()), }; - match result_count { - Ok((deleted_documents, edited_documents)) => { - task.status = Status::Succeeded; - task.details = Some(Details::DocumentEdition { - original_filter, - context, - function, - deleted_documents: Some(deleted_documents), - edited_documents: Some(edited_documents), - }); - } - Err(e) => { - task.status = Status::Failed; - task.details = Some(Details::DocumentEdition { - original_filter, - context, - function, - deleted_documents: Some(0), - edited_documents: Some(0), - }); - task.error = Some(e.into()); - } + let rtxn = index.read_txn()?; + let db_fields_ids_map = index.fields_ids_map(&rtxn)?; + let mut new_fields_ids_map = db_fields_ids_map.clone(); + let primary_key = + retrieve_or_guess_primary_key(&rtxn, index, &mut new_fields_ids_map, None)? + .unwrap(); + + if task.error.is_none() { + /// TODO create a pool if needed + // let pool = indexer_config.thread_pool.unwrap(); + let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); + + let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone()); + let document_changes = indexer.into_changes(&primary_key)?; + + indexer::index( + index_wtxn, + index, + &db_fields_ids_map, + new_fields_ids_map, + &pool, + &document_changes, + )?; + + // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } + // let (original_filter, context, function) = if let Some(Details::DocumentEdition { + // original_filter, + // context, + // function, + // .. + // }) = task.details + // { + // (original_filter, context, function) + // } else { + // // In the case of a `documentEdition` the details MUST be set + // unreachable!(); + // }; + + // match result_count { + // Ok((deleted_documents, edited_documents)) => { + // task.status = Status::Succeeded; + // task.details = Some(Details::DocumentEdition { + // original_filter, + // context, + // function, + // deleted_documents: Some(deleted_documents), + // edited_documents: Some(edited_documents), + // }); + // } + // Err(e) => { + // task.status = Status::Failed; + // task.details = Some(Details::DocumentEdition { + // original_filter, + // context, + // function, + // deleted_documents: Some(0), + // edited_documents: Some(0), + // }); + // task.error = Some(e.into()); + // } + // } + Ok(vec![task]) } IndexOperation::DocumentDeletion { mut tasks, index_uid: _ } => { diff --git a/milli/src/update/new/indexer/document_changes.rs b/milli/src/update/new/indexer/document_changes.rs index 2967311df..18c7cdf02 100644 --- a/milli/src/update/new/indexer/document_changes.rs +++ b/milli/src/update/new/indexer/document_changes.rs @@ -287,7 +287,7 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload &'doc self, context: &'doc DocumentChangeContext, item: Self::Item, - ) -> Result> where 'pl: 'doc // the payload must survive the process calls + ) -> Result>> where 'pl: 'doc // the payload must survive the process calls ; } @@ -352,8 +352,11 @@ where // Clean up and reuse the document-specific allocator context.doc_alloc.reset(); - let change = - document_changes.item_to_document_change(context, item).map_err(Arc::new)?; + let Some(change) = + document_changes.item_to_document_change(context, item).map_err(Arc::new)? + else { + return Ok(()); + }; let res = extractor.process(change, context).map_err(Arc::new); diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index 99ed4f54c..7744bcf18 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -55,7 +55,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { &'doc self, context: &'doc DocumentChangeContext, docid: Self::Item, - ) -> Result> + ) -> Result>> where 'pl: 'doc, // the payload must survive the process calls { @@ -69,7 +69,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { let external_document_id = external_document_id.to_bump(&context.doc_alloc); - Ok(DocumentChange::Deletion(Deletion::create(docid, external_document_id))) + Ok(Some(DocumentChange::Deletion(Deletion::create(docid, external_document_id)))) } } diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 143244a6b..fcab6773a 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -240,7 +240,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { &'doc self, context: &'doc DocumentChangeContext, item: Self::Item, - ) -> Result> + ) -> Result>> where 'pl: 'doc, { @@ -275,7 +275,7 @@ trait MergeChanges { is_new: bool, doc_alloc: &'doc Bump, operations: &'doc [InnerDocOp], - ) -> Result>; + ) -> Result>>; } struct MergeDocumentForReplacement; @@ -301,7 +301,7 @@ impl MergeChanges for MergeDocumentForReplacement { is_new: bool, doc_alloc: &'doc Bump, operations: &'doc [InnerDocOp], - ) -> Result> { + ) -> Result>> { match operations.last() { Some(InnerDocOp::Addition(DocumentOffset { content })) => { let document = serde_json::from_slice(content).unwrap(); @@ -312,18 +312,27 @@ impl MergeChanges for MergeDocumentForReplacement { let document = DocumentFromVersions::new(Versions::Single(document)); if is_new { - Ok(DocumentChange::Insertion(Insertion::create(docid, external_doc, document))) + Ok(Some(DocumentChange::Insertion(Insertion::create( + docid, + external_doc, + document, + )))) } else { - Ok(DocumentChange::Update(Update::create(docid, external_doc, document, true))) + Ok(Some(DocumentChange::Update(Update::create( + docid, + external_doc, + document, + true, + )))) } } Some(InnerDocOp::Deletion) => { - let deletion = if is_new { - Deletion::create(docid, external_doc) + return if is_new { + let deletion = Deletion::create(docid, external_doc); + Ok(Some(DocumentChange::Deletion(deletion))) } else { - todo!("Do that with Louis") + Ok(None) }; - Ok(DocumentChange::Deletion(deletion)) } None => unreachable!("We must not have empty set of operations on a document"), } @@ -354,7 +363,7 @@ impl MergeChanges for MergeDocumentForUpdates { is_new: bool, doc_alloc: &'doc Bump, operations: &'doc [InnerDocOp], - ) -> Result> { + ) -> Result>> { if operations.is_empty() { unreachable!("We must not have empty set of operations on a document"); } @@ -365,13 +374,12 @@ impl MergeChanges for MergeDocumentForUpdates { let has_deletion = last_deletion.is_some(); if operations.is_empty() { - let deletion = if !is_new { - Deletion::create(docid, external_docid) + return if !is_new { + let deletion = Deletion::create(docid, external_docid); + Ok(Some(DocumentChange::Deletion(deletion))) } else { - todo!("Do that with Louis") + Ok(None) }; - - return Ok(DocumentChange::Deletion(deletion)); } let mut versions = bumpalo::collections::Vec::with_capacity_in(operations.len(), doc_alloc); @@ -401,14 +409,14 @@ impl MergeChanges for MergeDocumentForUpdates { let document = DocumentFromVersions::new(versions); if is_new { - Ok(DocumentChange::Insertion(Insertion::create(docid, external_docid, document))) + Ok(Some(DocumentChange::Insertion(Insertion::create(docid, external_docid, document)))) } else { - Ok(DocumentChange::Update(Update::create( + Ok(Some(DocumentChange::Update(Update::create( docid, external_docid, document, has_deletion, - ))) + )))) } } } diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 4d31f600d..3b528d5e8 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -53,7 +53,7 @@ where &'doc self, context: &'doc DocumentChangeContext, document: Self::Item, - ) -> Result> + ) -> Result>> where 'index: 'doc, { @@ -80,6 +80,6 @@ where let document = DocumentFromVersions::new(Versions::Single(document)); let insertion = Insertion::create(docid, external_document_id, document); - Ok(DocumentChange::Insertion(insertion)) + Ok(Some(DocumentChange::Insertion(insertion))) } } diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index 9bff15b5c..d9b09bd21 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -1,33 +1,236 @@ -use rayon::iter::IntoParallelIterator; +use std::collections::BTreeMap; -use super::document_changes::{DocumentChangeContext, DocumentChanges}; -use crate::Result; +use raw_collections::RawMap; +use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST}; +use roaring::RoaringBitmap; -pub struct UpdateByFunction; +use super::document_changes::{DocumentChangeContext, MostlySend}; +use super::DocumentChanges; +use crate::documents::Error::InvalidDocumentFormat; +use crate::documents::PrimaryKey; +use crate::error::{FieldIdMapMissingEntry, InternalError}; +use crate::update::new::document::DocumentFromVersions; +use crate::update::new::document_change::Versions; +use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, KvWriterFieldId, Update}; +use crate::{all_obkv_to_json, Error, FieldsIdsMap, GlobalFieldsIdsMap, Object, Result, UserError}; + +pub struct UpdateByFunction { + documents: RoaringBitmap, + context: Option, + code: String, +} + +pub struct UpdateByFunctionChanges<'doc> { + primary_key: &'doc PrimaryKey<'doc>, + engine: Engine, + ast: AST, + context: Option, + // It is sad that the RoaringBitmap doesn't + // implement IndexedParallelIterator + documents: Vec, +} impl UpdateByFunction { - pub fn into_changes(self) -> UpdateByFunctionChanges { - UpdateByFunctionChanges + pub fn new(documents: RoaringBitmap, context: Option, code: String) -> Self { + UpdateByFunction { documents, context, code } + } + + pub fn into_changes<'index>( + self, + primary_key: &'index PrimaryKey, + ) -> Result> { + let Self { documents, context, code } = self; + + // Setup the security and limits of the Engine + let mut engine = Engine::new(); + engine.set_optimization_level(OptimizationLevel::Full); + engine.set_max_call_levels(1000); + // It is an arbitrary value. We need to let users define this in the settings. + engine.set_max_operations(1_000_000); + engine.set_max_variables(1000); + engine.set_max_functions(30); + engine.set_max_expr_depths(100, 1000); + engine.set_max_string_size(1024 * 1024 * 1024); // 1 GiB + engine.set_max_array_size(10_000); + engine.set_max_map_size(10_000); + + let ast = engine.compile(code).map_err(UserError::DocumentEditionCompilationError)?; + let context = match context { + Some(context) => { + Some(serde_json::from_value(context.into()).map_err(InternalError::SerdeJson)?) + } + None => None, + }; + + Ok(UpdateByFunctionChanges { + primary_key, + engine, + ast, + context, + documents: documents.into_iter().collect(), + }) } } -pub struct UpdateByFunctionChanges; - -impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges { +impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { type Item = u32; - fn iter(&self) -> impl rayon::prelude::IndexedParallelIterator { - (0..100).into_par_iter() + fn iter(&self) -> impl IndexedParallelIterator { + self.documents.par_iter().copied() } - fn item_to_document_change<'doc, T: super::document_changes::MostlySend + 'doc>( + fn item_to_document_change<'doc, T: MostlySend + 'doc>( &self, - _context: &'doc DocumentChangeContext, - _item: Self::Item, - ) -> Result> + context: &'doc DocumentChangeContext, + docid: Self::Item, + ) -> Result>> where 'index: 'doc, { - todo!() + let DocumentChangeContext { + index, + db_fields_ids_map, + txn, + new_fields_ids_map, + doc_alloc, + .. + } = context; + + // safety: Both documents *must* exists in the database as + // their IDs comes from the list of documents ids. + let document = index.document(txn, docid)?; + let rhai_document = obkv_to_rhaimap(document, db_fields_ids_map)?; + let json_document = all_obkv_to_json(document, db_fields_ids_map)?; + + let document_id = self + .primary_key + .document_id(document, db_fields_ids_map)? + .map_err(|_| InvalidDocumentFormat)?; + + let mut scope = Scope::new(); + if let Some(context) = self.context.as_ref().cloned() { + scope.push_constant_dynamic("context", context.clone()); + } + scope.push("doc", rhai_document); + // That's were the magic happens. We run the user script + // which edits "doc" scope variable reprensenting the document + // and ignore the output and even the type of it, i.e., Dynamic. + let _ = self + .engine + .eval_ast_with_scope::(&mut scope, &self.ast) + .map_err(UserError::DocumentEditionRuntimeError)?; + + match scope.remove::("doc") { + // If the "doc" variable has set to (), we effectively delete the document. + Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(Deletion::create( + docid, + doc_alloc.alloc_str(&document_id), + )))), + None => unreachable!("missing doc variable from the Rhai scope"), + Some(new_document) => match new_document.try_cast() { + Some(new_rhai_document) => { + let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc); + serde_json::to_writer(&mut buffer, &new_rhai_document) + .map_err(InternalError::SerdeJson)?; + let raw_new_doc = serde_json::from_slice(buffer.into_bump_slice()) + .map_err(InternalError::SerdeJson)?; + + // Note: This condition is not perfect. Sometimes it detect changes + // like with floating points numbers and consider updating + // the document even if nothing actually changed. + // + // Future: Use a custom function rhai function to track changes. + // + if json_document != rhaimap_to_object(new_rhai_document) { + let mut global_fields_ids_map = new_fields_ids_map.borrow_mut(); + let new_document_id = self + .primary_key + .extract_fields_and_docid( + raw_new_doc, + &mut *global_fields_ids_map, + doc_alloc, + )? + .to_de(); + + if document_id != new_document_id { + Err(Error::UserError(UserError::DocumentEditionCannotModifyPrimaryKey)) + } else { + let raw_new_doc = RawMap::from_raw_value(raw_new_doc, doc_alloc) + .map_err(InternalError::SerdeJson)?; + let new_doc_version = DocumentFromVersions::new(Versions::Single( + raw_new_doc.into_bump_slice(), + )); + Ok(Some(DocumentChange::Update(Update::create( + docid, + new_document_id, + new_doc_version, + true, // It is like document replacement + )))) + } + } else { + Ok(None) + } + } + None => Err(Error::UserError(UserError::DocumentEditionDocumentMustBeObject)), + }, + } } } + +fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result { + let all_keys = obkv.iter().map(|(k, _v)| k).collect::>(); + let map: Result = all_keys + .iter() + .copied() + .flat_map(|id| obkv.get(id).map(|value| (id, value))) + .map(|(id, value)| { + let name = fields_ids_map.name(id).ok_or(FieldIdMapMissingEntry::FieldId { + field_id: id, + process: "all_obkv_to_rhaimap", + })?; + let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?; + Ok((name.into(), value)) + }) + .collect(); + + map +} + +fn rhaimap_to_object(map: rhai::Map) -> Object { + let mut output = Object::new(); + for (key, value) in map { + let value = serde_json::to_value(&value).unwrap(); + output.insert(key.into(), value); + } + output +} + +fn rhaimap_to_obkv( + map: rhai::Map, + global_fields_ids_map: &mut GlobalFieldsIdsMap, + buffer: &mut Vec, +) -> Result> { + let result: Result> = map + .keys() + .map(|key| { + global_fields_ids_map + .id_or_insert(key) + .ok_or(UserError::AttributeLimitReached) + .map_err(Error::from) + .map(|fid| (fid, key)) + }) + .collect(); + + let ordered_fields = result?; + let mut writer = KvWriterFieldId::memory(); + for (fid, key) in ordered_fields { + let value = map.get(key).unwrap(); + let value = serde_json::to_value(value).unwrap(); + buffer.clear(); + serde_json::to_writer(&mut *buffer, &value).unwrap(); + writer.insert(fid, &buffer)?; + } + + Ok(writer.into_boxed()) +} diff --git a/milli/src/update/new/words_prefix_docids.rs b/milli/src/update/new/words_prefix_docids.rs index 8795fd9a4..d45f6397e 100644 --- a/milli/src/update/new/words_prefix_docids.rs +++ b/milli/src/update/new/words_prefix_docids.rs @@ -1,8 +1,8 @@ use std::collections::HashSet; use hashbrown::HashMap; -use heed::{types::Bytes, RwTxn}; -use heed::{BytesDecode, Database}; +use heed::types::Bytes; +use heed::{BytesDecode, Database, RwTxn}; use roaring::RoaringBitmap; use crate::heed_codec::StrBEU16Codec; @@ -110,7 +110,7 @@ impl WordPrefixIntegerDocids { key_buffer.extend_from_slice(prefix); key_buffer.push(0); key_buffer.extend_from_slice(&pos.to_be_bytes()); - self.prefix_database.put(wtxn, &key_buffer, &docids)?; + self.prefix_database.put(wtxn, &key_buffer, docids)?; } docids.clear(); }