mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 17:11:15 +08:00
Change DocumentsBatchReader to access cursor and index at same time
Otherwise it is not possible to iterate over all documents while using the fields index at the same time.
This commit is contained in:
parent
ab1571cdec
commit
fc9f3f31e7
@ -216,9 +216,9 @@ mod test {
|
|||||||
assert_eq!(builder.documents_count(), 2);
|
assert_eq!(builder.documents_count(), 2);
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
let index = cursor.documents_batch_index().clone();
|
.into_cursor_and_fields_index();
|
||||||
assert_eq!(index.len(), 3);
|
assert_eq!(index.len(), 3);
|
||||||
|
|
||||||
let document = cursor.next_document().unwrap().unwrap();
|
let document = cursor.next_document().unwrap().unwrap();
|
||||||
@ -240,9 +240,9 @@ mod test {
|
|||||||
assert_eq!(builder.documents_count(), 2);
|
assert_eq!(builder.documents_count(), 2);
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
let index = cursor.documents_batch_index().clone();
|
.into_cursor_and_fields_index();
|
||||||
assert_eq!(index.len(), 2);
|
assert_eq!(index.len(), 2);
|
||||||
|
|
||||||
let document = cursor.next_document().unwrap().unwrap();
|
let document = cursor.next_document().unwrap().unwrap();
|
||||||
@ -264,9 +264,9 @@ mod test {
|
|||||||
builder.append_csv(csv).unwrap();
|
builder.append_csv(csv).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
let index = cursor.documents_batch_index().clone();
|
.into_cursor_and_fields_index();
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
let doc = cursor.next_document().unwrap().unwrap();
|
||||||
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
||||||
|
|
||||||
@ -292,9 +292,9 @@ mod test {
|
|||||||
builder.append_csv(csv).unwrap();
|
builder.append_csv(csv).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
let index = cursor.documents_batch_index().clone();
|
.into_cursor_and_fields_index();
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
let doc = cursor.next_document().unwrap().unwrap();
|
||||||
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
||||||
@ -319,9 +319,9 @@ mod test {
|
|||||||
builder.append_csv(csv).unwrap();
|
builder.append_csv(csv).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
let index = cursor.documents_batch_index().clone();
|
.into_cursor_and_fields_index();
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
let doc = cursor.next_document().unwrap().unwrap();
|
||||||
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
||||||
@ -346,9 +346,9 @@ mod test {
|
|||||||
builder.append_csv(csv).unwrap();
|
builder.append_csv(csv).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
let index = cursor.documents_batch_index().clone();
|
.into_cursor_and_fields_index();
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
let doc = cursor.next_document().unwrap().unwrap();
|
||||||
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
||||||
@ -373,9 +373,9 @@ mod test {
|
|||||||
builder.append_csv(csv).unwrap();
|
builder.append_csv(csv).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
let index = cursor.documents_batch_index().clone();
|
.into_cursor_and_fields_index();
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
let doc = cursor.next_document().unwrap().unwrap();
|
||||||
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
||||||
@ -400,9 +400,9 @@ mod test {
|
|||||||
builder.append_csv(csv).unwrap();
|
builder.append_csv(csv).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
let index = cursor.documents_batch_index().clone();
|
.into_cursor_and_fields_index();
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
let doc = cursor.next_document().unwrap().unwrap();
|
||||||
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
||||||
@ -427,9 +427,9 @@ mod test {
|
|||||||
builder.append_csv(csv).unwrap();
|
builder.append_csv(csv).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
let index = cursor.documents_batch_index().clone();
|
.into_cursor_and_fields_index();
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
let doc = cursor.next_document().unwrap().unwrap();
|
||||||
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
||||||
@ -454,9 +454,9 @@ mod test {
|
|||||||
builder.append_csv(csv).unwrap();
|
builder.append_csv(csv).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
let index = cursor.documents_batch_index().clone();
|
.into_cursor_and_fields_index();
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
let doc = cursor.next_document().unwrap().unwrap();
|
||||||
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
||||||
@ -482,8 +482,9 @@ mod test {
|
|||||||
builder.append_csv(csv).unwrap();
|
builder.append_csv(csv).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, _) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
|
.into_cursor_and_fields_index();
|
||||||
|
|
||||||
assert!(cursor.next_document().is_err());
|
assert!(cursor.next_document().is_err());
|
||||||
}
|
}
|
||||||
@ -498,9 +499,9 @@ mod test {
|
|||||||
builder.append_csv(csv).unwrap();
|
builder.append_csv(csv).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut cursor =
|
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
let index = cursor.documents_batch_index().clone();
|
.into_cursor_and_fields_index();
|
||||||
|
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
let doc = cursor.next_document().unwrap().unwrap();
|
||||||
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
let val = obkv_to_object(&doc, &index).map(Value::from).unwrap();
|
||||||
|
@ -56,14 +56,13 @@ impl<R: io::Read + io::Seek> EnrichedDocumentsBatchReader<R> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// This method returns a forward cursor over the enriched documents.
|
/// This method returns a forward cursor over the enriched documents.
|
||||||
pub fn into_cursor(self) -> EnrichedDocumentsBatchCursor<R> {
|
pub fn into_cursor_and_fields_index(
|
||||||
|
self,
|
||||||
|
) -> (EnrichedDocumentsBatchCursor<R>, DocumentsBatchIndex) {
|
||||||
let EnrichedDocumentsBatchReader { documents, primary_key, mut external_ids } = self;
|
let EnrichedDocumentsBatchReader { documents, primary_key, mut external_ids } = self;
|
||||||
|
let (documents, fields_index) = documents.into_cursor_and_fields_index();
|
||||||
external_ids.reset();
|
external_ids.reset();
|
||||||
EnrichedDocumentsBatchCursor {
|
(EnrichedDocumentsBatchCursor { documents, primary_key, external_ids }, fields_index)
|
||||||
documents: documents.into_cursor(),
|
|
||||||
primary_key,
|
|
||||||
external_ids,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -80,23 +79,9 @@ pub struct EnrichedDocumentsBatchCursor<R> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<R> EnrichedDocumentsBatchCursor<R> {
|
impl<R> EnrichedDocumentsBatchCursor<R> {
|
||||||
pub fn into_reader(self) -> EnrichedDocumentsBatchReader<R> {
|
|
||||||
let EnrichedDocumentsBatchCursor { documents, primary_key, external_ids } = self;
|
|
||||||
EnrichedDocumentsBatchReader {
|
|
||||||
documents: documents.into_reader(),
|
|
||||||
primary_key,
|
|
||||||
external_ids,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn primary_key(&self) -> &str {
|
pub fn primary_key(&self) -> &str {
|
||||||
&self.primary_key
|
&self.primary_key
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
|
|
||||||
self.documents.documents_batch_index()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Resets the cursor to be able to read from the start again.
|
/// Resets the cursor to be able to read from the start again.
|
||||||
pub fn reset(&mut self) {
|
pub fn reset(&mut self) {
|
||||||
self.documents.reset();
|
self.documents.reset();
|
||||||
|
@ -203,10 +203,11 @@ mod test {
|
|||||||
builder.append_json_object(value.as_object().unwrap()).unwrap();
|
builder.append_json_object(value.as_object().unwrap()).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut documents =
|
let (mut documents, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
|
.into_cursor_and_fields_index();
|
||||||
|
|
||||||
assert_eq!(documents.documents_batch_index().iter().count(), 5);
|
assert_eq!(index.iter().count(), 5);
|
||||||
let reader = documents.next_document().unwrap().unwrap();
|
let reader = documents.next_document().unwrap().unwrap();
|
||||||
assert_eq!(reader.iter().count(), 5);
|
assert_eq!(reader.iter().count(), 5);
|
||||||
assert!(documents.next_document().unwrap().is_none());
|
assert!(documents.next_document().unwrap().is_none());
|
||||||
@ -226,9 +227,10 @@ mod test {
|
|||||||
builder.append_json_object(doc2.as_object().unwrap()).unwrap();
|
builder.append_json_object(doc2.as_object().unwrap()).unwrap();
|
||||||
let vector = builder.into_inner().unwrap();
|
let vector = builder.into_inner().unwrap();
|
||||||
|
|
||||||
let mut documents =
|
let (mut documents, index) = DocumentsBatchReader::from_reader(io::Cursor::new(vector))
|
||||||
DocumentsBatchReader::from_reader(io::Cursor::new(vector)).unwrap().into_cursor();
|
.unwrap()
|
||||||
assert_eq!(documents.documents_batch_index().iter().count(), 2);
|
.into_cursor_and_fields_index();
|
||||||
|
assert_eq!(index.iter().count(), 2);
|
||||||
let reader = documents.next_document().unwrap().unwrap();
|
let reader = documents.next_document().unwrap().unwrap();
|
||||||
assert_eq!(reader.iter().count(), 1);
|
assert_eq!(reader.iter().count(), 1);
|
||||||
assert!(documents.next_document().unwrap().is_some());
|
assert!(documents.next_document().unwrap().is_some());
|
||||||
@ -243,7 +245,7 @@ mod test {
|
|||||||
}
|
}
|
||||||
}]);
|
}]);
|
||||||
|
|
||||||
let mut cursor = docs_reader.into_cursor();
|
let (mut cursor, _) = docs_reader.into_cursor_and_fields_index();
|
||||||
let doc = cursor.next_document().unwrap().unwrap();
|
let doc = cursor.next_document().unwrap().unwrap();
|
||||||
let nested: Value = serde_json::from_slice(doc.get(0).unwrap()).unwrap();
|
let nested: Value = serde_json::from_slice(doc.get(0).unwrap()).unwrap();
|
||||||
assert_eq!(nested, json!({ "toto": ["hello"] }));
|
assert_eq!(nested, json!({ "toto": ["hello"] }));
|
||||||
|
@ -17,6 +17,10 @@ pub struct DocumentsBatchReader<R> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<R: io::Read + io::Seek> DocumentsBatchReader<R> {
|
impl<R: io::Read + io::Seek> DocumentsBatchReader<R> {
|
||||||
|
pub fn new(cursor: DocumentsBatchCursor<R>, fields_index: DocumentsBatchIndex) -> Self {
|
||||||
|
Self { cursor: cursor.cursor, fields_index }
|
||||||
|
}
|
||||||
|
|
||||||
/// Construct a `DocumentsReader` from a reader.
|
/// Construct a `DocumentsReader` from a reader.
|
||||||
///
|
///
|
||||||
/// It first retrieves the index, then moves to the first document. Use the `into_cursor`
|
/// It first retrieves the index, then moves to the first document. Use the `into_cursor`
|
||||||
@ -46,30 +50,20 @@ impl<R: io::Read + io::Seek> DocumentsBatchReader<R> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// This method returns a forward cursor over the documents.
|
/// This method returns a forward cursor over the documents.
|
||||||
pub fn into_cursor(self) -> DocumentsBatchCursor<R> {
|
pub fn into_cursor_and_fields_index(self) -> (DocumentsBatchCursor<R>, DocumentsBatchIndex) {
|
||||||
let DocumentsBatchReader { cursor, fields_index } = self;
|
let DocumentsBatchReader { cursor, fields_index } = self;
|
||||||
let mut cursor = DocumentsBatchCursor { cursor, fields_index };
|
let mut cursor = DocumentsBatchCursor { cursor };
|
||||||
cursor.reset();
|
cursor.reset();
|
||||||
cursor
|
(cursor, fields_index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A forward cursor over the documents in a `DocumentsBatchReader`.
|
/// A forward cursor over the documents in a `DocumentsBatchReader`.
|
||||||
pub struct DocumentsBatchCursor<R> {
|
pub struct DocumentsBatchCursor<R> {
|
||||||
cursor: grenad::ReaderCursor<R>,
|
cursor: grenad::ReaderCursor<R>,
|
||||||
fields_index: DocumentsBatchIndex,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R> DocumentsBatchCursor<R> {
|
impl<R> DocumentsBatchCursor<R> {
|
||||||
pub fn into_reader(self) -> DocumentsBatchReader<R> {
|
|
||||||
let DocumentsBatchCursor { cursor, fields_index, .. } = self;
|
|
||||||
DocumentsBatchReader { cursor, fields_index }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
|
|
||||||
&self.fields_index
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Resets the cursor to be able to read from the start again.
|
/// Resets the cursor to be able to read from the start again.
|
||||||
pub fn reset(&mut self) {
|
pub fn reset(&mut self) {
|
||||||
self.cursor.reset();
|
self.cursor.reset();
|
||||||
|
@ -27,8 +27,8 @@ pub fn enrich_documents_batch<R: Read + Seek>(
|
|||||||
autogenerate_docids: bool,
|
autogenerate_docids: bool,
|
||||||
reader: DocumentsBatchReader<R>,
|
reader: DocumentsBatchReader<R>,
|
||||||
) -> Result<StdResult<EnrichedDocumentsBatchReader<R>, UserError>> {
|
) -> Result<StdResult<EnrichedDocumentsBatchReader<R>, UserError>> {
|
||||||
let mut cursor = reader.into_cursor();
|
let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index();
|
||||||
let mut documents_batch_index = cursor.documents_batch_index().clone();
|
|
||||||
let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?;
|
let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?;
|
||||||
let mut uuid_buffer = [0; uuid::fmt::Hyphenated::LENGTH];
|
let mut uuid_buffer = [0; uuid::fmt::Hyphenated::LENGTH];
|
||||||
|
|
||||||
@ -103,9 +103,10 @@ pub fn enrich_documents_batch<R: Read + Seek>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let external_ids = writer_into_reader(external_ids)?;
|
let external_ids = writer_into_reader(external_ids)?;
|
||||||
|
let primary_key_name = primary_key.name().to_string();
|
||||||
let reader = EnrichedDocumentsBatchReader::new(
|
let reader = EnrichedDocumentsBatchReader::new(
|
||||||
cursor.into_reader(),
|
DocumentsBatchReader::new(cursor, documents_batch_index),
|
||||||
primary_key.name().to_string(),
|
primary_key_name,
|
||||||
external_ids,
|
external_ids,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
@ -146,11 +146,11 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
R: Read + Seek,
|
R: Read + Seek,
|
||||||
F: Fn(UpdateIndexingStep) + Sync,
|
F: Fn(UpdateIndexingStep) + Sync,
|
||||||
{
|
{
|
||||||
let mut cursor = reader.into_cursor();
|
let (mut cursor, fields_index) = reader.into_cursor_and_fields_index();
|
||||||
let fields_index = cursor.documents_batch_index();
|
|
||||||
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
|
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
|
||||||
|
|
||||||
let mapping = create_fields_mapping(&mut self.fields_ids_map, fields_index)?;
|
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
|
||||||
|
|
||||||
let primary_key = cursor.primary_key().to_string();
|
let primary_key = cursor.primary_key().to_string();
|
||||||
let primary_key_id =
|
let primary_key_id =
|
||||||
|
Loading…
Reference in New Issue
Block a user