Merge pull request #192 from meilisearch/dumps-tasks

Dumps tasks
This commit is contained in:
marin 2021-05-25 15:49:15 +02:00 committed by GitHub
commit cbcf50960f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 162 additions and 133 deletions

View File

@ -1,28 +1,27 @@
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus};
use crate::helpers::compression; use crate::helpers::compression;
use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata}; use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata};
use async_stream::stream;
use chrono::Utc; use chrono::Utc;
use log::{error, info, warn}; use futures::stream::StreamExt;
use log::{error, info};
use std::{ use std::{
collections::HashSet, collections::HashSet,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
}; };
use tokio::sync::{mpsc, Mutex}; use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid; use uuid::Uuid;
pub struct DumpActor<UuidResolver, Index, Update> { pub const CONCURRENT_DUMP_MSG: usize = 10;
inbox: mpsc::Receiver<DumpMsg>,
inner: InnerDump<UuidResolver, Index, Update>,
}
#[derive(Clone)] pub struct DumpActor<UuidResolver, Index, Update> {
struct InnerDump<UuidResolver, Index, Update> { inbox: Option<mpsc::Receiver<DumpMsg>>,
pub uuid_resolver: UuidResolver, uuid_resolver: UuidResolver,
pub index: Index, index: Index,
pub update: Update, update: Update,
pub dump_path: PathBuf, dump_path: PathBuf,
pub dump_info: Arc<Mutex<Option<DumpInfo>>>, dump_info: Arc<RwLock<Option<DumpInfo>>>,
} }
/// Generate uid from creation date /// Generate uid from creation date
@ -44,144 +43,93 @@ where
dump_path: impl AsRef<Path>, dump_path: impl AsRef<Path>,
) -> Self { ) -> Self {
Self { Self {
inbox, inbox: Some(inbox),
inner: InnerDump {
uuid_resolver, uuid_resolver,
index, index,
update, update,
dump_path: dump_path.as_ref().into(), dump_path: dump_path.as_ref().into(),
dump_info: Arc::new(Mutex::new(None)), dump_info: Arc::new(RwLock::new(None)),
},
} }
} }
pub async fn run(mut self) { pub async fn run(mut self) {
use DumpMsg::*;
info!("Started dump actor."); info!("Started dump actor.");
let mut inbox = self
.inbox
.take()
.expect("Dump Actor must have a inbox at this point.");
let stream = stream! {
loop { loop {
match self.inbox.recv().await { match inbox.recv().await {
Some(CreateDump { ret }) => { Some(msg) => yield msg,
let _ = ret.send(self.inner.clone().handle_create_dump().await);
}
Some(DumpInfo { ret, uid }) => {
let _ = ret.send(self.inner.handle_dump_info(uid).await);
}
None => break, None => break,
} }
} }
};
stream
.for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg))
.await;
error!("Dump actor stopped."); error!("Dump actor stopped.");
} }
async fn handle_message(&self, msg: DumpMsg) {
use DumpMsg::*;
match msg {
CreateDump { ret } => {
let _ = self.handle_create_dump(ret).await;
}
DumpInfo { ret, uid } => {
let _ = ret.send(self.handle_dump_info(uid).await);
}
}
} }
impl<UuidResolver, Index, Update> InnerDump<UuidResolver, Index, Update> async fn handle_create_dump(&self, ret: oneshot::Sender<DumpResult<DumpInfo>>) {
where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static,
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
{
async fn handle_create_dump(self) -> DumpResult<DumpInfo> {
if self.is_running().await { if self.is_running().await {
return Err(DumpError::DumpAlreadyRunning); ret.send(Err(DumpError::DumpAlreadyRunning))
.expect("Dump actor is dead");
return;
} }
let uid = generate_uid(); let uid = generate_uid();
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
*self.dump_info.lock().await = Some(info.clone()); *self.dump_info.write().await = Some(info.clone());
let this = self.clone(); ret.send(Ok(info)).expect("Dump actor is dead");
tokio::task::spawn(async move { let dump_info = self.dump_info.clone();
match this.perform_dump(uid).await {
Ok(()) => { let task_result = tokio::task::spawn(perform_dump(
if let Some(ref mut info) = *self.dump_info.lock().await { self.dump_path.clone(),
info.done(); self.uuid_resolver.clone(),
} else { self.index.clone(),
warn!("dump actor was in an inconsistant state"); self.update.clone(),
} uid.clone(),
))
.await;
match task_result {
Ok(Ok(())) => {
(*dump_info.write().await).as_mut().expect("Inconsistent dump service state").done();
info!("Dump succeed"); info!("Dump succeed");
} }
Err(e) => { Ok(Err(e)) => {
if let Some(ref mut info) = *self.dump_info.lock().await { (*dump_info.write().await).as_mut().expect("Inconsistent dump service state").with_error(e.to_string());
info.with_error(e.to_string());
} else {
warn!("dump actor was in an inconsistant state");
}
error!("Dump failed: {}", e); error!("Dump failed: {}", e);
} }
Err(_) => {
error!("Dump panicked. Dump status set to failed");
*dump_info.write().await = Some(DumpInfo::new(uid, DumpStatus::Failed));
}
}; };
});
Ok(info)
}
async fn perform_dump(self, uid: String) -> anyhow::Result<()> {
info!("Performing dump.");
let dump_dir = self.dump_path.clone();
tokio::fs::create_dir_all(&dump_dir).await?;
let temp_dump_dir =
tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let uuids = self.uuid_resolver.list().await?;
// maybe we could just keep the vec as-is
let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect();
if uuids.is_empty() {
return Ok(());
}
let indexes = self.list_indexes().await?;
// we create one directory by index
for meta in indexes.iter() {
tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?;
}
let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string());
metadata.to_path(&temp_dump_path).await?;
self.update.dump(uuids, temp_dump_path.clone()).await?;
let dump_dir = self.dump_path.clone();
let dump_path = self.dump_path.join(format!("{}.dump", uid));
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?;
let temp_dump_file_path = temp_dump_file.path().to_owned();
compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?;
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
async fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
let uuids = self.uuid_resolver.list().await?;
let mut ret = Vec::new();
for (uid, uuid) in uuids {
let meta = self.index.get_index_meta(uuid).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
uid,
meta,
};
ret.push(meta);
}
Ok(ret)
} }
async fn handle_dump_info(&self, uid: String) -> DumpResult<DumpInfo> { async fn handle_dump_info(&self, uid: String) -> DumpResult<DumpInfo> {
match &*self.dump_info.lock().await { match &*self.dump_info.read().await {
None => self.dump_from_fs(uid).await, None => self.dump_from_fs(uid).await,
Some(DumpInfo { uid: ref s, .. }) if &uid != s => self.dump_from_fs(uid).await, Some(DumpInfo { uid: ref s, .. }) if &uid != s => self.dump_from_fs(uid).await,
Some(info) => Ok(info.clone()), Some(info) => Ok(info.clone()),
@ -198,7 +146,7 @@ where
async fn is_running(&self) -> bool { async fn is_running(&self) -> bool {
matches!( matches!(
*self.dump_info.lock().await, *self.dump_info.read().await,
Some(DumpInfo { Some(DumpInfo {
status: DumpStatus::InProgress, status: DumpStatus::InProgress,
.. ..
@ -206,3 +154,85 @@ where
) )
} }
} }
async fn perform_dump<UuidResolver, Index, Update>(
dump_path: PathBuf,
uuid_resolver: UuidResolver,
index: Index,
update: Update,
uid: String,
) -> anyhow::Result<()>
where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static,
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
{
info!("Performing dump.");
let dump_dir = dump_path.clone();
tokio::fs::create_dir_all(&dump_dir).await?;
let temp_dump_dir =
tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let uuids = uuid_resolver.list().await?;
// maybe we could just keep the vec as-is
let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect();
if uuids.is_empty() {
return Ok(());
}
let indexes = list_indexes(&uuid_resolver, &index).await?;
// we create one directory by index
for meta in indexes.iter() {
tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?;
}
let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string());
metadata.to_path(&temp_dump_path).await?;
update.dump(uuids, temp_dump_path.clone()).await?;
let dump_dir = dump_path.clone();
let dump_path = dump_path.join(format!("{}.dump", uid));
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?;
let temp_dump_file_path = temp_dump_file.path().to_owned();
compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?;
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
async fn list_indexes<UuidResolver, Index>(
uuid_resolver: &UuidResolver,
index: &Index,
) -> anyhow::Result<Vec<IndexMetadata>>
where
UuidResolver: uuid_resolver::UuidResolverHandle,
Index: index_actor::IndexActorHandle,
{
let uuids = uuid_resolver.list().await?;
let mut ret = Vec::new();
for (uid, uuid) in uuids {
let meta = index.get_index_meta(uuid).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
uid,
meta,
};
ret.push(meta);
}
Ok(ret)
}

View File

@ -13,7 +13,6 @@ use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
#[cfg(test)] #[cfg(test)]
use mockall::automock; use mockall::automock;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json;
use tempfile::TempDir; use tempfile::TempDir;
use thiserror::Error; use thiserror::Error;
use uuid::Uuid; use uuid::Uuid;
@ -128,8 +127,8 @@ pub enum DumpStatus {
pub struct DumpInfo { pub struct DumpInfo {
pub uid: String, pub uid: String,
pub status: DumpStatus, pub status: DumpStatus,
#[serde(skip_serializing_if = "Option::is_none", flatten)] #[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<serde_json::Value>, pub error: Option<String>,
} }
impl DumpInfo { impl DumpInfo {
@ -143,7 +142,7 @@ impl DumpInfo {
pub fn with_error(&mut self, error: String) { pub fn with_error(&mut self, error: String) {
self.status = DumpStatus::Failed; self.status = DumpStatus::Failed;
self.error = Some(json!(error)); self.error = Some(error);
} }
pub fn done(&mut self) { pub fn done(&mut self) {