[WIP] rebase on main

This commit is contained in:
tamo 2021-05-10 20:25:09 +02:00
parent c3552cecdf
commit efca63f9ce
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
10 changed files with 381 additions and 87 deletions

View File

@ -4,8 +4,7 @@ use std::sync::Arc;
use sha2::Digest; use sha2::Digest;
use crate::index::{Checked, Settings}; use crate::index::{Checked, Settings};
use crate::index_controller::{IndexController, IndexStats, Stats}; use crate::index_controller::{IndexController, IndexStats, Stats, DumpInfo, IndexMetadata, IndexSettings};
use crate::index_controller::{IndexMetadata, IndexSettings};
use crate::option::Opt; use crate::option::Opt;
pub mod search; pub mod search;
@ -108,8 +107,12 @@ impl Data {
Ok(self.index_controller.get_all_stats().await?) Ok(self.index_controller.get_all_stats().await?)
} }
pub async fn dump(&self) -> anyhow::Result<String> { pub async fn create_dump(&self) -> anyhow::Result<DumpInfo> {
Ok(self.index_controller.dump(self.options.dumps_dir.clone()).await?) Ok(self.index_controller.create_dump().await?)
}
pub async fn dump_status(&self, uid: String) -> anyhow::Result<DumpInfo> {
Ok(self.index_controller.dump_info(uid).await?)
} }
#[inline] #[inline]

View File

@ -0,0 +1,200 @@
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus};
use crate::helpers::compression;
use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata};
use chrono::Utc;
use log::{error, info, warn};
use std::{
collections::HashSet,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::{mpsc, Mutex};
use uuid::Uuid;
pub struct DumpActor<UuidResolver, Index, Update> {
inbox: mpsc::Receiver<DumpMsg>,
inner: InnerDump<UuidResolver, Index, Update>,
}
#[derive(Clone)]
struct InnerDump<UuidResolver, Index, Update> {
pub uuid_resolver: UuidResolver,
pub index: Index,
pub update: Update,
pub dump_path: PathBuf,
pub dump_info: Arc<Mutex<Option<DumpInfo>>>,
}
/// Generate uid from creation date
fn generate_uid() -> String {
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
}
impl<UuidResolver, Index, Update> DumpActor<UuidResolver, Index, Update>
where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static,
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
{
pub fn new(
inbox: mpsc::Receiver<DumpMsg>,
uuid_resolver: UuidResolver,
index: Index,
update: Update,
dump_path: impl AsRef<Path>,
) -> Self {
Self {
inbox,
inner: InnerDump {
uuid_resolver,
index,
update,
dump_path: dump_path.as_ref().into(),
dump_info: Arc::new(Mutex::new(None)),
},
}
}
pub async fn run(mut self) {
use DumpMsg::*;
info!("Started dump actor.");
loop {
match self.inbox.recv().await {
Some(CreateDump { ret }) => {
let _ = ret.send(self.inner.clone().handle_create_dump().await);
}
Some(DumpInfo { ret, uid }) => {
let _ = ret.send(self.inner.handle_dump_info(uid).await);
}
None => break,
}
}
error!("Dump actor stopped.");
}
}
impl<UuidResolver, Index, Update> InnerDump<UuidResolver, Index, Update>
where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static,
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
{
async fn handle_create_dump(self) -> DumpResult<DumpInfo> {
if self.is_running().await {
return Err(DumpError::DumpAlreadyRunning);
}
let uid = generate_uid();
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
*self.dump_info.lock().await = Some(info.clone());
let this = self.clone();
tokio::task::spawn(async move {
match this.perform_dump(uid).await {
Ok(()) => {
if let Some(ref mut info) = *self.dump_info.lock().await {
info.done();
} else {
warn!("dump actor was in an inconsistant state");
}
info!("Dump succeed");
}
Err(e) => {
if let Some(ref mut info) = *self.dump_info.lock().await {
info.with_error(e.to_string());
} else {
warn!("dump actor was in an inconsistant state");
}
error!("Dump failed: {}", e);
}
};
});
Ok(info)
}
async fn perform_dump(self, uid: String) -> anyhow::Result<()> {
info!("Performing dump.");
let dump_dir = self.dump_path.clone();
tokio::fs::create_dir_all(&dump_dir).await?;
let temp_dump_dir =
tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let uuids = self.uuid_resolver.list().await?;
// maybe we could just keep the vec as-is
let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect();
if uuids.is_empty() {
return Ok(());
}
let indexes = self.list_indexes().await?;
// we create one directory by index
for meta in indexes.iter() {
tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?;
}
let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string());
metadata.to_path(&temp_dump_path).await?;
self.update.dump(uuids, temp_dump_path.clone()).await?;
let dump_dir = self.dump_path.clone();
let dump_path = self.dump_path.join(format!("{}.dump", uid));
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?;
let temp_dump_file_path = temp_dump_file.path().to_owned();
compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?;
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
async fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
let uuids = self.uuid_resolver.list().await?;
let mut ret = Vec::new();
for (uid, uuid) in uuids {
let meta = self.index.get_index_meta(uuid).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
uid,
meta,
};
ret.push(meta);
}
Ok(ret)
}
async fn handle_dump_info(&self, uid: String) -> DumpResult<DumpInfo> {
match &*self.dump_info.lock().await {
None => Err(DumpError::DumpDoesNotExist(uid)),
Some(DumpInfo { uid: ref s, .. }) if &uid != s => Err(DumpError::DumpDoesNotExist(uid)),
Some(info) => Ok(info.clone()),
}
}
async fn is_running(&self) -> bool {
matches!(
*self.dump_info.lock().await,
Some(DumpInfo {
status: DumpStatus::InProgress,
..
})
)
}
}

