From c85d1752dd3937ffdfc8f86f16108bfa9388aaac Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 18 Apr 2024 15:51:46 +0200 Subject: [PATCH] keep the same rtxn to compute the filters on the documents and to stream the documents later on --- meilisearch/src/routes/indexes/documents.rs | 28 +++++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 78af7a098..9d34fcdfe 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -1,4 +1,5 @@ use std::io::{ErrorKind, Write}; +use std::pin::Pin; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; @@ -250,7 +251,7 @@ impl Write for Writer2Streamer { } pub fn stream( - data: impl Serialize + Send + Sync + 'static, + data: impl Serialize + Send + 'static, ) -> impl Stream> { let (sender, receiver) = tokio::sync::mpsc::channel::>(1); @@ -626,20 +627,31 @@ fn some_documents<'a, 't: 'a>( pub struct DocumentsStreamer { attributes_to_retrieve: Option>, documents: RoaringBitmap, - index: Index, + // safety: The `rtxn` contains a reference to the index thus: + // - The `rtxn` MUST BE dropped before the index. + // - The index MUST BE `Pin`ned in RAM and never moved. + rtxn: Option>, + index: Pin>, pub total_documents: u64, } +impl Drop for DocumentsStreamer { + fn drop(&mut self) { + // safety: we drop the rtxn before the index + self.rtxn = None; + } +} + impl Serialize for DocumentsStreamer { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { - let rtxn = self.index.read_txn().unwrap(); + let rtxn = self.rtxn.as_ref().unwrap(); let mut seq = serializer.serialize_seq(Some(self.documents.len() as usize)).unwrap(); - let documents = some_documents(&self.index, &rtxn, self.documents.iter()).unwrap(); + let documents = some_documents(&self.index, rtxn, self.documents.iter()).unwrap(); for document in documents { let document = document.unwrap(); let document = match self.attributes_to_retrieve { @@ -663,7 +675,10 @@ fn retrieve_documents( filter: Option, attributes_to_retrieve: Option>, ) -> Result { + // safety: The index MUST NOT move while we hold the `rtxn` on it + let index = Box::pin(index); let rtxn = index.read_txn()?; + let filter = &filter; let filter = if let Some(filter) = filter { parse_filter(filter) @@ -682,12 +697,15 @@ fn retrieve_documents( } else { index.documents_ids(&rtxn)? }; - drop(rtxn); Ok(DocumentsStreamer { total_documents: candidates.len(), attributes_to_retrieve, documents: candidates.into_iter().skip(offset).take(limit).collect(), + // safety: It is safe to make the lifetime in the Rtxn static because it points to the index right below. + // The index is `Pin`ned on the RAM and won't move even if the structure is moved. + // The `rtxn` is held in an `Option`, so we're able to drop it before dropping the index. + rtxn: Some(unsafe { std::mem::transmute(rtxn) }), index, }) }