diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index 0f4bf3589..2b489451b 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -178,6 +178,7 @@ impl Index { let indexing_callback = |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step); + let gzipped = false; let addition = match content { Some(content) if gzipped => { diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index 27906a1a8..4097f31aa 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -194,9 +194,10 @@ where Ok(()) } - async fn handle_dump(&self, uuids: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()> { + async fn handle_dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let index_handle = self.index_handle.clone(); let update_store = self.store.clone(); + println!("starting dump"); tokio::task::spawn_blocking(move || -> anyhow::Result<()> { update_store.dump(&uuids, path.to_path_buf(), index_handle)?; Ok(()) diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index a497a3c5c..cc5ba9757 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -71,7 +71,7 @@ where receiver.await.expect("update actor killed.") } - async fn dump(&self, uuids: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()> { + async fn dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Dump { uuids, path, ret }; let _ = self.sender.send(msg).await; diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs index 4103ca121..37df2af32 100644 --- a/meilisearch-http/src/index_controller/update_actor/message.rs +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -32,7 +32,7 @@ pub enum UpdateMsg { ret: oneshot::Sender>, }, Dump { - uuids: HashSet<(String, Uuid)>, + uuids: HashSet, path: PathBuf, ret: oneshot::Sender>, }, diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index a0c498e92..8cd77e252 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -1,7 +1,7 @@ mod actor; mod handle_impl; mod message; -mod update_store; +mod store; use std::{collections::HashSet, path::PathBuf}; @@ -16,7 +16,7 @@ use actor::UpdateActor; use message::UpdateMsg; pub use handle_impl::UpdateActorHandleImpl; -pub use update_store::{UpdateStore, UpdateStoreInfo}; +pub use store::{UpdateStore, UpdateStoreInfo}; pub type Result = std::result::Result; type PayloadData = std::result::Result; @@ -62,7 +62,7 @@ pub trait UpdateActorHandle { async fn update_status(&self, uuid: Uuid, id: u64) -> Result; async fn delete(&self, uuid: Uuid) -> Result<()>; async fn snapshot(&self, uuid: HashSet, path: PathBuf) -> Result<()>; - async fn dump(&self, uuid: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()>; + async fn dump(&self, uuids: HashSet, path: PathBuf) -> Result<()>; async fn get_info(&self) -> Result; async fn update( &self, diff --git a/meilisearch-http/src/index_controller/update_actor/store/codec.rs b/meilisearch-http/src/index_controller/update_actor/store/codec.rs new file mode 100644 index 000000000..e07b52eec --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/store/codec.rs @@ -0,0 +1,86 @@ +use std::{borrow::Cow, convert::TryInto, mem::size_of}; + +use heed::{BytesDecode, BytesEncode}; +use uuid::Uuid; + +pub struct NextIdCodec; + +pub enum NextIdKey { + Global, + Index(Uuid), +} + +impl<'a> BytesEncode<'a> for NextIdCodec { + type EItem = NextIdKey; + + fn bytes_encode(item: &'a Self::EItem) -> Option> { + match item { + NextIdKey::Global => Some(Cow::Borrowed(b"__global__")), + NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())), + } + } +} + +pub struct PendingKeyCodec; + +impl<'a> BytesEncode<'a> for PendingKeyCodec { + type EItem = (u64, Uuid, u64); + + fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option> { + let mut bytes = Vec::with_capacity(size_of::()); + bytes.extend_from_slice(&global_id.to_be_bytes()); + bytes.extend_from_slice(uuid.as_bytes()); + bytes.extend_from_slice(&update_id.to_be_bytes()); + Some(Cow::Owned(bytes)) + } +} + +impl<'a> BytesDecode<'a> for PendingKeyCodec { + type DItem = (u64, Uuid, u64); + + fn bytes_decode(bytes: &'a [u8]) -> Option { + let global_id_bytes = bytes.get(0..size_of::())?.try_into().ok()?; + let global_id = u64::from_be_bytes(global_id_bytes); + + let uuid_bytes = bytes + .get(size_of::()..(size_of::() + size_of::()))? + .try_into() + .ok()?; + let uuid = Uuid::from_bytes(uuid_bytes); + + let update_id_bytes = bytes + .get((size_of::() + size_of::())..)? + .try_into() + .ok()?; + let update_id = u64::from_be_bytes(update_id_bytes); + + Some((global_id, uuid, update_id)) + } +} + +pub struct UpdateKeyCodec; + +impl<'a> BytesEncode<'a> for UpdateKeyCodec { + type EItem = (Uuid, u64); + + fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option> { + let mut bytes = Vec::with_capacity(size_of::()); + bytes.extend_from_slice(uuid.as_bytes()); + bytes.extend_from_slice(&update_id.to_be_bytes()); + Some(Cow::Owned(bytes)) + } +} + +impl<'a> BytesDecode<'a> for UpdateKeyCodec { + type DItem = (Uuid, u64); + + fn bytes_decode(bytes: &'a [u8]) -> Option { + let uuid_bytes = bytes.get(0..size_of::())?.try_into().ok()?; + let uuid = Uuid::from_bytes(uuid_bytes); + + let update_id_bytes = bytes.get(size_of::()..)?.try_into().ok()?; + let update_id = u64::from_be_bytes(update_id_bytes); + + Some((uuid, update_id)) + } +} diff --git a/meilisearch-http/src/index_controller/update_actor/store/dump.rs b/meilisearch-http/src/index_controller/update_actor/store/dump.rs new file mode 100644 index 000000000..8b75f9e5d --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/store/dump.rs @@ -0,0 +1,146 @@ +use std::{ + collections::HashSet, + fs::{copy, create_dir_all, File}, + io::Write, + path::{Path, PathBuf}, +}; + +use anyhow::Context; +use heed::RoTxn; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::{State, codec::UpdateKeyCodec}; +use super::UpdateStore; +use crate::index_controller::{index_actor::IndexActorHandle, UpdateStatus}; + +#[derive(Serialize, Deserialize)] +struct UpdateEntry { + uuid: Uuid, + update: UpdateStatus, +} + +impl UpdateStore { + pub fn dump( + &self, + uuids: &HashSet, + path: PathBuf, + handle: impl IndexActorHandle, + ) -> anyhow::Result<()> { + let state_lock = self.state.write(); + state_lock.swap(State::Dumping); + + // txn must *always* be acquired after state lock, or it will dead lock. + let txn = self.env.write_txn()?; + + let dump_path = path.join("updates"); + create_dir_all(&dump_path)?; + + self.dump_updates(&txn, uuids, &dump_path)?; + + let fut = dump_indexes(uuids, handle, &path); + tokio::runtime::Handle::current().block_on(fut)?; + + state_lock.swap(State::Idle); + + Ok(()) + } + + fn dump_updates( + &self, + txn: &RoTxn, + uuids: &HashSet, + path: impl AsRef, + ) -> anyhow::Result<()> { + let dump_data_path = path.as_ref().join("data.jsonl"); + let mut dump_data_file = File::create(dump_data_path)?; + + let update_files_path = path.as_ref().join("update_files"); + create_dir_all(&update_files_path)?; + + self.dump_pending(&txn, uuids, &mut dump_data_file, &update_files_path)?; + self.dump_completed(&txn, uuids, &mut dump_data_file)?; + + Ok(()) + } + + fn dump_pending( + &self, + txn: &RoTxn, + uuids: &HashSet, + mut file: &mut File, + update_files_path: impl AsRef, + ) -> anyhow::Result<()> { + let pendings = self.pending_queue.iter(txn)?.lazily_decode_data(); + + for pending in pendings { + let ((_, uuid, _), data) = pending?; + if uuids.contains(&uuid) { + let mut update = data.decode()?; + + if let Some(content) = update.content.take() { + update.content = Some(dump_update_file(content, &update_files_path)?); + } + + let update_json = UpdateEntry { + uuid, + update: update.into(), + }; + + serde_json::to_writer(&mut file, &update_json)?; + file.write(b"\n")?; + } + } + + Ok(()) + } + + fn dump_completed( + &self, + txn: &RoTxn, + uuids: &HashSet, + mut file: &mut File, + ) -> anyhow::Result<()> { + let updates = self + .updates + .iter(txn)? + .remap_key_type::() + .lazily_decode_data(); + + for update in updates { + let ((uuid, _), data) = update?; + if uuids.contains(&uuid) { + let update = data.decode()?.into(); + + let update_json = UpdateEntry { uuid, update }; + + serde_json::to_writer(&mut file, &update_json)?; + file.write(b"\n")?; + } + } + + Ok(()) + } +} + +async fn dump_indexes(uuids: &HashSet, handle: impl IndexActorHandle, path: impl AsRef)-> anyhow::Result<()> { + for uuid in uuids { + handle.dump(*uuid, path.as_ref().to_owned()).await?; + } + + Ok(()) +} + +fn dump_update_file( + file_path: impl AsRef, + dump_path: impl AsRef, +) -> anyhow::Result { + let filename: PathBuf = file_path + .as_ref() + .file_name() + .context("invalid update file name")? + .into(); + let dump_file_path = dump_path.as_ref().join(&filename); + copy(file_path, dump_file_path)?; + Ok(filename) +} diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs similarity index 79% rename from meilisearch-http/src/index_controller/update_actor/update_store.rs rename to meilisearch-http/src/index_controller/update_actor/store/mod.rs index d22be0bd4..52bd8d62a 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -1,23 +1,25 @@ +mod dump; +mod codec; + use std::collections::{BTreeMap, HashSet}; -use std::convert::TryInto; use std::fs::{copy, create_dir_all, remove_file, File}; -use std::mem::size_of; use std::path::Path; use std::sync::Arc; -use std::{borrow::Cow, path::PathBuf}; use anyhow::Context; use arc_swap::ArcSwap; use futures::StreamExt; use heed::types::{ByteSlice, OwnedType, SerdeJson}; use heed::zerocopy::U64; -use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions}; +use heed::{CompactionOption, Database, Env, EnvOpenOptions}; use log::error; use parking_lot::{Mutex, MutexGuard}; use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; +use codec::*; + use super::UpdateMeta; use crate::{helpers::EnvSizer, index_controller::index_actor::IndexResult}; use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; @@ -25,13 +27,6 @@ use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, Ind #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; -struct NextIdCodec; - -enum NextIdKey { - Global, - Index(Uuid), -} - pub struct UpdateStoreInfo { /// Size of the update store in bytes. pub size: u64, @@ -45,13 +40,13 @@ pub struct StateLock { data: ArcSwap, } -struct StateLockGuard<'a> { +pub struct StateLockGuard<'a> { _lock: MutexGuard<'a, ()>, state: &'a StateLock, } impl StateLockGuard<'_> { - fn swap(&self, state: State) -> Arc { + pub fn swap(&self, state: State) -> Arc { self.state.data.swap(Arc::new(state)) } } @@ -63,11 +58,11 @@ impl StateLock { Self { lock, data } } - fn read(&self) -> Arc { + pub fn read(&self) -> Arc { self.data.load().clone() } - fn write(&self) -> StateLockGuard { + pub fn write(&self) -> StateLockGuard { let _lock = self.lock.lock(); let state = &self; StateLockGuard { _lock, state } @@ -82,81 +77,6 @@ pub enum State { Dumping, } -impl<'a> BytesEncode<'a> for NextIdCodec { - type EItem = NextIdKey; - - fn bytes_encode(item: &'a Self::EItem) -> Option> { - match item { - NextIdKey::Global => Some(Cow::Borrowed(b"__global__")), - NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())), - } - } -} - -struct PendingKeyCodec; - -impl<'a> BytesEncode<'a> for PendingKeyCodec { - type EItem = (u64, Uuid, u64); - - fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option> { - let mut bytes = Vec::with_capacity(size_of::()); - bytes.extend_from_slice(&global_id.to_be_bytes()); - bytes.extend_from_slice(uuid.as_bytes()); - bytes.extend_from_slice(&update_id.to_be_bytes()); - Some(Cow::Owned(bytes)) - } -} - -impl<'a> BytesDecode<'a> for PendingKeyCodec { - type DItem = (u64, Uuid, u64); - - fn bytes_decode(bytes: &'a [u8]) -> Option { - let global_id_bytes = bytes.get(0..size_of::())?.try_into().ok()?; - let global_id = u64::from_be_bytes(global_id_bytes); - - let uuid_bytes = bytes - .get(size_of::()..(size_of::() + size_of::()))? - .try_into() - .ok()?; - let uuid = Uuid::from_bytes(uuid_bytes); - - let update_id_bytes = bytes - .get((size_of::() + size_of::())..)? - .try_into() - .ok()?; - let update_id = u64::from_be_bytes(update_id_bytes); - - Some((global_id, uuid, update_id)) - } -} - -struct UpdateKeyCodec; - -impl<'a> BytesEncode<'a> for UpdateKeyCodec { - type EItem = (Uuid, u64); - - fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option> { - let mut bytes = Vec::with_capacity(size_of::()); - bytes.extend_from_slice(uuid.as_bytes()); - bytes.extend_from_slice(&update_id.to_be_bytes()); - Some(Cow::Owned(bytes)) - } -} - -impl<'a> BytesDecode<'a> for UpdateKeyCodec { - type DItem = (Uuid, u64); - - fn bytes_decode(bytes: &'a [u8]) -> Option { - let uuid_bytes = bytes.get(0..size_of::())?.try_into().ok()?; - let uuid = Uuid::from_bytes(uuid_bytes); - - let update_id_bytes = bytes.get(size_of::()..)?.try_into().ok()?; - let update_id = u64::from_be_bytes(update_id_bytes); - - Some((uuid, update_id)) - } -} - #[derive(Clone)] pub struct UpdateStore { pub env: Env, @@ -174,7 +94,7 @@ pub struct UpdateStore { /// | 16-bytes | 8-bytes | updates: Database>, /// Indicates the current state of the update store, - state: Arc, + pub state: Arc, /// Wake up the loop when a new event occurs. notification_sender: mpsc::Sender<()>, } @@ -364,6 +284,7 @@ impl UpdateStore { let processing = pending.processing(); // Acquire the state lock and set the current state to processing. + // txn must *always* be acquired after state lock, or it will dead lock. let state = self.state.write(); state.swap(State::Processing(index_uuid, processing.clone())); @@ -580,78 +501,6 @@ impl UpdateStore { Ok(()) } - pub fn dump( - &self, - uuids: &HashSet<(String, Uuid)>, - path: PathBuf, - handle: impl IndexActorHandle, - ) -> anyhow::Result<()> { - use std::io::prelude::*; - let state_lock = self.state.write(); - state_lock.swap(State::Dumping); - - let txn = self.env.write_txn()?; - - for (index_uid, index_uuid) in uuids.iter() { - let file = File::create(path.join(index_uid).join("updates.jsonl"))?; - let mut file = std::io::BufWriter::new(file); - - let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); - for entry in pendings { - let ((_, uuid, _), pending) = entry?; - if &uuid == index_uuid { - let mut update: UpdateStatus = pending.decode()?.into(); - if let Some(path) = update.content_path_mut() { - *path = path.file_name().expect("update path can't be empty").into(); - } - serde_json::to_writer(&mut file, &update)?; - file.write_all(b"\n")?; - } - } - - let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?; - for entry in updates { - let (_, update) = entry?; - let mut update = update.clone(); - if let Some(path) = update.content_path_mut() { - *path = path.file_name().expect("update path can't be empty").into(); - } - serde_json::to_writer(&mut file, &update)?; - file.write_all(b"\n")?; - } - } - - let update_files_path = path.join("update_files"); - create_dir_all(&update_files_path)?; - - let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); - - for entry in pendings { - let ((_, uuid, _), pending) = entry?; - if uuids.iter().any(|(_, id)| id == &uuid) { - if let Some(path) = pending.decode()?.content_path() { - let name = path.file_name().unwrap(); - let to = update_files_path.join(name); - copy(path, to)?; - } - } - } - - // Perform the dump of each index concurently. Only a third of the capabilities of - // the index actor at a time not to put too much pressure on the index actor - let path = &path; - - let mut stream = futures::stream::iter(uuids.iter()) - .map(|(uid, uuid)| handle.dump(*uuid, path.clone())) - .buffer_unordered(CONCURRENT_INDEX_MSG / 3); - - Handle::current().block_on(async { - while let Some(res) = stream.next().await { - res?; - } - Ok(()) - }) - } pub fn get_info(&self) -> anyhow::Result { let mut size = self.env.size();