From fc9f3f31e74edda92ad1beb31226e9cb100781e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Lecrenier?= Date: Mon, 18 Jul 2022 16:08:01 +0200 Subject: [PATCH] Change DocumentsBatchReader to access cursor and index at same time Otherwise it is not possible to iterate over all documents while using the fields index at the same time. --- milli/src/documents/builder.rs | 71 ++++++++++--------- milli/src/documents/enriched.rs | 25 ++----- milli/src/documents/mod.rs | 16 +++-- milli/src/documents/reader.rs | 20 ++---- milli/src/update/index_documents/enrich.rs | 9 +-- milli/src/update/index_documents/transform.rs | 6 +- 6 files changed, 65 insertions(+), 82 deletions(-) diff --git a/milli/src/documents/builder.rs b/milli/src/documents/builder.rs index bb9d6aa68..dc027e1b7 100644 --- a/milli/src/documents/builder.rs +++ b/milli/src/documents/builder.rs @@ -216,9 +216,9 @@ mod test { assert_eq!(builder.documents_count(), 2); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); - let index = cursor.documents_batch_index().clone(); + let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); assert_eq!(index.len(), 3); let document = cursor.next_document().unwrap().unwrap(); @@ -240,9 +240,9 @@ mod test { assert_eq!(builder.documents_count(), 2); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); - let index = cursor.documents_batch_index().clone(); + let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); assert_eq!(index.len(), 2); let document = cursor.next_document().unwrap().unwrap(); @@ -264,9 +264,9 @@ mod test { builder.append_csv(csv).unwrap(); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); - let index = cursor.documents_batch_index().clone(); + let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); @@ -292,9 +292,9 @@ mod test { builder.append_csv(csv).unwrap(); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); - let index = cursor.documents_batch_index().clone(); + let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); @@ -319,9 +319,9 @@ mod test { builder.append_csv(csv).unwrap(); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); - let index = cursor.documents_batch_index().clone(); + let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); @@ -346,9 +346,9 @@ mod test { builder.append_csv(csv).unwrap(); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); - let index = cursor.documents_batch_index().clone(); + let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); @@ -373,9 +373,9 @@ mod test { builder.append_csv(csv).unwrap(); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); - let index = cursor.documents_batch_index().clone(); + let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); @@ -400,9 +400,9 @@ mod test { builder.append_csv(csv).unwrap(); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); - let index = cursor.documents_batch_index().clone(); + let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); @@ -427,9 +427,9 @@ mod test { builder.append_csv(csv).unwrap(); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); - let index = cursor.documents_batch_index().clone(); + let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); @@ -454,9 +454,9 @@ mod test { builder.append_csv(csv).unwrap(); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); - let index = cursor.documents_batch_index().clone(); + let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); @@ -482,8 +482,9 @@ mod test { builder.append_csv(csv).unwrap(); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); + let (mut cursor, _) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); assert!(cursor.next_document().is_err()); } @@ -498,9 +499,9 @@ mod test { builder.append_csv(csv).unwrap(); let vector = builder.into_inner().unwrap(); - let mut cursor = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); - let index = cursor.documents_batch_index().clone(); + let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); let val = obkv_to_object(&doc, &index).map(Value::from).unwrap(); diff --git a/milli/src/documents/enriched.rs b/milli/src/documents/enriched.rs index 4f45a891a..fa21c0f87 100644 --- a/milli/src/documents/enriched.rs +++ b/milli/src/documents/enriched.rs @@ -56,14 +56,13 @@ impl EnrichedDocumentsBatchReader { } /// This method returns a forward cursor over the enriched documents. - pub fn into_cursor(self) -> EnrichedDocumentsBatchCursor { + pub fn into_cursor_and_fields_index( + self, + ) -> (EnrichedDocumentsBatchCursor, DocumentsBatchIndex) { let EnrichedDocumentsBatchReader { documents, primary_key, mut external_ids } = self; + let (documents, fields_index) = documents.into_cursor_and_fields_index(); external_ids.reset(); - EnrichedDocumentsBatchCursor { - documents: documents.into_cursor(), - primary_key, - external_ids, - } + (EnrichedDocumentsBatchCursor { documents, primary_key, external_ids }, fields_index) } } @@ -80,23 +79,9 @@ pub struct EnrichedDocumentsBatchCursor { } impl EnrichedDocumentsBatchCursor { - pub fn into_reader(self) -> EnrichedDocumentsBatchReader { - let EnrichedDocumentsBatchCursor { documents, primary_key, external_ids } = self; - EnrichedDocumentsBatchReader { - documents: documents.into_reader(), - primary_key, - external_ids, - } - } - pub fn primary_key(&self) -> &str { &self.primary_key } - - pub fn documents_batch_index(&self) -> &DocumentsBatchIndex { - self.documents.documents_batch_index() - } - /// Resets the cursor to be able to read from the start again. pub fn reset(&mut self) { self.documents.reset(); diff --git a/milli/src/documents/mod.rs b/milli/src/documents/mod.rs index c5ff7a120..e766e29cf 100644 --- a/milli/src/documents/mod.rs +++ b/milli/src/documents/mod.rs @@ -203,10 +203,11 @@ mod test { builder.append_json_object(value.as_object().unwrap()).unwrap(); let vector = builder.into_inner().unwrap(); - let mut documents = - DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor(); + let (mut documents, index) = DocumentsBatchReader::from_reader(Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); - assert_eq!(documents.documents_batch_index().iter().count(), 5); + assert_eq!(index.iter().count(), 5); let reader = documents.next_document().unwrap().unwrap(); assert_eq!(reader.iter().count(), 5); assert!(documents.next_document().unwrap().is_none()); @@ -226,9 +227,10 @@ mod test { builder.append_json_object(doc2.as_object().unwrap()).unwrap(); let vector = builder.into_inner().unwrap(); - let mut documents = - DocumentsBatchReader::from_reader(io::Cursor::new(vector)).unwrap().into_cursor(); - assert_eq!(documents.documents_batch_index().iter().count(), 2); + let (mut documents, index) = DocumentsBatchReader::from_reader(io::Cursor::new(vector)) + .unwrap() + .into_cursor_and_fields_index(); + assert_eq!(index.iter().count(), 2); let reader = documents.next_document().unwrap().unwrap(); assert_eq!(reader.iter().count(), 1); assert!(documents.next_document().unwrap().is_some()); @@ -243,7 +245,7 @@ mod test { } }]); - let mut cursor = docs_reader.into_cursor(); + let (mut cursor, _) = docs_reader.into_cursor_and_fields_index(); let doc = cursor.next_document().unwrap().unwrap(); let nested: Value = serde_json::from_slice(doc.get(0).unwrap()).unwrap(); assert_eq!(nested, json!({ "toto": ["hello"] })); diff --git a/milli/src/documents/reader.rs b/milli/src/documents/reader.rs index 70b8b0131..a8a4c662d 100644 --- a/milli/src/documents/reader.rs +++ b/milli/src/documents/reader.rs @@ -17,6 +17,10 @@ pub struct DocumentsBatchReader { } impl DocumentsBatchReader { + pub fn new(cursor: DocumentsBatchCursor, fields_index: DocumentsBatchIndex) -> Self { + Self { cursor: cursor.cursor, fields_index } + } + /// Construct a `DocumentsReader` from a reader. /// /// It first retrieves the index, then moves to the first document. Use the `into_cursor` @@ -46,30 +50,20 @@ impl DocumentsBatchReader { } /// This method returns a forward cursor over the documents. - pub fn into_cursor(self) -> DocumentsBatchCursor { + pub fn into_cursor_and_fields_index(self) -> (DocumentsBatchCursor, DocumentsBatchIndex) { let DocumentsBatchReader { cursor, fields_index } = self; - let mut cursor = DocumentsBatchCursor { cursor, fields_index }; + let mut cursor = DocumentsBatchCursor { cursor }; cursor.reset(); - cursor + (cursor, fields_index) } } /// A forward cursor over the documents in a `DocumentsBatchReader`. pub struct DocumentsBatchCursor { cursor: grenad::ReaderCursor, - fields_index: DocumentsBatchIndex, } impl DocumentsBatchCursor { - pub fn into_reader(self) -> DocumentsBatchReader { - let DocumentsBatchCursor { cursor, fields_index, .. } = self; - DocumentsBatchReader { cursor, fields_index } - } - - pub fn documents_batch_index(&self) -> &DocumentsBatchIndex { - &self.fields_index - } - /// Resets the cursor to be able to read from the start again. pub fn reset(&mut self) { self.cursor.reset(); diff --git a/milli/src/update/index_documents/enrich.rs b/milli/src/update/index_documents/enrich.rs index 51495c598..7c9a016d8 100644 --- a/milli/src/update/index_documents/enrich.rs +++ b/milli/src/update/index_documents/enrich.rs @@ -27,8 +27,8 @@ pub fn enrich_documents_batch( autogenerate_docids: bool, reader: DocumentsBatchReader, ) -> Result, UserError>> { - let mut cursor = reader.into_cursor(); - let mut documents_batch_index = cursor.documents_batch_index().clone(); + let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index(); + let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?; let mut uuid_buffer = [0; uuid::fmt::Hyphenated::LENGTH]; @@ -103,9 +103,10 @@ pub fn enrich_documents_batch( } let external_ids = writer_into_reader(external_ids)?; + let primary_key_name = primary_key.name().to_string(); let reader = EnrichedDocumentsBatchReader::new( - cursor.into_reader(), - primary_key.name().to_string(), + DocumentsBatchReader::new(cursor, documents_batch_index), + primary_key_name, external_ids, )?; diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index d03a803fd..0de90924a 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -146,11 +146,11 @@ impl<'a, 'i> Transform<'a, 'i> { R: Read + Seek, F: Fn(UpdateIndexingStep) + Sync, { - let mut cursor = reader.into_cursor(); - let fields_index = cursor.documents_batch_index(); + let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); + let external_documents_ids = self.index.external_documents_ids(wtxn)?; - let mapping = create_fields_mapping(&mut self.fields_ids_map, fields_index)?; + let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; let primary_key = cursor.primary_key().to_string(); let primary_key_id =