From 1b671d4302e0effdcd70e0f2cf98c3121756fa81 Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Tue, 11 May 2021 12:18:10 +0200 Subject: [PATCH] fix-snapshot --- .github/workflows/rust.yml | 1 + .../index_controller/update_actor/actor.rs | 23 +---------- .../update_actor/update_store.rs | 39 +++++++++++++++++-- meilisearch-http/tests/snapshot/mod.rs | 1 - 4 files changed, 38 insertions(+), 26 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ee23eb487..8020ead32 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -1,6 +1,7 @@ name: Rust on: + workflow_dispatch: pull_request: push: # trying and staging branches are for Bors config diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index e47edc5bc..b26ba4b8e 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -3,17 +3,15 @@ use std::io::SeekFrom; use std::path::{Path, PathBuf}; use std::sync::Arc; -use futures::StreamExt; use log::info; use oxidized_json_checker::JsonChecker; use tokio::fs; use tokio::io::AsyncWriteExt; -use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo}; -use crate::index_controller::index_actor::{IndexActorHandle, CONCURRENT_INDEX_MSG}; +use crate::index_controller::index_actor::{IndexActorHandle}; use crate::index_controller::{UpdateMeta, UpdateStatus}; pub struct UpdateActor { @@ -207,25 +205,8 @@ where async fn handle_snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let index_handle = self.index_handle.clone(); let update_store = self.store.clone(); - tokio::task::spawn_blocking(move || -> anyhow::Result<()> { - update_store.snapshot(&uuids, &path)?; - // Perform the snapshot of each index concurently. Only a third of the capabilities of - // the index actor at a time not to put too much pressure on the index actor - let path = &path; - let handle = &index_handle; - - let mut stream = futures::stream::iter(uuids.iter()) - .map(|&uuid| handle.snapshot(uuid, path.clone())) - .buffer_unordered(CONCURRENT_INDEX_MSG / 3); - - Handle::current().block_on(async { - while let Some(res) = stream.next().await { - res?; - } - Ok(()) - }) - }) + tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) .await .map_err(|e| UpdateError::Error(e.into()))? .map_err(|e| UpdateError::Error(e.into()))?; diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index 6a916af33..4e7acc7cf 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use anyhow::Context; use arc_swap::ArcSwap; +use futures::StreamExt; use heed::types::{ByteSlice, OwnedType, SerdeJson}; use heed::zerocopy::U64; use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions}; @@ -17,8 +18,11 @@ use tokio::sync::mpsc; use uuid::Uuid; use super::UpdateMeta; -use crate::helpers::EnvSizer; -use crate::index_controller::{IndexActorHandle, updates::*}; +use crate::index_controller::{updates::*, IndexActorHandle}; +use crate::{ + helpers::EnvSizer, + index_controller::index_actor::{IndexResult, CONCURRENT_INDEX_MSG}, +}; #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; @@ -202,7 +206,14 @@ impl UpdateStore { .try_send(()) .expect("Failed to init update store"); - let update_store = Arc::new(UpdateStore { env, pending_queue, next_update_id, updates, state, notification_sender }); + let update_store = Arc::new(UpdateStore { + env, + pending_queue, + next_update_id, + updates, + state, + notification_sender, + }); // We need a weak reference so we can take ownership on the arc later when we // want to close the index. @@ -464,7 +475,12 @@ impl UpdateStore { Ok(()) } - pub fn snapshot(&self, uuids: &HashSet, path: impl AsRef) -> anyhow::Result<()> { + pub fn snapshot( + &self, + uuids: &HashSet, + path: impl AsRef, + handle: impl IndexActorHandle + Clone, + ) -> anyhow::Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Snapshoting); @@ -496,6 +512,21 @@ impl UpdateStore { } } + let path = &path.as_ref().to_path_buf(); + let handle = &handle; + // Perform the snapshot of each index concurently. Only a third of the capabilities of + // the index actor at a time not to put too much pressure on the index actor + let mut stream = futures::stream::iter(uuids.iter()) + .map(move |uuid| handle.snapshot(*uuid, path.clone())) + .buffer_unordered(CONCURRENT_INDEX_MSG / 3); + + Handle::current().block_on(async { + while let Some(res) = stream.next().await { + res?; + } + Ok(()) as IndexResult<()> + })?; + Ok(()) } diff --git a/meilisearch-http/tests/snapshot/mod.rs b/meilisearch-http/tests/snapshot/mod.rs index caed293e6..b5602c508 100644 --- a/meilisearch-http/tests/snapshot/mod.rs +++ b/meilisearch-http/tests/snapshot/mod.rs @@ -7,7 +7,6 @@ use tokio::time::sleep; use meilisearch_http::Opt; -#[ignore] #[actix_rt::test] async fn perform_snapshot() { let temp = tempfile::tempdir_in(".").unwrap();