From c83a33017e5224b4f7dfa713201b80b1070b69bc Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 29 Nov 2023 14:27:50 +0100 Subject: [PATCH] stream and chunk the data --- index-scheduler/src/lib.rs | 63 +++++++++++++++++++++++++----- meilisearch/tests/tasks/webhook.rs | 2 +- 2 files changed, 54 insertions(+), 11 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index bfaca3126..b5b061a50 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -34,6 +34,7 @@ pub type TaskId = u32; use std::collections::{BTreeMap, HashMap}; use std::fs::File; +use std::io::{self, BufReader, Read}; use std::ops::{Bound, RangeBounds}; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; @@ -1279,18 +1280,60 @@ impl IndexScheduler { /// Once the tasks changes have been commited we must send all the tasks that were updated to our webhook if there is one. fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> { if let Some(ref url) = self.webhook_url { - let rtxn = self.env.read_txn()?; - - // on average a task takes ~50 bytes - let mut buffer = Vec::with_capacity(updated.len() as usize * 50); - - for id in updated { - let task = self.get_task(&rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; - let _ = serde_json::to_writer(&mut buffer, &TaskView::from_task(&task)); - buffer.push(b'\n'); + struct TaskReader<'a, 'b> { + rtxn: &'a RoTxn<'a>, + index_scheduler: &'a IndexScheduler, + tasks: &'b mut roaring::bitmap::Iter<'b>, + buffer: Vec, + written: usize, } - let reader = GzEncoder::new(&buffer[..], Compression::default()); + impl<'a, 'b> Read for TaskReader<'a, 'b> { + fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result { + if self.buffer.is_empty() { + match self.tasks.next() { + None => return Ok(0), + Some(task_id) => { + let task = self + .index_scheduler + .get_task(self.rtxn, task_id) + .map_err(io::Error::other)? + .ok_or_else(|| io::Error::other(Error::CorruptedTaskQueue))?; + + serde_json::to_writer( + &mut self.buffer, + &TaskView::from_task(&task), + )?; + self.buffer.push(b'\n'); + } + } + } + + let mut to_write = &self.buffer[self.written..]; + let wrote = io::copy(&mut to_write, &mut buf)?; + self.written += wrote as usize; + + // we wrote everything and must refresh our buffer on the next call + if self.written == self.buffer.len() { + self.written = 0; + self.buffer.clear(); + } + + Ok(wrote as usize) + } + } + + let rtxn = self.env.read_txn()?; + + let task_reader = TaskReader { + rtxn: &rtxn, + index_scheduler: self, + tasks: &mut updated.into_iter(), + buffer: Vec::with_capacity(50), // on average a task is around ~100 bytes + written: 0, + }; + + let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default()); if let Err(e) = ureq::post(url).set("Content-Encoding", "gzip").send(reader) { log::error!("While sending data to the webhook: {e}"); } diff --git a/meilisearch/tests/tasks/webhook.rs b/meilisearch/tests/tasks/webhook.rs index 688d35e8b..6979ff294 100644 --- a/meilisearch/tests/tasks/webhook.rs +++ b/meilisearch/tests/tasks/webhook.rs @@ -45,7 +45,7 @@ struct WebhookHandle { async fn create_webhook_server() -> WebhookHandle { let mut log_builder = env_logger::Builder::new(); - log_builder.parse_filters("debug"); + log_builder.parse_filters("info"); log_builder.init(); let (sender, receiver) = mpsc::unbounded_channel();