From 01101d55acf2f3979340d30762b9db737a693219 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Mon, 18 Sep 2023 09:59:38 +0200 Subject: [PATCH] Wip --- Cargo.lock | 1 + milli/Cargo.toml | 1 + milli/src/search/new/tests/sort.rs | 1 + .../extract/extract_docid_word_positions.rs | 19 +-- .../extract/extract_fid_word_count_docids.rs | 67 ++------ .../extract/extract_word_docids.rs | 134 ++++++++++++---- .../extract/extract_word_fid_docids.rs | 2 + .../extract_word_pair_proximity_docids.rs | 145 ++++++++---------- .../extract/extract_word_position_docids.rs | 13 +- .../src/update/index_documents/extract/mod.rs | 33 ++-- .../index_documents/helpers/grenad_helpers.rs | 16 ++ milli/src/update/index_documents/mod.rs | 14 +- .../src/update/index_documents/typed_chunk.rs | 28 +++- 13 files changed, 274 insertions(+), 200 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 652f9daf2..90fc74810 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2704,6 +2704,7 @@ dependencies = [ "logging_timer", "maplit", "md5", + "meili-snap", "memmap2", "mimalloc", "obkv", diff --git a/milli/Cargo.toml b/milli/Cargo.toml index b19b40e85..68bc2d2b5 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -79,6 +79,7 @@ big_s = "1.0.2" insta = "1.29.0" maplit = "1.0.2" md5 = "0.7.0" +meili-snap = { path = "../meili-snap" } rand = { version = "0.8.5", features = ["small_rng"] } [features] diff --git a/milli/src/search/new/tests/sort.rs b/milli/src/search/new/tests/sort.rs index aa6aa971f..8fdf52d44 100644 --- a/milli/src/search/new/tests/sort.rs +++ b/milli/src/search/new/tests/sort.rs @@ -13,6 +13,7 @@ This module tests the `sort` ranking rule: use big_s::S; use maplit::hashset; +use meili_snap::insta; use crate::index::tests::TempIndex; use crate::search::new::tests::collect_field_values; diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index 1c24a0fcf..f99fbbc4e 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -4,11 +4,11 @@ use std::fs::File; use std::{io, mem, str}; use charabia::{Language, Script, SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder}; -use obkv::KvReader; +use obkv::{KvReader, KvWriterU16}; use roaring::RoaringBitmap; use serde_json::Value; -use super::helpers::{concat_u32s_array, create_sorter, sorter_into_reader, GrenadParameters}; +use super::helpers::{create_sorter, keep_latest_obkv, sorter_into_reader, GrenadParameters}; use crate::error::{InternalError, SerializationError}; use crate::update::index_documents::MergeFn; use crate::{ @@ -42,7 +42,7 @@ pub fn extract_docid_word_positions( let mut script_language_docids = HashMap::new(); let mut docid_word_positions_sorter = create_sorter( grenad::SortAlgorithm::Stable, - concat_u32s_array, + keep_latest_obkv, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -155,6 +155,7 @@ fn extract_tokens_from_document( let tokens = process_tokens(tokenizer.tokenize(field)) .take_while(|(p, _)| (*p as u32) < max_positions_per_attributes); + let mut writer = KvWriterU16::memory(); for (index, token) in tokens { // if a language has been detected for the token, we update the counter. if let Some(language) = token.language { @@ -168,17 +169,17 @@ fn extract_tokens_from_document( } let token = token.lemma().trim(); if !token.is_empty() && token.len() <= MAX_WORD_LENGTH { - buffers.key_buffer.truncate(mem::size_of::()); - buffers.key_buffer.extend_from_slice(token.as_bytes()); - let position: u16 = index .try_into() .map_err(|_| SerializationError::InvalidNumberSerialization)?; - let position = absolute_from_relative_position(field_id, position); - docid_word_positions_sorter - .insert(&buffers.key_buffer, position.to_ne_bytes())?; + writer.insert(position, token.as_bytes())?; } } + + let positions = writer.into_inner()?; + buffers.key_buffer.truncate(mem::size_of::()); + buffers.key_buffer.extend_from_slice(&field_id.to_be_bytes()); + docid_word_positions_sorter.insert(&buffers.key_buffer, positions)?; } } } diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index 79cf4c7fe..8ed084fdc 100644 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -1,16 +1,17 @@ -use std::collections::HashMap; use std::fs::File; use std::io; -use grenad::Sorter; +use obkv::KvReaderU16; use super::helpers::{ - create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, - try_split_array_at, GrenadParameters, MergeFn, + create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, + GrenadParameters, }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; -use crate::{relative_from_absolute_position, DocumentId, FieldId, Result}; +use crate::Result; + +const MAX_COUNTED_WORDS: usize = 30; /// Extracts the field id word count and the documents ids where /// this field id with this amount of words appear. @@ -35,63 +36,21 @@ pub fn extract_fid_word_count_docids( max_memory, ); - // This map is assumed to not consume a lot of memory. - let mut document_fid_wordcount = HashMap::new(); - let mut current_document_id = None; - + let mut key_buffer = Vec::new(); let mut cursor = docid_word_positions.into_cursor()?; while let Some((key, value)) = cursor.move_on_next()? { - let (document_id_bytes, _word_bytes) = try_split_array_at(key) + let (document_id_bytes, fid_bytes) = try_split_array_at(key) .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); - let curr_document_id = *current_document_id.get_or_insert(document_id); - if curr_document_id != document_id { - drain_document_fid_wordcount_into_sorter( - &mut fid_word_count_docids_sorter, - &mut document_fid_wordcount, - curr_document_id, - )?; - current_document_id = Some(document_id); - } - - for position in read_u32_ne_bytes(value) { - let (field_id, _) = relative_from_absolute_position(position); - - let value = document_fid_wordcount.entry(field_id as FieldId).or_insert(0); - *value += 1; - } - } - - if let Some(document_id) = current_document_id { - // We must make sure that don't lose the current document field id - // word count map if we break because we reached the end of the chunk. - drain_document_fid_wordcount_into_sorter( - &mut fid_word_count_docids_sorter, - &mut document_fid_wordcount, - document_id, - )?; - } - - sorter_into_reader(fid_word_count_docids_sorter, indexer) -} - -fn drain_document_fid_wordcount_into_sorter( - fid_word_count_docids_sorter: &mut Sorter, - document_fid_wordcount: &mut HashMap, - document_id: DocumentId, -) -> Result<()> { - let mut key_buffer = Vec::new(); - - for (fid, count) in document_fid_wordcount.drain() { - if count <= 30 { + let word_count = KvReaderU16::new(&value).iter().take(MAX_COUNTED_WORDS + 1).count(); + if word_count <= MAX_COUNTED_WORDS { key_buffer.clear(); - key_buffer.extend_from_slice(&fid.to_be_bytes()); - key_buffer.push(count as u8); - + key_buffer.extend_from_slice(fid_bytes); + key_buffer.push(word_count as u8); fid_word_count_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?; } } - Ok(()) + sorter_into_reader(fid_word_count_docids_sorter, indexer) } diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index f1656d024..1860f5fe0 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -1,18 +1,19 @@ -use std::collections::HashSet; +use std::collections::{BTreeSet, HashSet}; use std::fs::File; use std::io; use std::iter::FromIterator; +use obkv::KvReaderU16; use roaring::RoaringBitmap; use super::helpers::{ - create_sorter, merge_roaring_bitmaps, serialize_roaring_bitmap, sorter_into_reader, - try_split_array_at, GrenadParameters, + create_sorter, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, serialize_roaring_bitmap, + sorter_into_reader, try_split_array_at, GrenadParameters, }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; -use crate::update::index_documents::helpers::read_u32_ne_bytes; -use crate::{relative_from_absolute_position, FieldId, Result}; +use crate::update::MergeFn; +use crate::{DocumentId, FieldId, Result}; /// Extracts the word and the documents ids where this word appear. /// @@ -26,7 +27,7 @@ pub fn extract_word_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, exact_attributes: &HashSet, -) -> Result<(grenad::Reader, grenad::Reader)> { +) -> Result<(grenad::Reader, grenad::Reader, grenad::Reader)> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); @@ -37,7 +38,7 @@ pub fn extract_word_docids( indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, - max_memory.map(|x| x / 2), + max_memory.map(|x| x / 3), ); let mut exact_word_docids_sorter = create_sorter( @@ -46,45 +47,116 @@ pub fn extract_word_docids( indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, - max_memory.map(|x| x / 2), + max_memory.map(|x| x / 3), ); + let mut word_fid_docids_sorter = create_sorter( + grenad::SortAlgorithm::Unstable, + merge_roaring_bitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory.map(|x| x / 3), + ); + + let mut current_document_id = None; + let mut fid = 0; + let mut key_buffer = Vec::new(); let mut value_buffer = Vec::new(); + let mut words = BTreeSet::new(); + let mut exact_words = BTreeSet::new(); let mut cursor = docid_word_positions.into_cursor()?; - while let Some((key, positions)) = cursor.move_on_next()? { - let (document_id_bytes, word_bytes) = try_split_array_at(key) + while let Some((key, value)) = cursor.move_on_next()? { + let (document_id_bytes, fid_bytes) = try_split_array_at(key) + .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; + let (fid_bytes, _) = try_split_array_at(key) .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); + fid = u16::from_be_bytes(fid_bytes); - let bitmap = RoaringBitmap::from_iter(Some(document_id)); - serialize_roaring_bitmap(&bitmap, &mut value_buffer)?; + // drain the btreemaps when we change document. + if current_document_id.map_or(false, |id| id != document_id) { + words_into_sorters( + document_id, + fid, + &mut key_buffer, + &mut value_buffer, + &mut exact_words, + &mut words, + &mut exact_word_docids_sorter, + &mut word_docids_sorter, + &mut word_fid_docids_sorter, + )?; + } - // If there are no exact attributes, we do not need to iterate over positions. - if exact_attributes.is_empty() { - word_docids_sorter.insert(word_bytes, &value_buffer)?; + current_document_id = Some(document_id); + + // every words contained in an attribute set to exact must be pushed in the exact_words list. + if exact_attributes.contains(&fid) { + for (_pos, word) in KvReaderU16::new(&value).iter() { + exact_words.insert(word.to_vec()); + } } else { - let mut added_to_exact = false; - let mut added_to_word_docids = false; - for position in read_u32_ne_bytes(positions) { - // as soon as we know that this word had been to both readers, we don't need to - // iterate over the positions. - if added_to_exact && added_to_word_docids { - break; - } - let (fid, _) = relative_from_absolute_position(position); - if exact_attributes.contains(&fid) && !added_to_exact { - exact_word_docids_sorter.insert(word_bytes, &value_buffer)?; - added_to_exact = true; - } else if !added_to_word_docids { - word_docids_sorter.insert(word_bytes, &value_buffer)?; - added_to_word_docids = true; - } + for (_pos, word) in KvReaderU16::new(&value).iter() { + words.insert(word.to_vec()); } } } + // We must make sure that don't lose the current document field id + if let Some(document_id) = current_document_id { + words_into_sorters( + document_id, + fid, + &mut key_buffer, + &mut value_buffer, + &mut exact_words, + &mut words, + &mut exact_word_docids_sorter, + &mut word_docids_sorter, + &mut word_fid_docids_sorter, + )?; + } + Ok(( sorter_into_reader(word_docids_sorter, indexer)?, sorter_into_reader(exact_word_docids_sorter, indexer)?, + sorter_into_reader(word_fid_docids_sorter, indexer)?, )) } + +fn words_into_sorters( + document_id: DocumentId, + fid: FieldId, + key_buffer: &mut Vec, + value_buffer: &mut Vec, + exact_words: &mut BTreeSet>, + words: &mut BTreeSet>, + exact_word_docids_sorter: &mut grenad::Sorter, + word_docids_sorter: &mut grenad::Sorter, + word_fid_docids_sorter: &mut grenad::Sorter, +) -> Result<()> { + puffin::profile_function!(); + let bitmap = RoaringBitmap::from_iter(Some(document_id)); + serialize_roaring_bitmap(&bitmap, value_buffer)?; + for word_bytes in exact_words.iter() { + exact_word_docids_sorter.insert(word_bytes, &mut *value_buffer)?; + } + + for word_bytes in words.iter() { + word_docids_sorter.insert(word_bytes, &value_buffer)?; + } + + for word_bytes in (&*words | &*exact_words).iter() { + key_buffer.clear(); + key_buffer.extend_from_slice(&word_bytes); + key_buffer.push(0); + key_buffer.extend_from_slice(&fid.to_be_bytes()); + word_fid_docids_sorter.insert(word_bytes, &value_buffer)?; + } + + exact_words.clear(); + words.clear(); + + Ok(()) +} diff --git a/milli/src/update/index_documents/extract/extract_word_fid_docids.rs b/milli/src/update/index_documents/extract/extract_word_fid_docids.rs index aaf8fad79..af2dd1c21 100644 --- a/milli/src/update/index_documents/extract/extract_word_fid_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_fid_docids.rs @@ -17,6 +17,8 @@ pub fn extract_word_fid_docids( ) -> Result> { puffin::profile_function!(); + todo!("remove me"); + let max_memory = indexer.max_memory_by_thread(); let mut word_fid_docids_sorter = create_sorter( diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 4c910f32e..1d2b0dc96 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -1,11 +1,13 @@ use std::cmp::Ordering; -use std::collections::{BinaryHeap, HashMap}; +use std::collections::HashMap; use std::fs::File; -use std::{cmp, io, mem, str, vec}; +use std::{cmp, io}; + +use obkv::KvReaderU16; use super::helpers::{ - create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, - try_split_array_at, GrenadParameters, MergeFn, + create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, + GrenadParameters, MergeFn, }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; @@ -34,44 +36,59 @@ pub fn extract_word_pair_proximity_docids( max_memory.map(|m| m / 2), ); - // This map is assumed to not consume a lot of memory. - let mut document_word_positions_heap = BinaryHeap::new(); + let mut word_positions: Vec<(String, u16)> = Vec::with_capacity(MAX_DISTANCE as usize); + let mut word_pair_proximity = HashMap::new(); let mut current_document_id = None; let mut cursor = docid_word_positions.into_cursor()?; while let Some((key, value)) = cursor.move_on_next()? { - let (document_id_bytes, word_bytes) = try_split_array_at(key) + let (document_id_bytes, _fid_bytes) = try_split_array_at(key) .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); - let word = str::from_utf8(word_bytes)?; - let curr_document_id = *current_document_id.get_or_insert(document_id); - if curr_document_id != document_id { - let document_word_positions_heap = mem::take(&mut document_word_positions_heap); - document_word_positions_into_sorter( - curr_document_id, - document_word_positions_heap, - &mut word_pair_proximity_docids_sorter, - )?; - current_document_id = Some(document_id); - } + for (position, word) in KvReaderU16::new(&value).iter() { + // if we change document, we fill the sorter + if current_document_id.map_or(false, |id| id != document_id) { + while !word_positions.is_empty() { + word_positions_into_word_pair_proximity( + &mut word_positions, + &mut word_pair_proximity, + )?; + } - let word = word.to_string(); - let mut positions: Vec<_> = read_u32_ne_bytes(value).collect(); - positions.sort_unstable(); - let mut iter = positions.into_iter(); - if let Some(position) = iter.next() { - document_word_positions_heap.push(PeekedWordPosition { word, position, iter }); + document_word_positions_into_sorter( + document_id, + &word_pair_proximity, + &mut word_pair_proximity_docids_sorter, + )?; + word_pair_proximity.clear(); + word_positions.clear(); + } + + // drain the proximity window until the head word is considered close to the word we are inserting. + while word_positions.get(0).map_or(false, |(_w, p)| { + positions_proximity(*p as u32, position as u32) > MAX_DISTANCE + }) { + word_positions_into_word_pair_proximity( + &mut word_positions, + &mut word_pair_proximity, + )?; + } + + // insert the new word. + let word = std::str::from_utf8(word)?; + word_positions.push((word.to_string(), position)); } } if let Some(document_id) = current_document_id { - // We must make sure that don't lose the current document field id - // word count map if we break because we reached the end of the chunk. - let document_word_positions_heap = mem::take(&mut document_word_positions_heap); + while !word_positions.is_empty() { + word_positions_into_word_pair_proximity(&mut word_positions, &mut word_pair_proximity)?; + } + document_word_positions_into_sorter( document_id, - document_word_positions_heap, + &word_pair_proximity, &mut word_pair_proximity_docids_sorter, )?; } @@ -85,64 +102,13 @@ pub fn extract_word_pair_proximity_docids( /// close to each other. fn document_word_positions_into_sorter( document_id: DocumentId, - mut word_positions_heap: BinaryHeap>>, + word_pair_proximity: &HashMap<(String, String), u8>, word_pair_proximity_docids_sorter: &mut grenad::Sorter, ) -> Result<()> { - let mut word_pair_proximity = HashMap::new(); - let mut ordered_peeked_word_positions = Vec::new(); - while !word_positions_heap.is_empty() { - while let Some(peeked_word_position) = word_positions_heap.pop() { - ordered_peeked_word_positions.push(peeked_word_position); - if ordered_peeked_word_positions.len() == 7 { - break; - } - } - - if let Some((head, tail)) = ordered_peeked_word_positions.split_first() { - for PeekedWordPosition { word, position, .. } in tail { - let prox = positions_proximity(head.position, *position); - if prox > 0 && prox < MAX_DISTANCE { - word_pair_proximity - .entry((head.word.clone(), word.clone())) - .and_modify(|p| { - *p = cmp::min(*p, prox); - }) - .or_insert(prox); - } - } - - // Push the tail in the heap. - let tail_iter = ordered_peeked_word_positions.drain(1..); - word_positions_heap.extend(tail_iter); - - // Advance the head and push it in the heap. - if let Some(mut head) = ordered_peeked_word_positions.pop() { - if let Some(next_position) = head.iter.next() { - let prox = positions_proximity(head.position, next_position); - - if prox > 0 && prox < MAX_DISTANCE { - word_pair_proximity - .entry((head.word.clone(), head.word.clone())) - .and_modify(|p| { - *p = cmp::min(*p, prox); - }) - .or_insert(prox); - } - - word_positions_heap.push(PeekedWordPosition { - word: head.word, - position: next_position, - iter: head.iter, - }); - } - } - } - } - let mut key_buffer = Vec::new(); for ((w1, w2), prox) in word_pair_proximity { key_buffer.clear(); - key_buffer.push(prox as u8); + key_buffer.push(*prox as u8); key_buffer.extend_from_slice(w1.as_bytes()); key_buffer.push(0); key_buffer.extend_from_slice(w2.as_bytes()); @@ -153,6 +119,23 @@ fn document_word_positions_into_sorter( Ok(()) } +fn word_positions_into_word_pair_proximity( + word_positions: &mut Vec<(String, u16)>, + word_pair_proximity: &mut HashMap<(String, String), u8>, +) -> Result<()> { + let (head_word, head_position) = word_positions.remove(0); + for (word, position) in word_positions.iter() { + let prox = positions_proximity(head_position as u32, *position as u32) as u8; + word_pair_proximity + .entry((head_word.clone(), word.clone())) + .and_modify(|p| { + *p = cmp::min(*p, prox); + }) + .or_insert(prox); + } + Ok(()) +} + struct PeekedWordPosition { word: String, position: u32, diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index e945833e6..0e2aa2021 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -1,13 +1,15 @@ use std::fs::File; use std::io; +use obkv::KvReaderU16; + use super::helpers::{ - create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, - try_split_array_at, GrenadParameters, + create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, + GrenadParameters, }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; -use crate::{bucketed_position, relative_from_absolute_position, DocumentId, Result}; +use crate::{bucketed_position, DocumentId, Result}; /// Extracts the word positions and the documents ids where this word appear. /// @@ -34,15 +36,14 @@ pub fn extract_word_position_docids( let mut key_buffer = Vec::new(); let mut cursor = docid_word_positions.into_cursor()?; while let Some((key, value)) = cursor.move_on_next()? { - let (document_id_bytes, word_bytes) = try_split_array_at(key) + let (document_id_bytes, fid_bytes) = try_split_array_at(key) .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = DocumentId::from_be_bytes(document_id_bytes); - for position in read_u32_ne_bytes(value) { + for (position, word_bytes) in KvReaderU16::new(&value).iter() { key_buffer.clear(); key_buffer.extend_from_slice(word_bytes); key_buffer.push(0); - let (_, position) = relative_from_absolute_position(position); let position = bucketed_position(position); key_buffer.extend_from_slice(&position.to_be_bytes()); word_position_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?; diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index c3a023e71..1e1b3d3e2 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -172,15 +172,22 @@ pub(crate) fn data_from_obkv_documents( "field-id-wordcount-docids", ); - spawn_extraction_task::<_, _, Vec<(grenad::Reader, grenad::Reader)>>( + spawn_extraction_task::< + _, + _, + Vec<(grenad::Reader, grenad::Reader, grenad::Reader)>, + >( docid_word_positions_chunks.clone(), indexer, lmdb_writer_sx.clone(), move |doc_word_pos, indexer| extract_word_docids(doc_word_pos, indexer, &exact_attributes), merge_roaring_bitmaps, - |(word_docids_reader, exact_word_docids_reader)| TypedChunk::WordDocids { - word_docids_reader, - exact_word_docids_reader, + |(word_docids_reader, exact_word_docids_reader, word_fid_docids_reader)| { + TypedChunk::WordDocids { + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + } }, "word-docids", ); @@ -194,15 +201,15 @@ pub(crate) fn data_from_obkv_documents( TypedChunk::WordPositionDocids, "word-position-docids", ); - spawn_extraction_task::<_, _, Vec>>( - docid_word_positions_chunks, - indexer, - lmdb_writer_sx.clone(), - extract_word_fid_docids, - merge_cbo_roaring_bitmaps, - TypedChunk::WordFidDocids, - "word-fid-docids", - ); + // spawn_extraction_task::<_, _, Vec>>( + // docid_word_positions_chunks, + // indexer, + // lmdb_writer_sx.clone(), + // extract_word_fid_docids, + // merge_cbo_roaring_bitmaps, + // TypedChunk::WordFidDocids, + // "word-fid-docids", + // ); spawn_extraction_task::<_, _, Vec>>( docid_fid_facet_strings_chunks, diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index d5f5ac0bd..8048199ca 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -113,6 +113,22 @@ impl MergeableReader for Vec<(grenad::Reader, grenad::Reader)> { } } +impl MergeableReader for Vec<(grenad::Reader, grenad::Reader, grenad::Reader)> { + type Output = (grenad::Reader, grenad::Reader, grenad::Reader); + + fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result { + let mut m1 = MergerBuilder::new(merge_fn); + let mut m2 = MergerBuilder::new(merge_fn); + let mut m3 = MergerBuilder::new(merge_fn); + for (r1, r2, r3) in self.into_iter() { + m1.push(r1)?; + m2.push(r2)?; + m3.push(r3)?; + } + Ok((m1.finish(params)?, m2.finish(params)?, m3.finish(params)?)) + } +} + struct MergerBuilder(grenad::MergerBuilder); impl MergerBuilder { diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 52aa1113e..58219f28c 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -406,13 +406,23 @@ where } let typed_chunk = match result? { - TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => { + TypedChunk::WordDocids { + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + } => { let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? }; word_docids = Some(cloneable_chunk); let cloneable_chunk = unsafe { as_cloneable_grenad(&exact_word_docids_reader)? }; exact_word_docids = Some(cloneable_chunk); - TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } + let cloneable_chunk = unsafe { as_cloneable_grenad(&word_fid_docids_reader)? }; + word_fid_docids = Some(cloneable_chunk); + TypedChunk::WordDocids { + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + } } TypedChunk::WordPairProximityDocids(chunk) => { let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 788aaf93d..013189387 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -32,6 +32,7 @@ pub(crate) enum TypedChunk { WordDocids { word_docids_reader: grenad::Reader, exact_word_docids_reader: grenad::Reader, + word_fid_docids_reader: grenad::Reader, }, WordPositionDocids(grenad::Reader), WordFidDocids(grenad::Reader), @@ -64,10 +65,15 @@ impl TypedChunk { TypedChunk::NewDocumentsIds(grenad) => { format!("NewDocumentsIds {{ number_of_entries: {} }}", grenad.len()) } - TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => format!( - "WordDocids {{ word_docids_reader: {}, exact_word_docids_reader: {} }}", + TypedChunk::WordDocids { + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + } => format!( + "WordDocids {{ word_docids_reader: {}, exact_word_docids_reader: {}, word_fid_docids_reader: {} }}", word_docids_reader.len(), - exact_word_docids_reader.len() + exact_word_docids_reader.len(), + word_fid_docids_reader.len() ), TypedChunk::WordPositionDocids(grenad) => { format!("WordPositionDocids {{ number_of_entries: {} }}", grenad.len()) @@ -138,7 +144,11 @@ pub(crate) fn write_typed_chunk_into_index( TypedChunk::NewDocumentsIds(documents_ids) => { return Ok((documents_ids, is_merged_database)) } - TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => { + TypedChunk::WordDocids { + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + } => { let word_docids_iter = unsafe { as_cloneable_grenad(&word_docids_reader) }?; append_entries_into_database( word_docids_iter.clone(), @@ -159,6 +169,16 @@ pub(crate) fn write_typed_chunk_into_index( merge_roaring_bitmaps, )?; + let word_fid_docids_iter = unsafe { as_cloneable_grenad(&word_fid_docids_reader) }?; + append_entries_into_database( + word_fid_docids_iter, + &index.word_fid_docids, + wtxn, + index_is_empty, + |value, _buffer| Ok(value), + merge_cbo_roaring_bitmaps, + )?; + // create fst from word docids let fst = merge_word_docids_reader_into_fst(word_docids_iter, exact_word_docids_iter)?; let db_fst = index.words_fst(wtxn)?;