From c683fa98e6bccad193cd9affc3bdffa0f8f99087 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 28 Aug 2024 18:45:16 +0200 Subject: [PATCH] WIP Co-authored-by: Kerollmops Co-authored-by: ManyTheFish --- Cargo.lock | 33 +- milli/Cargo.toml | 5 + milli/src/index.rs | 8 + milli/src/update/mod.rs | 1 + milli/src/update/new/document_change.rs | 78 ++++ milli/src/update/new/extract/cache.rs | 248 +++++++++++ .../update/new/extract/extract_word_docids.rs | 84 ++++ milli/src/update/new/extract/mod.rs | 2 + .../update/new/extract/tokenize_document.rs | 195 +++++++++ milli/src/update/new/global_fields_ids_map.rs | 65 +++ milli/src/update/new/items_pool.rs | 54 +++ milli/src/update/new/mod.rs | 414 ++++++++++++++++++ 12 files changed, 1184 insertions(+), 3 deletions(-) create mode 100644 milli/src/update/new/document_change.rs create mode 100644 milli/src/update/new/extract/cache.rs create mode 100644 milli/src/update/new/extract/extract_word_docids.rs create mode 100644 milli/src/update/new/extract/mod.rs create mode 100644 milli/src/update/new/extract/tokenize_document.rs create mode 100644 milli/src/update/new/global_fields_ids_map.rs create mode 100644 milli/src/update/new/items_pool.rs create mode 100644 milli/src/update/new/mod.rs diff --git a/Cargo.lock b/Cargo.lock index dd67520ea..c3e9532e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2230,6 +2230,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "grenad" +version = "0.4.7" +source = "git+https://github.com/meilisearch/grenad?branch=various-improvements#d7512aedb854c247acc7cd18d0bfa148d3779923" +dependencies = [ + "bytemuck", + "byteorder", + "tempfile", +] + [[package]] name = "h2" version = "0.3.26" @@ -3313,6 +3323,15 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lru" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ee39891760e7d94734f6f63fedc29a2e4a152f836120753a72503f09fcf904" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "lzma-rs" version = "0.3.0" @@ -3415,7 +3434,7 @@ dependencies = [ "mimalloc", "mime", "num_cpus", - "obkv", + "obkv 0.2.2", "once_cell", "ordered-float", "parking_lot", @@ -3565,7 +3584,8 @@ dependencies = [ "fst", "fxhash", "geoutils", - "grenad", + "grenad 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "grenad 0.4.7 (git+https://github.com/meilisearch/grenad?branch=various-improvements)", "heed", "hf-hub", "indexmap", @@ -3574,13 +3594,15 @@ dependencies = [ "json-depth-checker", "levenshtein_automata", "liquid", + "lru", "maplit", "md5", "meili-snap", "memchr", "memmap2", "mimalloc", - "obkv", + "obkv 0.2.2", + "obkv 0.3.0", "once_cell", "ordered-float", "rand", @@ -3833,6 +3855,11 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2e27bcfe835a379d32352112f6b8dbae2d99d16a5fff42abe6e5ba5386c1e5a" +[[package]] +name = "obkv" +version = "0.3.0" +source = "git+https://github.com/kerollmops/obkv?branch=unsized-kvreader#d248eb7edd3453ff758afc2883f6ae25684eb69e" + [[package]] name = "once_cell" version = "1.19.0" diff --git a/milli/Cargo.toml b/milli/Cargo.toml index 79b61b4f1..9fa270d46 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -30,6 +30,9 @@ grenad = { version = "0.4.7", default-features = false, features = [ "rayon", "tempfile", ] } +grenad2 = { package = "grenad", version = "0.4.7", default-features = false, features = [ + "tempfile" +], git = "https://github.com/meilisearch/grenad", branch = "various-improvements" } heed = { version = "0.20.3", default-features = false, features = [ "serde-json", "serde-bincode", @@ -38,9 +41,11 @@ heed = { version = "0.20.3", default-features = false, features = [ indexmap = { version = "2.2.6", features = ["serde"] } json-depth-checker = { path = "../json-depth-checker" } levenshtein_automata = { version = "0.2.1", features = ["fst_automaton"] } +lru = "0.12.3" memchr = "2.5.0" memmap2 = "0.9.4" obkv = "0.2.2" +obkv2 = { package = "obkv", git = "https://github.com/kerollmops/obkv", branch = "unsized-kvreader" } once_cell = "1.19.0" ordered-float = "4.2.1" rayon = "1.10.0" diff --git a/milli/src/index.rs b/milli/src/index.rs index 512e911aa..5d651e144 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -1251,6 +1251,14 @@ impl Index { /* documents */ + /// Returns a document by using the document id. + pub fn document<'t>(&self, rtxn: &'t RoTxn, id: DocumentId) -> Result> { + self.documents + .get(rtxn, &id)? + .ok_or(UserError::UnknownInternalDocumentId { document_id: id }) + .map_err(Into::into) + } + /// Returns an iterator over the requested documents. The next item will be an error if a document is missing. pub fn iter_documents<'a, 't: 'a>( &'a self, diff --git a/milli/src/update/mod.rs b/milli/src/update/mod.rs index 195b95d1e..adfc85174 100644 --- a/milli/src/update/mod.rs +++ b/milli/src/update/mod.rs @@ -19,6 +19,7 @@ pub(crate) mod del_add; pub(crate) mod facet; mod index_documents; mod indexer_config; +mod new; mod settings; mod update_step; mod word_prefix_docids; diff --git a/milli/src/update/new/document_change.rs b/milli/src/update/new/document_change.rs new file mode 100644 index 000000000..e7c8bf012 --- /dev/null +++ b/milli/src/update/new/document_change.rs @@ -0,0 +1,78 @@ +use heed::RoTxn; +use obkv2::KvReader; + +use super::indexer::KvReaderFieldId; +use crate::{DocumentId, FieldId}; + +pub enum DocumentChange { + Deletion(Deletion), + Update(Update), + Insertion(Insertion), +} + +pub struct Deletion { + docid: DocumentId, + external_docid: String, // ? + current: Box, +} + +pub struct Update { + docid: DocumentId, + external_docid: String, // ? + current: Box, + new: Box, +} + +pub struct Insertion { + docid: DocumentId, + external_docid: String, // ? + new: Box, +} + +impl DocumentChange { + fn docid(&self) -> DocumentId { + match &self { + Self::Deletion(inner) => inner.docid(), + Self::Update(inner) => inner.docid(), + Self::Insertion(inner) => inner.docid(), + } + } +} + +impl Deletion { + pub fn new(docid: DocumentId, external_docid: String, current: Box) -> Self { + Self { docid, external_docid, current } + } + + fn docid(&self) -> DocumentId { + self.docid + } + + fn current(&self, rtxn: &RoTxn) -> &KvReader { + unimplemented!() + } +} + +impl Insertion { + fn docid(&self) -> DocumentId { + self.docid + } + + fn new(&self) -> &KvReader { + unimplemented!() + } +} + +impl Update { + fn docid(&self) -> DocumentId { + self.docid + } + + fn current(&self, rtxn: &RoTxn) -> &KvReader { + unimplemented!() + } + + fn new(&self) -> &KvReader { + unimplemented!() + } +} diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs new file mode 100644 index 000000000..0d72a5a8d --- /dev/null +++ b/milli/src/update/new/extract/cache.rs @@ -0,0 +1,248 @@ +use std::borrow::Cow; +use std::num::NonZeroUsize; +use std::{io, mem}; + +use grenad2::{MergeFunction, Sorter}; +use lru::LruCache; +use roaring::RoaringBitmap; +use smallvec::SmallVec; + +use crate::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; + +#[derive(Debug)] +pub struct CachedSorter { + cache: lru::LruCache, DelAddRoaringBitmap>, + sorter: Sorter, + deladd_buffer: Vec, + cbo_buffer: Vec, +} + +impl CachedSorter { + pub fn new(cap: NonZeroUsize, sorter: Sorter) -> Self { + CachedSorter { + cache: lru::LruCache::new(cap), + sorter, + deladd_buffer: Vec::new(), + cbo_buffer: Vec::new(), + } + } +} + +impl CachedSorter { + pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { + match self.cache.get_mut(key) { + Some(DelAddRoaringBitmap { del, add: _ }) => { + del.get_or_insert_with(RoaringBitmap::new).insert(n); + } + None => { + let value = DelAddRoaringBitmap::new_del_u32(n); + if let Some((key, deladd)) = self.cache.push(key.into(), value) { + self.write_entry(key, deladd)?; + } + } + } + + Ok(()) + } + + pub fn insert_del( + &mut self, + key: &[u8], + bitmap: RoaringBitmap, + ) -> grenad::Result<(), MF::Error> { + match self.cache.get_mut(key) { + Some(DelAddRoaringBitmap { del, add: _ }) => { + *del.get_or_insert_with(RoaringBitmap::new) |= bitmap; + } + None => { + let value = DelAddRoaringBitmap::new_del(bitmap); + if let Some((key, deladd)) = self.cache.push(key.into(), value) { + self.write_entry(key, deladd)?; + } + } + } + + Ok(()) + } + + pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { + match self.cache.get_mut(key) { + Some(DelAddRoaringBitmap { del: _, add }) => { + add.get_or_insert_with(RoaringBitmap::new).insert(n); + } + None => { + let value = DelAddRoaringBitmap::new_add_u32(n); + if let Some((key, deladd)) = self.cache.push(key.into(), value) { + self.write_entry(key, deladd)?; + } + } + } + + Ok(()) + } + + pub fn insert_add( + &mut self, + key: &[u8], + bitmap: RoaringBitmap, + ) -> grenad::Result<(), MF::Error> { + match self.cache.get_mut(key) { + Some(DelAddRoaringBitmap { del: _, add }) => { + *add.get_or_insert_with(RoaringBitmap::new) |= bitmap; + } + None => { + let value = DelAddRoaringBitmap::new_add(bitmap); + if let Some((key, deladd)) = self.cache.push(key.into(), value) { + self.write_entry(key, deladd)?; + } + } + } + + Ok(()) + } + + pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { + match self.cache.get_mut(key) { + Some(DelAddRoaringBitmap { del, add }) => { + del.get_or_insert_with(RoaringBitmap::new).insert(n); + add.get_or_insert_with(RoaringBitmap::new).insert(n); + } + None => { + let value = DelAddRoaringBitmap::new_del_add_u32(n); + if let Some((key, deladd)) = self.cache.push(key.into(), value) { + self.write_entry(key, deladd)?; + } + } + } + + Ok(()) + } + + fn write_entry>( + &mut self, + key: A, + deladd: DelAddRoaringBitmap, + ) -> grenad::Result<(), MF::Error> { + self.deladd_buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(&mut self.deladd_buffer); + match deladd { + DelAddRoaringBitmap { del: Some(del), add: None } => { + self.cbo_buffer.clear(); + RoaringBitmap::serialize_into(&del, &mut self.cbo_buffer)?; + value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?; + } + DelAddRoaringBitmap { del: None, add: Some(add) } => { + self.cbo_buffer.clear(); + RoaringBitmap::serialize_into(&add, &mut self.cbo_buffer)?; + value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?; + } + DelAddRoaringBitmap { del: Some(del), add: Some(add) } => { + self.cbo_buffer.clear(); + RoaringBitmap::serialize_into(&del, &mut self.cbo_buffer)?; + value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?; + + self.cbo_buffer.clear(); + RoaringBitmap::serialize_into(&add, &mut self.cbo_buffer)?; + value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?; + } + DelAddRoaringBitmap { del: None, add: None } => return Ok(()), + } + let bytes = value_writer.into_inner().unwrap(); + self.sorter.insert(key, bytes) + } + + pub fn direct_insert(&mut self, key: &[u8], val: &[u8]) -> grenad::Result<(), MF::Error> { + self.sorter.insert(key, val) + } + + pub fn into_sorter(mut self) -> grenad::Result, MF::Error> { + let default_arc = LruCache::new(NonZeroUsize::MIN); + for (key, deladd) in mem::replace(&mut self.cache, default_arc) { + self.write_entry(key, deladd)?; + } + Ok(self.sorter) + } +} + +#[derive(Debug, Clone)] +pub struct DelAddRoaringBitmap { + pub del: Option, + pub add: Option, +} + +impl DelAddRoaringBitmap { + fn new_del_add_u32(n: u32) -> Self { + DelAddRoaringBitmap { + del: Some(RoaringBitmap::from([n])), + add: Some(RoaringBitmap::from([n])), + } + } + + fn new_del(bitmap: RoaringBitmap) -> Self { + DelAddRoaringBitmap { del: Some(bitmap), add: None } + } + + fn new_del_u32(n: u32) -> Self { + DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), add: None } + } + + fn new_add(bitmap: RoaringBitmap) -> Self { + DelAddRoaringBitmap { del: None, add: Some(bitmap) } + } + + fn new_add_u32(n: u32) -> Self { + DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) } + } +} + +/// Do a union of CboRoaringBitmaps on both sides of a DelAdd obkv +/// separately and outputs a new DelAdd with both unions. +pub struct DelAddRoaringBitmapMerger; + +impl MergeFunction for DelAddRoaringBitmapMerger { + type Error = io::Error; + + fn merge<'a>( + &self, + _key: &[u8], + values: &[Cow<'a, [u8]>], + ) -> std::result::Result, Self::Error> { + if values.len() == 1 { + Ok(values[0].clone()) + } else { + // Retrieve the bitmaps from both sides + let mut del_bitmaps_bytes = Vec::new(); + let mut add_bitmaps_bytes = Vec::new(); + for value in values { + let obkv: &KvReaderDelAdd = value.as_ref().into(); + if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) { + del_bitmaps_bytes.push(bitmap_bytes); + } + if let Some(bitmap_bytes) = obkv.get(DelAdd::Addition) { + add_bitmaps_bytes.push(bitmap_bytes); + } + } + + let mut output_deladd_obkv = KvWriterDelAdd::memory(); + + // Deletion + let mut buffer = Vec::new(); + let mut merged = RoaringBitmap::new(); + for bytes in del_bitmaps_bytes { + merged |= RoaringBitmap::deserialize_unchecked_from(bytes)?; + } + merged.serialize_into(&mut buffer)?; + output_deladd_obkv.insert(DelAdd::Deletion, &buffer)?; + + // Addition + buffer.clear(); + merged.clear(); + for bytes in add_bitmaps_bytes { + merged |= RoaringBitmap::deserialize_unchecked_from(bytes)?; + } + output_deladd_obkv.insert(DelAdd::Addition, &buffer)?; + + output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into) + } + } +} diff --git a/milli/src/update/new/extract/extract_word_docids.rs b/milli/src/update/new/extract/extract_word_docids.rs new file mode 100644 index 000000000..e2e1520bc --- /dev/null +++ b/milli/src/update/new/extract/extract_word_docids.rs @@ -0,0 +1,84 @@ +pub fn extract_word_docids( + document_change: DocumentChange, + _tokenizer: &Tokenizer, + output: &mut CachedSorter, +) -> grenad::Result<(), io::Error> { + match document_change { + DocumentChange::Deletion(inner) => { + unimplemented!() + } + DocumentChange::Update(inner) => { + unimplemented!() + } + DocumentChange::Insertion(inner) => { + unimplemented!() + } + } + + let normalizer_options = NormalizerOption::default(); + + if let Some(previous_doc) = previous_doc { + for (_, v) in previous_doc.iter() { + // Only manage the direct JSON strings + // TODO manage the JSON strings correctly (escaped chars) + if v.first().zip(v.last()) == Some((&b'"', &b'"')) { + let s = std::str::from_utf8(&v[1..v.len() - 1]).unwrap(); + // for token in tokenizer.tokenize(s).filter(|t| t.is_word()) { + // let key = token.lemma().normalize(&normalizer_options); + for token in s.split_whitespace() { + let key = token.normalize(&normalizer_options); + output.insert_del_u32(key.as_bytes(), docid)?; + } + } + } + } + + for (_, v) in new_doc.iter() { + // Only manage the direct JSON strings + // TODO manage the JSON strings correctly (escaped chars) + if v.first().zip(v.last()) == Some((&b'"', &b'"')) { + let s = std::str::from_utf8(&v[1..v.len() - 1]).unwrap(); + // for token in tokenizer.tokenize(s).filter(|t| t.is_word()) { + // let key = token.lemma().normalize(&normalizer_options); + for token in s.split_whitespace() { + let key = token.normalize(&normalizer_options); + output.insert_add_u32(key.as_bytes(), docid)?; + } + } + } + + Ok(()) +} + +/// take an iterator on tokens and compute their relative position depending on separator kinds +/// if it's an `Hard` separator we add an additional relative proximity of 8 between words, +/// else we keep the standard proximity of 1 between words. +fn process_tokens<'a>( + tokens: impl Iterator>, +) -> impl Iterator)> { + tokens + .skip_while(|token| token.is_separator()) + .scan((0, None), |(offset, prev_kind), mut token| { + match token.kind { + TokenKind::Word | TokenKind::StopWord if !token.lemma().is_empty() => { + *offset += match *prev_kind { + Some(TokenKind::Separator(SeparatorKind::Hard)) => 8, + Some(_) => 1, + None => 0, + }; + *prev_kind = Some(token.kind) + } + TokenKind::Separator(SeparatorKind::Hard) => { + *prev_kind = Some(token.kind); + } + TokenKind::Separator(SeparatorKind::Soft) + if *prev_kind != Some(TokenKind::Separator(SeparatorKind::Hard)) => + { + *prev_kind = Some(token.kind); + } + _ => token.kind = TokenKind::Unknown, + } + Some((*offset, token)) + }) + .filter(|(_, t)| t.is_word()) +} diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs new file mode 100644 index 000000000..26732d4c8 --- /dev/null +++ b/milli/src/update/new/extract/mod.rs @@ -0,0 +1,2 @@ +mod cache; +mod extract_word_docids; diff --git a/milli/src/update/new/extract/tokenize_document.rs b/milli/src/update/new/extract/tokenize_document.rs new file mode 100644 index 000000000..8793063b0 --- /dev/null +++ b/milli/src/update/new/extract/tokenize_document.rs @@ -0,0 +1,195 @@ +pub struct DocumentTokenizer { + tokenizer: &Tokenizer, + searchable_attributes: Option<&[String]>, + localized_attributes_rules: &[LocalizedAttributesRule], + max_positions_per_attributes: u32, +} + +impl DocumentTokenizer { + // pub fn new(tokenizer: &Tokenizer, settings: &InnerIndexSettings) -> Self { + // Self { tokenizer, settings } + // } + + pub fn tokenize_document<'a>( + obkv: &KvReader<'a, FieldId>, + field_id_map: &FieldsIdsMap, + token_fn: impl Fn(FieldId, u16, &str), + ) { + let mut field_position = Hashmap::new(); + for (field_id, field_bytes) in obkv { + let field_name = field_id_map.name(field_id); + + let tokenize_field = |name, value| { + let field_id = field_id_map.id(name); + match value { + Number(n) => { + let token = n.to_string(); + let position = field_position + .entry(field_id) + .and_modify(|counter| *counter += 8) + .or_insert(0); + token_fn(field_id, position, token.as_str()); + } + String(text) => { + // create an iterator of token with their positions. + let locales = self + .localized_attributes_rules + .iter() + .first(|rule| rule.match_str(field_name)) + .map(|rule| rule.locales(field_id)); + let tokens = + process_tokens(tokenizer.tokenize_with_allow_list(field, locales)) + .take_while(|(p, _)| { + (*p as u32) < self.max_positions_per_attributes + }); + + for (index, token) in tokens { + // keep a word only if it is not empty and fit in a LMDB key. + let token = token.lemma().trim(); + if !token.is_empty() && token.len() <= MAX_WORD_LENGTH { + let position: u16 = index + .try_into() + .map_err(|_| SerializationError::InvalidNumberSerialization)?; + writer.insert(position, token.as_bytes())?; + } + } + } + _ => (), + } + }; + + // if the current field is searchable or contains a searchable attribute + if searchable_attributes.map_or(true, |attributes| { + attributes.iter().any(|name| contained_in(name, field_name)) + }) { + // parse json. + match serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)? { + Value::Object(object) => { + seek_leaf_values_in_object(object, selectors, &field_name, tokenize_field) + } + Value::Array(array) => { + seek_leaf_values_in_array(array, selectors, &field_name, tokenize_field) + } + value => tokenize_field(&base_key, value), + } + } + } + } +} + +/// take an iterator on tokens and compute their relative position depending on separator kinds +/// if it's an `Hard` separator we add an additional relative proximity of 8 between words, +/// else we keep the standard proximity of 1 between words. +fn process_tokens<'a>( + tokens: impl Iterator>, +) -> impl Iterator)> { + tokens + .skip_while(|token| token.is_separator()) + .scan((0, None), |(offset, prev_kind), mut token| { + match token.kind { + TokenKind::Word | TokenKind::StopWord if !token.lemma().is_empty() => { + *offset += match *prev_kind { + Some(TokenKind::Separator(SeparatorKind::Hard)) => 8, + Some(_) => 1, + None => 0, + }; + *prev_kind = Some(token.kind) + } + TokenKind::Separator(SeparatorKind::Hard) => { + *prev_kind = Some(token.kind); + } + TokenKind::Separator(SeparatorKind::Soft) + if *prev_kind != Some(TokenKind::Separator(SeparatorKind::Hard)) => + { + *prev_kind = Some(token.kind); + } + _ => token.kind = TokenKind::Unknown, + } + Some((*offset, token)) + }) + .filter(|(_, t)| t.is_word()) +} + +/// Returns `true` if the `selector` match the `key`. +/// +/// ```text +/// Example: +/// `animaux` match `animaux` +/// `animaux.chien` match `animaux` +/// `animaux.chien` match `animaux` +/// `animaux.chien.nom` match `animaux` +/// `animaux.chien.nom` match `animaux.chien` +/// ----------------------------------------- +/// `animaux` doesn't match `animaux.chien` +/// `animaux.` doesn't match `animaux` +/// `animaux.ch` doesn't match `animaux.chien` +/// `animau` doesn't match `animaux` +/// ``` +fn contained_in(selector: &str, key: &str) -> bool { + selector.starts_with(key) + && selector[key.len()..].chars().next().map(|c| c == SPLIT_SYMBOL).unwrap_or(true) +} + +/// TODO move in permissive json pointer +mod perm_json_p { + pub fn seek_leaf_values<'a>( + value: &Map, + selectors: impl IntoIterator, + seeker: impl Fn(&str, &Value), + ) { + let selectors: Vec<_> = selectors.into_iter().collect(); + seek_leaf_values_in_object(value, &selectors, "", &seeker); + } + + pub fn seek_leaf_values_in_object( + value: &Map, + selectors: &[&str], + base_key: &str, + seeker: &impl Fn(&str, &Value), + ) { + for (key, value) in value.iter() { + let base_key = if base_key.is_empty() { + key.to_string() + } else { + format!("{}{}{}", base_key, SPLIT_SYMBOL, key) + }; + + // here if the user only specified `doggo` we need to iterate in all the fields of `doggo` + // so we check the contained_in on both side + let should_continue = selectors.iter().any(|selector| { + contained_in(selector, &base_key) || contained_in(&base_key, selector) + }); + + if should_continue { + match value { + Value::Object(object) => { + seek_leaf_values_in_object(object, selectors, &base_key, seeker) + } + Value::Array(array) => { + seek_leaf_values_in_array(array, selectors, &base_key, seeker) + } + value => seeker(&base_key, value), + } + } + } + } + + pub fn seek_leaf_values_in_array( + values: &mut [Value], + selectors: &[&str], + base_key: &str, + seeker: &impl Fn(&str, &Value), + ) { + for value in values.iter_mut() { + match value { + Value::Object(object) => { + seek_leaf_values_in_object(object, selectors, base_key, seeker) + } + Value::Array(array) => { + seek_leaf_values_in_array(array, selectors, base_key, seeker) + } + value => seeker(base_key, value), + } + } + } +} diff --git a/milli/src/update/new/global_fields_ids_map.rs b/milli/src/update/new/global_fields_ids_map.rs new file mode 100644 index 000000000..4bd7b27d9 --- /dev/null +++ b/milli/src/update/new/global_fields_ids_map.rs @@ -0,0 +1,65 @@ +use std::sync::{Arc, RwLock}; + +use crate::{FieldId, FieldsIdsMap}; + +/// A fields ids map that can be globally updated to add fields +pub struct GlobalFieldsIdsMap { + global: Arc>, + local: FieldsIdsMap, +} + +impl GlobalFieldsIdsMap { + pub fn new(global: FieldsIdsMap) -> Self { + Self { local: global.clone(), global: Arc::new(RwLock::new(global)) } + } + + /// Returns the number of fields ids in the map. + pub fn global_len(&self) -> usize { + todo!() + } + + /// Returns `true` if the map is empty. + pub fn global_is_empty(&self) -> bool { + todo!() + } + + /// Returns the field id related to a field name, it will create a new field id if the + /// name is not already known. Returns `None` if the maximum field id as been reached. + pub fn insert(&mut self, name: &str) -> Option { + match self.names_ids.get(name) { + Some(id) => Some(*id), + None => { + let id = self.next_id?; + self.next_id = id.checked_add(1); + self.names_ids.insert(name.to_owned(), id); + self.ids_names.insert(id, name.to_owned()); + Some(id) + } + } + } + + /// Get the id of a field based on its name. + pub fn id(&self, name: &str) -> Option { + self.names_ids.get(name).copied() + } + + /// Get the name of a field based on its id. + pub fn name(&self, id: FieldId) -> Option<&str> { + self.ids_names.get(&id).map(String::as_str) + } + + /// Iterate over the ids and names in the ids order. + pub fn iter(&self) -> impl Iterator { + self.ids_names.iter().map(|(id, name)| (*id, name.as_str())) + } + + /// Iterate over the ids in the order of the ids. + pub fn ids(&'_ self) -> impl Iterator + '_ { + self.ids_names.keys().copied() + } + + /// Iterate over the names in the order of the ids. + pub fn names(&self) -> impl Iterator { + self.ids_names.values().map(AsRef::as_ref) + } +} diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/items_pool.rs new file mode 100644 index 000000000..e90ce97db --- /dev/null +++ b/milli/src/update/new/items_pool.rs @@ -0,0 +1,54 @@ +use crossbeam_channel::{Receiver, Sender, TryRecvError}; + +/// A pool of items that can be pull and generated on demand. +pub struct ItemsPool +where + F: Fn() -> Result, +{ + init: F, + sender: Sender, + receiver: Receiver, +} + +impl ItemsPool +where + F: Fn() -> Result, +{ + /// Create a new unbounded items pool with the specified function + /// to generate items when needed. + /// + /// The `init` function will be invoked whenever a call to `with` requires new items. + pub fn new(init: F) -> Self { + let (sender, receiver) = crossbeam_channel::unbounded(); + ItemsPool { init, sender, receiver } + } + + /// Consumes the pool to retrieve all remaining items. + /// + /// This method is useful for cleaning up and managing the items once they are no longer needed. + pub fn into_items(self) -> crossbeam_channel::IntoIter { + self.receiver.into_iter() + } + + /// Allows running a function on an item from the pool, + /// potentially generating a new item if the pool is empty. + pub fn with(&self, f: G) -> Result + where + G: FnOnce(&mut T) -> Result, + { + let mut item = match self.receiver.try_recv() { + Ok(item) => item, + Err(TryRecvError::Empty) => (self.init)()?, + Err(TryRecvError::Disconnected) => unreachable!(), + }; + + // Run the user's closure with the retrieved item + let result = f(&mut item); + + if let Err(e) = self.sender.send(item) { + unreachable!("error when sending into channel {e}"); + } + + result + } +} diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs new file mode 100644 index 000000000..41b04219f --- /dev/null +++ b/milli/src/update/new/mod.rs @@ -0,0 +1,414 @@ +mod document_change; +// mod extract; +mod items_pool; + +mod global_fields_ids_map; + +mod indexer { + use std::collections::{BTreeMap, HashMap}; + use std::fs::File; + use std::io::Cursor; + use std::os::unix::fs::MetadataExt; + use std::sync::Arc; + + use heed::RoTxn; + use memmap2::Mmap; + use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; + use roaring::RoaringBitmap; + use serde_json::Value; + + use super::document_change::{self, DocumentChange}; + use super::items_pool::ItemsPool; + use crate::documents::{ + obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, + }; + use crate::update::{AvailableDocumentsIds, IndexDocumentsMethod}; + use crate::{ + DocumentId, Error, FieldId, FieldsIdsMap, Index, InternalError, Result, UserError, + }; + + pub type KvReaderFieldId = obkv2::KvReader; + pub type KvWriterFieldId = obkv2::KvWriter; + + pub struct DocumentOperationIndexer { + operations: Vec, + method: IndexDocumentsMethod, + } + + enum Payload { + Addition(File), + Deletion(Vec), + } + + pub struct PayloadStats { + pub document_count: usize, + pub bytes: u64, + } + + enum DocumentOperation { + Addition(DocumentOffset), + Deletion, + } + + /// Represents an offset where a document lives + /// in an mmapped grenad reader file. + struct DocumentOffset { + /// The mmapped grenad reader file. + pub content: Arc, // grenad::Reader + /// The offset of the document in the file. + pub offset: u32, + } + + impl DocumentOperationIndexer { + pub fn new(method: IndexDocumentsMethod) -> Self { + Self { operations: Default::default(), method } + } + + /// TODO please give me a type + /// The payload is expected to be in the grenad format + pub fn add_documents(&mut self, payload: File) -> Result { + let reader = DocumentsBatchReader::from_reader(&payload)?; + let bytes = payload.metadata()?.size(); + let document_count = reader.documents_count() as usize; + + self.operations.push(Payload::Addition(payload)); + + Ok(PayloadStats { bytes, document_count }) + } + + pub fn delete_documents(&mut self, to_delete: Vec) { + self.operations.push(Payload::Deletion(to_delete)) + } + + pub fn document_changes<'a>( + self, + index: &'a Index, + rtxn: &'a RoTxn, + mut fields_ids_map: FieldsIdsMap, + primary_key: &'a PrimaryKey<'a>, + ) -> Result + 'a> { + let documents_ids = index.documents_ids(rtxn)?; + let mut available_docids = AvailableDocumentsIds::from_documents_ids(&documents_ids); + let mut docids_version_offsets = HashMap::::new(); + + for operation in self.operations { + match operation { + Payload::Addition(payload) => { + let content = unsafe { Mmap::map(&payload).map(Arc::new)? }; + let cursor = Cursor::new(content.as_ref()); + let reader = DocumentsBatchReader::from_reader(cursor)?; + + let (mut batch_cursor, batch_index) = reader.into_cursor_and_fields_index(); + // TODO Fetch all document fields to fill the fields ids map + batch_index.iter().for_each(|(_, name)| { + fields_ids_map.insert(name); + }); + + let mut offset: u32 = 0; + while let Some(document) = batch_cursor.next_document()? { + let external_document_id = + match primary_key.document_id(&document, &batch_index)? { + Ok(document_id) => Ok(document_id), + Err(DocumentIdExtractionError::InvalidDocumentId( + user_error, + )) => Err(user_error), + Err(DocumentIdExtractionError::MissingDocumentId) => { + Err(UserError::MissingDocumentId { + primary_key: primary_key.name().to_string(), + document: obkv_to_object(&document, &batch_index)?, + }) + } + Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { + Err(UserError::TooManyDocumentIds { + primary_key: primary_key.name().to_string(), + document: obkv_to_object(&document, &batch_index)?, + }) + } + }?; + + let content = content.clone(); + let document_offset = DocumentOffset { content, offset }; + let document_operation = DocumentOperation::Addition(document_offset); + + match docids_version_offsets.get_mut(&external_document_id) { + None => { + let docid = match index + .external_documents_ids() + .get(rtxn, &external_document_id)? + { + Some(docid) => docid, + None => available_docids.next().ok_or(Error::UserError( + UserError::DocumentLimitReached, + ))?, + }; + + docids_version_offsets.insert( + external_document_id.into(), + (docid, vec![document_operation]), + ); + } + Some((_, offsets)) => offsets.push(document_operation), + } + offset += 1; + } + } + Payload::Deletion(to_delete) => { + for external_document_id in to_delete { + match docids_version_offsets.get_mut(&external_document_id) { + None => { + let docid = match index + .external_documents_ids() + .get(rtxn, &external_document_id)? + { + Some(docid) => docid, + None => available_docids.next().ok_or(Error::UserError( + UserError::DocumentLimitReached, + ))?, + }; + + docids_version_offsets.insert( + external_document_id, + (docid, vec![DocumentOperation::Deletion]), + ); + } + Some((_, offsets)) => offsets.push(DocumentOperation::Deletion), + } + } + } + } + } + + let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); + docids_version_offsets.into_par_iter().map_with( + items, + |context_pool, (external_docid, (internal_docid, operations))| { + context_pool.with(|rtxn| match self.method { + IndexDocumentsMethod::ReplaceDocuments => todo!(), + // TODO Remap the documents to match the db fields_ids_map + IndexDocumentsMethod::UpdateDocuments => merge_document_obkv_for_updates( + rtxn, + index, + &fields_ids_map, + internal_docid, + external_docid, + &operations, + ), + }) + }, + ); + + Ok(vec![].into_par_iter()) + + // let mut file_count: usize = 0; + // for result in WalkDir::new(update_files_path) + // // TODO handle errors + // .sort_by_key(|entry| entry.metadata().unwrap().created().unwrap()) + // { + // let entry = result?; + // if !entry.file_type().is_file() { + // continue; + // } + + // let file = File::open(entry.path()) + // .with_context(|| format!("While opening {}", entry.path().display()))?; + // let content = unsafe { + // Mmap::map(&file) + // .map(Arc::new) + // .with_context(|| format!("While memory mapping {}", entry.path().display()))? + // }; + + // let reader = + // crate::documents::DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?; + // let (mut batch_cursor, batch_index) = reader.into_cursor_and_fields_index(); + // batch_index.iter().for_each(|(_, name)| { + // fields_ids_map.insert(name); + // }); + // let mut offset: u32 = 0; + // while let Some(document) = batch_cursor.next_document()? { + // let primary_key = batch_index.id(primary_key).unwrap(); + // let document_id = document.get(primary_key).unwrap(); + // let document_id = std::str::from_utf8(document_id).unwrap(); + + // let document_offset = DocumentOffset { content: content.clone(), offset }; + // match docids_version_offsets.get_mut(document_id) { + // None => { + // let docid = match maindb.external_documents_ids.get(rtxn, document_id)? { + // Some(docid) => docid, + // None => sequential_docids.next().context("no more available docids")?, + // }; + // docids_version_offsets + // .insert(document_id.into(), (docid, smallvec![document_offset])); + // } + // Some((_, offsets)) => offsets.push(document_offset), + // } + // offset += 1; + // p.inc(1); + // } + + // file_count += 1; + // } + } + } + + pub struct DeleteDocumentIndexer { + to_delete: RoaringBitmap, + } + + impl DeleteDocumentIndexer { + pub fn new() -> Self { + Self { to_delete: Default::default() } + } + + pub fn delete_documents_by_docids(&mut self, docids: RoaringBitmap) { + self.to_delete |= docids; + } + + // let fields = index.fields_ids_map(rtxn)?; + // let primary_key = + // index.primary_key(rtxn)?.ok_or(InternalError::DatabaseMissingEntry { + // db_name: db_name::MAIN, + // key: Some(main_key::PRIMARY_KEY_KEY), + // })?; + // let primary_key = PrimaryKey::new(primary_key, &fields).ok_or_else(|| { + // InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldName { + // field_name: primary_key.to_owned(), + // process: "external_id_of", + // }) + // })?; + pub fn document_changes<'a, F>( + self, + index: &'a Index, + fields: &'a FieldsIdsMap, + primary_key: &'a PrimaryKey<'a>, + ) -> Result> + 'a> + { + let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); + Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| { + items.with(|rtxn| { + let document = index.document(rtxn, docid)?; + let external_docid = match primary_key.document_id(&document, fields)? { + Ok(document_id) => Ok(document_id) as Result<_>, + Err(_) => Err(InternalError::DocumentsError( + crate::documents::Error::InvalidDocumentFormat, + ) + .into()), + }?; + Ok(DocumentChange::Deletion(document_change::Deletion::new( + docid, + external_docid, + ))) + }) + })) + } + } + + pub struct DumpIndexer; + + impl DumpIndexer { + pub fn new() -> Self { + todo!() + } + + pub fn document_changes_from_json_iter( + self, + iter: I, + index: &Index, + ) -> impl ParallelIterator + where + I: IntoIterator, + { + // let items = Arc::new(ItemsPool::new(|| { + // let rtxn = index.read_txn()?; + // let fields = index.fields_ids_map(&rtxn)?; + // let primary_key = + // index.primary_key(&rtxn)?.ok_or(InternalError::DatabaseMissingEntry { + // db_name: db_name::MAIN, + // key: Some(main_key::PRIMARY_KEY_KEY), + // })?; + // let primary_key = PrimaryKey::new(primary_key, &fields).ok_or_else(|| { + // InternalError::FieldIdMapMissingEntry( + // crate::FieldIdMapMissingEntry::FieldName { + // field_name: primary_key.to_owned(), + // process: "external_id_of", + // }, + // ) + // })?; + // Ok(DeleteDocumentExternalDocumentIdGetter { rtxn, fields, primary_key }) + // as crate::Result<_> + // })); + + todo!(); + vec![].into_par_iter() + } + } + + pub struct UpdateByFunctionIndexer; + // DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))? + + /// Reads the previous version of a document from the database, the new versions + /// in the grenad update files and merges them to generate a new boxed obkv. + /// + /// This function is only meant to be used when doing an update and not a replacement. + pub fn merge_document_obkv_for_updates( + rtxn: &RoTxn, + // Let's construct the new obkv in memory + index: &Index, + fields_ids_map: &FieldsIdsMap, + docid: DocumentId, + external_docid: String, + operations: &[DocumentOperation], + ) -> Result> { + let mut document = BTreeMap::new(); + let original_obkv = + index.documents.remap_data_type::().get(rtxn, &docid)?; + let original_obkv: Option<&KvReaderFieldId> = original_obkv.map(Into::into); + + if let Some(original_obkv) = original_obkv { + original_obkv.into_iter().for_each(|(k, v)| { + document.insert(k, v.to_vec()); + }); + } + + let last_deletion = operations + .iter() + .rposition(|operation| matches!(operation, DocumentOperation::Deletion)); + + let operations = &operations[last_deletion.map_or(0, |i| i + 1)..]; + + if operations.is_empty() { + match original_obkv { + Some(original_obkv) => { + let current = original_obkv.as_bytes().to_vec().into_boxed_slice().into(); + return Ok(Some(DocumentChange::Deletion(document_change::Deletion::new( + docid, + external_docid, + current, + )))); + } + None => return Ok(None), + } + } + + for operation in operations { + let DocumentOffset { content, offset } = ; + + let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?; + let (mut cursor, batch_index) = reader.into_cursor_and_fields_index(); + let obkv = cursor.get(*offset)?.expect("must exists"); + + obkv.into_iter().for_each(|(k, v)| { + let field_name = batch_index.name(k).unwrap(); + let id = fields_ids_map.id(field_name).unwrap(); + document.insert(id, v.to_vec()); + }); + } + + let mut writer = KvWriterFieldId::memory(); + document.into_iter().for_each(|(id, value)| writer.insert(id, value).unwrap()); + let boxed = writer.into_inner().unwrap().into_boxed_slice(); + + // Box + + Ok(boxed.into()) + } +}