View File

@ -0,0 +1,41 @@
use std::path::{Path};
use actix_web::web::Bytes;
use tokio::sync::{mpsc, oneshot};
use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult};
#[derive(Clone)]
pub struct DumpActorHandleImpl {
sender: mpsc::Sender<DumpMsg>,
}
#[async_trait::async_trait]
impl DumpActorHandle for DumpActorHandleImpl {
async fn create_dump(&self) -> DumpResult<DumpInfo> {
let (ret, receiver) = oneshot::channel();
let msg = DumpMsg::CreateDump { ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
async fn dump_info(&self, uid: String) -> DumpResult<DumpInfo> {
let (ret, receiver) = oneshot::channel();
let msg = DumpMsg::DumpInfo { ret, uid };
let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
}
impl DumpActorHandleImpl {
pub fn new(
path: impl AsRef<Path>,
uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl,
index: crate::index_controller::index_actor::IndexActorHandleImpl,
update: crate::index_controller::update_actor::UpdateActorHandleImpl<Bytes>,
) -> anyhow::Result<Self> {
let (sender, receiver) = mpsc::channel(10);
let actor = DumpActor::new(receiver, uuid_resolver, index, update, path);
tokio::task::spawn(actor.run());
Ok(Self { sender })
}
}

View File

@ -0,0 +1,15 @@
use tokio::sync::oneshot;
use super::{DumpResult, DumpInfo};
pub enum DumpMsg {
CreateDump {
ret: oneshot::Sender<DumpResult<DumpInfo>>,
},
DumpInfo {
uid: String,
ret: oneshot::Sender<DumpResult<DumpInfo>>,
},
}

View File

@ -1,23 +1,48 @@
mod v1; mod v1;
mod v2; mod v2;
mod handle_impl;
mod actor;
mod message;
use std::{collections::HashSet, fs::{File}, path::{Path, PathBuf}, sync::Arc}; use std::{
fs::File,
path::Path,
sync::Arc,
};
#[cfg(test)]
use mockall::automock;
use anyhow::bail; use anyhow::bail;
use chrono::Utc; use thiserror::Error;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use log::{error, info}; use log::{error, info};
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::task::spawn_blocking;
use tokio::fs;
use uuid::Uuid;
use super::{IndexController, IndexMetadata, update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle}; use super::IndexMetadata;
use crate::helpers::compression;
use crate::index::Index; use crate::index::Index;
use crate::index_controller::uuid_resolver; use crate::index_controller::uuid_resolver;
use crate::helpers::compression;
pub use handle_impl::*;
pub use actor::DumpActor;
pub use message::DumpMsg;
pub type DumpResult<T> = std::result::Result<T, DumpError>;
#[derive(Error, Debug)]
pub enum DumpError {
#[error("error with index: {0}")]
Error(#[from] anyhow::Error),
#[error("Heed error: {0}")]
HeedError(#[from] heed::Error),
#[error("dump already running")]
DumpAlreadyRunning,
#[error("dump `{0}` does not exist")]
DumpDoesNotExist(String),
}
#[derive(Debug, Serialize, Deserialize, Copy, Clone)] #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
enum DumpVersion { enum DumpVersion {
@ -29,7 +54,12 @@ impl DumpVersion {
const CURRENT: Self = Self::V2; const CURRENT: Self = Self::V2;
/// Select the good importation function from the `DumpVersion` of metadata /// Select the good importation function from the `DumpVersion` of metadata
pub fn import_index(self, size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> { pub fn import_index(
self,
size: usize,
dump_path: &Path,
index_path: &Path,
) -> anyhow::Result<()> {
match self { match self {
Self::V1 => v1::import_index(size, dump_path, index_path), Self::V1 => v1::import_index(size, dump_path, index_path),
Self::V2 => v2::import_index(size, dump_path, index_path), Self::V2 => v2::import_index(size, dump_path, index_path),
@ -37,6 +67,19 @@ impl DumpVersion {
} }
} }
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
pub trait DumpActorHandle {
/// Start the creation of a dump
/// Implementation: [handle_impl::DumpActorHandleImpl::create_dump]
async fn create_dump(&self) -> DumpResult<DumpInfo>;
/// Return the status of an already created dump
/// Implementation: [handle_impl::DumpActorHandleImpl::dump_status]
async fn dump_info(&self, uid: String) -> DumpResult<DumpInfo>;
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Metadata { pub struct Metadata {
@ -74,66 +117,46 @@ impl Metadata {
} }
} }
/// Generate uid from creation date #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
fn generate_uid() -> String { #[serde(rename_all = "snake_case")]
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() pub enum DumpStatus {
Done,
InProgress,
Failed,
} }
pub async fn perform_dump(index_controller: &IndexController, dump_path: PathBuf) -> anyhow::Result<String> { #[derive(Debug, Serialize, Clone)]
info!("Performing dump."); #[serde(rename_all = "camelCase")]
pub struct DumpInfo {
pub uid: String,
pub status: DumpStatus,
#[serde(skip_serializing_if = "Option::is_none", flatten)]
pub error: Option<serde_json::Value>,
}
let dump_dir = dump_path.clone(); impl DumpInfo {
let uid = generate_uid(); pub fn new(uid: String, status: DumpStatus) -> Self {
fs::create_dir_all(&dump_dir).await?; Self {
let temp_dump_dir = spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??; uid,
let temp_dump_path = temp_dump_dir.path().to_owned(); status,
error: None,
let uuids = index_controller.uuid_resolver.list().await?; }
// maybe we could just keep the vec as-is
let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect();
if uuids.is_empty() {
return Ok(uid);
} }
let indexes = index_controller.list_indexes().await?; pub fn with_error(&mut self, error: String) {
self.status = DumpStatus::Failed;
// we create one directory by index self.error = Some(json!(error));
for meta in indexes.iter() {
tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?;
} }
let metadata = Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string()); pub fn done(&mut self) {
metadata.to_path(&temp_dump_path).await?; self.status = DumpStatus::Done;
}
index_controller.update_handle.dump(uuids, temp_dump_path.clone()).await?; pub fn dump_already_in_progress(&self) -> bool {
let dump_dir = dump_path.clone(); self.status == DumpStatus::InProgress
let dump_path = dump_path.join(format!("{}.dump", uid)); }
let dump_path = spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?;
let temp_dump_file_path = temp_dump_file.path().to_owned();
compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?;
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(uid)
} }
/*
/// Write Settings in `settings.json` file at provided `dir_path`
fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()> {
let path = dir_path.join("settings.json");
let file = File::create(path)?;
serde_json::to_writer(file, settings)?;
Ok(())
}
*/
pub fn load_dump( pub fn load_dump(
db_path: impl AsRef<Path>, db_path: impl AsRef<Path>,
@ -185,12 +208,18 @@ pub fn load_dump(
let index_path = db_path.join(&format!("indexes/index-{}", uuid)); let index_path = db_path.join(&format!("indexes/index-{}", uuid));
// let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db // let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db
info!("Importing dump from {} into {}...", dump_path.display(), index_path.display()); info!(
metadata.dump_version.import_index(size, &dump_path, &index_path).unwrap(); "Importing dump from {} into {}...",
dump_path.display(),
index_path.display()
);
metadata
.dump_version
.import_index(size, &dump_path, &index_path)
.unwrap();
info!("Dump importation from {} succeed", dump_path.display()); info!("Dump importation from {} succeed", dump_path.display());
} }
info!("Dump importation from {} succeed", dump_path.display()); info!("Dump importation from {} succeed", dump_path.display());
Ok(()) Ok(())
} }

View File

@ -315,8 +315,8 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
/// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the /// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the
/// documents and all the settings. /// documents and all the settings.
async fn handle_dump(&self, uid: &str, uuid: Uuid, path: PathBuf) -> IndexResult<()> { async fn handle_dump(&self, uid: &str, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
use tokio::fs::create_dir_all;
use std::io::prelude::*; use std::io::prelude::*;
use tokio::fs::create_dir_all;
create_dir_all(&path) create_dir_all(&path)
.await .await
@ -348,7 +348,6 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
file.write_all(b"\n")?; file.write_all(b"\n")?;
} }
// then we dump all the settings // then we dump all the settings
let file = File::create(settings_path)?; let file = File::create(settings_path)?;
let mut file = std::io::BufWriter::new(file); let mut file = std::io::BufWriter::new(file);
@ -357,7 +356,6 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
file.write_all(serde_json::to_string(&settings)?.as_bytes())?; file.write_all(serde_json::to_string(&settings)?.as_bytes())?;
file.write_all(b"\n")?; file.write_all(b"\n")?;
Ok(()) Ok(())
}) })
.await .await

View File

@ -1,4 +1,4 @@
use std::{collections::BTreeMap, path::PathBuf}; use std::collections::BTreeMap;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -15,6 +15,8 @@ use tokio::time::sleep;
use uuid::Uuid; use uuid::Uuid;
pub use updates::*; pub use updates::*;
pub use dump_actor::{DumpInfo, DumpStatus};
use dump_actor::DumpActorHandle;
use index_actor::IndexActorHandle; use index_actor::IndexActorHandle;
use snapshot::{SnapshotService, load_snapshot}; use snapshot::{SnapshotService, load_snapshot};
use update_actor::UpdateActorHandle; use update_actor::UpdateActorHandle;
@ -23,11 +25,11 @@ use uuid_resolver::{UuidError, UuidResolverHandle};
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::option::Opt; use crate::option::Opt;
use self::dump::load_dump; use dump_actor::load_dump;
mod index_actor; mod index_actor;
mod snapshot; mod snapshot;
mod dump; mod dump_actor;
mod update_actor; mod update_actor;
mod update_handler; mod update_handler;
mod updates; mod updates;
@ -63,10 +65,12 @@ pub struct IndexStats {
pub fields_distribution: FieldsDistribution, pub fields_distribution: FieldsDistribution,
} }
#[derive(Clone)]
pub struct IndexController { pub struct IndexController {
uuid_resolver: uuid_resolver::UuidResolverHandleImpl, uuid_resolver: uuid_resolver::UuidResolverHandleImpl,
index_handle: index_actor::IndexActorHandleImpl, index_handle: index_actor::IndexActorHandleImpl,
update_handle: update_actor::UpdateActorHandleImpl<Bytes>, update_handle: update_actor::UpdateActorHandleImpl<Bytes>,
dump_handle: dump_actor::DumpActorHandleImpl,
} }
#[derive(Serialize)] #[derive(Serialize)]
@ -108,6 +112,7 @@ impl IndexController {
&path, &path,
update_store_size, update_store_size,
)?; )?;
let dump_handle = dump_actor::DumpActorHandleImpl::new(&options.dumps_dir, uuid_resolver.clone(), index_handle.clone(), update_handle.clone())?;
if options.schedule_snapshot { if options.schedule_snapshot {
let snapshot_service = SnapshotService::new( let snapshot_service = SnapshotService::new(
@ -129,6 +134,7 @@ impl IndexController {
uuid_resolver, uuid_resolver,
index_handle, index_handle,
update_handle, update_handle,
dump_handle,
}) })
} }
@ -378,13 +384,6 @@ impl IndexController {
Ok(stats) Ok(stats)
} }
pub async fn dump(&self, path: PathBuf) -> anyhow::Result<String> {
eprintln!("index_controller::mod called");
let res = dump::perform_dump(self, path).await?;
eprintln!("index_controller::mod finished");
Ok(res)
}
pub async fn get_all_stats(&self) -> anyhow::Result<Stats> { pub async fn get_all_stats(&self) -> anyhow::Result<Stats> {
let update_infos = self.update_handle.get_info().await?; let update_infos = self.update_handle.get_info().await?;
let mut database_size = self.get_uuids_size().await? + update_infos.size; let mut database_size = self.get_uuids_size().await? + update_infos.size;
@ -410,6 +409,14 @@ impl IndexController {
indexes, indexes,
}) })
} }
pub async fn create_dump(&self) -> anyhow::Result<DumpInfo> {
Ok(self.dump_handle.create_dump().await?)
}
pub async fn dump_info(&self, uid: String) -> anyhow::Result<DumpInfo> {
Ok(self.dump_handle.dump_info(uid).await?)
}
} }
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T { pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {

View File

@ -7,18 +7,17 @@ use crate::helpers::Authentication;
use crate::Data; use crate::Data;
pub fn services(cfg: &mut web::ServiceConfig) { pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(trigger_dump) cfg.service(create_dump)
.service(get_dump_status); .service(get_dump_status);
} }
#[post("/dumps", wrap = "Authentication::Private")] #[post("/dumps", wrap = "Authentication::Private")]
async fn trigger_dump( async fn create_dump(
data: web::Data<Data>, data: web::Data<Data>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
eprintln!("dump started"); let res = data.create_dump().await?;
let res = data.dump().await?;
Ok(HttpResponse::Ok().body(res)) Ok(HttpResponse::Ok().json(res))
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@ -29,13 +28,15 @@ struct DumpStatusResponse {
#[derive(Deserialize)] #[derive(Deserialize)]
struct DumpParam { struct DumpParam {
_dump_uid: String, dump_uid: String,
} }
#[get("/dumps/{dump_uid}/status", wrap = "Authentication::Private")] #[get("/dumps/{dump_uid}/status", wrap = "Authentication::Private")]
async fn get_dump_status( async fn get_dump_status(
_data: web::Data<Data>, data: web::Data<Data>,
_path: web::Path<DumpParam>, path: web::Path<DumpParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
todo!() let res = data.dump_status(path.dump_uid.clone()).await?;
Ok(HttpResponse::Ok().json(res))
} }