diff --git a/Cargo.lock b/Cargo.lock index 13f023140..25acf9bda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1756,6 +1756,7 @@ dependencies = [ "async-stream", "async-trait", "byte-unit", + "bytemuck", "bytes 0.6.0", "cargo_toml", "chrono", diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index c5aef1a56..c1eb6b8dc 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -32,6 +32,7 @@ async-compression = { version = "0.3.6", features = ["gzip", "tokio-02"] } async-stream = "0.3.0" async-trait = "0.1.42" byte-unit = { version = "4.0.9", default-features = false, features = ["std"] } +bytemuck = "1.5.1" bytes = "0.6.0" chrono = { version = "0.4.19", features = ["serde"] } crossbeam-channel = "0.5.0" diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index f1895829b..efd827619 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -1,10 +1,13 @@ -use std::fs::File; -use std::fs::{copy, create_dir_all, remove_file}; +use std::borrow::Cow; +use std::convert::TryInto; +use std::fs::{copy, create_dir_all, remove_file, File}; +use std::mem::size_of; use std::path::{Path, PathBuf}; use std::sync::Arc; -use heed::types::{DecodeIgnore, OwnedType, SerdeJson}; -use heed::{CompactionOption, Database, Env, EnvOpenOptions}; +use bytemuck::{Pod, Zeroable}; +use heed::types::{ByteSlice, DecodeIgnore, SerdeJson}; +use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions}; use parking_lot::{Mutex, RwLock}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; @@ -16,15 +19,54 @@ use crate::index_controller::updates::*; #[allow(clippy::upper_case_acronyms)] type BEU64 = heed::zerocopy::U64; +struct IndexUuidUpdateIdCodec; + +#[repr(C)] +#[derive(Copy, Clone)] +struct IndexUuidUpdateId(Uuid, BEU64); + +// Is Uuid really zeroable (semantically)? +unsafe impl Zeroable for IndexUuidUpdateId {} +unsafe impl Pod for IndexUuidUpdateId {} + +impl IndexUuidUpdateId { + fn new(uuid: Uuid, update_id: u64) -> Self { + Self(uuid, BEU64::new(update_id)) + } +} + +const UUID_SIZE: usize = size_of::(); +const U64_SIZE: usize = size_of::(); + +impl<'a> BytesEncode<'a> for IndexUuidUpdateIdCodec { + type EItem = IndexUuidUpdateId; + + fn bytes_encode(item: &'a Self::EItem) -> Option> { + let bytes = bytemuck::cast_ref::(item); + Some(Cow::Borrowed(&bytes[..])) + } +} + +impl<'a> BytesDecode<'a> for IndexUuidUpdateIdCodec { + type DItem = (Uuid, u64); + + fn bytes_decode(bytes: &'a [u8]) -> Option { + let bytes = bytes.try_into().ok()?; + let IndexUuidUpdateId(uuid, id) = + bytemuck::cast_ref::<[u8; UUID_SIZE + U64_SIZE], IndexUuidUpdateId>(bytes); + Some((*uuid, id.get())) + } +} + #[derive(Clone)] pub struct UpdateStore { pub env: Env, - pending_meta: Database, SerdeJson>>, - pending: Database, SerdeJson>, - processed_meta: Database, SerdeJson>>, - failed_meta: Database, SerdeJson>>, - aborted_meta: Database, SerdeJson>>, - processing: Arc>>>, + pending_meta: Database>>, + pending: Database>, + processed_meta: Database>>, + failed_meta: Database>>, + aborted_meta: Database>>, + processing: Arc)>>>, notification_sender: mpsc::Sender<()>, /// A lock on the update loop. This is meant to prevent a snapshot to occur while an update is /// processing, while not preventing writes all together during an update @@ -34,6 +76,7 @@ pub struct UpdateStore { pub trait HandleUpdate { fn handle_update( &mut self, + index_uuid: Uuid, meta: Processing, content: File, ) -> anyhow::Result, Failed>>; @@ -41,14 +84,15 @@ pub trait HandleUpdate { impl HandleUpdate for F where - F: FnMut(Processing, File) -> anyhow::Result, Failed>>, + F: FnMut(Uuid, Processing, File) -> anyhow::Result, Failed>>, { fn handle_update( &mut self, + index_uuid: Uuid, meta: Processing, content: File, ) -> anyhow::Result, Failed>> { - self(meta, content) + self(index_uuid, meta, content) } } @@ -131,24 +175,35 @@ where } /// Returns the new biggest id to use to store the new update. - fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { + fn new_update_id(&self, txn: &heed::RoTxn, index_uuid: Uuid) -> heed::Result { + // TODO: this is a very inneficient process for finding the next update id for each index, + // and needs to be made better. let last_pending = self .pending_meta .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); + .prefix_iter(txn, index_uuid.as_bytes())? + .remap_key_type::() + .last() + .transpose()? + .map(|((_, id), _)| id); let last_processed = self .processed_meta .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); + .prefix_iter(txn, index_uuid.as_bytes())? + .remap_key_type::() + .last() + .transpose()? + .map(|((_, id), _)| id); let last_aborted = self .aborted_meta .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); + .prefix_iter(txn, index_uuid.as_bytes())? + .remap_key_type::() + .last() + .transpose()? + .map(|((_, id), _)| id); let last_update_id = [last_pending, last_processed, last_aborted] .iter() @@ -176,13 +231,16 @@ where // no other update can have the same id because we use a write txn before // asking for the id and registering it so other update registering // will be forced to wait for a new write txn. - let update_id = self.new_update_id(&wtxn)?; - let update_key = BEU64::new(update_id); + let update_id = self.new_update_id(&wtxn, index_uuid)?; + let meta = Enqueued::new(meta, update_id); + let key = IndexUuidUpdateId::new(index_uuid, update_id); + self.pending_meta + .remap_key_type::() + .put(&mut wtxn, &key, &meta)?; - let meta = Enqueued::new(meta, update_id, index_uuid); - self.pending_meta.put(&mut wtxn, &update_key, &meta)?; self.pending - .put(&mut wtxn, &update_key, &content.as_ref().to_owned())?; + .remap_key_type::() + .put(&mut wtxn, &key, &content.as_ref().to_owned())?; wtxn.commit()?; @@ -191,6 +249,7 @@ where .expect("Update store loop exited."); Ok(meta) } + /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. @@ -201,25 +260,31 @@ where let _lock = self.update_lock.lock(); // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; - let first_meta = self.pending_meta.first(&rtxn)?; + + let first_meta = self + .pending_meta + .remap_key_type::() + .first(&rtxn)?; // If there is a pending update we process and only keep // a reader while processing it, not a writer. match first_meta { - Some((first_id, pending)) => { + Some(((index_uuid, update_id), pending)) => { + let key = IndexUuidUpdateId::new(index_uuid, update_id); let content_path = self .pending - .get(&rtxn, &first_id)? + .remap_key_type::() + .get(&rtxn, &key)? .expect("associated update content"); // we change the state of the update from pending to processing before we pass it // to the update handler. Processing store is non persistent to be able recover // from a failure let processing = pending.processing(); - self.processing.write().replace(processing.clone()); + self.processing.write().replace((index_uuid, processing.clone())); let file = File::open(&content_path)?; // Process the pending update using the provided user function. - let result = handler.handle_update(processing, file)?; + let result = handler.handle_update(index_uuid, processing, file)?; drop(rtxn); // Once the pending update have been successfully processed @@ -227,12 +292,24 @@ where // write the *new* meta to the processed-meta store and commit. let mut wtxn = self.env.write_txn()?; self.processing.write().take(); - self.pending_meta.delete(&mut wtxn, &first_id)?; + self.pending_meta + .remap_key_type::() + .delete(&mut wtxn, &key)?; + remove_file(&content_path)?; - self.pending.delete(&mut wtxn, &first_id)?; + + self.pending + .remap_key_type::() + .delete(&mut wtxn, &key)?; match result { - Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?, - Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?, + Ok(processed) => self + .processed_meta + .remap_key_type::() + .put(&mut wtxn, &key, &processed)?, + Err(failed) => self + .failed_meta + .remap_key_type::() + .put(&mut wtxn, &key, &failed)?, } wtxn.commit()?; @@ -242,28 +319,30 @@ where } } - pub fn list(&self) -> anyhow::Result>> { + pub fn list(&self, index_uuid: Uuid) -> anyhow::Result>> { let rtxn = self.env.read_txn()?; let mut updates = Vec::new(); let processing = self.processing.read(); - if let Some(ref processing) = *processing { - let update = UpdateStatus::from(processing.clone()); - updates.push(update); + if let Some((uuid, ref processing)) = *processing { + if uuid == index_uuid { + let update = UpdateStatus::from(processing.clone()); + updates.push(update); + } } let pending = self .pending_meta - .iter(&rtxn)? + .prefix_iter(&rtxn, index_uuid.as_bytes())? .filter_map(Result::ok) - .filter_map(|(_, p)| (Some(p.id()) != processing.as_ref().map(|p| p.id())).then(|| p)) + .filter_map(|(_, p)| (Some(p.id()) != processing.as_ref().map(|p| p.1.id())).then(|| p)) .map(UpdateStatus::from); updates.extend(pending); let aborted = self .aborted_meta - .iter(&rtxn)? + .prefix_iter(&rtxn, index_uuid.as_bytes())? .filter_map(Result::ok) .map(|(_, p)| p) .map(UpdateStatus::from); @@ -294,29 +373,49 @@ where } /// Returns the update associated meta or `None` if the update doesn't exist. - pub fn meta(&self, update_id: u64) -> heed::Result>> { + pub fn meta( + &self, + index_uuid: Uuid, + update_id: u64, + ) -> heed::Result>> { let rtxn = self.env.read_txn()?; - let key = BEU64::new(update_id); + let key = IndexUuidUpdateId::new(index_uuid, update_id); - if let Some(ref meta) = *self.processing.read() { - if meta.id() == update_id { + if let Some((uuid, ref meta)) = *self.processing.read() { + if uuid == index_uuid && meta.id() == update_id { return Ok(Some(UpdateStatus::Processing(meta.clone()))); } } - if let Some(meta) = self.pending_meta.get(&rtxn, &key)? { + if let Some(meta) = self + .pending_meta + .remap_key_type::() + .get(&rtxn, &key)? + { return Ok(Some(UpdateStatus::Enqueued(meta))); } - if let Some(meta) = self.processed_meta.get(&rtxn, &key)? { + if let Some(meta) = self + .processed_meta + .remap_key_type::() + .get(&rtxn, &key)? + { return Ok(Some(UpdateStatus::Processed(meta))); } - if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? { + if let Some(meta) = self + .aborted_meta + .remap_key_type::() + .get(&rtxn, &key)? + { return Ok(Some(UpdateStatus::Aborted(meta))); } - if let Some(meta) = self.failed_meta.get(&rtxn, &key)? { + if let Some(meta) = self + .failed_meta + .remap_key_type::() + .get(&rtxn, &key)? + { return Ok(Some(UpdateStatus::Failed(meta))); } @@ -330,25 +429,45 @@ where /// that as already been processed or which doesn't actually exist, will /// return `None`. #[allow(dead_code)] - pub fn abort_update(&self, update_id: u64) -> heed::Result>> { + pub fn abort_update( + &self, + index_uuid: Uuid, + update_id: u64, + ) -> heed::Result>> { let mut wtxn = self.env.write_txn()?; - let key = BEU64::new(update_id); + let key = IndexUuidUpdateId::new(index_uuid, update_id); // We cannot abort an update that is currently being processed. - if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) { + if self + .pending_meta + .remap_key_type::() + .first(&wtxn)? + .map(|((_, id), _)| id) + == Some(update_id) + { return Ok(None); } - let pending = match self.pending_meta.get(&wtxn, &key)? { + let pending = match self + .pending_meta + .remap_key_type::() + .get(&wtxn, &key)? + { Some(meta) => meta, None => return Ok(None), }; let aborted = pending.abort(); - self.aborted_meta.put(&mut wtxn, &key, &aborted)?; - self.pending_meta.delete(&mut wtxn, &key)?; - self.pending.delete(&mut wtxn, &key)?; + self.aborted_meta + .remap_key_type::() + .put(&mut wtxn, &key, &aborted)?; + self.pending_meta + .remap_key_type::() + .delete(&mut wtxn, &key)?; + self.pending + .remap_key_type::() + .delete(&mut wtxn, &key)?; wtxn.commit()?; @@ -358,22 +477,32 @@ where /// Aborts all the pending updates, and not the one being currently processed. /// Returns the update metas and ids that were successfully aborted. #[allow(dead_code)] - pub fn abort_pendings(&self) -> heed::Result)>> { + pub fn abort_pendings(&self, index_uuid: Uuid) -> heed::Result)>> { let mut wtxn = self.env.write_txn()?; let mut aborted_updates = Vec::new(); // We skip the first pending update as it is currently being processed. - for result in self.pending_meta.iter(&wtxn)?.skip(1) { - let (key, pending) = result?; - let id = key.get(); - aborted_updates.push((id, pending.abort())); + for result in self + .pending_meta + .prefix_iter(&wtxn, index_uuid.as_bytes())? + .remap_key_type::() + .skip(1) + { + let ((_, update_id), pending) = result?; + aborted_updates.push((update_id, pending.abort())); } for (id, aborted) in &aborted_updates { - let key = BEU64::new(*id); - self.aborted_meta.put(&mut wtxn, &key, &aborted)?; - self.pending_meta.delete(&mut wtxn, &key)?; - self.pending.delete(&mut wtxn, &key)?; + let key = IndexUuidUpdateId::new(index_uuid, *id); + self.aborted_meta + .remap_key_type::() + .put(&mut wtxn, &key, &aborted)?; + self.pending_meta + .remap_key_type::() + .delete(&mut wtxn, &key)?; + self.pending + .remap_key_type::() + .delete(&mut wtxn, &key)?; } wtxn.commit()?; diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index 42712a396..60db5d3df 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -8,16 +8,14 @@ pub struct Enqueued { pub update_id: u64, pub meta: M, pub enqueued_at: DateTime, - pub index_uuid: Uuid, } impl Enqueued { - pub fn new(meta: M, update_id: u64, index_uuid: Uuid) -> Self { + pub fn new(meta: M, update_id: u64) -> Self { Self { enqueued_at: Utc::now(), meta, update_id, - index_uuid, } }