keep the same rtxn to compute the filters on the documents and to stream the documents later on

This commit is contained in:
Tamo 2024-04-18 15:51:46 +02:00
parent 8e6ffbfc6f
commit c85d1752dd

View File

@ -1,4 +1,5 @@
use std::io::{ErrorKind, Write};
use std::pin::Pin;
use actix_web::http::header::CONTENT_TYPE;
use actix_web::web::Data;
@ -250,7 +251,7 @@ impl Write for Writer2Streamer {
}
pub fn stream(
data: impl Serialize + Send + Sync + 'static,
data: impl Serialize + Send + 'static,
) -> impl Stream<Item = Result<Bytes, anyhow::Error>> {
let (sender, receiver) = tokio::sync::mpsc::channel::<Result<Bytes, anyhow::Error>>(1);
@ -626,20 +627,31 @@ fn some_documents<'a, 't: 'a>(
pub struct DocumentsStreamer {
attributes_to_retrieve: Option<Vec<String>>,
documents: RoaringBitmap,
index: Index,
// safety: The `rtxn` contains a reference to the index thus:
// - The `rtxn` MUST BE dropped before the index.
// - The index MUST BE `Pin`ned in RAM and never moved.
rtxn: Option<RoTxn<'static>>,
index: Pin<Box<Index>>,
pub total_documents: u64,
}
impl Drop for DocumentsStreamer {
fn drop(&mut self) {
// safety: we drop the rtxn before the index
self.rtxn = None;
}
}
impl Serialize for DocumentsStreamer {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let rtxn = self.index.read_txn().unwrap();
let rtxn = self.rtxn.as_ref().unwrap();
let mut seq = serializer.serialize_seq(Some(self.documents.len() as usize)).unwrap();
let documents = some_documents(&self.index, &rtxn, self.documents.iter()).unwrap();
let documents = some_documents(&self.index, rtxn, self.documents.iter()).unwrap();
for document in documents {
let document = document.unwrap();
let document = match self.attributes_to_retrieve {
@ -663,7 +675,10 @@ fn retrieve_documents(
filter: Option<Value>,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<DocumentsStreamer, ResponseError> {
// safety: The index MUST NOT move while we hold the `rtxn` on it
let index = Box::pin(index);
let rtxn = index.read_txn()?;
let filter = &filter;
let filter = if let Some(filter) = filter {
parse_filter(filter)
@ -682,12 +697,15 @@ fn retrieve_documents(
} else {
index.documents_ids(&rtxn)?
};
drop(rtxn);
Ok(DocumentsStreamer {
total_documents: candidates.len(),
attributes_to_retrieve,
documents: candidates.into_iter().skip(offset).take(limit).collect(),
// safety: It is safe to make the lifetime in the Rtxn static because it points to the index right below.
// The index is `Pin`ned on the RAM and won't move even if the structure is moved.
// The `rtxn` is held in an `Option`, so we're able to drop it before dropping the index.
rtxn: Some(unsafe { std::mem::transmute(rtxn) }),
index,
})
}