diff --git a/src/update/index_documents/merge_function.rs b/src/update/index_documents/merge_function.rs index 941b17536..7f91b6716 100644 --- a/src/update/index_documents/merge_function.rs +++ b/src/update/index_documents/merge_function.rs @@ -70,7 +70,7 @@ pub fn documents_merge(key: &[u8], _values: &[Cow<[u8]>]) -> anyhow::Result) { +pub fn merge_two_obkvs(base: obkv::KvReader, update: obkv::KvReader, buffer: &mut Vec) { use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index 807147747..10fd83f36 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -491,7 +491,7 @@ mod tests { use heed::EnvOpenOptions; #[test] - fn simple_replacement() { + fn simple_document_replacement() { let path = tempfile::tempdir().unwrap(); let mut options = EnvOpenOptions::new(); options.map_size(10 * 1024 * 1024); // 10 MB @@ -516,7 +516,7 @@ mod tests { IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); - // Check that there is **always*** 3 documents. + // Check that there is **always** 3 documents. let rtxn = index.read_txn().unwrap(); let count = index.number_of_documents(&rtxn).unwrap(); assert_eq!(count, 3); @@ -528,10 +528,73 @@ mod tests { IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); - // Check that there is **always*** 3 documents. + // Check that there is **always** 3 documents. let rtxn = index.read_txn().unwrap(); let count = index.number_of_documents(&rtxn).unwrap(); assert_eq!(count, 3); drop(rtxn); } + + #[test] + fn simple_document_merge() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + + let index = Index::new(options, &path).unwrap(); + + // First we send 3 documents with duplicate ids and + // change the index method to merge documents. + let mut wtxn = index.write_txn().unwrap(); + let content = &b"id,name\n1,kevin\n1,kevina\n1,benoit\n"[..]; + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); + builder.execute(content, |_, _| ()).unwrap(); + wtxn.commit().unwrap(); + + // Check that there is only 1 document now. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 1); + + // Check that we get only one document from the database. + let docs = index.documents(&rtxn, Some(0)).unwrap(); + assert_eq!(docs.len(), 1); + let (id, doc) = docs[0]; + assert_eq!(id, 0); + + // Check that this document is equal to the last one sent. + let mut doc_iter = doc.iter(); + assert_eq!(doc_iter.next(), Some((0, &br#""1""#[..]))); + assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..]))); + assert_eq!(doc_iter.next(), None); + drop(rtxn); + + // Second we send 1 document with id 1, to force it to be merged with the previous one. + let mut wtxn = index.write_txn().unwrap(); + let content = &b"id,age\n1,25\n"[..]; + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); + builder.execute(content, |_, _| ()).unwrap(); + wtxn.commit().unwrap(); + + // Check that there is **always** 1 document. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 1); + + // Check that we get only one document from the database. + let docs = index.documents(&rtxn, Some(0)).unwrap(); + assert_eq!(docs.len(), 1); + let (id, doc) = docs[0]; + assert_eq!(id, 0); + + // Check that this document is equal to the last one sent. + let mut doc_iter = doc.iter(); + assert_eq!(doc_iter.next(), Some((0, &br#""1""#[..]))); + assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..]))); + assert_eq!(doc_iter.next(), Some((2, &br#""25""#[..]))); + assert_eq!(doc_iter.next(), None); + drop(rtxn); + } } diff --git a/src/update/index_documents/transform.rs b/src/update/index_documents/transform.rs index 0c9425ffb..b6ec88879 100644 --- a/src/update/index_documents/transform.rs +++ b/src/update/index_documents/transform.rs @@ -10,7 +10,7 @@ use roaring::RoaringBitmap; use crate::{BEU32, Index, FieldsIdsMap}; use crate::update::AvailableDocumentsIds; -use super::merge_function::merge_two_obkv; +use super::merge_function::merge_two_obkvs; use super::{create_writer, create_sorter, IndexDocumentsMethod}; pub struct TransformOutput { @@ -58,14 +58,14 @@ impl Transform<'_, '_> { fields_ids.push(id); } - /// The last value associated with an id is kept. - fn merge_last_win(_key: &[u8], vals: &[Cow<[u8]>]) -> anyhow::Result> { - vals.last().context("no last value").map(|last| last.clone().into_owned()) + /// Only the last value associated with an id is kept. + fn keep_latest_obkv(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result> { + obkvs.last().context("no last value").map(|last| last.clone().into_owned()) } // We initialize the sorter with the user indexing settings. let mut sorter = create_sorter( - merge_last_win, + keep_latest_obkv, self.chunk_compression_type, self.chunk_compression_level, self.chunk_fusing_shrink_size, @@ -132,7 +132,7 @@ impl Transform<'_, '_> { let base_obkv = self.index.documents.get(&self.rtxn, &key)? .context("document not found")?; let update_obkv = obkv::KvReader::new(update_obkv); - merge_two_obkv(base_obkv, update_obkv, &mut obkv_buffer); + merge_two_obkvs(base_obkv, update_obkv, &mut obkv_buffer); (docid, obkv_buffer.as_slice()) } }