From 1cb64caae46b4b5988f9f5c3fd8dec5d32dba068 Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Sat, 29 May 2021 00:08:17 +0200 Subject: [PATCH] dump content is now only uuid --- .../index_controller/update_actor/actor.rs | 8 +- .../update_actor/store/dump.rs | 77 +++++++++---------- .../update_actor/store/mod.rs | 50 +++++++----- .../src/index_controller/updates.rs | 67 +--------------- 4 files changed, 75 insertions(+), 127 deletions(-) diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index 4097f31aa..40bba4e2b 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -117,7 +117,7 @@ where if file_len != 0 { file.flush().await?; let file = file.into_std().await; - Some((file, path)) + Some((file, update_file_id)) } else { // empty update, delete the empty file. fs::remove_file(&path).await?; @@ -133,7 +133,7 @@ where use std::io::{copy, sink, BufReader, Seek}; // If the payload is empty, ignore the check. - let path = if let Some((mut file, path)) = file_path { + let update_uuid = if let Some((mut file, uuid)) = file_path { // set the file back to the beginning file.seek(SeekFrom::Start(0))?; // Check that the json payload is valid: @@ -145,14 +145,14 @@ where file.seek(SeekFrom::Start(0))?; let _: serde_json::Value = serde_json::from_reader(file)?; } - Some(path) + Some(uuid) } else { None }; // The payload is valid, we can register it to the update store. let status = update_store - .register_update(meta, path, uuid) + .register_update(meta, update_uuid, uuid) .map(UpdateStatus::Enqueued)?; Ok(status) }) diff --git a/meilisearch-http/src/index_controller/update_actor/store/dump.rs b/meilisearch-http/src/index_controller/update_actor/store/dump.rs index 1f36931d1..ec7aeea87 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/dump.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/dump.rs @@ -1,12 +1,17 @@ -use std::{collections::HashSet, fs::{copy, create_dir_all, File}, io::{BufRead, BufReader, Write}, path::{Path, PathBuf}}; +use std::{ + collections::HashSet, + fs::{create_dir_all, File}, + io::{BufRead, BufReader, Write}, + path::{Path, PathBuf}, +}; use anyhow::Context; use heed::{EnvOpenOptions, RoTxn}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use super::{State, codec::UpdateKeyCodec}; use super::UpdateStore; +use super::{codec::UpdateKeyCodec, State}; use crate::index_controller::{index_actor::IndexActorHandle, UpdateStatus}; #[derive(Serialize, Deserialize)] @@ -50,10 +55,10 @@ impl UpdateStore { let dump_data_path = path.as_ref().join("data.jsonl"); let mut dump_data_file = File::create(dump_data_path)?; - let update_files_path = path.as_ref().join("update_files"); + let update_files_path = path.as_ref().join(super::UPDATE_DIR); create_dir_all(&update_files_path)?; - self.dump_pending(&txn, uuids, &mut dump_data_file, &update_files_path)?; + self.dump_pending(&txn, uuids, &mut dump_data_file, &path)?; self.dump_completed(&txn, uuids, &mut dump_data_file)?; Ok(()) @@ -64,19 +69,24 @@ impl UpdateStore { txn: &RoTxn, uuids: &HashSet, mut file: &mut File, - update_files_path: impl AsRef, + dst_update_files: impl AsRef, ) -> anyhow::Result<()> { let pendings = self.pending_queue.iter(txn)?.lazily_decode_data(); for pending in pendings { let ((_, uuid, _), data) = pending?; if uuids.contains(&uuid) { - let mut update = data.decode()?; + let update = data.decode()?; - if let Some(content) = update.content.take() { - update.content = Some(dump_update_file(content, &update_files_path)?); + if let Some(ref update_uuid) = update.content { + let src = dbg!(super::update_uuid_to_file_path(&self.path, *update_uuid)); + let dst = dbg!(super::update_uuid_to_file_path(&dst_update_files, *update_uuid)); + assert!(src.exists()); + dbg!(std::fs::copy(src, dst))?; } + println!("copied files"); + let update_json = UpdateEntry { uuid, update: update.into(), @@ -117,18 +127,20 @@ impl UpdateStore { Ok(()) } - pub fn load_dump(src: impl AsRef, dst: impl AsRef, db_size: u64) -> anyhow::Result<()> { - let dst_updates_path = dst.as_ref().join("updates/"); - create_dir_all(&dst_updates_path)?; - let dst_update_files_path = dst_updates_path.join("update_files/"); - create_dir_all(&dst_update_files_path)?; + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + db_size: u64, + ) -> anyhow::Result<()> { + let dst_update_path = dst.as_ref().join("updates/"); + create_dir_all(&dst_update_path)?; + let mut options = EnvOpenOptions::new(); options.map_size(db_size as usize); - let (store, _) = UpdateStore::new(options, &dst_updates_path)?; + let (store, _) = UpdateStore::new(options, &dst_update_path)?; let src_update_path = src.as_ref().join("updates"); - let src_update_files_path = src_update_path.join("update_files"); let update_data = File::open(&src_update_path.join("data.jsonl"))?; let mut update_data = BufReader::new(update_data); @@ -138,15 +150,7 @@ impl UpdateStore { match update_data.read_line(&mut line) { Ok(0) => break, Ok(_) => { - let UpdateEntry { uuid, mut update } = serde_json::from_str(&line)?; - - if let Some(path) = update.content_path_mut() { - let dst_file_path = dst_update_files_path.join(&path); - let src_file_path = src_update_files_path.join(&path); - *path = dst_update_files_path.join(&path); - std::fs::copy(src_file_path, dst_file_path)?; - } - + let UpdateEntry { uuid, update } = serde_json::from_str(&line)?; store.register_raw_updates(&mut wtxn, update, uuid)?; } _ => break, @@ -154,30 +158,25 @@ impl UpdateStore { line.clear(); } + + let dst_update_files_path = dst_update_path.join("update_files/"); + let src_update_files_path = src_update_path.join("update_files/"); + std::fs::copy(src_update_files_path, dst_update_files_path)?; + wtxn.commit()?; Ok(()) } } -async fn dump_indexes(uuids: &HashSet, handle: impl IndexActorHandle, path: impl AsRef)-> anyhow::Result<()> { +async fn dump_indexes( + uuids: &HashSet, + handle: impl IndexActorHandle, + path: impl AsRef, +) -> anyhow::Result<()> { for uuid in uuids { handle.dump(*uuid, path.as_ref().to_owned()).await?; } Ok(()) } - -fn dump_update_file( - file_path: impl AsRef, - dump_path: impl AsRef, -) -> anyhow::Result { - let filename: PathBuf = file_path - .as_ref() - .file_name() - .context("invalid update file name")? - .into(); - let dump_file_path = dump_path.as_ref().join(&filename); - copy(file_path, dump_file_path)?; - Ok(filename) -} diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index 661b712ac..6910d5144 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -1,12 +1,11 @@ pub mod dump; mod codec; -use std::collections::{BTreeMap, HashSet}; +use std::{collections::{BTreeMap, HashSet}, path::PathBuf}; use std::fs::{copy, create_dir_all, remove_file, File}; use std::path::Path; use std::sync::Arc; -use anyhow::Context; use arc_swap::ArcSwap; use futures::StreamExt; use heed::types::{ByteSlice, OwnedType, SerdeJson}; @@ -27,6 +26,8 @@ use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, Ind #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; +const UPDATE_DIR: &'static str = "update_files"; + pub struct UpdateStoreInfo { /// Size of the update store in bytes. pub size: u64, @@ -97,6 +98,7 @@ pub struct UpdateStore { pub state: Arc, /// Wake up the loop when a new event occurs. notification_sender: mpsc::Sender<()>, + path: PathBuf, } impl UpdateStore { @@ -106,7 +108,7 @@ impl UpdateStore { ) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { options.max_dbs(5); - let env = options.open(path)?; + let env = options.open(&path)?; let pending_queue = env.create_database(Some("pending-queue"))?; let next_update_id = env.create_database(Some("next-update-id"))?; let updates = env.create_database(Some("updates"))?; @@ -123,6 +125,7 @@ impl UpdateStore { updates, state, notification_sender, + path: path.as_ref().to_owned(), }, notification_receiver, )) @@ -165,7 +168,7 @@ impl UpdateStore { match res { Ok(Some(_)) => (), Ok(None) => break, - Err(e) => error!("error while processing update: {}", e), + Err(e) => panic!("error while processing update: {}", e), } } // the ownership on the arc has been taken, we need to exit. @@ -217,13 +220,13 @@ impl UpdateStore { pub fn register_update( &self, meta: UpdateMeta, - content: Option>, + content: Option, index_uuid: Uuid, ) -> heed::Result { let mut txn = self.env.write_txn()?; let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; - let meta = Enqueued::new(meta, update_id, content.map(|p| p.as_ref().to_owned())); + let meta = Enqueued::new(meta, update_id, content); self.pending_queue .put(&mut txn, &(global_id, index_uuid, update_id), &meta)?; @@ -290,9 +293,9 @@ impl UpdateStore { state.swap(State::Processing(index_uuid, processing.clone())); let file = match content_path { - Some(ref path) => { - let file = File::open(path) - .with_context(|| format!("file at path: {:?}", &content_path))?; + Some(uuid) => { + let path = update_uuid_to_file_path(&self.path, uuid); + let file = File::open(path)?; Some(file) } None => None, @@ -308,7 +311,8 @@ impl UpdateStore { self.pending_queue .delete(&mut wtxn, &(global_id, index_uuid, update_id))?; - if let Some(path) = content_path { + if let Some(uuid) = content_path { + let path = update_uuid_to_file_path(&self.path, uuid); remove_file(&path)?; } @@ -408,7 +412,7 @@ impl UpdateStore { pub fn delete_all(&self, index_uuid: Uuid) -> anyhow::Result<()> { let mut txn = self.env.write_txn()?; // Contains all the content file paths that we need to be removed if the deletion was successful. - let mut paths_to_remove = Vec::new(); + let mut uuids_to_remove = Vec::new(); let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data(); @@ -416,8 +420,8 @@ impl UpdateStore { if uuid == index_uuid { pendings.del_current()?; let mut pending = pending.decode()?; - if let Some(path) = pending.content.take() { - paths_to_remove.push(path); + if let Some(update_uuid) = pending.content.take() { + uuids_to_remove.push(update_uuid); } } } @@ -437,7 +441,9 @@ impl UpdateStore { txn.commit()?; - paths_to_remove.iter().for_each(|path| { + uuids_to_remove.iter() + .map(|uuid| update_uuid_to_file_path(&self.path, *uuid)) + .for_each(|path| { let _ = remove_file(path); }); @@ -468,7 +474,7 @@ impl UpdateStore { // create db snapshot self.env.copy_to_path(&db_path, CompactionOption::Enabled)?; - let update_files_path = update_path.join("update_files"); + let update_files_path = update_path.join(UPDATE_DIR); create_dir_all(&update_files_path)?; let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); @@ -476,10 +482,9 @@ impl UpdateStore { for entry in pendings { let ((_, uuid, _), pending) = entry?; if uuids.contains(&uuid) { - if let Some(path) = pending.decode()?.content_path() { - let name = path.file_name().unwrap(); - let to = update_files_path.join(name); - copy(path, to)?; + if let Enqueued { content: Some(uuid), .. } = pending.decode()? { + let path = update_uuid_to_file_path(&self.path, uuid); + copy(path, &update_files_path)?; } } } @@ -508,7 +513,8 @@ impl UpdateStore { let txn = self.env.read_txn()?; for entry in self.pending_queue.iter(&txn)? { let (_, pending) = entry?; - if let Some(path) = pending.content_path() { + if let Enqueued { content: Some(uuid), .. } = pending { + let path = update_uuid_to_file_path(&self.path, uuid); size += File::open(path)?.metadata()?.len(); } } @@ -521,6 +527,10 @@ impl UpdateStore { } } +fn update_uuid_to_file_path(root: impl AsRef, uuid: Uuid) -> PathBuf { + root.as_ref().join(UPDATE_DIR).join(format!("update_{}", uuid)) +} + #[cfg(test)] mod test { use super::*; diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index 31f0005f8..0aacf9b6c 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -1,8 +1,7 @@ -use std::path::{Path, PathBuf}; - use chrono::{DateTime, Utc}; use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat}; use serde::{Deserialize, Serialize}; +use uuid::Uuid; use crate::index::{Checked, Settings}; @@ -34,11 +33,11 @@ pub struct Enqueued { pub update_id: u64, pub meta: UpdateMeta, pub enqueued_at: DateTime, - pub content: Option, + pub content: Option, } impl Enqueued { - pub fn new(meta: UpdateMeta, update_id: u64, content: Option) -> Self { + pub fn new(meta: UpdateMeta, update_id: u64, content: Option) -> Self { Self { enqueued_at: Utc::now(), meta, @@ -68,14 +67,6 @@ impl Enqueued { pub fn id(&self) -> u64 { self.update_id } - - pub fn content_path(&self) -> Option<&Path> { - self.content.as_deref() - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - self.content.as_mut() - } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -91,14 +82,6 @@ impl Processed { pub fn id(&self) -> u64 { self.from.id() } - - pub fn content_path(&self) -> Option<&Path> { - self.from.content_path() - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - self.from.content_path_mut() - } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -118,14 +101,6 @@ impl Processing { self.from.meta() } - pub fn content_path(&self) -> Option<&Path> { - self.from.content_path() - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - self.from.content_path_mut() - } - pub fn process(self, success: UpdateResult) -> Processed { Processed { success, @@ -155,14 +130,6 @@ impl Aborted { pub fn id(&self) -> u64 { self.from.id() } - - pub fn content_path(&self) -> Option<&Path> { - self.from.content_path() - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - self.from.content_path_mut() - } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -178,14 +145,6 @@ impl Failed { pub fn id(&self) -> u64 { self.from.id() } - - pub fn content_path(&self) -> Option<&Path> { - self.from.content_path() - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - self.from.content_path_mut() - } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -215,26 +174,6 @@ impl UpdateStatus { _ => None, } } - - pub fn content_path(&self) -> Option<&Path> { - match self { - UpdateStatus::Processing(u) => u.content_path(), - UpdateStatus::Processed(u) => u.content_path(), - UpdateStatus::Aborted(u) => u.content_path(), - UpdateStatus::Failed(u) => u.content_path(), - UpdateStatus::Enqueued(u) => u.content_path(), - } - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - match self { - UpdateStatus::Processing(u) => u.content_path_mut(), - UpdateStatus::Processed(u) => u.content_path_mut(), - UpdateStatus::Aborted(u) => u.content_path_mut(), - UpdateStatus::Failed(u) => u.content_path_mut(), - UpdateStatus::Enqueued(u) => u.content_path_mut(), - } - } } impl From for UpdateStatus {