diff --git a/Cargo.lock b/Cargo.lock index ed62f0716..961ebab28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,9 +296,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.16" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" [[package]] name = "anes" @@ -664,6 +664,10 @@ name = "bumpalo" version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +dependencies = [ + "allocator-api2", + "serde", +] [[package]] name = "byte-unit" @@ -1887,6 +1891,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -2315,6 +2325,18 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", + "serde", +] + [[package]] name = "heapless" version = "0.8.0" @@ -2557,6 +2579,7 @@ dependencies = [ "arroy", "big_s", "bincode", + "bumpalo", "crossbeam", "csv", "derive_builder 0.20.0", @@ -3549,6 +3572,7 @@ dependencies = [ "bimap", "bincode", "bstr", + "bumpalo", "bytemuck", "byteorder", "candle-core", @@ -3585,6 +3609,7 @@ dependencies = [ "once_cell", "ordered-float", "rand", + "raw-collections", "rayon", "rayon-par-bridge", "rhai", @@ -4406,6 +4431,18 @@ dependencies = [ "rand", ] +[[package]] +name = "raw-collections" +version = "0.1.0" +source = "git+https://github.com/dureuill/raw-collections.git#0ecd143c1707d237e3c4d749bc685418da2fccc2" +dependencies = [ + "allocator-api2", + "bumpalo", + "hashbrown 0.15.0", + "serde", + "serde_json", +] + [[package]] name = "raw-cpuid" version = "10.7.0" @@ -4869,12 +4906,13 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.120" +version = "1.0.128" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" dependencies = [ "indexmap", "itoa", + "memchr", "ryu", "serde", ] diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index 4731be68b..88f9488b5 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -39,6 +39,7 @@ time = { version = "0.3.36", features = [ tracing = "0.1.40" ureq = "2.10.0" uuid = { version = "1.10.0", features = ["serde", "v4"] } +bumpalo = "3.16.0" [dev-dependencies] arroy = { git = "https://github.com/meilisearch/arroy/", rev = "2386594dfb009ce08821a925ccc89fb8e30bf73d" } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 69eb28372..446efd0c4 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -23,14 +23,15 @@ use std::fmt; use std::fs::{self, File}; use std::io::BufWriter; +use bumpalo::collections::CollectIn; +use bumpalo::Bump; use dump::IndexMetadata; use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::heed::CompactionOption; -use meilisearch_types::milli::update::new::indexer::{ - self, retrieve_or_guess_primary_key, DocumentChanges, -}; +use meilisearch_types::milli::update::new::indexer::document_changes::DocumentChanges; +use meilisearch_types::milli::update::new::indexer::{self, retrieve_or_guess_primary_key}; use meilisearch_types::milli::update::{ IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings, }; @@ -1219,6 +1220,8 @@ impl IndexScheduler { index: &'i Index, operation: IndexOperation, ) -> Result> { + let indexer_alloc = Bump::new(); + match operation { IndexOperation::DocumentClear { mut tasks, .. } => { let count = milli::update::ClearDocuments::new(index_wtxn, index).execute()?; @@ -1252,6 +1255,9 @@ impl IndexScheduler { let mut primary_key_has_been_set = false; let must_stop_processing = self.must_stop_processing.clone(); let indexer_config = self.index_mapper.indexer_config(); + // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. + // this is made difficult by the fact we're doing private clones of the index scheduler and sending it + // to a fresh thread. /// TODO manage errors correctly let rtxn = index.read_txn()?; @@ -1274,7 +1280,9 @@ impl IndexScheduler { } } - let mut fields_ids_map = index.fields_ids_map(&rtxn)?; + let db_fields_ids_map = index.fields_ids_map(&rtxn)?; + let mut new_fields_ids_map = db_fields_ids_map.clone(); + let first_document = match content_files.first() { Some(mmap) => { let mut iter = serde_json::Deserializer::from_slice(mmap).into_iter(); @@ -1286,7 +1294,7 @@ impl IndexScheduler { let primary_key = retrieve_or_guess_primary_key( &rtxn, index, - &mut fields_ids_map, + &mut new_fields_ids_map, first_document.as_ref(), )? .unwrap(); @@ -1320,7 +1328,11 @@ impl IndexScheduler { } DocumentOperation::Delete(document_ids) => { let count = document_ids.len(); - indexer.delete_documents(document_ids); + let document_ids: bumpalo::collections::vec::Vec<_> = document_ids + .iter() + .map(|s| &*indexer_alloc.alloc_str(s)) + .collect_in(&indexer_alloc); + indexer.delete_documents(document_ids.into_bump_slice()); // Uses Invariant: remove documents actually always returns Ok for the inner result // let count = user_result.unwrap(); let provided_ids = @@ -1347,10 +1359,22 @@ impl IndexScheduler { // let pool = indexer_config.thread_pool.unwrap(); let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); - let param = (index, &rtxn, &primary_key); - let document_changes = indexer.document_changes(&mut fields_ids_map, param)?; - /// TODO pass/write the FieldsIdsMap - indexer::index(index_wtxn, index, fields_ids_map, &pool, document_changes)?; + let document_changes = indexer.into_changes( + &indexer_alloc, + index, + &rtxn, + &primary_key, + &mut new_fields_ids_map, + )?; + + indexer::index( + index_wtxn, + index, + &db_fields_ids_map, + new_fields_ids_map, + &pool, + &document_changes, + )?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } @@ -1501,10 +1525,11 @@ impl IndexScheduler { } let rtxn = index.read_txn()?; - let mut fields_ids_map = index.fields_ids_map(&rtxn)?; + let db_fields_ids_map = index.fields_ids_map(&rtxn)?; + let mut new_fields_ids_map = db_fields_ids_map.clone(); let primary_key = - retrieve_or_guess_primary_key(&rtxn, index, &mut fields_ids_map, None)? + retrieve_or_guess_primary_key(&rtxn, index, &mut new_fields_ids_map, None)? .unwrap(); if !tasks.iter().all(|res| res.error.is_some()) { @@ -1512,19 +1537,17 @@ impl IndexScheduler { // let pool = indexer_config.thread_pool.unwrap(); let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); - let param = (index, &fields_ids_map, &primary_key); let mut indexer = indexer::DocumentDeletion::new(); indexer.delete_documents_by_docids(to_delete); - /// TODO remove this fields-ids-map, it's useless for the deletion pipeline (the &mut cloned one). - let document_changes = - indexer.document_changes(&mut fields_ids_map.clone(), param)?; - /// TODO pass/write the FieldsIdsMap + let document_changes = indexer.into_changes(&indexer_alloc, primary_key); + indexer::index( index_wtxn, index, - fields_ids_map.clone(), + &db_fields_ids_map, + new_fields_ids_map, &pool, - document_changes, + &document_changes, )?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); diff --git a/milli/Cargo.toml b/milli/Cargo.toml index 72f3daa4e..fc522994e 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -29,8 +29,8 @@ fst = "0.4.7" fxhash = "0.2.1" geoutils = "0.5.1" grenad = { version = "0.4.7", default-features = false, features = [ - "rayon", # TODO Should we keep this feature - "tempfile" + "rayon", # TODO Should we keep this feature + "tempfile", ], git = "https://github.com/meilisearch/grenad", branch = "various-improvements" } heed = { version = "0.20.3", default-features = false, features = [ "serde-json", @@ -81,7 +81,13 @@ hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls", ] } tiktoken-rs = "0.5.9" liquid = "0.26.6" -rhai = { version = "1.19.0", features = ["serde", "no_module", "no_custom_syntax", "no_time", "sync"] } +rhai = { version = "1.19.0", features = [ + "serde", + "no_module", + "no_custom_syntax", + "no_time", + "sync", +] } arroy = { git = "https://github.com/meilisearch/arroy/", rev = "2386594dfb009ce08821a925ccc89fb8e30bf73d" } rand = "0.8.5" tracing = "0.1.40" @@ -89,6 +95,8 @@ ureq = { version = "2.10.0", features = ["json"] } url = "2.5.2" rayon-par-bridge = "0.1.0" hashbrown = "0.14.5" +raw-collections = { git = "https://github.com/dureuill/raw-collections.git", version = "0.1.0" } +bumpalo = "3.16.0" thread_local = "1.1.8" [dev-dependencies] diff --git a/milli/src/documents/mod.rs b/milli/src/documents/mod.rs index 036981b65..001e2293a 100644 --- a/milli/src/documents/mod.rs +++ b/milli/src/documents/mod.rs @@ -13,8 +13,8 @@ pub use builder::DocumentsBatchBuilder; pub use enriched::{EnrichedDocument, EnrichedDocumentsBatchCursor, EnrichedDocumentsBatchReader}; use obkv::KvReader; pub use primary_key::{ - validate_document_id_value, DocumentIdExtractionError, FieldIdMapper, PrimaryKey, - DEFAULT_PRIMARY_KEY, + validate_document_id_str, validate_document_id_value, DocumentIdExtractionError, FieldIdMapper, + PrimaryKey, DEFAULT_PRIMARY_KEY, }; pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader}; use serde::{Deserialize, Serialize}; @@ -96,6 +96,10 @@ impl FieldIdMapper for DocumentsBatchIndex { fn id(&self, name: &str) -> Option { self.id(name) } + + fn name(&self, id: FieldId) -> Option<&str> { + self.name(id) + } } #[derive(Debug, thiserror::Error)] diff --git a/milli/src/documents/primary_key.rs b/milli/src/documents/primary_key.rs index b6a236623..1662ed2e6 100644 --- a/milli/src/documents/primary_key.rs +++ b/milli/src/documents/primary_key.rs @@ -19,6 +19,21 @@ pub trait FieldIdMapper { /// /// `None` if the field with this name was not found. fn id(&self, name: &str) -> Option; + + fn name(&self, id: FieldId) -> Option<&str>; +} + +impl FieldIdMapper for &T +where + T: FieldIdMapper, +{ + fn id(&self, name: &str) -> Option { + T::id(self, name) + } + + fn name(&self, id: FieldId) -> Option<&str> { + T::name(self, id) + } } /// A type that represent the type of primary key that has been set @@ -190,7 +205,7 @@ fn starts_with(selector: &str, key: &str) -> bool { // FIXME: move to a DocumentId struct -fn validate_document_id(document_id: &str) -> Option<&str> { +pub fn validate_document_id_str(document_id: &str) -> Option<&str> { if !document_id.is_empty() && document_id.chars().all(|c| matches!(c, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_')) { @@ -202,7 +217,7 @@ fn validate_document_id(document_id: &str) -> Option<&str> { pub fn validate_document_id_value(document_id: Value) -> StdResult { match document_id { - Value::String(string) => match validate_document_id(&string) { + Value::String(string) => match validate_document_id_str(&string) { Some(s) if s.len() == string.len() => Ok(string), Some(s) => Ok(s.to_string()), None => Err(UserError::InvalidDocumentId { document_id: Value::String(string) }), diff --git a/milli/src/fields_ids_map.rs b/milli/src/fields_ids_map.rs index 52e02045d..af96f6a86 100644 --- a/milli/src/fields_ids_map.rs +++ b/milli/src/fields_ids_map.rs @@ -98,6 +98,20 @@ impl crate::documents::FieldIdMapper for FieldsIdsMap { fn id(&self, name: &str) -> Option { self.id(name) } + + fn name(&self, id: FieldId) -> Option<&str> { + self.name(id) + } +} + +pub trait MutFieldIdMapper { + fn insert(&mut self, name: &str) -> Option; +} + +impl MutFieldIdMapper for FieldsIdsMap { + fn insert(&mut self, name: &str) -> Option { + self.insert(name) + } } #[cfg(test)] diff --git a/milli/src/fields_ids_map/global.rs b/milli/src/fields_ids_map/global.rs index 93908aea8..40d7f389b 100644 --- a/milli/src/fields_ids_map/global.rs +++ b/milli/src/fields_ids_map/global.rs @@ -1,6 +1,8 @@ use std::collections::BTreeMap; use std::sync::RwLock; +use super::MutFieldIdMapper; +use crate::documents::FieldIdMapper; use crate::{FieldId, FieldsIdsMap}; /// A fields ids map that can be globally updated to add fields @@ -11,11 +13,21 @@ pub struct GlobalFieldsIdsMap<'indexing> { } #[derive(Debug, Clone)] -struct LocalFieldsIdsMap { +pub struct LocalFieldsIdsMap { names_ids: BTreeMap, ids_names: BTreeMap, } +impl FieldIdMapper for LocalFieldsIdsMap { + fn id(&self, name: &str) -> Option { + self.id(name) + } + + fn name(&self, id: FieldId) -> Option<&str> { + self.name(id) + } +} + impl LocalFieldsIdsMap { fn new(global: &RwLock) -> Self { let global = global.read().unwrap(); @@ -83,4 +95,14 @@ impl<'indexing> GlobalFieldsIdsMap<'indexing> { self.local.name(id) } + + pub fn local_map(&self) -> &LocalFieldsIdsMap { + &self.local + } +} + +impl<'indexing> MutFieldIdMapper for GlobalFieldsIdsMap<'indexing> { + fn insert(&mut self, name: &str) -> Option { + self.id_or_insert(name) + } } diff --git a/milli/src/update/new/document.rs b/milli/src/update/new/document.rs new file mode 100644 index 000000000..96d0e9cca --- /dev/null +++ b/milli/src/update/new/document.rs @@ -0,0 +1,255 @@ +use std::collections::BTreeSet; + +use heed::RoTxn; +use serde_json::value::RawValue; + +use super::document_change::{Entry, Versions}; +use super::{KvReaderFieldId, KvWriterFieldId}; +use crate::documents::FieldIdMapper; +use crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME; +use crate::{DocumentId, FieldId, Index, InternalError, Result}; + +/// A view into a document that can represent either the current version from the DB, +/// the update data from payload or other means, or the merged updated version. +/// +/// The 'doc lifetime is meant to live sufficiently for the document to be handled by the extractors. +pub trait Document<'doc> { + /// Iterate over all **top-level** fields of the document, returning their name and raw JSON value. + /// + /// - The returned values *may* contain nested fields. + /// - The `_vectors` field is **ignored** by this method, meaning it is **not returned** by this method. + fn iter_top_level_fields(&self) -> impl Iterator>; +} + +#[derive(Clone, Copy)] +pub struct DocumentFromDb<'t, Mapper: FieldIdMapper> +where + Mapper: FieldIdMapper, +{ + fields_ids_map: &'t Mapper, + content: &'t KvReaderFieldId, +} + +impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> { + fn iter_top_level_fields(&self) -> impl Iterator> { + let mut it = self.content.iter(); + + std::iter::from_fn(move || { + let (fid, value) = it.next()?; + + let res = (|| { + let value = + serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?; + + let name = self.fields_ids_map.name(fid).ok_or( + InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldId { + field_id: fid, + process: "getting current document", + }), + )?; + Ok((name, value)) + })(); + + Some(res) + }) + } +} + +impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> { + pub fn new( + docid: DocumentId, + rtxn: &'t RoTxn, + index: &'t Index, + db_fields_ids_map: &'t Mapper, + ) -> Result> { + index.documents.get(rtxn, &docid).map_err(crate::Error::from).map(|reader| { + reader.map(|reader| Self { fields_ids_map: db_fields_ids_map, content: reader }) + }) + } + + fn field_from_fid(&self, fid: FieldId) -> Result> { + Ok(self + .content + .get(fid) + .map(|v| serde_json::from_slice(v).map_err(InternalError::SerdeJson)) + .transpose()?) + } +} + +#[derive(Clone, Copy)] +pub struct DocumentFromVersions<'doc> { + versions: Versions<'doc>, +} + +impl<'doc> DocumentFromVersions<'doc> { + pub fn new(versions: Versions<'doc>) -> Self { + Self { versions } + } +} + +impl<'doc> Document<'doc> for DocumentFromVersions<'doc> { + fn iter_top_level_fields(&self) -> impl Iterator> { + match &self.versions { + Versions::Single(version) => either::Either::Left(version.iter_top_level_fields()), + Versions::Multiple(versions) => { + let mut seen_fields = BTreeSet::new(); + let mut it = versions.iter().rev().flat_map(|version| version.iter()).copied(); + either::Either::Right(std::iter::from_fn(move || loop { + let (name, value) = it.next()?; + + if seen_fields.contains(name) { + continue; + } + seen_fields.insert(name); + return Some(Ok((name, value))); + })) + } + } + } +} + +// used in document from payload +impl<'doc> Document<'doc> for &'doc [Entry<'doc>] { + fn iter_top_level_fields(&self) -> impl Iterator>> { + self.iter().copied().map(|(k, v)| Ok((k, v))) + } +} + +pub struct MergedDocument<'doc, 't, Mapper: FieldIdMapper> { + new_doc: DocumentFromVersions<'doc>, + db: Option>, +} + +impl<'doc, 't, Mapper: FieldIdMapper> MergedDocument<'doc, 't, Mapper> { + pub fn new( + new_doc: DocumentFromVersions<'doc>, + db: Option>, + ) -> Self { + Self { new_doc, db } + } + + pub fn with_db( + docid: DocumentId, + rtxn: &'t RoTxn, + index: &'t Index, + db_fields_ids_map: &'t Mapper, + new_doc: DocumentFromVersions<'doc>, + ) -> Result { + let db = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?; + Ok(Self { new_doc, db }) + } + + pub fn without_db(new_doc: DocumentFromVersions<'doc>) -> Self { + Self { new_doc, db: None } + } +} + +impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d> + for MergedDocument<'doc, 't, Mapper> +{ + fn iter_top_level_fields(&self) -> impl Iterator> { + let mut new_doc_it = self.new_doc.iter_top_level_fields(); + let mut db_it = self.db.iter().flat_map(|db| db.iter_top_level_fields()); + + std::iter::from_fn(move || { + let mut seen_fields = BTreeSet::new(); + if let Some(next) = new_doc_it.next() { + if let Ok((name, _)) = next { + seen_fields.insert(name); + } + return Some(next); + } + loop { + match db_it.next()? { + Ok((name, value)) => { + if seen_fields.contains(name) { + continue; + } + return Some(Ok((name, value))); + } + Err(err) => return Some(Err(err)), + } + } + }) + } +} + +impl<'doc, D> Document<'doc> for &D +where + D: Document<'doc>, +{ + fn iter_top_level_fields(&self) -> impl Iterator> { + D::iter_top_level_fields(self) + } +} + +/// Turn this document into an obkv, whose fields are indexed by the provided `FieldIdMapper`. +/// +/// The produced obkv is suitable for storing into the documents DB, meaning: +/// +/// - It contains the contains of `_vectors` that are not configured as an embedder +/// - It contains all the top-level fields of the document, with their raw JSON value as value. +/// +/// # Panics +/// +/// - If the document contains a top-level field that is not present in `fields_ids_map`. +/// +pub fn write_to_obkv<'s, 'a, 'b>( + document: &'s impl Document<'s>, + vector_document: Option<()>, + fields_ids_map: &'a impl FieldIdMapper, + mut document_buffer: &'a mut Vec, +) -> Result<&'a KvReaderFieldId> +where + 's: 'a, + 's: 'b, +{ + // will be used in 'inject_vectors + let vectors_value: Box; + + document_buffer.clear(); + let mut unordered_field_buffer = Vec::new(); + unordered_field_buffer.clear(); + + let mut writer = KvWriterFieldId::new(&mut document_buffer); + + for res in document.iter_top_level_fields() { + let (field_name, value) = res?; + let field_id = fields_ids_map.id(field_name).unwrap(); + unordered_field_buffer.push((field_id, value)); + } + + 'inject_vectors: { + let Some(vector_document) = vector_document else { break 'inject_vectors }; + + let Some(vectors_fid) = fields_ids_map.id(RESERVED_VECTORS_FIELD_NAME) else { + break 'inject_vectors; + }; + /* + let mut vectors = BTreeMap::new(); + for (name, entry) in vector_document.iter_vectors() { + if entry.has_configured_embedder { + continue; // we don't write vectors with configured embedder in documents + } + vectors.insert( + name, + serde_json::json!({ + "regenerate": entry.regenerate, + // TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object + "embeddings": entry.embeddings, + }), + ); + } + + vectors_value = serde_json::value::to_raw_value(&vectors).unwrap(); + unordered_field_buffer.push((vectors_fid, &vectors_value));*/ + } + + unordered_field_buffer.sort_by_key(|(fid, _)| *fid); + for (fid, value) in unordered_field_buffer.iter() { + writer.insert(*fid, value.get().as_bytes()).unwrap(); + } + + writer.finish().unwrap(); + Ok(KvReaderFieldId::from_slice(document_buffer)) +} diff --git a/milli/src/update/new/document_change.rs b/milli/src/update/new/document_change.rs index 7be8d1958..a789b32b7 100644 --- a/milli/src/update/new/document_change.rs +++ b/milli/src/update/new/document_change.rs @@ -1,35 +1,35 @@ use heed::RoTxn; -use obkv::KvReader; +use serde_json::value::RawValue; -use crate::update::new::KvReaderFieldId; -use crate::{DocumentId, FieldId, Index, Result}; +use super::document::{DocumentFromDb, DocumentFromVersions, MergedDocument}; +use crate::documents::FieldIdMapper; +use crate::{DocumentId, Index, Result}; -pub enum DocumentChange { +pub enum DocumentChange<'doc> { Deletion(Deletion), - Update(Update), - Insertion(Insertion), + Update(Update<'doc>), + Insertion(Insertion<'doc>), } pub struct Deletion { - pub docid: DocumentId, - pub external_document_id: String, - current: Box, + docid: DocumentId, + external_document_id: String, } -pub struct Update { - pub docid: DocumentId, - pub external_document_id: String, - current: Box, - pub new: Box, +pub struct Update<'doc> { + docid: DocumentId, + external_document_id: String, + new: DocumentFromVersions<'doc>, + has_deletion: bool, } -pub struct Insertion { - pub docid: DocumentId, - pub external_document_id: String, - pub new: Box, +pub struct Insertion<'doc> { + docid: DocumentId, + external_document_id: String, + new: DocumentFromVersions<'doc>, } -impl DocumentChange { +impl<'doc> DocumentChange<'doc> { pub fn docid(&self) -> DocumentId { match &self { Self::Deletion(inner) => inner.docid(), @@ -37,15 +37,19 @@ impl DocumentChange { Self::Insertion(inner) => inner.docid(), } } + + pub fn external_docid(&self) -> &str { + match self { + DocumentChange::Deletion(deletion) => deletion.external_document_id(), + DocumentChange::Update(update) => update.external_document_id(), + DocumentChange::Insertion(insertion) => insertion.external_document_id(), + } + } } impl Deletion { - pub fn create( - docid: DocumentId, - external_document_id: String, - current: Box, - ) -> Self { - Self { docid, external_document_id, current } + pub fn create(docid: DocumentId, external_document_id: String) -> Self { + Self { docid, external_document_id } } pub fn docid(&self) -> DocumentId { @@ -56,21 +60,23 @@ impl Deletion { &self.external_document_id } - // TODO shouldn't we use the one in self? - pub fn current<'a>( + pub fn current<'a, Mapper: FieldIdMapper>( &self, rtxn: &'a RoTxn, index: &'a Index, - ) -> Result>> { - index.documents.get(rtxn, &self.docid).map_err(crate::Error::from) + mapper: &'a Mapper, + ) -> Result> { + Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( + crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, + )?) } } -impl Insertion { +impl<'doc> Insertion<'doc> { pub fn create( docid: DocumentId, external_document_id: String, - new: Box, + new: DocumentFromVersions<'doc>, ) -> Self { Insertion { docid, external_document_id, new } } @@ -82,20 +88,19 @@ impl Insertion { pub fn external_document_id(&self) -> &str { &self.external_document_id } - - pub fn new(&self) -> &KvReader { - self.new.as_ref() + pub fn new(&self) -> DocumentFromVersions<'doc> { + self.new } } -impl Update { +impl<'doc> Update<'doc> { pub fn create( docid: DocumentId, external_document_id: String, - current: Box, - new: Box, + new: DocumentFromVersions<'doc>, + has_deletion: bool, ) -> Self { - Update { docid, external_document_id, current, new } + Update { docid, new, external_document_id, has_deletion } } pub fn docid(&self) -> DocumentId { @@ -105,16 +110,39 @@ impl Update { pub fn external_document_id(&self) -> &str { &self.external_document_id } - - pub fn current<'a>( + pub fn current<'a, Mapper: FieldIdMapper>( &self, rtxn: &'a RoTxn, index: &'a Index, - ) -> Result>> { - index.documents.get(rtxn, &self.docid).map_err(crate::Error::from) + mapper: &'a Mapper, + ) -> Result> { + Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( + crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, + )?) } - pub fn new(&self) -> &KvReader { - self.new.as_ref() + pub fn updated(&self) -> DocumentFromVersions<'doc> { + self.new + } + + pub fn new<'a, Mapper: FieldIdMapper>( + &self, + rtxn: &'a RoTxn, + index: &'a Index, + mapper: &'a Mapper, + ) -> Result> { + if self.has_deletion { + Ok(MergedDocument::without_db(self.new)) + } else { + MergedDocument::with_db(self.docid, rtxn, index, mapper, self.new) + } } } + +pub type Entry<'doc> = (&'doc str, &'doc RawValue); + +#[derive(Clone, Copy)] +pub enum Versions<'doc> { + Single(&'doc [Entry<'doc>]), + Multiple(&'doc [&'doc [Entry<'doc>]]), +} diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 8ca9a8b20..a3f05ce0e 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -2,46 +2,90 @@ use std::cell::RefCell; use std::collections::HashSet; use std::fmt::Debug; use std::fs::File; -use std::sync::Arc; +use std::ops::DerefMut as _; +use bumpalo::Bump; use grenad::{MergeFunction, Merger}; use heed::RoTxn; -use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; +use rayon::iter::{ParallelBridge as _, ParallelIterator as _}; use serde_json::Value; -use thread_local::ThreadLocal; use super::super::cache::CboCachedSorter; use super::facet_document::extract_document_facets; use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; use crate::update::new::extract::DocidsExtractor; -use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; +use crate::update::new::indexer::document_changes::{ + for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, + IndexingContext, ThreadLocal, +}; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{ - DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH, -}; +use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH}; + +pub struct FacetedExtractorData<'extractor> { + attributes_to_extract: &'extractor [&'extractor str], + grenad_parameters: GrenadParameters, + max_memory: Option, +} + +impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> { + type Data = FullySend>>; + + fn init_data( + &self, + _extractor_alloc: raw_collections::alloc::RefBump<'extractor>, + ) -> Result { + Ok(FullySend(RefCell::new(CboCachedSorter::new( + // TODO use a better value + 1_000_000.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + self.grenad_parameters.chunk_compression_type, + self.grenad_parameters.chunk_compression_level, + self.grenad_parameters.max_nb_chunks, + self.max_memory, + ), + )))) + } + + fn process( + &self, + change: DocumentChange, + context: &crate::update::new::indexer::document_changes::DocumentChangeContext, + ) -> Result<()> { + FacetedDocidsExtractor::extract_document_change( + &context, + self.attributes_to_extract, + change, + ) + } +} + pub struct FacetedDocidsExtractor; impl FacetedDocidsExtractor { fn extract_document_change( - rtxn: &RoTxn, - index: &Index, - buffer: &mut Vec, - fields_ids_map: &mut GlobalFieldsIdsMap, + context: &DocumentChangeContext< + FullySend>>, + >, attributes_to_extract: &[&str], - cached_sorter: &mut CboCachedSorter, document_change: DocumentChange, ) -> Result<()> { + let index = &context.index; + let rtxn = &context.txn; + let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut(); + let mut cached_sorter = context.data.0.borrow_mut(); match document_change { DocumentChange::Deletion(inner) => extract_document_facets( attributes_to_extract, - inner.current(rtxn, index)?.unwrap(), - fields_ids_map, + inner.current(rtxn, index, context.db_fields_ids_map)?, + new_fields_ids_map.deref_mut(), &mut |fid, value| { Self::facet_fn_with_options( - buffer, - cached_sorter, + &context.doc_alloc, + cached_sorter.deref_mut(), CboCachedSorter::insert_del_u32, inner.docid(), fid, @@ -52,12 +96,12 @@ impl FacetedDocidsExtractor { DocumentChange::Update(inner) => { extract_document_facets( attributes_to_extract, - inner.current(rtxn, index)?.unwrap(), - fields_ids_map, + inner.current(rtxn, index, context.db_fields_ids_map)?, + new_fields_ids_map.deref_mut(), &mut |fid, value| { Self::facet_fn_with_options( - buffer, - cached_sorter, + &context.doc_alloc, + cached_sorter.deref_mut(), CboCachedSorter::insert_del_u32, inner.docid(), fid, @@ -68,12 +112,12 @@ impl FacetedDocidsExtractor { extract_document_facets( attributes_to_extract, - inner.new(), - fields_ids_map, + inner.new(rtxn, index, context.db_fields_ids_map)?, + new_fields_ids_map.deref_mut(), &mut |fid, value| { Self::facet_fn_with_options( - buffer, - cached_sorter, + &context.doc_alloc, + cached_sorter.deref_mut(), CboCachedSorter::insert_add_u32, inner.docid(), fid, @@ -85,11 +129,11 @@ impl FacetedDocidsExtractor { DocumentChange::Insertion(inner) => extract_document_facets( attributes_to_extract, inner.new(), - fields_ids_map, + new_fields_ids_map.deref_mut(), &mut |fid, value| { Self::facet_fn_with_options( - buffer, - cached_sorter, + &context.doc_alloc, + cached_sorter.deref_mut(), CboCachedSorter::insert_add_u32, inner.docid(), fid, @@ -101,7 +145,7 @@ impl FacetedDocidsExtractor { } fn facet_fn_with_options( - buffer: &mut Vec, + doc_alloc: &Bump, cached_sorter: &mut CboCachedSorter, cache_fn: impl Fn(&mut CboCachedSorter, &[u8], u32) -> grenad::Result<(), MF::Error>, docid: DocumentId, @@ -113,9 +157,9 @@ impl FacetedDocidsExtractor { MF::Error: Debug, grenad::Error: Into, { + let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc); // Exists // key: fid - buffer.clear(); buffer.push(FacetKind::Exists as u8); buffer.extend_from_slice(&fid.to_be_bytes()); cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into)?; @@ -197,58 +241,38 @@ fn truncate_str(s: &str) -> &str { impl DocidsExtractor for FacetedDocidsExtractor { #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] - fn run_extraction( - index: &Index, - fields_ids_map: &GlobalFieldsIdsMap, - indexer: GrenadParameters, - document_changes: impl IntoParallelIterator< - Item = std::result::Result>, - >, + fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + grenad_parameters: GrenadParameters, + document_changes: &DC, + indexing_context: IndexingContext<'fid, 'indexer, 'index>, + extractor_allocs: &mut ThreadLocal>>, ) -> Result> { - let max_memory = indexer.max_memory_by_thread(); + let max_memory = grenad_parameters.max_memory_by_thread(); + + let index = indexing_context.index; let rtxn = index.read_txn()?; let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; let attributes_to_extract: Vec<_> = attributes_to_extract.iter().map(|s| s.as_ref()).collect(); - let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads()); + let datastore = ThreadLocal::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_arc_for_each_try_init( - || { - thread_local.get_or_try(|| { - let rtxn = index.read_txn().map_err(Error::from)?; - let cache = CboCachedSorter::new( - /// TODO use a better value - 100.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - ); - Ok((rtxn, RefCell::new((fields_ids_map.clone(), Vec::new(), cache)))) - }) - }, - |(rtxn, rc), document_change| { - let (fields_ids_map, buffer, cached_sorter) = &mut *rc.borrow_mut(); - Self::extract_document_change( - rtxn, - index, - buffer, - fields_ids_map, - &attributes_to_extract, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }, + + let extractor = FacetedExtractorData { + attributes_to_extract: &attributes_to_extract, + grenad_parameters, + max_memory, + }; + for_each_document_change( + document_changes, + &extractor, + indexing_context, + extractor_allocs, + &datastore, )?; } { @@ -257,11 +281,11 @@ impl DocidsExtractor for FacetedDocidsExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = thread_local + let readers: Vec<_> = datastore .into_iter() .par_bridge() - .map(|(_, rc)| { - let (_, _, cached_sorter) = rc.into_inner(); + .map(|cached_sorter| { + let cached_sorter = cached_sorter.0.into_inner(); let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) diff --git a/milli/src/update/new/extract/faceted/facet_document.rs b/milli/src/update/new/extract/faceted/facet_document.rs index 4525e866f..cf8984f9c 100644 --- a/milli/src/update/new/extract/faceted/facet_document.rs +++ b/milli/src/update/new/extract/faceted/facet_document.rs @@ -1,24 +1,17 @@ use serde_json::Value; +use crate::update::new::document::Document; use crate::update::new::extract::perm_json_p; -use crate::update::new::KvReaderFieldId; use crate::{FieldId, GlobalFieldsIdsMap, InternalError, Result, UserError}; -pub fn extract_document_facets( +pub fn extract_document_facets<'doc>( attributes_to_extract: &[&str], - obkv: &KvReaderFieldId, + document: impl Document<'doc>, field_id_map: &mut GlobalFieldsIdsMap, facet_fn: &mut impl FnMut(FieldId, &Value) -> Result<()>, ) -> Result<()> { - let mut field_name = String::new(); - for (field_id, field_bytes) in obkv { - let Some(field_name) = field_id_map.name(field_id).map(|s| { - field_name.clear(); - field_name.push_str(s); - &field_name - }) else { - unreachable!("field id not found in field id map"); - }; + for res in document.iter_top_level_fields() { + let (field_name, value) = res?; let mut tokenize_field = |name: &str, value: &Value| match field_id_map.id_or_insert(name) { Some(field_id) => facet_fn(field_id, value), @@ -28,7 +21,7 @@ pub fn extract_document_facets( // if the current field is searchable or contains a searchable attribute if perm_json_p::select_field(field_name, Some(attributes_to_extract), &[]) { // parse json. - match serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)? { + match serde_json::value::to_value(value).map_err(InternalError::SerdeJson)? { Value::Object(object) => perm_json_p::seek_leaf_values_in_object( &object, Some(attributes_to_extract), diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs index c12634563..1c86d80af 100644 --- a/milli/src/update/new/extract/mod.rs +++ b/milli/src/update/new/extract/mod.rs @@ -3,26 +3,24 @@ mod faceted; mod lru; mod searchable; +use std::cell::RefCell; use std::fs::File; -use std::sync::Arc; +use bumpalo::Bump; pub use faceted::*; use grenad::Merger; -use rayon::iter::IntoParallelIterator; pub use searchable::*; -use super::DocumentChange; +use super::indexer::document_changes::{DocumentChanges, FullySend, IndexingContext, ThreadLocal}; use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{Error, GlobalFieldsIdsMap, Index, Result}; +use crate::Result; pub trait DocidsExtractor { - fn run_extraction( - index: &Index, - fields_ids_map: &GlobalFieldsIdsMap, - indexer: GrenadParameters, - document_changes: impl IntoParallelIterator< - Item = std::result::Result>, - >, + fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + grenad_parameters: GrenadParameters, + document_changes: &DC, + indexing_context: IndexingContext<'fid, 'indexer, 'index>, + extractor_allocs: &mut ThreadLocal>>, ) -> Result>; } diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index dde969614..82bb0ec86 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -2,17 +2,23 @@ use std::cell::RefCell; use std::collections::HashMap; use std::fs::File; use std::num::NonZero; +use std::ops::DerefMut as _; use std::sync::Arc; +use bumpalo::Bump; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; use rayon::iter::IntoParallelIterator; -use thread_local::ThreadLocal; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::SearchableExtractor; +use crate::update::new::document::Document; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; +use crate::update::new::indexer::document_changes::{ + for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, + IndexingContext, ThreadLocal, +}; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -23,7 +29,7 @@ use crate::{ const MAX_COUNTED_WORDS: usize = 30; -struct WordDocidsCachedSorters { +pub struct WordDocidsCachedSorters { word_fid_docids: CboCachedSorter, word_docids: CboCachedSorter, exact_word_docids: CboCachedSorter, @@ -301,18 +307,47 @@ impl WordDocidsMergerBuilders { } } +pub struct WordDocidsExtractorData<'extractor> { + tokenizer: &'extractor DocumentTokenizer<'extractor>, + grenad_parameters: GrenadParameters, + max_memory: Option, +} + +impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { + type Data = FullySend>; + + fn init_data( + &self, + _extractor_alloc: raw_collections::alloc::RefBump<'extractor>, + ) -> Result { + Ok(FullySend(RefCell::new(WordDocidsCachedSorters::new( + self.grenad_parameters, + self.max_memory, + // TODO use a better value + 200_000.try_into().unwrap(), + )))) + } + + fn process( + &self, + change: DocumentChange, + context: &crate::update::new::indexer::document_changes::DocumentChangeContext, + ) -> Result<()> { + WordDocidsExtractors::extract_document_change(context, self.tokenizer, change) + } +} + pub struct WordDocidsExtractors; impl WordDocidsExtractors { - pub fn run_extraction( - index: &Index, - fields_ids_map: &GlobalFieldsIdsMap, - indexer: GrenadParameters, - document_changes: impl IntoParallelIterator< - Item = std::result::Result>, - >, + pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + grenad_parameters: GrenadParameters, + document_changes: &DC, + indexing_context: IndexingContext<'fid, 'indexer, 'index>, + extractor_allocs: &mut ThreadLocal>>, ) -> Result { - let max_memory = indexer.max_memory_by_thread(); + let max_memory = grenad_parameters.max_memory_by_thread(); + let index = indexing_context.index; let rtxn = index.read_txn()?; let stop_words = index.stop_words(&rtxn)?; @@ -342,38 +377,25 @@ impl WordDocidsExtractors { max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads()); + let datastore = ThreadLocal::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_arc_for_each_try_init( - || { - thread_local.get_or_try(|| { - let rtxn = index.read_txn().map_err(Error::from)?; - let fields_ids_map = fields_ids_map.clone(); - let cache = WordDocidsCachedSorters::new( - indexer, - max_memory, - // TODO use a better value - 200_000.try_into().unwrap(), - ); - Ok((rtxn, &document_tokenizer, RefCell::new((fields_ids_map, cache)))) - }) - }, - |(rtxn, document_tokenizer, rc), document_change| { - let (fields_ids_map, cached_sorter) = &mut *rc.borrow_mut(); - Self::extract_document_change( - rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }, + + let extractor = WordDocidsExtractorData { + tokenizer: &document_tokenizer, + grenad_parameters, + max_memory, + }; + + for_each_document_change( + document_changes, + &extractor, + indexing_context, + extractor_allocs, + &datastore, )?; } @@ -382,8 +404,7 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); - for (_, _, rc) in thread_local.into_iter() { - let (_, cache) = rc.into_inner(); + for cache in datastore.into_iter().map(|cache| cache.0.into_inner()) { builder.add_sorters(cache)?; } @@ -392,13 +413,17 @@ impl WordDocidsExtractors { } fn extract_document_change( - rtxn: &RoTxn, - index: &Index, + context: &DocumentChangeContext>>, document_tokenizer: &DocumentTokenizer, - fields_ids_map: &mut GlobalFieldsIdsMap, - cached_sorter: &mut WordDocidsCachedSorters, document_change: DocumentChange, ) -> Result<()> { + let index = &context.index; + let rtxn = &context.txn; + let mut cached_sorter = context.data.0.borrow_mut(); + let cached_sorter = cached_sorter.deref_mut(); + let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut(); + let new_fields_ids_map = new_fields_ids_map.deref_mut(); + let exact_attributes = index.exact_attributes(rtxn)?; let is_exact_attribute = |fname: &str| exact_attributes.iter().any(|attr| contained_in(fname, attr)); @@ -418,8 +443,8 @@ impl WordDocidsExtractors { .map_err(crate::Error::from) }; document_tokenizer.tokenize_document( - inner.current(rtxn, index)?.unwrap(), - fields_ids_map, + inner.current(rtxn, index, context.db_fields_ids_map)?, + new_fields_ids_map, &mut token_fn, )?; } @@ -437,8 +462,8 @@ impl WordDocidsExtractors { .map_err(crate::Error::from) }; document_tokenizer.tokenize_document( - inner.current(rtxn, index)?.unwrap(), - fields_ids_map, + inner.current(rtxn, index, context.db_fields_ids_map)?, + new_fields_ids_map, &mut token_fn, )?; @@ -454,7 +479,11 @@ impl WordDocidsExtractors { ) .map_err(crate::Error::from) }; - document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?; + document_tokenizer.tokenize_document( + inner.new(rtxn, index, context.db_fields_ids_map)?, + new_fields_ids_map, + &mut token_fn, + )?; } DocumentChange::Insertion(inner) => { let mut token_fn = |fname: &str, fid, pos, word: &str| { @@ -469,7 +498,11 @@ impl WordDocidsExtractors { ) .map_err(crate::Error::from) }; - document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?; + document_tokenizer.tokenize_document( + inner.new(), + new_fields_ids_map, + &mut token_fn, + )?; } } diff --git a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index 7d3655be8..d47ab606c 100644 --- a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -1,13 +1,17 @@ +use std::cell::RefCell; use std::collections::VecDeque; use std::rc::Rc; +use bumpalo::Bump; use heed::RoTxn; use obkv::KvReader; use super::tokenize_document::DocumentTokenizer; use super::SearchableExtractor; use crate::proximity::{index_proximity, MAX_DISTANCE}; +use crate::update::new::document::Document; use crate::update::new::extract::cache::CboCachedSorter; +use crate::update::new::indexer::document_changes::{DocumentChangeContext, FullySend}; use crate::update::new::DocumentChange; use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{FieldId, GlobalFieldsIdsMap, Index, Result}; @@ -28,27 +32,39 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { // This method is reimplemented to count the number of words in the document in each field // and to store the docids of the documents that have a number of words in a given field equal to or under than MAX_COUNTED_WORDS. fn extract_document_change( - rtxn: &RoTxn, - index: &Index, + context: &DocumentChangeContext< + FullySend>>, + >, document_tokenizer: &DocumentTokenizer, - fields_ids_map: &mut GlobalFieldsIdsMap, - cached_sorter: &mut CboCachedSorter, document_change: DocumentChange, ) -> Result<()> { - let mut key_buffer = Vec::new(); - let mut del_word_pair_proximity = Vec::new(); - let mut add_word_pair_proximity = Vec::new(); + let doc_alloc = &context.doc_alloc; + + let index = context.index; + let rtxn = &context.txn; + + let mut key_buffer = bumpalo::collections::Vec::new_in(doc_alloc); + let mut del_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc); + let mut add_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc); + + let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut(); + let new_fields_ids_map = &mut *new_fields_ids_map; + + let mut cached_sorter = context.data.0.borrow_mut(); + let cached_sorter = &mut *cached_sorter; + + // is a vecdequeue, and will be smol, so can stay on the heap for now let mut word_positions: VecDeque<(Rc, u16)> = VecDeque::with_capacity(MAX_DISTANCE as usize); let docid = document_change.docid(); match document_change { DocumentChange::Deletion(inner) => { - let document = inner.current(rtxn, index)?.unwrap(); + let document = inner.current(rtxn, index, context.db_fields_ids_map)?; process_document_tokens( document, document_tokenizer, - fields_ids_map, + new_fields_ids_map, &mut word_positions, &mut |(w1, w2), prox| { del_word_pair_proximity.push(((w1, w2), prox)); @@ -56,21 +72,21 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { )?; } DocumentChange::Update(inner) => { - let document = inner.current(rtxn, index)?.unwrap(); + let document = inner.current(rtxn, index, context.db_fields_ids_map)?; process_document_tokens( document, document_tokenizer, - fields_ids_map, + new_fields_ids_map, &mut word_positions, &mut |(w1, w2), prox| { del_word_pair_proximity.push(((w1, w2), prox)); }, )?; - let document = inner.new(); + let document = inner.new(rtxn, index, context.db_fields_ids_map)?; process_document_tokens( document, document_tokenizer, - fields_ids_map, + new_fields_ids_map, &mut word_positions, &mut |(w1, w2), prox| { add_word_pair_proximity.push(((w1, w2), prox)); @@ -82,7 +98,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { process_document_tokens( document, document_tokenizer, - fields_ids_map, + new_fields_ids_map, &mut word_positions, &mut |(w1, w2), prox| { add_word_pair_proximity.push(((w1, w2), prox)); @@ -108,7 +124,12 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { } } -fn build_key<'a>(prox: u8, w1: &str, w2: &str, key_buffer: &'a mut Vec) -> &'a [u8] { +fn build_key<'a>( + prox: u8, + w1: &str, + w2: &str, + key_buffer: &'a mut bumpalo::collections::Vec, +) -> &'a [u8] { key_buffer.clear(); key_buffer.push(prox); key_buffer.extend_from_slice(w1.as_bytes()); @@ -131,8 +152,8 @@ fn word_positions_into_word_pair_proximity( Ok(()) } -fn process_document_tokens( - document: &KvReader, +fn process_document_tokens<'doc>( + document: impl Document<'doc>, document_tokenizer: &DocumentTokenizer, fields_ids_map: &mut GlobalFieldsIdsMap, word_positions: &mut VecDeque<(Rc, u16)>, diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index a261efda3..758b3b6a1 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -4,40 +4,81 @@ mod tokenize_document; use std::cell::RefCell; use std::fs::File; -use std::sync::Arc; +use std::marker::PhantomData; +use std::ops::DerefMut; +use bumpalo::Bump; pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers}; pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; use grenad::Merger; use heed::RoTxn; -use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; -use thread_local::ThreadLocal; +use rayon::iter::{ParallelBridge, ParallelIterator}; use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; -use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; +use crate::update::new::indexer::document_changes::{ + for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, + IndexingContext, ThreadLocal, +}; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; +use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; -pub trait SearchableExtractor { - fn run_extraction( - index: &Index, - fields_ids_map: &GlobalFieldsIdsMap, - indexer: GrenadParameters, - document_changes: impl IntoParallelIterator< - Item = std::result::Result>, - >, +pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> { + tokenizer: &'extractor DocumentTokenizer<'extractor>, + grenad_parameters: GrenadParameters, + max_memory: Option, + _ex: PhantomData, +} + +impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> + for SearchableExtractorData<'extractor, EX> +{ + type Data = FullySend>>; + + fn init_data( + &self, + _extractor_alloc: raw_collections::alloc::RefBump<'extractor>, + ) -> Result { + Ok(FullySend(RefCell::new(CboCachedSorter::new( + // TODO use a better value + 1_000_000.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + self.grenad_parameters.chunk_compression_type, + self.grenad_parameters.chunk_compression_level, + self.grenad_parameters.max_nb_chunks, + self.max_memory, + ), + )))) + } + + fn process( + &self, + change: DocumentChange, + context: &crate::update::new::indexer::document_changes::DocumentChangeContext, + ) -> Result<()> { + EX::extract_document_change(context, self.tokenizer, change) + } +} + +pub trait SearchableExtractor: Sized + Sync { + fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + grenad_parameters: GrenadParameters, + document_changes: &DC, + indexing_context: IndexingContext<'fid, 'indexer, 'index>, + extractor_allocs: &mut ThreadLocal>>, ) -> Result> { - let max_memory = indexer.max_memory_by_thread(); + let max_memory = grenad_parameters.max_memory_by_thread(); - let rtxn = index.read_txn()?; - let stop_words = index.stop_words(&rtxn)?; - let allowed_separators = index.allowed_separators(&rtxn)?; + let rtxn = indexing_context.index.read_txn()?; + let stop_words = indexing_context.index.stop_words(&rtxn)?; + let allowed_separators = indexing_context.index.allowed_separators(&rtxn)?; let allowed_separators: Option> = allowed_separators.as_ref().map(|s| s.iter().map(String::as_str).collect()); - let dictionary = index.dictionary(&rtxn)?; + let dictionary = indexing_context.index.dictionary(&rtxn)?; let dictionary: Option> = dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); let builder = tokenizer_builder( @@ -47,10 +88,10 @@ pub trait SearchableExtractor { ); let tokenizer = builder.into_tokenizer(); - let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; - let attributes_to_skip = Self::attributes_to_skip(&rtxn, index)?; + let attributes_to_extract = Self::attributes_to_extract(&rtxn, indexing_context.index)?; + let attributes_to_skip = Self::attributes_to_skip(&rtxn, indexing_context.index)?; let localized_attributes_rules = - index.localized_attributes_rules(&rtxn)?.unwrap_or_default(); + indexing_context.index.localized_attributes_rules(&rtxn)?.unwrap_or_default(); let document_tokenizer = DocumentTokenizer { tokenizer: &tokenizer, @@ -60,48 +101,26 @@ pub trait SearchableExtractor { max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads()); + let extractor_data: SearchableExtractorData = SearchableExtractorData { + tokenizer: &document_tokenizer, + grenad_parameters, + max_memory, + _ex: PhantomData, + }; + + let datastore = ThreadLocal::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_arc_for_each_try_init( - || { - thread_local.get_or_try(|| { - let rtxn = index.read_txn().map_err(Error::from)?; - let cache = CboCachedSorter::new( - /// TODO use a better value - 1_000_000.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - ); - Ok(( - rtxn, - &document_tokenizer, - RefCell::new((fields_ids_map.clone(), cache)), - )) - }) - }, - |(rtxn, document_tokenizer, rc), document_change| { - let (fields_ids_map, cached_sorter) = &mut *rc.borrow_mut(); - Self::extract_document_change( - rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }, - )?; + for_each_document_change( + document_changes, + &extractor_data, + indexing_context, + extractor_allocs, + &datastore, + ); } { let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); @@ -109,11 +128,14 @@ pub trait SearchableExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = thread_local + let readers: Vec<_> = datastore .into_iter() .par_bridge() - .map(|(_, _, rc)| { - let (_, cached_sorter) = rc.into_inner(); + .map(|cache_entry| { + let cached_sorter: FullySend< + RefCell>, + > = cache_entry; + let cached_sorter = cached_sorter.0.into_inner(); let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) @@ -122,16 +144,16 @@ pub trait SearchableExtractor { for reader in readers { builder.extend(reader?); } + Ok(builder.build()) } } fn extract_document_change( - rtxn: &RoTxn, - index: &Index, + context: &DocumentChangeContext< + FullySend>>, + >, document_tokenizer: &DocumentTokenizer, - fields_ids_map: &mut GlobalFieldsIdsMap, - cached_sorter: &mut CboCachedSorter, document_change: DocumentChange, ) -> Result<()>; @@ -142,14 +164,17 @@ pub trait SearchableExtractor { } impl DocidsExtractor for T { - fn run_extraction( - index: &Index, - fields_ids_map: &GlobalFieldsIdsMap, - indexer: GrenadParameters, - document_changes: impl IntoParallelIterator< - Item = std::result::Result>, - >, + fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( + grenad_parameters: GrenadParameters, + document_changes: &DC, + indexing_context: IndexingContext<'fid, 'indexer, 'index>, + extractor_allocs: &mut ThreadLocal>>, ) -> Result> { - Self::run_extraction(index, fields_ids_map, indexer, document_changes) + Self::run_extraction( + grenad_parameters, + document_changes, + indexing_context, + extractor_allocs, + ) } } diff --git a/milli/src/update/new/extract/searchable/tokenize_document.rs b/milli/src/update/new/extract/searchable/tokenize_document.rs index fda619013..71585c8d2 100644 --- a/milli/src/update/new/extract/searchable/tokenize_document.rs +++ b/milli/src/update/new/extract/searchable/tokenize_document.rs @@ -4,6 +4,7 @@ use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder}; use serde_json::Value; use crate::proximity::MAX_DISTANCE; +use crate::update::new::document::Document; use crate::update::new::extract::perm_json_p::{ seek_leaf_values_in_array, seek_leaf_values_in_object, select_field, }; @@ -22,22 +23,16 @@ pub struct DocumentTokenizer<'a> { } impl<'a> DocumentTokenizer<'a> { - pub fn tokenize_document( + pub fn tokenize_document<'doc>( &self, - obkv: &KvReaderFieldId, + document: impl Document<'doc>, field_id_map: &mut GlobalFieldsIdsMap, token_fn: &mut impl FnMut(&str, FieldId, u16, &str) -> Result<()>, ) -> Result<()> { let mut field_position = HashMap::new(); - let mut field_name = String::new(); - for (field_id, field_bytes) in obkv { - let Some(field_name) = field_id_map.name(field_id).map(|s| { - field_name.clear(); - field_name.push_str(s); - &field_name - }) else { - unreachable!("field id not found in field id map"); - }; + + for entry in document.iter_top_level_fields() { + let (field_name, value) = entry?; let mut tokenize_field = |name: &str, value: &Value| { let Some(field_id) = field_id_map.id_or_insert(name) else { @@ -94,7 +89,7 @@ impl<'a> DocumentTokenizer<'a> { // if the current field is searchable or contains a searchable attribute if select_field(field_name, self.attribute_to_extract, self.attribute_to_skip) { // parse json. - match serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)? { + match serde_json::to_value(value).map_err(InternalError::SerdeJson)? { Value::Object(object) => seek_leaf_values_in_object( &object, self.attribute_to_extract, @@ -174,10 +169,13 @@ pub fn tokenizer_builder<'a>( #[cfg(test)] mod test { + use bumpalo::Bump; use charabia::TokenizerBuilder; use meili_snap::snapshot; use obkv::KvReader; + use raw_collections::RawMap; use serde_json::json; + use serde_json::value::RawValue; use super::*; use crate::FieldsIdsMap; @@ -186,40 +184,25 @@ mod test { fn test_tokenize_document() { let mut fields_ids_map = FieldsIdsMap::new(); - let field_1 = json!({ - "name": "doggo", - "age": 10, - }); - - let field_2 = json!({ + let document = json!({ + "doggo": { "name": "doggo", + "age": 10,}, + "catto": { "catto": { "name": "pesti", "age": 23, } + }, + "doggo.name": ["doggo", "catto"], + "not-me": "UNSEARCHABLE", + "me-nether": {"nope": "unsearchable"} }); - let field_3 = json!(["doggo", "catto"]); - let field_4 = json!("UNSEARCHABLE"); - let field_5 = json!({"nope": "unsearchable"}); - - let mut obkv = obkv::KvWriter::memory(); - let field_1_id = fields_ids_map.insert("doggo").unwrap(); - let field_1 = serde_json::to_string(&field_1).unwrap(); - obkv.insert(field_1_id, field_1.as_bytes()).unwrap(); - let field_2_id = fields_ids_map.insert("catto").unwrap(); - let field_2 = serde_json::to_string(&field_2).unwrap(); - obkv.insert(field_2_id, field_2.as_bytes()).unwrap(); - let field_3_id = fields_ids_map.insert("doggo.name").unwrap(); - let field_3 = serde_json::to_string(&field_3).unwrap(); - obkv.insert(field_3_id, field_3.as_bytes()).unwrap(); - let field_4_id = fields_ids_map.insert("not-me").unwrap(); - let field_4 = serde_json::to_string(&field_4).unwrap(); - obkv.insert(field_4_id, field_4.as_bytes()).unwrap(); - let field_5_id = fields_ids_map.insert("me-nether").unwrap(); - let field_5 = serde_json::to_string(&field_5).unwrap(); - obkv.insert(field_5_id, field_5.as_bytes()).unwrap(); - let value = obkv.into_inner().unwrap(); - let obkv = KvReader::from_slice(value.as_slice()); + let _field_1_id = fields_ids_map.insert("doggo").unwrap(); + let _field_2_id = fields_ids_map.insert("catto").unwrap(); + let _field_3_id = fields_ids_map.insert("doggo.name").unwrap(); + let _field_4_id = fields_ids_map.insert("not-me").unwrap(); + let _field_5_id = fields_ids_map.insert("me-nether").unwrap(); let mut tb = TokenizerBuilder::default(); let document_tokenizer = DocumentTokenizer { @@ -234,11 +217,23 @@ mod test { let mut global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock); let mut words = std::collections::BTreeMap::new(); + + let document = document.to_string(); + + let bump = Bump::new(); + let document: &RawValue = serde_json::from_str(&document).unwrap(); + let document = RawMap::from_raw_value(document, &bump).unwrap(); + let document = document.into_bump_slice(); + document_tokenizer - .tokenize_document(obkv, &mut global_fields_ids_map, &mut |_fname, fid, pos, word| { - words.insert([fid, pos], word.to_string()); - Ok(()) - }) + .tokenize_document( + document, + &mut global_fields_ids_map, + &mut |_fname, fid, pos, word| { + words.insert([fid, pos], word.to_string()); + Ok(()) + }, + ) .unwrap(); snapshot!(format!("{:#?}", words), @r###" diff --git a/milli/src/update/new/indexer/de.rs b/milli/src/update/new/indexer/de.rs new file mode 100644 index 000000000..749588c86 --- /dev/null +++ b/milli/src/update/new/indexer/de.rs @@ -0,0 +1,163 @@ +use bumpalo::Bump; +use serde_json::value::RawValue; + +use crate::documents::{validate_document_id_str, DocumentIdExtractionError, PrimaryKey}; +use crate::fields_ids_map::MutFieldIdMapper; +use crate::{FieldId, UserError}; + +// visits a document to fill the top level fields of the field id map and retrieve the external document id. +pub struct DocumentVisitor<'p, 'indexer, Mapper: MutFieldIdMapper> { + fields_ids_map: &'p mut Mapper, + primary_key: &'p PrimaryKey<'p>, + indexer: &'indexer Bump, +} + +impl<'p, 'indexer, Mapper: MutFieldIdMapper> DocumentVisitor<'p, 'indexer, Mapper> { + pub fn new( + fields_ids_map: &'p mut Mapper, + primary_key: &'p PrimaryKey<'p>, + indexer: &'indexer Bump, + ) -> Self { + Self { fields_ids_map, primary_key, indexer } + } +} + +impl<'de, 'p, 'indexer: 'de, Mapper: MutFieldIdMapper> serde::de::Visitor<'de> + for DocumentVisitor<'p, 'indexer, Mapper> +{ + type Value = std::result::Result<&'de str, DocumentIdExtractionError>; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "a map") + } + + fn visit_map(mut self, mut map: A) -> std::result::Result + where + A: serde::de::MapAccess<'de>, + { + let mut docid = None; + while let Some((fid, fields_ids_map)) = + map.next_key_seed(FieldIdMapSeed(self.fields_ids_map))? + { + use serde::de::Deserializer as _; + self.fields_ids_map = fields_ids_map; + /// FIXME unwrap => too many fields + let fid = fid.unwrap(); + + match self.primary_key { + PrimaryKey::Flat { name, field_id } => { + let value: &'de RawValue = map.next_value()?; + if fid == *field_id { + let value = match value + .deserialize_any(DocumentIdVisitor(self.indexer)) + .map_err(|_err| { + DocumentIdExtractionError::InvalidDocumentId( + UserError::InvalidDocumentId { + document_id: serde_json::to_value(value).unwrap(), + }, + ) + }) { + Ok(Ok(value)) => value, + Ok(Err(err)) | Err(err) => return Ok(Err(err)), + }; + if let Some(_previous_value) = docid.replace(value) { + return Ok(Err(DocumentIdExtractionError::TooManyDocumentIds(2))); + } + } + } + PrimaryKey::Nested { name } => todo!(), + } + } + Ok(match docid { + Some(docid) => Ok(docid), + None => Err(DocumentIdExtractionError::MissingDocumentId), + }) + } +} + +struct FieldIdMapSeed<'a, Mapper: MutFieldIdMapper>(&'a mut Mapper); + +impl<'de, 'a, Mapper: MutFieldIdMapper> serde::de::DeserializeSeed<'de> + for FieldIdMapSeed<'a, Mapper> +{ + type Value = (Option, &'a mut Mapper); + + fn deserialize(self, deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct FieldIdMapVisitor<'a, Mapper: MutFieldIdMapper>(&'a mut Mapper); + impl<'de, 'a, Mapper: MutFieldIdMapper> serde::de::Visitor<'de> for FieldIdMapVisitor<'a, Mapper> { + type Value = (Option, &'a mut Mapper); + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "expecting a string") + } + fn visit_borrowed_str(self, v: &'de str) -> std::result::Result + where + E: serde::de::Error, + { + Ok((self.0.insert(v), self.0)) + } + + fn visit_str(self, v: &str) -> std::result::Result + where + E: serde::de::Error, + { + Ok((self.0.insert(v), self.0)) + } + } + deserializer.deserialize_str(FieldIdMapVisitor(self.0)) + } +} + +struct DocumentIdVisitor<'indexer>(&'indexer Bump); + +impl<'de, 'indexer: 'de> serde::de::Visitor<'de> for DocumentIdVisitor<'indexer> { + type Value = std::result::Result<&'de str, DocumentIdExtractionError>; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "an integer or a string") + } + + fn visit_borrowed_str(self, v: &'de str) -> std::result::Result + where + E: serde::de::Error, + { + Ok(validate_document_id_str(v).ok_or_else(|| { + DocumentIdExtractionError::InvalidDocumentId(UserError::InvalidDocumentId { + document_id: serde_json::Value::String(v.to_owned()), + }) + })) + } + + fn visit_str(self, v: &str) -> std::result::Result + where + E: serde::de::Error, + { + let v = self.0.alloc_str(v); + self.visit_borrowed_str(v) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + use std::fmt::Write as _; + + let mut out = bumpalo::collections::String::new_in(&self.0); + write!(&mut out, "{v}"); + Ok(Ok(out.into_bump_str())) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + use std::fmt::Write as _; + + let mut out = bumpalo::collections::String::new_in(&self.0); + write!(&mut out, "{v}"); + Ok(Ok(out.into_bump_str())) + } +} diff --git a/milli/src/update/new/indexer/document_changes.rs b/milli/src/update/new/indexer/document_changes.rs new file mode 100644 index 000000000..8bab9903f --- /dev/null +++ b/milli/src/update/new/indexer/document_changes.rs @@ -0,0 +1,378 @@ +use std::cell::{Cell, RefCell}; +use std::sync::{Arc, RwLock}; + +use bumpalo::Bump; +use heed::RoTxn; +use raw_collections::alloc::RefBump; +use rayon::iter::IndexedParallelIterator; + +use super::super::document_change::DocumentChange; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result}; + +/// A trait for types that are **not** [`Send`] only because they would then allow concurrent access to a type that is not [`Sync`]. +/// +/// The primary example of such a type is `&T`, with `T: !Sync`. +/// +/// In the authors' understanding, a type can be `!Send` for two distinct reasons: +/// +/// 1. Because it contains data that *genuinely* cannot be moved between threads, such as thread-local data. +/// 2. Because sending the type would allow concurrent access to a `!Sync` type, which is undefined behavior. +/// +/// `MostlySend` exists to be used in bounds where you need a type whose data is **not** *attached* to a thread +/// because you might access it from a different thread, but where you will never access the type **concurrently** from +/// multiple threads. +/// +/// Like [`Send`], `MostlySend` assumes properties on types that cannot be verified by the compiler, which is why implementing +/// this trait is unsafe. +/// +/// # Safety +/// +/// Implementers of this trait promises that the following properties hold on the implementing type: +/// +/// 1. Its data can be accessed from any thread and will be the same regardless of the thread accessing it. +/// 2. Any operation that can be performed on the type does not depend on the thread that executes it. +/// +/// As these properties are subtle and are not generally tracked by the Rust type system, great care should be taken before +/// implementing `MostlySend` on a type, especially a foreign type. +/// +/// - An example of a type that verifies (1) and (2) is [`std::rc::Rc`] (when `T` is `Send` and `Sync`). +/// - An example of a type that doesn't verify (1) is thread-local data. +/// - An example of a type that doesn't verify (2) is [`std::sync::MutexGuard`]: a lot of mutex implementations require that +/// a lock is returned to the operating system on the same thread that initially locked the mutex, failing to uphold this +/// invariant will cause Undefined Behavior +/// (see last § in [the nomicon](https://doc.rust-lang.org/nomicon/send-and-sync.html)). +/// +/// It is **always safe** to implement this trait on a type that is `Send`, but no placeholder impl is provided due to limitations in +/// coherency. Use the [`FullySend`] wrapper in this situation. +pub unsafe trait MostlySend {} + +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct FullySend(pub T); + +// SAFETY: a type **fully** send is always mostly send as well. +unsafe impl MostlySend for FullySend where T: Send {} + +impl FullySend { + pub fn into(self) -> T { + self.0 + } +} + +impl From for FullySend { + fn from(value: T) -> Self { + Self(value) + } +} + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +struct MostlySendWrapper(T); + +impl MostlySendWrapper { + /// # Safety + /// + /// - (P1) Users of this type will never access the type concurrently from multiple threads without synchronization + unsafe fn new(t: T) -> Self { + Self(t) + } + + fn new_send(t: T) -> Self + where + T: Send, + { + Self(t) + } + + fn get(&self) -> T + where + T: Copy, + { + self.0 + } + + fn as_ref(&self) -> &T { + &self.0 + } + + fn as_mut(&mut self) -> &mut T { + &mut self.0 + } + + fn into_inner(self) -> T { + self.0 + } +} + +/// # Safety +/// +/// 1. `T` is [`MostlySend`], so by its safety contract it can be accessed by any thread and all of its operations are available +/// from any thread. +/// 2. (P1) of `MostlySendWrapper::new` forces the user to never access the value from multiple threads concurrently. +unsafe impl Send for MostlySendWrapper {} + +/// A wrapper around [`thread_local::ThreadLocal`] that accepts [`MostlySend`] `T`s. +pub struct ThreadLocal { + inner: thread_local::ThreadLocal>, + // FIXME: this should be necessary + //_no_send: PhantomData<*mut ()>, +} + +impl ThreadLocal { + pub fn new() -> Self { + Self { inner: thread_local::ThreadLocal::new() } + } + + pub fn with_capacity(capacity: usize) -> Self { + Self { inner: thread_local::ThreadLocal::with_capacity(capacity) } + } + + pub fn clear(&mut self) { + self.inner.clear() + } + + pub fn get(&self) -> Option<&T> { + self.inner.get().map(|t| t.as_ref()) + } + + pub fn get_or(&self, create: F) -> &T + where + F: FnOnce() -> T, + { + self.inner.get_or(|| unsafe { MostlySendWrapper::new(create()) }).as_ref() + } + + pub fn get_or_try(&self, create: F) -> std::result::Result<&T, E> + where + F: FnOnce() -> std::result::Result, + { + self.inner + .get_or_try(|| unsafe { Ok(MostlySendWrapper::new(create()?)) }) + .map(MostlySendWrapper::as_ref) + } + + pub fn get_or_default(&self) -> &T + where + T: Default, + { + self.inner.get_or_default().as_ref() + } + + pub fn iter_mut(&mut self) -> IterMut { + IterMut(self.inner.iter_mut()) + } +} + +impl IntoIterator for ThreadLocal { + type Item = T; + + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + IntoIter(self.inner.into_iter()) + } +} + +pub struct IterMut<'a, T: MostlySend>(thread_local::IterMut<'a, MostlySendWrapper>); + +impl<'a, T: MostlySend> Iterator for IterMut<'a, T> { + type Item = &'a mut T; + + fn next(&mut self) -> Option { + self.0.next().map(|t| t.as_mut()) + } +} + +pub struct IntoIter(thread_local::IntoIter>); + +impl Iterator for IntoIter { + type Item = T; + + fn next(&mut self) -> Option { + self.0.next().map(|t| t.into_inner()) + } +} + +pub struct DocumentChangeContext< + 'doc, // covariant lifetime of a single `process` call + 'extractor: 'doc, // invariant lifetime of the extractor_allocs + 'fid: 'doc, // invariant lifetime of the new_fields_ids_map + 'indexer: 'doc, // covariant lifetime of objects that outlive a single `process` call + T: MostlySend, +> { + /// The index we're indexing in + pub index: &'indexer Index, + /// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents + /// inside of the DB. + pub db_fields_ids_map: &'indexer FieldsIdsMap, + /// A transaction providing data from the DB before all indexing operations + pub txn: RoTxn<'indexer>, + + /// Global field id map that is up to date with the current state of the indexing process. + /// + /// - Inserting a field will take a lock + /// - Retrieving a field may take a lock as well + pub new_fields_ids_map: &'doc std::cell::RefCell>, + + /// Data allocated in this allocator is cleared between each call to `process`. + pub doc_alloc: Bump, + + /// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills. + pub extractor_alloc: RefBump<'extractor>, + + /// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents + doc_allocs: &'doc ThreadLocal>>, + + /// Extractor-specific data + pub data: &'doc T, +} + +impl< + 'doc, // covariant lifetime of a single `process` call + 'data: 'doc, // invariant on T lifetime of the datastore + 'extractor: 'doc, // invariant lifetime of extractor_allocs + 'fid: 'doc, // invariant lifetime of fields ids map + 'indexer: 'doc, // covariant lifetime of objects that survive a `process` call + T: MostlySend, + > DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, T> +{ + pub fn new( + index: &'indexer Index, + db_fields_ids_map: &'indexer FieldsIdsMap, + new_fields_ids_map: &'fid RwLock, + extractor_allocs: &'extractor ThreadLocal>>, + doc_allocs: &'doc ThreadLocal>>, + datastore: &'data ThreadLocal, + fields_ids_map_store: &'doc ThreadLocal>>>, + init_data: F, + ) -> Result + where + F: FnOnce(RefBump<'extractor>) -> Result, + { + let doc_alloc = + doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024 * 1024)))); + let doc_alloc = doc_alloc.0.take(); + let fields_ids_map = fields_ids_map_store + .get_or(|| RefCell::new(GlobalFieldsIdsMap::new(&new_fields_ids_map)).into()); + + let fields_ids_map = &fields_ids_map.0; + let extractor_alloc = extractor_allocs.get_or_default(); + + let extractor_alloc = RefBump::new(extractor_alloc.0.borrow()); + + let data = datastore.get_or_try(|| init_data(RefBump::clone(&extractor_alloc)))?; + + let txn = index.read_txn()?; + Ok(DocumentChangeContext { + index, + txn, + db_fields_ids_map, + new_fields_ids_map: fields_ids_map, + doc_alloc, + extractor_alloc, + data, + doc_allocs, + }) + } +} + +/// An internal iterator (i.e. using `foreach`) of `DocumentChange`s +pub trait Extractor<'extractor>: Sync { + type Data: MostlySend; + + fn init_data<'doc>(&'doc self, extractor_alloc: RefBump<'extractor>) -> Result; + + fn process<'doc>( + &'doc self, + change: DocumentChange<'doc>, + context: &'doc DocumentChangeContext, + ) -> Result<()>; +} + +pub trait DocumentChanges<'pl // lifetime of the underlying payload +>: Sync { + type Item; + + fn iter(&self) -> impl IndexedParallelIterator; + + fn item_to_document_change<'doc, // lifetime of a single `process` call + T: MostlySend>( + &'doc self, + context: &'doc DocumentChangeContext, + item: Self::Item, + ) -> Result> where 'pl: 'doc // the payload must survive the process calls + ; +} + +#[derive(Clone, Copy)] +pub struct IndexingContext< + 'fid, // invariant lifetime of fields ids map + 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation + 'index, // covariant lifetime of the index +> { + pub index: &'index Index, + pub db_fields_ids_map: &'indexer FieldsIdsMap, + pub new_fields_ids_map: &'fid RwLock, + pub doc_allocs: &'indexer ThreadLocal>>, + pub fields_ids_map_store: &'indexer ThreadLocal>>>, +} + +pub fn for_each_document_change< + 'pl, // covariant lifetime of the underlying payload + 'extractor, // invariant lifetime of extractor_alloc + 'fid, // invariant lifetime of fields ids map + 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing + 'data, // invariant on EX::Data lifetime of datastore + 'index, // covariant lifetime of the index + EX, + DC: DocumentChanges<'pl>, +>( + document_changes: &DC, + extractor: &EX, + IndexingContext { + index, + db_fields_ids_map, + new_fields_ids_map, + doc_allocs, + fields_ids_map_store, + }: IndexingContext<'fid, 'indexer, 'index>, + extractor_allocs: &'extractor mut ThreadLocal>>, + datastore: &'data ThreadLocal, +) -> Result<()> +where + EX: Extractor<'extractor>, +{ + // Clean up and reuse the extractor allocs + for extractor_alloc in extractor_allocs.iter_mut() { + extractor_alloc.0.get_mut().reset(); + } + + let pi = document_changes.iter(); + pi.try_arc_for_each_try_init( + || { + DocumentChangeContext::new( + index, + db_fields_ids_map, + new_fields_ids_map, + extractor_allocs, + doc_allocs, + datastore, + fields_ids_map_store, + move |index_alloc| extractor.init_data(index_alloc), + ) + }, + |context, item| { + // Clean up and reuse the document-specific allocator + context.doc_alloc.reset(); + + let change = + document_changes.item_to_document_change(context, item).map_err(Arc::new)?; + + let res = extractor.process(change, context).map_err(Arc::new); + + // send back the doc_alloc in the pool + context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); + + res + }, + ) +} diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index 9dbc4e52d..cafc59221 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -1,14 +1,14 @@ -use std::sync::Arc; - -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; +use bumpalo::collections::CollectIn; +use bumpalo::Bump; +use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; use roaring::RoaringBitmap; -use super::DocumentChanges; +use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend}; use crate::documents::PrimaryKey; use crate::index::db_name::EXTERNAL_DOCUMENTS_IDS; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::{Deletion, DocumentChange}; -use crate::{Error, FieldsIdsMap, Index, InternalError, Result}; +use crate::{DocumentId, InternalError, Result}; pub struct DocumentDeletion { pub to_delete: RoaringBitmap, @@ -22,38 +22,163 @@ impl DocumentDeletion { pub fn delete_documents_by_docids(&mut self, docids: RoaringBitmap) { self.to_delete |= docids; } -} -impl<'p> DocumentChanges<'p> for DocumentDeletion { - type Parameter = (&'p Index, &'p FieldsIdsMap, &'p PrimaryKey<'p>); - - fn document_changes( + pub fn into_changes<'indexer>( self, - _fields_ids_map: &mut FieldsIdsMap, - param: Self::Parameter, - ) -> Result< - impl IndexedParallelIterator>> - + Clone - + 'p, - > { - let (index, fields_ids_map, primary_key) = param; - let to_delete: Vec<_> = self.to_delete.into_iter().collect(); - Ok(to_delete.into_par_iter().try_map_try_init( - || index.read_txn().map_err(crate::Error::from), - |rtxn, docid| { - let current = index.document(rtxn, docid)?; - let external_document_id = primary_key - .document_id(current, fields_ids_map)? - .map_err(|_| InternalError::DatabaseMissingEntry { - db_name: EXTERNAL_DOCUMENTS_IDS, - key: None, - })?; - Ok(DocumentChange::Deletion(Deletion::create( - docid, - external_document_id, - current.boxed(), - ))) - }, - )) + indexer: &'indexer Bump, + primary_key: PrimaryKey<'indexer>, + ) -> DocumentDeletionChanges<'indexer> { + let to_delete: bumpalo::collections::Vec<_> = + self.to_delete.into_iter().collect_in(indexer); + + let to_delete = to_delete.into_bump_slice(); + + DocumentDeletionChanges { to_delete, primary_key } + } +} + +pub struct DocumentDeletionChanges<'indexer> { + to_delete: &'indexer [DocumentId], + primary_key: PrimaryKey<'indexer>, +} + +impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { + type Item = DocumentId; + + fn iter(&self) -> impl rayon::prelude::IndexedParallelIterator { + self.to_delete.into_par_iter().copied() + } + + fn item_to_document_change< + 'doc, // lifetime of a single `process` call + T: MostlySend, + >( + &'doc self, + context: &'doc DocumentChangeContext, + docid: Self::Item, + ) -> Result> + where + 'pl: 'doc, // the payload must survive the process calls + { + let current = context.index.document(&context.txn, docid)?; + let new_fields_ids_map = context.new_fields_ids_map.borrow(); + let new_fields_ids_map = new_fields_ids_map.local_map(); + let external_document_id = + self.primary_key.document_id(current, new_fields_ids_map)?.map_err(|_| { + InternalError::DatabaseMissingEntry { db_name: EXTERNAL_DOCUMENTS_IDS, key: None } + })?; + Ok(DocumentChange::Deletion(Deletion::create(docid, external_document_id))) + } +} + +// TODO: implement Allocator for Ref<'bump, Bump> + +#[cfg(test)] +mod test { + use std::cell::RefCell; + use std::marker::PhantomData; + use std::sync::RwLock; + + use bumpalo::Bump; + use raw_collections::alloc::RefBump; + + use crate::index::tests::TempIndex; + use crate::update::new::indexer::document_changes::{ + for_each_document_change, DocumentChangeContext, Extractor, IndexingContext, MostlySend, + ThreadLocal, + }; + use crate::update::new::indexer::DocumentDeletion; + use crate::update::new::DocumentChange; + use crate::DocumentId; + + #[test] + fn test_deletions() { + struct DeletionWithData<'extractor> { + deleted: RefCell< + hashbrown::HashSet< + DocumentId, + hashbrown::hash_map::DefaultHashBuilder, + RefBump<'extractor>, + >, + >, + } + + unsafe impl<'extractor> MostlySend for DeletionWithData<'extractor> {} + + struct TrackDeletion<'extractor>(PhantomData<&'extractor ()>); + + impl<'extractor> Extractor<'extractor> for TrackDeletion<'extractor> { + type Data = DeletionWithData<'extractor>; + + fn init_data( + &self, + extractor_alloc: raw_collections::alloc::RefBump<'extractor>, + ) -> crate::Result { + let deleted = RefCell::new(hashbrown::HashSet::new_in(extractor_alloc)); + Ok(DeletionWithData { deleted }) + } + + fn process( + &self, + change: DocumentChange, + context: &DocumentChangeContext, + ) -> crate::Result<()> { + context.data.deleted.borrow_mut().insert(change.docid()); + Ok(()) + } + } + + let mut deletions = DocumentDeletion::new(); + deletions.delete_documents_by_docids(vec![0, 2, 42].into_iter().collect()); + let indexer = Bump::new(); + + let index = TempIndex::new(); + + let rtxn = index.read_txn().unwrap(); + + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let fields_ids_map = RwLock::new(db_fields_ids_map.clone()); + + let fields_ids_map_store = ThreadLocal::new(); + + let mut extractor_allocs = ThreadLocal::new(); + let doc_allocs = ThreadLocal::new(); + + let deletion_tracker = TrackDeletion(PhantomData); + + let changes = deletions + .into_changes(&indexer, crate::documents::PrimaryKey::Flat { name: "id", field_id: 0 }); + + let context = IndexingContext { + index: &index, + db_fields_ids_map: &db_fields_ids_map, + new_fields_ids_map: &fields_ids_map, + doc_allocs: &doc_allocs, + fields_ids_map_store: &fields_ids_map_store, + }; + + for _ in 0..3 { + let datastore = ThreadLocal::new(); + + for_each_document_change( + &changes, + &deletion_tracker, + context, + &mut extractor_allocs, + &datastore, + ) + .unwrap(); + + for (index, data) in datastore.into_iter().enumerate() { + println!("deleted by {index}: {:?}", data.deleted.borrow()); + } + for alloc in extractor_allocs.iter_mut() { + let alloc = &mut alloc.0; + alloc.get_mut().reset(); + } + } + drop(deletion_tracker); + drop(changes); + drop(rtxn); } } diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 7341f4e5c..7978fc46c 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -1,19 +1,18 @@ -use std::borrow::Cow; -use std::collections::{BTreeMap, HashMap}; -use std::sync::Arc; - -use heed::types::Bytes; +use bumpalo::collections::CollectIn; +use bumpalo::Bump; use heed::RoTxn; use memmap2::Mmap; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; +use rayon::iter::IntoParallelIterator; +use serde_json::value::RawValue; use IndexDocumentsMethod as Idm; use super::super::document_change::DocumentChange; -use super::super::{CowStr, TopLevelMap}; -use super::DocumentChanges; +use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend}; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; -use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; -use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update}; +use crate::update::new::document::DocumentFromVersions; +use crate::update::new::document_change::Versions; +use crate::update::new::indexer::de::DocumentVisitor; +use crate::update::new::{Deletion, Insertion, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; @@ -22,9 +21,14 @@ pub struct DocumentOperation<'pl> { index_documents_method: IndexDocumentsMethod, } +pub struct DocumentOperationChanges<'pl> { + docids_version_offsets: &'pl [(&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>]))], + index_documents_method: IndexDocumentsMethod, +} + pub enum Payload<'pl> { Addition(&'pl [u8]), - Deletion(Vec), + Deletion(&'pl [&'pl str]), } pub struct PayloadStats { @@ -33,7 +37,7 @@ pub struct PayloadStats { } #[derive(Clone)] -enum InnerDocOp<'pl> { +pub enum InnerDocOp<'pl> { Addition(DocumentOffset<'pl>), Deletion, } @@ -61,83 +65,89 @@ impl<'pl> DocumentOperation<'pl> { Ok(PayloadStats { bytes: payload.len() as u64, document_count }) } - pub fn delete_documents(&mut self, to_delete: Vec) { + pub fn delete_documents(&mut self, to_delete: &'pl [&'pl str]) { self.operations.push(Payload::Deletion(to_delete)) } -} -impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { - type Parameter = (&'p Index, &'p RoTxn<'p>, &'p PrimaryKey<'p>); - - fn document_changes( + pub fn into_changes( self, - fields_ids_map: &mut FieldsIdsMap, - param: Self::Parameter, - ) -> Result< - impl IndexedParallelIterator>> - + Clone - + 'p, - > { - let (index, rtxn, primary_key) = param; + indexer: &'pl Bump, + index: &Index, + rtxn: &RoTxn, + primary_key: &PrimaryKey, + new_fields_ids_map: &mut FieldsIdsMap, + ) -> Result> { + use serde::de::Deserializer; + // will contain nodes from the intermediate hashmap + let document_changes_alloc = Bump::with_capacity(1024 * 1024 * 1024); // 1 MiB let documents_ids = index.documents_ids(rtxn)?; let mut available_docids = AvailableIds::new(&documents_ids); - let mut docids_version_offsets = HashMap::, _>::new(); + let mut docids_version_offsets = + hashbrown::HashMap::<&'pl str, _, _, _>::new_in(&document_changes_alloc); for operation in self.operations { match operation { Payload::Addition(payload) => { let mut iter = - serde_json::Deserializer::from_slice(payload).into_iter::(); + serde_json::Deserializer::from_slice(payload).into_iter::<&RawValue>(); /// TODO manage the error let mut previous_offset = 0; - while let Some(document) = iter.next().transpose().unwrap() { - // TODO Fetch all document fields to fill the fields ids map - document.0.keys().for_each(|key| { - fields_ids_map.insert(key.as_ref()); - }); + while let Some(document) = + iter.next().transpose().map_err(UserError::SerdeJson)? + { + let res = document + .deserialize_map(DocumentVisitor::new( + new_fields_ids_map, + primary_key, + indexer, + )) + .map_err(UserError::SerdeJson)?; - // TODO we must manage the TooManyDocumentIds,InvalidDocumentId - // we must manage the unwrap - let external_document_id = - match primary_key.document_id_from_top_level_map(&document)? { - Ok(document_id) => Ok(document_id), - Err(DocumentIdExtractionError::InvalidDocumentId(e)) => Err(e), - Err(DocumentIdExtractionError::MissingDocumentId) => { - Err(UserError::MissingDocumentId { - primary_key: primary_key.name().to_string(), - document: document.try_into().unwrap(), - }) - } - Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { - Err(UserError::TooManyDocumentIds { - primary_key: primary_key.name().to_string(), - document: document.try_into().unwrap(), - }) - } - }?; + let external_document_id = match res { + Ok(document_id) => Ok(document_id), + Err(DocumentIdExtractionError::InvalidDocumentId(e)) => Err(e), + Err(DocumentIdExtractionError::MissingDocumentId) => { + Err(UserError::MissingDocumentId { + primary_key: primary_key.name().to_string(), + document: serde_json::from_str(document.get()).unwrap(), + }) + } + Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { + Err(UserError::TooManyDocumentIds { + primary_key: primary_key.name().to_string(), + document: serde_json::from_str(document.get()).unwrap(), + }) + } + }?; let current_offset = iter.byte_offset(); let document_operation = InnerDocOp::Addition(DocumentOffset { content: &payload[previous_offset..current_offset], }); - match docids_version_offsets.get_mut(external_document_id.as_ref()) { + match docids_version_offsets.get_mut(external_document_id) { None => { - let docid = match index + let (docid, is_new) = match index .external_documents_ids() .get(rtxn, &external_document_id)? { - Some(docid) => docid, - None => available_docids - .next() - .ok_or(Error::UserError(UserError::DocumentLimitReached))?, + Some(docid) => (docid, false), + None => ( + available_docids.next().ok_or(Error::UserError( + UserError::DocumentLimitReached, + ))?, + true, + ), }; docids_version_offsets.insert( external_document_id, - (docid, vec![document_operation]), + ( + (docid, is_new), + bumpalo::vec![in indexer; document_operation], + ), ); } Some((_, offsets)) => { @@ -163,21 +173,27 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { } Payload::Deletion(to_delete) => { for external_document_id in to_delete { - match docids_version_offsets.get_mut(external_document_id.as_str()) { + match docids_version_offsets.get_mut(external_document_id) { None => { - let docid = match index + let (docid, is_new) = match index .external_documents_ids() - .get(rtxn, &external_document_id)? + .get(rtxn, external_document_id)? { - Some(docid) => docid, - None => available_docids - .next() - .ok_or(Error::UserError(UserError::DocumentLimitReached))?, + Some(docid) => (docid, false), + None => ( + available_docids.next().ok_or(Error::UserError( + UserError::DocumentLimitReached, + ))?, + true, + ), }; docids_version_offsets.insert( - CowStr(external_document_id.into()), - (docid, vec![InnerDocOp::Deletion]), + external_document_id, + ( + (docid, is_new), + bumpalo::vec![in indexer; InnerDocOp::Deletion], + ), ); } Some((_, offsets)) => { @@ -190,10 +206,11 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { } } - /// TODO is it the best way to provide FieldsIdsMap to the parallel iterator? - let fields_ids_map = fields_ids_map.clone(); // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone - let mut docids_version_offsets: Vec<_> = docids_version_offsets.drain().collect(); + let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = docids_version_offsets + .drain() + .map(|(item, (docid, v))| (item, (docid, v.into_bump_slice()))) + .collect_in(indexer); // Reorder the offsets to make sure we iterate on the file sequentially let sort_function_key = match self.index_documents_method { Idm::ReplaceDocuments => MergeDocumentForReplacement::sort_key, @@ -202,43 +219,61 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { // And finally sort them docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops)); + let docids_version_offsets = docids_version_offsets.into_bump_slice(); + Ok(DocumentOperationChanges { + docids_version_offsets, + index_documents_method: self.index_documents_method, + }) + } +} - Ok(docids_version_offsets.into_par_iter().try_map_try_init( - || index.read_txn().map_err(Error::from), - move |rtxn, (external_docid, (internal_docid, operations))| { - let document_merge_function = match self.index_documents_method { - Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, - Idm::UpdateDocuments => MergeDocumentForUpdates::merge, - }; +impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { + type Item = &'pl (&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>])); - document_merge_function( - rtxn, - index, - &fields_ids_map, - internal_docid, - external_docid.to_string(), // TODO do not clone - &operations, - ) - }, - )) + fn iter(&self) -> impl rayon::prelude::IndexedParallelIterator { + self.docids_version_offsets.into_par_iter() + } + + fn item_to_document_change<'doc, T: MostlySend + 'doc>( + &'doc self, + context: &'doc DocumentChangeContext, + item: Self::Item, + ) -> Result> + where + 'pl: 'doc, + { + let document_merge_function = match self.index_documents_method { + Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, + Idm::UpdateDocuments => MergeDocumentForUpdates::merge, + }; + + let (external_doc, ((internal_docid, is_new), operations)) = *item; + + let change = document_merge_function( + internal_docid, + external_doc, + is_new, + &context.doc_alloc, + operations, + )?; + Ok(change) } } trait MergeChanges { - /// Wether the payloads in the list of operations are useless or not. + /// Whether the payloads in the list of operations are useless or not. const USELESS_PREVIOUS_CHANGES: bool; /// Returns a key that is used to order the payloads the right way. fn sort_key(docops: &[InnerDocOp]) -> usize; - fn merge( - rtxn: &RoTxn, - index: &Index, - fields_ids_map: &FieldsIdsMap, + fn merge<'doc>( docid: DocumentId, - external_docid: String, - operations: &[InnerDocOp], - ) -> Result; + external_docid: &'doc str, + is_new: bool, + doc_alloc: &'doc Bump, + operations: &'doc [InnerDocOp], + ) -> Result>; } struct MergeDocumentForReplacement; @@ -258,48 +293,42 @@ impl MergeChanges for MergeDocumentForReplacement { /// Returns only the most recent version of a document based on the updates from the payloads. /// /// This function is only meant to be used when doing a replacement and not an update. - fn merge( - rtxn: &RoTxn, - index: &Index, - fields_ids_map: &FieldsIdsMap, + fn merge<'doc>( docid: DocumentId, - external_docid: String, - operations: &[InnerDocOp], - ) -> Result { - let current = index.documents.remap_data_type::().get(rtxn, &docid)?; - let current: Option<&KvReaderFieldId> = current.map(Into::into); - + external_doc: &'doc str, + is_new: bool, + doc_alloc: &'doc Bump, + operations: &'doc [InnerDocOp], + ) -> Result> { match operations.last() { Some(InnerDocOp::Addition(DocumentOffset { content })) => { - let map: TopLevelMap = serde_json::from_slice(content).unwrap(); - let mut document_entries = Vec::new(); - for (key, v) in map.0 { - let id = fields_ids_map.id(key.as_ref()).unwrap(); - document_entries.push((id, v)); - } + let document = serde_json::from_slice(content).unwrap(); + let document = raw_collections::RawMap::from_raw_value(document, doc_alloc) + .map_err(UserError::SerdeJson)?; - document_entries.sort_unstable_by_key(|(id, _)| *id); + let document = document.into_bump_slice(); + let document = DocumentFromVersions::new(Versions::Single(document)); - let mut writer = KvWriterFieldId::memory(); - document_entries - .into_iter() - .for_each(|(id, value)| writer.insert(id, value.get()).unwrap()); - let new = writer.into_boxed(); - - match current { - Some(current) => { - let update = Update::create(docid, external_docid, current.boxed(), new); - Ok(DocumentChange::Update(update)) - } - None => { - Ok(DocumentChange::Insertion(Insertion::create(docid, external_docid, new))) - } + if is_new { + Ok(DocumentChange::Insertion(Insertion::create( + docid, + external_doc.to_owned(), + document, + ))) + } else { + Ok(DocumentChange::Update(Update::create( + docid, + external_doc.to_owned(), + document, + true, + ))) } } Some(InnerDocOp::Deletion) => { - let deletion = match current { - Some(current) => Deletion::create(docid, external_docid, current.boxed()), - None => todo!("Do that with Louis"), + let deletion = if is_new { + Deletion::create(docid, external_doc.to_owned()) + } else { + todo!("Do that with Louis") }; Ok(DocumentChange::Deletion(deletion)) } @@ -326,18 +355,13 @@ impl MergeChanges for MergeDocumentForUpdates { /// in the grenad update files and merges them to generate a new boxed obkv. /// /// This function is only meant to be used when doing an update and not a replacement. - fn merge( - rtxn: &RoTxn, - index: &Index, - fields_ids_map: &FieldsIdsMap, + fn merge<'doc>( docid: DocumentId, - external_docid: String, - operations: &[InnerDocOp], - ) -> Result { - let mut document = BTreeMap::<_, Cow<_>>::new(); - let current = index.documents.remap_data_type::().get(rtxn, &docid)?; - let current: Option<&KvReaderFieldId> = current.map(Into::into); - + external_docid: &'doc str, + is_new: bool, + doc_alloc: &'doc Bump, + operations: &'doc [InnerDocOp], + ) -> Result> { if operations.is_empty() { unreachable!("We must not have empty set of operations on a document"); } @@ -345,24 +369,20 @@ impl MergeChanges for MergeDocumentForUpdates { let last_deletion = operations.iter().rposition(|op| matches!(op, InnerDocOp::Deletion)); let operations = &operations[last_deletion.map_or(0, |i| i + 1)..]; - // If there was a deletion we must not start - // from the original document but from scratch. - if last_deletion.is_none() { - if let Some(current) = current { - current.into_iter().for_each(|(k, v)| { - document.insert(k, v.into()); - }); - } - } + let has_deletion = last_deletion.is_some(); if operations.is_empty() { - let deletion = match current { - Some(current) => Deletion::create(docid, external_docid, current.boxed()), - None => todo!("Do that with Louis"), + let deletion = if !is_new { + Deletion::create(docid, external_docid.to_owned()) + } else { + todo!("Do that with Louis") }; + return Ok(DocumentChange::Deletion(deletion)); } + let mut versions = bumpalo::collections::Vec::with_capacity_in(operations.len(), doc_alloc); + for operation in operations { let DocumentOffset { content } = match operation { InnerDocOp::Addition(offset) => offset, @@ -371,26 +391,35 @@ impl MergeChanges for MergeDocumentForUpdates { } }; - let map: TopLevelMap = serde_json::from_slice(content).unwrap(); - for (key, v) in map.0 { - let id = fields_ids_map.id(key.as_ref()).unwrap(); - document.insert(id, v.get().as_bytes().to_vec().into()); - } + let document = serde_json::from_slice(content).unwrap(); + let document = raw_collections::RawMap::from_raw_value(document, doc_alloc) + .map_err(UserError::SerdeJson)?; + + let document = document.into_bump_slice(); + versions.push(document); } - let mut writer = KvWriterFieldId::memory(); - document.into_iter().for_each(|(id, value)| writer.insert(id, value).unwrap()); - let new = writer.into_boxed(); + let versions = versions.into_bump_slice(); + let versions = match versions { + [single] => Versions::Single(*single), + versions => Versions::Multiple(versions), + }; - match current { - Some(current) => { - let update = Update::create(docid, external_docid, current.boxed(), new); - Ok(DocumentChange::Update(update)) - } - None => { - let insertion = Insertion::create(docid, external_docid, new); - Ok(DocumentChange::Insertion(insertion)) - } + let document = DocumentFromVersions::new(versions); + + if is_new { + Ok(DocumentChange::Insertion(Insertion::create( + docid, + external_docid.to_owned(), + document, + ))) + } else { + Ok(DocumentChange::Update(Update::create( + docid, + external_docid.to_owned(), + document, + has_deletion, + ))) } } } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index f231527f6..673cd402e 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -1,7 +1,12 @@ +use std::cell::RefCell; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder}; use big_s::S; +use bumpalo::Bump; +use document_changes::{ + for_each_document_change, DocumentChanges, Extractor, FullySend, IndexingContext, ThreadLocal, +}; pub use document_deletion::DocumentDeletion; pub use document_operation::DocumentOperation; use heed::{RoTxn, RwTxn}; @@ -11,6 +16,7 @@ use rayon::ThreadPool; pub use update_by_function::UpdateByFunction; use super::channel::*; +use super::document::write_to_obkv; use super::document_change::{Deletion, DocumentChange, Insertion, Update}; use super::extract::*; use super::merger::{merge_grenad_entries, FacetFieldIdsDelta}; @@ -18,32 +24,75 @@ use super::word_fst_builder::PrefixDelta; use super::words_prefix_docids::{ compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, }; -use super::{StdResult, TopLevelMap}; +use super::{extract, StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::facet::FacetType; use crate::update::new::channel::ExtractorSender; -use crate::update::settings::InnerIndexSettings; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; -use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; +use crate::update::settings::InnerIndexSettings; use crate::update::{FacetsUpdateBulk, GrenadParameters}; +use crate::{fields_ids_map, Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; +mod de; +pub mod document_changes; mod document_deletion; mod document_operation; mod partial_dump; mod update_by_function; -pub trait DocumentChanges<'p> { - type Parameter: 'p; +struct DocumentExtractor<'a> { + document_sender: &'a DocumentSender<'a>, +} - fn document_changes( - self, - fields_ids_map: &mut FieldsIdsMap, - param: Self::Parameter, - ) -> Result< - impl IndexedParallelIterator>> - + Clone - + 'p, - >; +impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { + type Data = FullySend<()>; + + fn init_data( + &self, + extractor_alloc: raw_collections::alloc::RefBump<'extractor>, + ) -> Result { + Ok(FullySend(())) + } + + fn process( + &self, + change: DocumentChange, + context: &document_changes::DocumentChangeContext, + ) -> Result<()> { + let mut document_buffer = Vec::new(); + + let new_fields_ids_map = context.new_fields_ids_map.borrow(); + let new_fields_ids_map = &*new_fields_ids_map; + let new_fields_ids_map = new_fields_ids_map.local_map(); + + let external_docid = change.external_docid().to_owned(); + + // document but we need to create a function that collects and compresses documents. + match change { + DocumentChange::Deletion(deletion) => { + let docid = deletion.docid(); + self.document_sender.delete(docid, external_docid).unwrap(); + } + /// TODO: change NONE by SOME(vector) when implemented + DocumentChange::Update(update) => { + let docid = update.docid(); + let content = + update.new(&context.txn, context.index, &context.db_fields_ids_map)?; + let content = + write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; + self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); + } + DocumentChange::Insertion(insertion) => { + let docid = insertion.docid(); + let content = insertion.new(); + let content = + write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; + self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); + // extracted_dictionary_sender.send(self, dictionary: &[u8]); + } + } + Ok(()) + } } /// This is the main function of this crate. @@ -51,25 +100,34 @@ pub trait DocumentChanges<'p> { /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. /// /// TODO return stats -pub fn index( +pub fn index<'pl, 'indexer, 'index, DC>( wtxn: &mut RwTxn, - index: &Index, - fields_ids_map: FieldsIdsMap, + index: &'index Index, + db_fields_ids_map: &'indexer FieldsIdsMap, + new_fields_ids_map: FieldsIdsMap, pool: &ThreadPool, - document_changes: PI, + document_changes: &DC, ) -> Result<()> where - PI: IndexedParallelIterator>> - + Send - + Clone, + DC: DocumentChanges<'pl>, { let (merger_sender, writer_receiver) = merger_writer_channel(10_000); // This channel acts as a rendezvous point to ensure that we are one task ahead let (extractor_sender, merger_receiver) = extractors_merger_channels(4); - let fields_ids_map_lock = RwLock::new(fields_ids_map); - let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock); - let global_fields_ids_map_clone = global_fields_ids_map.clone(); + let new_fields_ids_map = RwLock::new(new_fields_ids_map); + + let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads()); + let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); + let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); + + let indexing_context = IndexingContext { + index, + db_fields_ids_map, + new_fields_ids_map: &new_fields_ids_map, + doc_allocs: &doc_allocs, + fields_ids_map_store: &fields_ids_map_store, + }; thread::scope(|s| { let indexer_span = tracing::Span::current(); @@ -78,26 +136,12 @@ where pool.in_place_scope(|_s| { let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); let _entered = span.enter(); - let document_changes = document_changes.into_par_iter(); // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.document_sender(); - document_changes.clone().into_par_iter().try_arc_for_each::<_, Error>( - |result| { - match result? { - DocumentChange::Deletion(Deletion { docid, external_document_id, ..}) => { - document_sender.delete(docid, external_document_id).unwrap(); - } - DocumentChange::Update(Update { docid, external_document_id, new, ..}) => { - document_sender.insert(docid, external_document_id, new).unwrap(); - } - DocumentChange::Insertion(Insertion { docid, external_document_id, new, ..}) => { - document_sender.insert(docid, external_document_id, new).unwrap(); - // extracted_dictionary_sender.send(self, dictionary: &[u8]); - } - } - Ok(()) - })?; + let document_extractor = DocumentExtractor { document_sender: &document_sender}; + let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); + for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?; document_sender.finish().unwrap(); @@ -112,13 +156,14 @@ where let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); let _entered = span.enter(); extract_and_send_docids::< + _, FacetedDocidsExtractor, FacetDocids, >( - index, - &global_fields_ids_map, grenad_parameters, - document_changes.clone(), + document_changes, + indexing_context, + &mut extractor_allocs, &extractor_sender, )?; } @@ -133,7 +178,7 @@ where exact_word_docids, word_position_docids, fid_word_count_docids, - } = WordDocidsExtractors::run_extraction(index, &global_fields_ids_map, grenad_parameters, document_changes.clone())?; + } = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?; extractor_sender.send_searchable::(word_docids).unwrap(); extractor_sender.send_searchable::(word_fid_docids).unwrap(); extractor_sender.send_searchable::(exact_word_docids).unwrap(); @@ -145,13 +190,14 @@ where let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); let _entered = span.enter(); extract_and_send_docids::< + _, WordPairProximityDocidsExtractor, WordPairProximityDocids, >( - index, - &global_fields_ids_map, grenad_parameters, - document_changes.clone(), + document_changes, + indexing_context, + &mut extractor_allocs, &extractor_sender, )?; } @@ -180,6 +226,8 @@ where }) })?; + let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); + let indexer_span = tracing::Span::current(); // TODO manage the errors correctly let merger_thread = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || { @@ -192,7 +240,7 @@ where merger_sender, &rtxn, index, - global_fields_ids_map_clone, + global_fields_ids_map, ) })?; @@ -223,7 +271,10 @@ where Ok(()) as Result<_> })?; - let fields_ids_map = fields_ids_map_lock.into_inner().unwrap(); + drop(indexing_context); + drop(fields_ids_map_store); + + let fields_ids_map = new_fields_ids_map.into_inner().unwrap(); index.put_fields_ids_map(wtxn, &fields_ids_map)?; // used to update the localized and weighted maps while sharing the update code with the settings pipeline. @@ -284,14 +335,23 @@ fn compute_facet_level_database( /// TODO: GrenadParameters::default() should be removed in favor a passed parameter /// TODO: manage the errors correctly /// TODO: we must have a single trait that also gives the extractor type -fn extract_and_send_docids( - index: &Index, - fields_ids_map: &GlobalFieldsIdsMap, - indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>>, +fn extract_and_send_docids< + 'pl, + 'fid, + 'indexer, + 'index, + DC: DocumentChanges<'pl>, + E: DocidsExtractor, + D: MergerOperationType, +>( + grenad_parameters: GrenadParameters, + document_changes: &DC, + indexing_context: IndexingContext<'fid, 'indexer, 'index>, + extractor_allocs: &mut ThreadLocal>>, sender: &ExtractorSender, ) -> Result<()> { - let merger = E::run_extraction(index, fields_ids_map, indexer, document_changes)?; + let merger = + E::run_extraction(grenad_parameters, document_changes, indexing_context, extractor_allocs)?; sender.send_searchable::(merger).unwrap(); Ok(()) } diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 08b97b931..527f5c751 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -1,13 +1,17 @@ -use std::sync::Arc; +use std::ops::DerefMut; use rayon::iter::IndexedParallelIterator; +use serde::Deserializer; +use serde_json::value::RawValue; -use super::DocumentChanges; +use super::de::DocumentVisitor; +use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend}; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::update::concurrent_available_ids::ConcurrentAvailableIds; -use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; -use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; -use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; +use crate::update::new::document::DocumentFromVersions; +use crate::update::new::document_change::Versions; +use crate::update::new::{DocumentChange, Insertion}; +use crate::{Error, InternalError, Result, UserError}; pub struct PartialDump { iter: I, @@ -17,69 +21,81 @@ impl PartialDump { pub fn new_from_jsonlines(iter: I) -> Self { PartialDump { iter } } -} -impl<'p, I> DocumentChanges<'p> for PartialDump -where - I: IndexedParallelIterator + Clone + 'p, -{ - type Parameter = (&'p FieldsIdsMap, &'p ConcurrentAvailableIds, &'p PrimaryKey<'p>); - - /// Note for future self: - /// - the field ids map must already be valid so you must have to generate it beforehand. - /// - We should probably expose another method that generates the fields ids map from an iterator of JSON objects. - /// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items). - fn document_changes( + pub fn into_changes<'index>( self, - _fields_ids_map: &mut FieldsIdsMap, - param: Self::Parameter, - ) -> Result< - impl IndexedParallelIterator>> - + Clone - + 'p, - > { - let (fields_ids_map, concurrent_available_ids, primary_key) = param; - - Ok(self.iter.try_map_try_init( - || Ok(()), - |_, object| { - let docid = match concurrent_available_ids.next() { - Some(id) => id, - None => return Err(Error::UserError(UserError::DocumentLimitReached)), - }; - - let mut writer = KvWriterFieldId::memory(); - object.iter().for_each(|(key, value)| { - let key = fields_ids_map.id(key).unwrap(); - /// TODO better error management - let value = serde_json::to_vec(&value).unwrap(); - /// TODO it is not ordered - writer.insert(key, value).unwrap(); - }); - - let document = writer.into_boxed(); - let external_docid = match primary_key.document_id(&document, fields_ids_map)? { - Ok(document_id) => Ok(document_id), - Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => { - Err(user_error) - } - Err(DocumentIdExtractionError::MissingDocumentId) => { - Err(UserError::MissingDocumentId { - primary_key: primary_key.name().to_string(), - document: all_obkv_to_json(&document, fields_ids_map)?, - }) - } - Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { - Err(UserError::TooManyDocumentIds { - primary_key: primary_key.name().to_string(), - document: all_obkv_to_json(&document, fields_ids_map)?, - }) - } - }?; - - let insertion = Insertion::create(docid, external_docid, document); - Ok(DocumentChange::Insertion(insertion)) - }, - )) + concurrent_available_ids: &'index ConcurrentAvailableIds, + primary_key: &'index PrimaryKey, + ) -> PartialDumpChanges<'index, I> { + /// Note for future self: + /// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items). + PartialDumpChanges { iter: self.iter, concurrent_available_ids, primary_key } + } +} + +pub struct PartialDumpChanges<'doc, I> { + iter: I, + concurrent_available_ids: &'doc ConcurrentAvailableIds, + primary_key: &'doc PrimaryKey<'doc>, +} + +impl<'index, Iter> DocumentChanges<'index> for PartialDumpChanges<'index, Iter> +where + Iter: IndexedParallelIterator> + Clone + Sync + 'index, +{ + type Item = Box; + + fn iter(&self) -> impl IndexedParallelIterator { + self.iter.clone() + } + + fn item_to_document_change<'doc, T: MostlySend + 'doc>( + &'doc self, + context: &'doc DocumentChangeContext, + document: Self::Item, + ) -> Result> + where + 'index: 'doc, + { + let doc_alloc = &context.doc_alloc; + let docid = match self.concurrent_available_ids.next() { + Some(id) => id, + None => return Err(Error::UserError(UserError::DocumentLimitReached)), + }; + + let mut fields_ids_map = context.new_fields_ids_map.borrow_mut(); + let fields_ids_map = fields_ids_map.deref_mut(); + + let res = document + .deserialize_map(DocumentVisitor::new(fields_ids_map, self.primary_key, &doc_alloc)) + .map_err(UserError::SerdeJson)?; + + let external_document_id = match res { + Ok(document_id) => Ok(document_id), + Err(DocumentIdExtractionError::InvalidDocumentId(e)) => Err(e), + Err(DocumentIdExtractionError::MissingDocumentId) => { + Err(UserError::MissingDocumentId { + primary_key: self.primary_key.name().to_string(), + document: serde_json::from_str(document.get()).unwrap(), + }) + } + Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { + Err(UserError::TooManyDocumentIds { + primary_key: self.primary_key.name().to_string(), + document: serde_json::from_str(document.get()).unwrap(), + }) + } + }?; + let document = doc_alloc.alloc_str(document.get()); + let document: &RawValue = unsafe { std::mem::transmute(document) }; + + let document = raw_collections::RawMap::from_raw_value(document, doc_alloc) + .map_err(InternalError::SerdeJson)?; + + let document = document.into_bump_slice(); + let document = DocumentFromVersions::new(Versions::Single(document)); + + let insertion = Insertion::create(docid, external_document_id.to_owned(), document); + Ok(DocumentChange::Insertion(insertion)) } } diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index d6d532433..9bff15b5c 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -1,25 +1,33 @@ -use std::sync::Arc; +use rayon::iter::IntoParallelIterator; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; - -use super::DocumentChanges; -use crate::update::new::DocumentChange; -use crate::{Error, FieldsIdsMap, Result}; +use super::document_changes::{DocumentChangeContext, DocumentChanges}; +use crate::Result; pub struct UpdateByFunction; -impl<'p> DocumentChanges<'p> for UpdateByFunction { - type Parameter = (); - - fn document_changes( - self, - _fields_ids_map: &mut FieldsIdsMap, - _param: Self::Parameter, - ) -> Result< - impl IndexedParallelIterator>> - + Clone - + 'p, - > { - Ok((0..100).into_par_iter().map(|_| todo!())) +impl UpdateByFunction { + pub fn into_changes(self) -> UpdateByFunctionChanges { + UpdateByFunctionChanges + } +} + +pub struct UpdateByFunctionChanges; + +impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges { + type Item = u32; + + fn iter(&self) -> impl rayon::prelude::IndexedParallelIterator { + (0..100).into_par_iter() + } + + fn item_to_document_change<'doc, T: super::document_changes::MostlySend + 'doc>( + &self, + _context: &'doc DocumentChangeContext, + _item: Self::Item, + ) -> Result> + where + 'index: 'doc, + { + todo!() } } diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 9751be66c..524608801 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -3,10 +3,10 @@ use std::io::{self}; use bincode::ErrorKind; use grenad::Merger; +use hashbrown::HashSet; use heed::types::Bytes; use heed::{Database, RoTxn}; use roaring::RoaringBitmap; -use std::collections::HashSet; use super::channel::*; use super::extract::FacetKind; @@ -149,17 +149,8 @@ pub fn merge_grenad_entries( let current = index.documents.remap_data_type::().get(rtxn, &docid)?; let current: Option<&KvReaderFieldId> = current.map(Into::into); let change = match current { - Some(current) => DocumentChange::Update(Update::create( - docid, - external_id, - current.boxed(), - document, - )), - None => DocumentChange::Insertion(Insertion::create( - docid, - external_id, - document, - )), + Some(current) => DocumentChange::Update(todo!()), + None => DocumentChange::Insertion(todo!()), }; geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; } @@ -174,12 +165,7 @@ pub fn merge_grenad_entries( sender.documents().delete(docid, external_id.clone()).unwrap(); if let Some(geo_extractor) = geo_extractor.as_mut() { - let current = index.document(rtxn, docid)?; - let change = DocumentChange::Deletion(Deletion::create( - docid, - external_id, - current.boxed(), - )); + let change = DocumentChange::Deletion(Deletion::create(docid, todo!())); geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; } } diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 4a83529dc..37ccc75cd 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -5,6 +5,7 @@ use super::del_add::DelAdd; use crate::FieldId; mod channel; +pub mod document; mod document_change; mod extract; pub mod indexer;