From f8d0f5265fea004057749d9b8c67897dce471f0c Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 4 May 2021 22:01:11 +0300 Subject: [PATCH] fix(update): fields distribution after documents merge --- milli/src/index.rs | 7 ++-- milli/src/update/index_documents/transform.rs | 36 +++++-------------- 2 files changed, 13 insertions(+), 30 deletions(-) diff --git a/milli/src/index.rs b/milli/src/index.rs index 945567cdb..f222069f6 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -566,11 +566,11 @@ pub(crate) mod tests { let mut wtxn = index.write_txn().unwrap(); let content = &br#"[ - { "name": "kevin" }, - { "name": "bob", "age": 20 } + { "id": 1, "name": "kevin" }, + { "id": 2, "name": "bob", "age": 20 }, + { "id": 2, "name": "bob", "age": 20 } ]"#[..]; let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); - builder.enable_autogenerate_docids(); builder.update_format(UpdateFormat::Json); builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); @@ -579,6 +579,7 @@ pub(crate) mod tests { let fields_distribution = index.fields_distribution(&rtxn).unwrap(); assert_eq!(fields_distribution, hashmap! { + "id".to_string() => 2, "name".to_string() => 2, "age".to_string() => 1, }); diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 308a24abc..e029a5135 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -1,5 +1,4 @@ use std::borrow::Cow; -use std::collections::HashMap; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::iter::Peekable; @@ -76,7 +75,6 @@ impl Transform<'_, '_> { F: Fn(UpdateIndexingStep) + Sync, { let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; - let mut fields_distribution = self.index.fields_distribution(self.rtxn)?; let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap(); // Deserialize the whole batch of documents in memory. @@ -106,7 +104,7 @@ impl Transform<'_, '_> { return Ok(TransformOutput { primary_key, fields_ids_map, - fields_distribution, + fields_distribution: self.index.fields_distribution(self.rtxn)?, external_documents_ids: ExternalDocumentsIds::default(), new_documents_ids: RoaringBitmap::new(), replaced_documents_ids: RoaringBitmap::new(), @@ -137,8 +135,6 @@ impl Transform<'_, '_> { let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; let mut documents_count = 0; - let mut fields_ids_distribution = HashMap::new(); - for result in documents { let document = result?; @@ -153,9 +149,7 @@ impl Transform<'_, '_> { // We prepare the fields ids map with the documents keys. for (key, _value) in &document { - let field_id = fields_ids_map.insert(&key).context("field id limit reached")?; - - *fields_ids_distribution.entry(field_id).or_insert(0) += 1; + fields_ids_map.insert(&key).context("field id limit reached")?; } // We retrieve the user id from the document based on the primary key name, @@ -198,11 +192,6 @@ impl Transform<'_, '_> { documents_count += 1; } - for (field_id, count) in fields_ids_distribution { - let field_name = fields_ids_map.name(field_id).unwrap(); - *fields_distribution.entry(field_name.to_string()).or_default() += count; - } - progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat { documents_seen: documents_count, }); @@ -213,7 +202,6 @@ impl Transform<'_, '_> { sorter, primary_key, fields_ids_map, - fields_distribution, documents_count, external_documents_ids, progress_callback, @@ -226,7 +214,6 @@ impl Transform<'_, '_> { F: Fn(UpdateIndexingStep) + Sync, { let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; - let mut fields_distribution = self.index.fields_distribution(self.rtxn)?; let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap(); let mut csv = csv::Reader::from_reader(reader); @@ -284,8 +271,6 @@ impl Transform<'_, '_> { let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; let mut documents_count = 0; - let mut fields_ids_distribution = HashMap::new(); - let mut record = csv::StringRecord::new(); while csv.read_record(&mut record)? { obkv_buffer.clear(); @@ -324,8 +309,6 @@ impl Transform<'_, '_> { json_buffer.clear(); serde_json::to_writer(&mut json_buffer, &field)?; writer.insert(*field_id, &json_buffer)?; - - *fields_ids_distribution.entry(*field_id).or_insert(0) += 1; } // We use the extracted/generated user id as the key for this document. @@ -333,11 +316,6 @@ impl Transform<'_, '_> { documents_count += 1; } - for (field_id, count) in fields_ids_distribution { - let field_name = fields_ids_map.name(field_id).unwrap(); - *fields_distribution.entry(field_name.to_string()).or_default() += count; - } - progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat { documents_seen: documents_count, }); @@ -352,7 +330,6 @@ impl Transform<'_, '_> { sorter, primary_key_name, fields_ids_map, - fields_distribution, documents_count, external_documents_ids, progress_callback, @@ -367,7 +344,6 @@ impl Transform<'_, '_> { sorter: grenad::Sorter, primary_key: String, fields_ids_map: FieldsIdsMap, - fields_distribution: FieldsDistribution, approximate_number_of_documents: usize, mut external_documents_ids: ExternalDocumentsIds<'_>, progress_callback: F, @@ -376,6 +352,7 @@ impl Transform<'_, '_> { F: Fn(UpdateIndexingStep) + Sync, { let documents_ids = self.index.documents_ids(self.rtxn)?; + let mut fields_distribution = self.index.fields_distribution(self.rtxn)?; let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); // Once we have sort and deduplicated the documents we write them into a final file. @@ -396,7 +373,6 @@ impl Transform<'_, '_> { let mut documents_count = 0; let mut iter = sorter.into_iter()?; while let Some((external_id, update_obkv)) = iter.next()? { - if self.log_every_n.map_or(false, |len| documents_count % len == 0) { progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { documents_seen: documents_count, @@ -438,6 +414,12 @@ impl Transform<'_, '_> { // We insert the document under the documents ids map into the final file. final_sorter.insert(docid.to_be_bytes(), obkv)?; documents_count += 1; + + let reader = obkv::KvReader::new(obkv); + for (field_id, _) in reader.iter() { + let field_name = fields_ids_map.name(field_id).unwrap(); + *fields_distribution.entry(field_name.to_string()).or_default() += 1; + } } progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {