From e9c95f66231d5059f65d8bd13be977611bd593e6 Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 28 Jan 2021 19:43:54 +0100 Subject: [PATCH] remove useless files --- Cargo.lock | 2 +- src/index_controller/_update_store/mod.rs | 17 - src/index_controller/index_store.rs | 74 --- src/updates/mod.rs | 318 ------------ src/updates/settings.rs | 61 --- src/updates/update_store.rs | 581 ---------------------- 6 files changed, 1 insertion(+), 1052 deletions(-) delete mode 100644 src/index_controller/_update_store/mod.rs delete mode 100644 src/index_controller/index_store.rs delete mode 100644 src/updates/mod.rs delete mode 100644 src/updates/settings.rs delete mode 100644 src/updates/update_store.rs diff --git a/Cargo.lock b/Cargo.lock index fc102336a..4471f5127 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1628,7 +1628,7 @@ checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" [[package]] name = "meilisearch-error" -version = "0.18.1" +version = "0.18.0" dependencies = [ "actix-http", ] diff --git a/src/index_controller/_update_store/mod.rs b/src/index_controller/_update_store/mod.rs deleted file mode 100644 index ef8711ded..000000000 --- a/src/index_controller/_update_store/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -use std::sync::Arc; - -use heed::Env; - -use super::IndexStore; - -pub struct UpdateStore { - env: Env, - index_store: Arc, -} - -impl UpdateStore { - pub fn new(env: Env, index_store: Arc) -> anyhow::Result { - Ok(Self { env, index_store }) - } -} - diff --git a/src/index_controller/index_store.rs b/src/index_controller/index_store.rs deleted file mode 100644 index 6652086f9..000000000 --- a/src/index_controller/index_store.rs +++ /dev/null @@ -1,74 +0,0 @@ -use std::sync::Arc; -use std::collections::HashMap; - -use anyhow::Result; -use milli::{Index, FieldsIdsMap, SearchResult, FieldId, facet::FacetType}; -use ouroboros::self_referencing; - -use crate::data::SearchQuery; - -#[self_referencing] -pub struct IndexView { - pub index: Arc, - #[borrows(index)] - #[covariant] - pub txn: heed::RoTxn<'this>, - uuid: String, -} - -impl IndexView { - pub fn search(&self, search_query: &SearchQuery) -> Result { - self.with(|this| { - let mut search = this.index.search(&this.txn); - if let Some(query) = &search_query.q { - search.query(query); - } - - if let Some(offset) = search_query.offset { - search.offset(offset); - } - - let limit = search_query.limit; - search.limit(limit); - - Ok(search.execute()?) - }) - } - - #[inline] - pub fn fields_ids_map(&self) -> Result { - self.with(|this| Ok(this.index.fields_ids_map(&this.txn)?)) - - } - - #[inline] - pub fn displayed_fields_ids(&self) -> Result>> { - self.with(|this| Ok(this.index.displayed_fields_ids(&this.txn)?)) - } - - #[inline] - pub fn displayed_fields(&self) -> Result>> { - self.with(|this| Ok(this.index - .displayed_fields(&this.txn)? - .map(|fields| fields.into_iter().map(String::from).collect()))) - } - - #[inline] - pub fn searchable_fields(&self) -> Result>> { - self.with(|this| Ok(this.index - .searchable_fields(&this.txn)? - .map(|fields| fields.into_iter().map(String::from).collect()))) - } - - #[inline] - pub fn faceted_fields(&self) -> Result> { - self.with(|this| Ok(this.index.faceted_fields(&this.txn)?)) - } - - pub fn documents(&self, ids: &[u32]) -> Result)>> { - let txn = self.borrow_txn(); - let index = self.borrow_index(); - Ok(index.documents(txn, ids.into_iter().copied())?) - } -} - diff --git a/src/updates/mod.rs b/src/updates/mod.rs deleted file mode 100644 index 7e0802595..000000000 --- a/src/updates/mod.rs +++ /dev/null @@ -1,318 +0,0 @@ -mod settings; -mod update_store; - -pub use settings::{Settings, Facets}; - -use std::io; -use std::sync::Arc; -use std::fs::create_dir_all; -use std::collections::HashMap; - -use anyhow::Result; -use byte_unit::Byte; -use flate2::read::GzDecoder; -use grenad::CompressionType; -use log::info; -use milli::Index; -use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod, DocumentAdditionResult }; -use milli::update_store::{UpdateStore, UpdateHandler as Handler, UpdateStatus, Processing, Processed, Failed}; -use rayon::ThreadPool; -use serde::{Serialize, Deserialize}; -use structopt::StructOpt; - -use crate::option::Opt; - -pub type UpdateStatusResponse = UpdateStatus; - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum UpdateMeta { - DocumentsAddition { method: IndexDocumentsMethod, format: UpdateFormat }, - ClearDocuments, - Settings(Settings), - Facets(Facets), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum UpdateMetaProgress { - DocumentsAddition { - step: usize, - total_steps: usize, - current: usize, - total: Option, - }, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum UpdateResult { - DocumentsAddition(DocumentAdditionResult), - Other, -} - -#[derive(Clone)] -pub struct UpdateQueue { - inner: Arc>, -} - -#[derive(Debug, Clone, StructOpt)] -pub struct IndexerOpts { - /// The amount of documents to skip before printing - /// a log regarding the indexing advancement. - #[structopt(long, default_value = "100000")] // 100k - pub log_every_n: usize, - - /// MTBL max number of chunks in bytes. - #[structopt(long)] - pub max_nb_chunks: Option, - - /// The maximum amount of memory to use for the MTBL buffer. It is recommended - /// to use something like 80%-90% of the available memory. - /// - /// It is automatically split by the number of jobs e.g. if you use 7 jobs - /// and 7 GB of max memory, each thread will use a maximum of 1 GB. - #[structopt(long, default_value = "7 GiB")] - pub max_memory: Byte, - - /// Size of the linked hash map cache when indexing. - /// The bigger it is, the faster the indexing is but the more memory it takes. - #[structopt(long, default_value = "500")] - pub linked_hash_map_size: usize, - - /// The name of the compression algorithm to use when compressing intermediate - /// chunks during indexing documents. - /// - /// Choosing a fast algorithm will make the indexing faster but may consume more memory. - #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] - pub chunk_compression_type: CompressionType, - - /// The level of compression of the chosen algorithm. - #[structopt(long, requires = "chunk-compression-type")] - pub chunk_compression_level: Option, - - /// The number of bytes to remove from the begining of the chunks while reading/sorting - /// or merging them. - /// - /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, - /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. - #[structopt(long, default_value = "4 GiB")] - pub chunk_fusing_shrink_size: Byte, - - /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. - #[structopt(long)] - pub enable_chunk_fusing: bool, - - /// Number of parallel jobs for indexing, defaults to # of CPUs. - #[structopt(long)] - pub indexing_jobs: Option, -} - -struct UpdateHandler { - indexes: Arc, - max_nb_chunks: Option, - chunk_compression_level: Option, - thread_pool: ThreadPool, - log_frequency: usize, - max_memory: usize, - linked_hash_map_size: usize, - chunk_compression_type: CompressionType, - chunk_fusing_shrink_size: u64, -} - -impl UpdateHandler { - fn new( - opt: &IndexerOpts, - indexes: Arc, - ) -> Result { - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(opt.indexing_jobs.unwrap_or(0)) - .build()?; - Ok(Self { - indexes, - max_nb_chunks: opt.max_nb_chunks, - chunk_compression_level: opt.chunk_compression_level, - thread_pool, - log_frequency: opt.log_every_n, - max_memory: opt.max_memory.get_bytes() as usize, - linked_hash_map_size: opt.linked_hash_map_size, - chunk_compression_type: opt.chunk_compression_type, - chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(), - }) - } - - fn update_buidler(&self, update_id: u64) -> UpdateBuilder { - // We prepare the update by using the update builder. - let mut update_builder = UpdateBuilder::new(update_id); - if let Some(max_nb_chunks) = self.max_nb_chunks { - update_builder.max_nb_chunks(max_nb_chunks); - } - if let Some(chunk_compression_level) = self.chunk_compression_level { - update_builder.chunk_compression_level(chunk_compression_level); - } - update_builder.thread_pool(&self.thread_pool); - update_builder.log_every_n(self.log_frequency); - update_builder.max_memory(self.max_memory); - update_builder.linked_hash_map_size(self.linked_hash_map_size); - update_builder.chunk_compression_type(self.chunk_compression_type); - update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size); - update_builder - } - - fn update_documents( - &self, - format: UpdateFormat, - method: IndexDocumentsMethod, - content: &[u8], - update_builder: UpdateBuilder, - ) -> Result { - // We must use the write transaction of the update here. - let mut wtxn = self.indexes.write_txn()?; - let mut builder = update_builder.index_documents(&mut wtxn, &self.indexes); - builder.update_format(format); - builder.index_documents_method(method); - - let gzipped = true; - let reader = if gzipped { - Box::new(GzDecoder::new(content)) - } else { - Box::new(content) as Box - }; - - let result = builder.execute(reader, |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); - - match result { - Ok(addition_result) => wtxn - .commit() - .and(Ok(UpdateResult::DocumentsAddition(addition_result))) - .map_err(Into::into), - Err(e) => Err(e.into()) - } - } - - fn clear_documents(&self, update_builder: UpdateBuilder) -> Result { - // We must use the write transaction of the update here. - let mut wtxn = self.indexes.write_txn()?; - let builder = update_builder.clear_documents(&mut wtxn, &self.indexes); - - match builder.execute() { - Ok(_count) => wtxn - .commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into), - Err(e) => Err(e.into()) - } - } - - fn update_settings(&self, settings: &Settings, update_builder: UpdateBuilder) -> Result { - // We must use the write transaction of the update here. - let mut wtxn = self.indexes.write_txn()?; - let mut builder = update_builder.settings(&mut wtxn, &self.indexes); - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref names) = settings.searchable_attributes { - match names { - Some(names) => builder.set_searchable_fields(names.clone()), - None => builder.reset_searchable_fields(), - } - } - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref names) = settings.displayed_attributes { - match names { - Some(names) => builder.set_displayed_fields(names.clone()), - None => builder.reset_displayed_fields(), - } - } - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref facet_types) = settings.faceted_attributes { - let facet_types = facet_types.clone().unwrap_or_else(|| HashMap::new()); - builder.set_faceted_fields(facet_types); - } - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref criteria) = settings.criteria { - match criteria { - Some(criteria) => builder.set_criteria(criteria.clone()), - None => builder.reset_criteria(), - } - } - - let result = builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); - - match result { - Ok(()) => wtxn - .commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into), - Err(e) => Err(e.into()) - } - } - - fn update_facets(&self, levels: &Facets, update_builder: UpdateBuilder) -> Result { - // We must use the write transaction of the update here. - let mut wtxn = self.indexes.write_txn()?; - let mut builder = update_builder.facets(&mut wtxn, &self.indexes); - if let Some(value) = levels.level_group_size { - builder.level_group_size(value); - } - if let Some(value) = levels.min_level_size { - builder.min_level_size(value); - } - match builder.execute() { - Ok(()) => wtxn - .commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into), - Err(e) => Err(e.into()) - } - } -} - -impl Handler for UpdateHandler { - fn handle_update( - &mut self, - update_id: u64, - meta: Processing, - content: &[u8] - ) -> Result, Failed> { - use UpdateMeta::*; - - let update_builder = self.update_buidler(update_id); - - let result = match meta.meta() { - DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder), - ClearDocuments => self.clear_documents(update_builder), - Settings(settings) => self.update_settings(settings, update_builder), - Facets(levels) => self.update_facets(levels, update_builder), - }; - - match result { - Ok(result) => Ok(meta.process(result)), - Err(e) => Err(meta.fail(e.to_string())), - } - } -} - -impl UpdateQueue { - pub fn new( - opt: &Opt, - indexes: Arc, - ) -> Result { - let handler = UpdateHandler::new(&opt.indexer_options, indexes)?; - let size = opt.max_udb_size.get_bytes() as usize; - let path = opt.db_path.join("updates.mdb"); - create_dir_all(&path)?; - let inner = UpdateStore::open( - Some(size), - path, - handler - )?; - Ok(Self { inner }) - } - - #[inline] - pub fn get_update_status(&self, update_id: u64) -> Result> { - Ok(self.inner.meta(update_id)?) - } -} diff --git a/src/updates/settings.rs b/src/updates/settings.rs deleted file mode 100644 index 91381edc5..000000000 --- a/src/updates/settings.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::num::NonZeroUsize; -use std::collections::HashMap; - -use serde::{Serialize, Deserialize, de::Deserializer}; - -// Any value that is present is considered Some value, including null. -fn deserialize_some<'de, T, D>(deserializer: D) -> Result, D::Error> -where T: Deserialize<'de>, - D: Deserializer<'de> -{ - Deserialize::deserialize(deserializer).map(Some) -} - -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] -#[serde(rename_all = "camelCase")] -pub struct Settings { - #[serde( - default, - deserialize_with = "deserialize_some", - skip_serializing_if = "Option::is_none", - )] - pub displayed_attributes: Option>>, - - #[serde( - default, - deserialize_with = "deserialize_some", - skip_serializing_if = "Option::is_none", - )] - pub searchable_attributes: Option>>, - - #[serde(default)] - pub faceted_attributes: Option>>, - - #[serde( - default, - deserialize_with = "deserialize_some", - skip_serializing_if = "Option::is_none", - )] - pub criteria: Option>>, -} - -impl Settings { - pub fn cleared() -> Self { - Self { - displayed_attributes: Some(None), - searchable_attributes: Some(None), - faceted_attributes: Some(None), - criteria: Some(None), - } - } -} - - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] -#[serde(rename_all = "camelCase")] -pub struct Facets { - pub level_group_size: Option, - pub min_level_size: Option, -} diff --git a/src/updates/update_store.rs b/src/updates/update_store.rs deleted file mode 100644 index f750fc38b..000000000 --- a/src/updates/update_store.rs +++ /dev/null @@ -1,581 +0,0 @@ -use std::path::Path; -use std::sync::{Arc, RwLock}; - -use crossbeam_channel::Sender; -use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; -use heed::{EnvOpenOptions, Env, Database}; -use serde::{Serialize, Deserialize}; -use chrono::{DateTime, Utc}; - -type BEU64 = heed::zerocopy::U64; - -#[derive(Clone)] -pub struct UpdateStore { - env: Env, - pending_meta: Database, SerdeJson>>, - pending: Database, ByteSlice>, - processed_meta: Database, SerdeJson>>, - failed_meta: Database, SerdeJson>>, - aborted_meta: Database, SerdeJson>>, - processing: Arc>>>, - notification_sender: Sender<()>, -} - -pub trait UpdateHandler { - fn handle_update(&mut self, update_id: u64, meta: Processing, content: &[u8]) -> Result, Failed>; -} - -impl UpdateHandler for F -where F: FnMut(u64, Processing, &[u8]) -> Result, Failed> + Send + 'static { - fn handle_update(&mut self, update_id: u64, meta: Processing, content: &[u8]) -> Result, Failed> { - self(update_id, meta, content) - } -} - -impl UpdateStore { - pub fn open( - size: Option, - path: P, - mut update_handler: U, - ) -> heed::Result>> - where - P: AsRef, - U: UpdateHandler + Send + 'static, - M: for<'a> Deserialize<'a> + Serialize + Send + Sync + Clone, - N: Serialize, - E: Serialize, - { - let mut options = EnvOpenOptions::new(); - if let Some(size) = size { - options.map_size(size); - } - options.max_dbs(5); - - let env = options.open(path)?; - let pending_meta = env.create_database(Some("pending-meta"))?; - let pending = env.create_database(Some("pending"))?; - let processed_meta = env.create_database(Some("processed-meta"))?; - let aborted_meta = env.create_database(Some("aborted-meta"))?; - let failed_meta = env.create_database(Some("failed-meta"))?; - let processing = Arc::new(RwLock::new(None)); - - let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); - // Send a first notification to trigger the process. - let _ = notification_sender.send(()); - - let update_store = Arc::new(UpdateStore { - env, - pending, - pending_meta, - processed_meta, - aborted_meta, - notification_sender, - failed_meta, - processing, - }); - - let update_store_cloned = update_store.clone(); - std::thread::spawn(move || { - // Block and wait for something to process. - for () in notification_receiver { - loop { - match update_store_cloned.process_pending_update(&mut update_handler) { - Ok(Some(_)) => (), - Ok(None) => break, - Err(e) => eprintln!("error while processing update: {}", e), - } - } - } - }); - - Ok(update_store) - } - - /// Returns the new biggest id to use to store the new update. - fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { - let last_pending = self.pending_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); - - let last_processed = self.processed_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); - - let last_aborted = self.aborted_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); - - let last_update_id = [last_pending, last_processed, last_aborted] - .iter() - .copied() - .flatten() - .max(); - - match last_update_id { - Some(last_id) => Ok(last_id + 1), - None => Ok(0), - } - } - - /// Registers the update content in the pending store and the meta - /// into the pending-meta store. Returns the new unique update id. - pub fn register_update(&self, meta: M, content: &[u8]) -> heed::Result> - where M: Serialize, - { - let mut wtxn = self.env.write_txn()?; - - // We ask the update store to give us a new update id, this is safe, - // no other update can have the same id because we use a write txn before - // asking for the id and registering it so other update registering - // will be forced to wait for a new write txn. - let update_id = self.new_update_id(&wtxn)?; - let update_key = BEU64::new(update_id); - - let meta = Pending::new(meta, update_id); - self.pending_meta.put(&mut wtxn, &update_key, &meta)?; - self.pending.put(&mut wtxn, &update_key, content)?; - - wtxn.commit()?; - - if let Err(e) = self.notification_sender.try_send(()) { - assert!(!e.is_disconnected(), "update notification channel is disconnected"); - } - Ok(meta) - } - /// Executes the user provided function on the next pending update (the one with the lowest id). - /// This is asynchronous as it let the user process the update with a read-only txn and - /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, handler: &mut U) -> heed::Result> - where - U: UpdateHandler, - M: for<'a> Deserialize<'a> + Serialize + Clone, - N: Serialize, - E: Serialize, - { - // Create a read transaction to be able to retrieve the pending update in order. - let rtxn = self.env.read_txn()?; - let first_meta = self.pending_meta.first(&rtxn)?; - - // If there is a pending update we process and only keep - // a reader while processing it, not a writer. - match first_meta { - Some((first_id, pending)) => { - let first_content = self.pending - .get(&rtxn, &first_id)? - .expect("associated update content"); - - // we cahnge the state of the update from pending to processing before we pass it - // to the update handler. Processing store is non persistent to be able recover - // from a failure - let processing = pending.processing(); - self.processing - .write() - .unwrap() - .replace(processing.clone()); - // Process the pending update using the provided user function. - let result = handler.handle_update(first_id.get(), processing, first_content); - drop(rtxn); - - // Once the pending update have been successfully processed - // we must remove the content from the pending and processing stores and - // write the *new* meta to the processed-meta store and commit. - let mut wtxn = self.env.write_txn()?; - self.processing - .write() - .unwrap() - .take(); - self.pending_meta.delete(&mut wtxn, &first_id)?; - self.pending.delete(&mut wtxn, &first_id)?; - match result { - Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?, - Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?, - } - wtxn.commit()?; - - Ok(Some(())) - }, - None => Ok(None) - } - } - - /// The id and metadata of the update that is currently being processed, - /// `None` if no update is being processed. - pub fn processing_update(&self) -> heed::Result)>> - where M: for<'a> Deserialize<'a>, - { - let rtxn = self.env.read_txn()?; - match self.pending_meta.first(&rtxn)? { - Some((key, meta)) => Ok(Some((key.get(), meta))), - None => Ok(None), - } - } - - /// Execute the user defined function with the meta-store iterators, the first - /// iterator is the *processed* meta one, the second the *aborted* meta one - /// and, the last is the *pending* meta one. - pub fn iter_metas(&self, mut f: F) -> heed::Result - where - M: for<'a> Deserialize<'a> + Clone, - N: for<'a> Deserialize<'a>, - F: for<'a> FnMut( - Option>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - ) -> heed::Result, - { - let rtxn = self.env.read_txn()?; - - // We get the pending, processed and aborted meta iterators. - let processed_iter = self.processed_meta.iter(&rtxn)?; - let aborted_iter = self.aborted_meta.iter(&rtxn)?; - let pending_iter = self.pending_meta.iter(&rtxn)?; - let processing = self.processing.read().unwrap().clone(); - let failed_iter = self.failed_meta.iter(&rtxn)?; - - // We execute the user defined function with both iterators. - (f)(processing, processed_iter, aborted_iter, pending_iter, failed_iter) - } - - /// Returns the update associated meta or `None` if the update doesn't exist. - pub fn meta(&self, update_id: u64) -> heed::Result>> - where - M: for<'a> Deserialize<'a> + Clone, - N: for<'a> Deserialize<'a>, - E: for<'a> Deserialize<'a>, - { - let rtxn = self.env.read_txn()?; - let key = BEU64::new(update_id); - - if let Some(ref meta) = *self.processing.read().unwrap() { - if meta.id() == update_id { - return Ok(Some(UpdateStatus::Processing(meta.clone()))); - } - } - - println!("pending"); - if let Some(meta) = self.pending_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Pending(meta))); - } - - println!("processed"); - if let Some(meta) = self.processed_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Processed(meta))); - } - - if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Aborted(meta))); - } - - if let Some(meta) = self.failed_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Failed(meta))); - } - - Ok(None) - } - - /// Aborts an update, an aborted update content is deleted and - /// the meta of it is moved into the aborted updates database. - /// - /// Trying to abort an update that is currently being processed, an update - /// that as already been processed or which doesn't actually exist, will - /// return `None`. - pub fn abort_update(&self, update_id: u64) -> heed::Result>> - where M: Serialize + for<'a> Deserialize<'a>, - { - let mut wtxn = self.env.write_txn()?; - let key = BEU64::new(update_id); - - // We cannot abort an update that is currently being processed. - if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) { - return Ok(None); - } - - let pending = match self.pending_meta.get(&wtxn, &key)? { - Some(meta) => meta, - None => return Ok(None), - }; - - let aborted = pending.abort(); - - self.aborted_meta.put(&mut wtxn, &key, &aborted)?; - self.pending_meta.delete(&mut wtxn, &key)?; - self.pending.delete(&mut wtxn, &key)?; - - wtxn.commit()?; - - Ok(Some(aborted)) - } - - /// Aborts all the pending updates, and not the one being currently processed. - /// Returns the update metas and ids that were successfully aborted. - pub fn abort_pendings(&self) -> heed::Result)>> - where M: Serialize + for<'a> Deserialize<'a>, - { - let mut wtxn = self.env.write_txn()?; - let mut aborted_updates = Vec::new(); - - // We skip the first pending update as it is currently being processed. - for result in self.pending_meta.iter(&wtxn)?.skip(1) { - let (key, pending) = result?; - let id = key.get(); - aborted_updates.push((id, pending.abort())); - } - - for (id, aborted) in &aborted_updates { - let key = BEU64::new(*id); - self.aborted_meta.put(&mut wtxn, &key, &aborted)?; - self.pending_meta.delete(&mut wtxn, &key)?; - self.pending.delete(&mut wtxn, &key)?; - } - - wtxn.commit()?; - - Ok(aborted_updates) - } -} - -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] -pub struct Pending { - update_id: u64, - meta: M, - enqueued_at: DateTime, -} - -impl Pending { - fn new(meta: M, update_id: u64) -> Self { - Self { - enqueued_at: Utc::now(), - meta, - update_id, - } - } - - pub fn processing(self) -> Processing { - Processing { - from: self, - started_processing_at: Utc::now(), - } - } - - pub fn abort(self) -> Aborted { - Aborted { - from: self, - aborted_at: Utc::now(), - } - } - - pub fn meta(&self) -> &M { - &self.meta - } - - pub fn id(&self) -> u64 { - self.update_id - } -} - -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] -pub struct Processed { - success: N, - processed_at: DateTime, - #[serde(flatten)] - from: Processing, -} - -impl Processed { - fn id(&self) -> u64 { - self.from.id() - } -} - -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] -pub struct Processing { - #[serde(flatten)] - from: Pending, - started_processing_at: DateTime, -} - -impl Processing { - pub fn id(&self) -> u64 { - self.from.id() - } - - pub fn meta(&self) -> &M { - self.from.meta() - } - - pub fn process(self, meta: N) -> Processed { - Processed { - success: meta, - from: self, - processed_at: Utc::now(), - } - } - - pub fn fail(self, error: E) -> Failed { - Failed { - from: self, - error, - failed_at: Utc::now(), - } - } -} - -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] -pub struct Aborted { - #[serde(flatten)] - from: Pending, - aborted_at: DateTime, -} - -impl Aborted { - fn id(&self) -> u64 { - self.from.id() - } -} - -#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] -pub struct Failed { - #[serde(flatten)] - from: Processing, - error: E, - failed_at: DateTime, -} - -impl Failed { - fn id(&self) -> u64 { - self.from.id() - } -} - -#[derive(Debug, PartialEq, Eq, Hash, Serialize)] -#[serde(tag = "status")] -pub enum UpdateStatus { - Processing(Processing), - Pending(Pending), - Processed(Processed), - Aborted(Aborted), - Failed(Failed), -} - -impl UpdateStatus { - pub fn id(&self) -> u64 { - match self { - UpdateStatus::Processing(u) => u.id(), - UpdateStatus::Pending(u) => u.id(), - UpdateStatus::Processed(u) => u.id(), - UpdateStatus::Aborted(u) => u.id(), - UpdateStatus::Failed(u) => u.id(), - } - } -} - -impl From> for UpdateStatus { - fn from(other: Pending) -> Self { - Self::Pending(other) - } -} - -impl From> for UpdateStatus { - fn from(other: Aborted) -> Self { - Self::Aborted(other) - } -} - -impl From> for UpdateStatus { - fn from(other: Processed) -> Self { - Self::Processed(other) - } -} - -impl From> for UpdateStatus { - fn from(other: Processing) -> Self { - Self::Processing(other) - } -} - -impl From> for UpdateStatus { - fn from(other: Failed) -> Self { - Self::Failed(other) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::thread; - use std::time::{Duration, Instant}; - - #[test] - fn simple() { - let dir = tempfile::tempdir().unwrap(); - let update_store = UpdateStore::open(None, dir, |_id, meta: Processing, _content: &_| -> Result<_, Failed<_, ()>> { - let new_meta = meta.meta().to_string() + " processed"; - let processed = meta.process(new_meta); - Ok(processed) - }).unwrap(); - - let meta = String::from("kiki"); - let update = update_store.register_update(meta, &[]).unwrap(); - thread::sleep(Duration::from_millis(100)); - let meta = update_store.meta(update.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "kiki processed"); - } else { - panic!() - } - } - - #[test] - #[ignore] - fn long_running_update() { - let dir = tempfile::tempdir().unwrap(); - let update_store = UpdateStore::open(None, dir, |_id, meta: Processing, _content:&_| -> Result<_, Failed<_, ()>> { - thread::sleep(Duration::from_millis(400)); - let new_meta = meta.meta().to_string() + "processed"; - let processed = meta.process(new_meta); - Ok(processed) - }).unwrap(); - - let before_register = Instant::now(); - - let meta = String::from("kiki"); - let update_kiki = update_store.register_update(meta, &[]).unwrap(); - assert!(before_register.elapsed() < Duration::from_millis(200)); - - let meta = String::from("coco"); - let update_coco = update_store.register_update(meta, &[]).unwrap(); - assert!(before_register.elapsed() < Duration::from_millis(200)); - - let meta = String::from("cucu"); - let update_cucu = update_store.register_update(meta, &[]).unwrap(); - assert!(before_register.elapsed() < Duration::from_millis(200)); - - thread::sleep(Duration::from_millis(400 * 3 + 100)); - - let meta = update_store.meta(update_kiki.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "kiki processed"); - } else { - panic!() - } - - let meta = update_store.meta(update_coco.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "coco processed"); - } else { - panic!() - } - - let meta = update_store.meta(update_cucu.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "cucu processed"); - } else { - panic!() - } - } -}