diff --git a/http-ui/public/updates-script.js b/http-ui/public/updates-script.js index 5d439a7f5..bb91de313 100644 --- a/http-ui/public/updates-script.js +++ b/http-ui/public/updates-script.js @@ -78,6 +78,12 @@ $(window).on('load', function () { const content = $(`#${id} .updateStatus.content`); content.html('processed ' + JSON.stringify(status.meta)); } + + if (status.type == "Aborted") { + const id = 'update-' + status.update_id; + const content = $(`#${id} .updateStatus.content`); + content.html('aborted ' + JSON.stringify(status.meta)); + } } }); diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index 80402f0a0..62d3d75bd 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -189,6 +189,18 @@ enum UpdateStatus { Pending { update_id: u64, meta: M }, Progressing { update_id: u64, meta: P }, Processed { update_id: u64, meta: N }, + Aborted { update_id: u64, meta: M }, +} + +impl UpdateStatus { + fn update_id(&self) -> u64 { + match self { + UpdateStatus::Pending { update_id, .. } => *update_id, + UpdateStatus::Progressing { update_id, .. } => *update_id, + UpdateStatus::Processed { update_id, .. } => *update_id, + UpdateStatus::Aborted { update_id, .. } => *update_id, + } + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -473,12 +485,16 @@ async fn main() -> anyhow::Result<()> { .and(warp::path!("updates")) .map(move |header: String| { let update_store = update_store_cloned.clone(); - let mut updates = update_store.iter_metas(|processed, pending| { + let mut updates = update_store.iter_metas(|processed, aborted, pending| { let mut updates = Vec::>::new(); for result in processed { let (uid, meta) = result?; updates.push(UpdateStatus::Processed { update_id: uid.get(), meta }); } + for result in aborted { + let (uid, meta) = result?; + updates.push(UpdateStatus::Aborted { update_id: uid.get(), meta }); + } for result in pending { let (uid, meta) = result?; updates.push(UpdateStatus::Pending { update_id: uid.get(), meta }); @@ -486,9 +502,9 @@ async fn main() -> anyhow::Result<()> { Ok(updates) }).unwrap(); - if header.contains("text/html") { - updates.reverse(); + updates.sort_unstable_by(|s1, s2| s1.update_id().cmp(&s2.update_id()).reverse()); + if header.contains("text/html") { // We retrieve the database size. let db_size = File::open(lmdb_path_cloned.clone()) .unwrap() @@ -798,6 +814,31 @@ async fn main() -> anyhow::Result<()> { warp::reply() }); + let update_store_cloned = update_store.clone(); + let update_status_sender_cloned = update_status_sender.clone(); + let abort_update_id_route = warp::filters::method::delete() + .and(warp::path!("update" / u64)) + .map(move |update_id: u64| { + if let Some(meta) = update_store_cloned.abort_update(update_id).unwrap() { + let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta }); + eprintln!("update {} aborted", update_id); + } + warp::reply() + }); + + let update_store_cloned = update_store.clone(); + let update_status_sender_cloned = update_status_sender.clone(); + let abort_pending_updates_route = warp::filters::method::delete() + .and(warp::path!("updates")) + .map(move || { + let updates = update_store_cloned.abort_pendings().unwrap(); + for (update_id, meta) in updates { + let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta }); + eprintln!("update {} aborted", update_id); + } + warp::reply() + }); + let update_ws_route = warp::ws() .and(warp::path!("updates" / "ws")) .map(move |ws: warp::ws::Ws| { @@ -844,6 +885,8 @@ async fn main() -> anyhow::Result<()> { .or(indexing_csv_route) .or(indexing_json_route) .or(indexing_json_stream_route) + .or(abort_update_id_route) + .or(abort_pending_updates_route) .or(clearing_route) .or(change_settings_route) .or(change_facet_levels_route) diff --git a/http-ui/templates/updates.html b/http-ui/templates/updates.html index 271394c92..514a006b3 100644 --- a/http-ui/templates/updates.html +++ b/http-ui/templates/updates.html @@ -72,6 +72,15 @@ + {% when UpdateStatus::Aborted with { update_id, meta } %} +
  • +
      +
    1. +
      update id
      {{ update_id }}
      +
      update status
      aborted
      +
    2. +
    +
  • {% else %} {% endmatch %} {% endfor %} diff --git a/src/update/update_store.rs b/src/update/update_store.rs index de07f1e21..d21d4b054 100644 --- a/src/update/update_store.rs +++ b/src/update/update_store.rs @@ -1,5 +1,6 @@ use std::path::Path; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use crossbeam_channel::Sender; use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; @@ -14,7 +15,9 @@ pub struct UpdateStore { pending_meta: Database, SerdeJson>, pending: Database, ByteSlice>, processed_meta: Database, SerdeJson>, + aborted_meta: Database, SerdeJson>, notification_sender: Sender<()>, + processing_update_id: Arc, } impl UpdateStore { @@ -29,11 +32,12 @@ impl UpdateStore { M: for<'a> Deserialize<'a>, N: Serialize, { - options.max_dbs(3); + options.max_dbs(4); let env = options.open(path)?; let pending_meta = env.create_database(Some("pending-meta"))?; let pending = env.create_database(Some("pending"))?; let processed_meta = env.create_database(Some("processed-meta"))?; + let aborted_meta = env.create_database(Some("aborted-meta"))?; let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); // Send a first notification to trigger the process. @@ -44,7 +48,9 @@ impl UpdateStore { pending, pending_meta, processed_meta, + aborted_meta, notification_sender, + processing_update_id: Arc::new(AtomicU64::new(u64::MAX)), }); let update_store_cloned = update_store.clone(); @@ -67,20 +73,27 @@ impl UpdateStore { /// Returns the new biggest id to use to store the new update. fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { let last_pending = self.pending_meta - .as_polymorph() - .last::<_, OwnedType, DecodeIgnore>(txn)? + .remap_data_type::() + .last(txn)? .map(|(k, _)| k.get()); - if let Some(last_id) = last_pending { - return Ok(last_id + 1); - } - let last_processed = self.processed_meta - .as_polymorph() - .last::<_, OwnedType, DecodeIgnore>(txn)? + .remap_data_type::() + .last(txn)? .map(|(k, _)| k.get()); - match last_processed { + let last_aborted = self.aborted_meta + .remap_data_type::() + .last(txn)? + .map(|(k, _)| k.get()); + + let last_update_id = [last_pending, last_processed, last_aborted] + .iter() + .copied() + .flatten() + .max(); + + match last_update_id { Some(last_id) => Ok(last_id + 1), None => Ok(0), } @@ -134,7 +147,10 @@ impl UpdateStore { .expect("associated update content"); // Process the pending update using the provided user function. - let new_meta = (f)(first_id.get(), first_meta, first_content)?; + self.processing_update_id.store(first_id.get(), Ordering::Relaxed); + let result = (f)(first_id.get(), first_meta, first_content); + self.processing_update_id.store(u64::MAX, Ordering::Relaxed); + let new_meta = result?; drop(rtxn); // Once the pending update have been successfully processed @@ -152,8 +168,18 @@ impl UpdateStore { } } - /// Execute the user defined function with both meta-store iterators, the first - /// iterator is the *processed* meta one and the secind is the *pending* meta one. + /// The id of the update tha is currently being processed, + /// `None` if no update is being processed. + pub fn processing_update_id(&self) -> Option { + match self.processing_update_id.load(Ordering::Relaxed) { + u64::MAX => None, + update_id => Some(update_id), + } + } + + /// Execute the user defined function with the meta-store iterators, the first + /// iterator is the *processed* meta one, the second the *aborted* meta one + /// and, the last is the *pending* meta one. pub fn iter_metas(&self, mut f: F) -> heed::Result where M: for<'a> Deserialize<'a>, @@ -161,19 +187,21 @@ impl UpdateStore { F: for<'a> FnMut( heed::RoIter<'a, OwnedType, SerdeJson>, heed::RoIter<'a, OwnedType, SerdeJson>, + heed::RoIter<'a, OwnedType, SerdeJson>, ) -> heed::Result, { let rtxn = self.env.read_txn()?; - // We get both the pending and processed meta iterators. + // We get the pending, processed and aborted meta iterators. let processed_iter = self.processed_meta.iter(&rtxn)?; + let aborted_iter = self.aborted_meta.iter(&rtxn)?; let pending_iter = self.pending_meta.iter(&rtxn)?; // We execute the user defined function with both iterators. - (f)(processed_iter, pending_iter) + (f)(processed_iter, aborted_iter, pending_iter) } - /// Returns the update associated meta or `None` if the update deosn't exist. + /// Returns the update associated meta or `None` if the update doesn't exist. pub fn meta(&self, update_id: u64) -> heed::Result>> where M: for<'a> Deserialize<'a>, @@ -186,10 +214,76 @@ impl UpdateStore { return Ok(Some(UpdateStatusMeta::Pending(meta))); } - match self.processed_meta.get(&rtxn, &key)? { - Some(meta) => Ok(Some(UpdateStatusMeta::Processed(meta))), - None => Ok(None), + if let Some(meta) = self.processed_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatusMeta::Processed(meta))); } + + if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatusMeta::Aborted(meta))); + } + + Ok(None) + } + + /// Aborts an update, an aborted update content is deleted and + /// the meta of it is moved into the aborted updates database. + /// + /// Trying to abort an update that is currently being processed, an update + /// that as already been processed or which doesn't actually exist, will + /// return `None`. + pub fn abort_update(&self, update_id: u64) -> heed::Result> + where M: Serialize + for<'a> Deserialize<'a>, + { + let mut wtxn = self.env.write_txn()?; + let key = BEU64::new(update_id); + + // We cannot abort an update that is currently being processed. + if self.processing_update_id() == Some(update_id) { + return Ok(None); + } + + let meta = match self.pending_meta.get(&wtxn, &key)? { + Some(meta) => meta, + None => return Ok(None), + }; + + self.aborted_meta.put(&mut wtxn, &key, &meta)?; + self.pending_meta.delete(&mut wtxn, &key)?; + self.pending.delete(&mut wtxn, &key)?; + + wtxn.commit()?; + + Ok(Some(meta)) + } + + /// Aborts all the pending updates, and not the one being currently processed. + /// Returns the update metas and ids that were successfully aborted. + pub fn abort_pendings(&self) -> heed::Result> + where M: Serialize + for<'a> Deserialize<'a>, + { + let mut wtxn = self.env.write_txn()?; + let processing_update_id = self.processing_update_id(); + + let mut aborted_updates = Vec::new(); + for result in self.pending_meta.iter(&wtxn)? { + let (key, meta) = result?; + let id = key.get(); + if processing_update_id == Some(id) { + continue; + } + aborted_updates.push((id, meta)); + } + + for (id, meta) in &aborted_updates { + let key = BEU64::new(*id); + self.aborted_meta.put(&mut wtxn, &key, &meta)?; + self.pending_meta.delete(&mut wtxn, &key)?; + self.pending.delete(&mut wtxn, &key)?; + } + + wtxn.commit()?; + + Ok(aborted_updates) } } @@ -197,6 +291,7 @@ impl UpdateStore { pub enum UpdateStatusMeta { Pending(M), Processed(N), + Aborted(M), } #[cfg(test)]