From 6a3f625e11bc9ab41a2bd01a306b3c8a63ea65cd Mon Sep 17 00:00:00 2001 From: mpostma Date: Sat, 16 Jan 2021 15:09:48 +0100 Subject: [PATCH] WIP: refactor IndexController change the architecture of the index controller to allow it to own an index store. --- Cargo.lock | 40 ++ Cargo.toml | 1 + src/data/mod.rs | 47 +- src/data/search.rs | 4 +- src/data/updates.rs | 66 ++- src/index_controller/index_store.rs | 255 ++++++++++ src/index_controller/mod.rs | 293 +++++------- src/index_controller/update_store/mod.rs | 49 ++ src/lib.rs | 2 +- src/option.rs | 52 +- src/routes/document.rs | 2 +- src/routes/index.rs | 66 +-- src/routes/settings/mod.rs | 14 +- src/updates/mod.rs | 12 +- src/updates/update_store.rs | 581 +++++++++++++++++++++++ 15 files changed, 1197 insertions(+), 287 deletions(-) create mode 100644 src/index_controller/index_store.rs create mode 100644 src/index_controller/update_store/mod.rs create mode 100644 src/updates/update_store.rs diff --git a/Cargo.lock b/Cargo.lock index e0457b4d5..0282bfe92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,15 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "Inflector" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" +dependencies = [ + "lazy_static", + "regex", +] + [[package]] name = "actix-codec" version = "0.3.0" @@ -1639,6 +1649,7 @@ dependencies = [ "mime", "obkv", "once_cell", + "ouroboros", "page_size", "rand 0.7.3", "rayon", @@ -1941,6 +1952,29 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ouroboros" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "069fb33e127cabdc8ad6a287eed9719b85c612d36199777f6dc41ad91f7be41a" +dependencies = [ + "ouroboros_macro", + "stable_deref_trait", +] + +[[package]] +name = "ouroboros_macro" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad938cc920f299d6dce91e43d3ce316e785f4aa4bc4243555634dc2967098fc6" +dependencies = [ + "Inflector", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "page_size" version = "0.4.2" @@ -2771,6 +2805,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "standback" version = "0.2.13" diff --git a/Cargo.toml b/Cargo.toml index c06847253..26a3dc1b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ whoami = "1.0.0" dashmap = "4.0.2" page_size = "0.4.2" obkv = "0.1.1" +ouroboros = "0.8.0" [dependencies.sentry] default-features = false diff --git a/src/data/mod.rs b/src/data/mod.rs index 4600531bf..4258278d2 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -8,17 +8,16 @@ use std::sync::Arc; use sha2::Digest; -use crate::{option::Opt, updates::Settings}; -use crate::updates::UpdateQueue; -use crate::index_controller::IndexController; +use crate::{option::Opt, index_controller::Settings}; +use crate::index_controller::{IndexStore, UpdateStore}; #[derive(Clone)] pub struct Data { - inner: Arc, + inner: Arc>, } impl Deref for Data { - type Target = DataInner; + type Target = DataInner; fn deref(&self) -> &Self::Target { &self.inner @@ -26,8 +25,8 @@ impl Deref for Data { } #[derive(Clone)] -pub struct DataInner { - pub indexes: Arc>, +pub struct DataInner { + pub indexes: Arc, api_keys: ApiKeys, options: Opt, } @@ -58,11 +57,10 @@ impl ApiKeys { impl Data { pub fn new(options: Opt) -> anyhow::Result { - let db_size = options.max_mdb_size.get_bytes() as usize; - let indexes = IndexController::new(&options.db_path)?; - let indexes = Arc::new(indexes); - - let update_queue = Arc::new(UpdateQueue::new(&options, indexes.clone())?); + let path = options.db_path.clone(); + let index_store = IndexStore::new(&path)?; + let index_controller = UpdateStore::new(index_store); + let indexes = Arc::new(index_controller); let mut api_keys = ApiKeys { master: options.clone().master_key, @@ -72,31 +70,28 @@ impl Data { api_keys.generate_missing_api_keys(); - let inner = DataInner { indexes, options, update_queue, api_keys }; + let inner = DataInner { indexes, options, api_keys }; let inner = Arc::new(inner); Ok(Data { inner }) } - pub fn settings>(&self, _index: S) -> anyhow::Result { - let txn = self.indexes.env.read_txn()?; - let fields_map = self.indexes.fields_ids_map(&txn)?; - println!("fields_map: {:?}", fields_map); + pub fn settings>(&self, index_uid: S) -> anyhow::Result { + let index = self.indexes + .get(&index_uid)? + .ok_or_else(|| anyhow::anyhow!("Index {} does not exist.", index_uid.as_ref()))?; - let displayed_attributes = self.indexes - .displayed_fields(&txn)? + let displayed_attributes = index + .displayed_fields()? .map(|fields| fields.into_iter().map(String::from).collect()) .unwrap_or_else(|| vec!["*".to_string()]); - let searchable_attributes = self.indexes - .searchable_fields(&txn)? - .map(|fields| fields - .into_iter() - .map(String::from) - .collect()) + let searchable_attributes = index + .searchable_fields()? + .map(|fields| fields.into_iter().map(String::from).collect()) .unwrap_or_else(|| vec!["*".to_string()]); - let faceted_attributes = self.indexes.faceted_fields(&txn)? + let faceted_attributes = index.faceted_fields()? .into_iter() .map(|(k, v)| (k, v.to_string())) .collect(); diff --git a/src/data/search.rs b/src/data/search.rs index 69029d8a9..c30345073 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -107,10 +107,10 @@ impl Data { pub fn search>(&self, index: S, search_query: SearchQuery) -> anyhow::Result { let start = Instant::now(); let index = self.indexes - .get(index)? + .get(&index)? .ok_or_else(|| Error::OpenIndex(format!("Index {} doesn't exists.", index.as_ref())))?; - let Results { found_words, documents_ids, nb_hits, limit, .. } = index.search(search_query)?; + let Results { found_words, documents_ids, nb_hits, limit, .. } = index.search(&search_query)?; let fields_ids_map = index.fields_ids_map()?; diff --git a/src/data/updates.rs b/src/data/updates.rs index 0654f6fd3..507223d41 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -1,18 +1,20 @@ use std::ops::Deref; +use milli::update::{IndexDocumentsMethod, UpdateFormat}; +//use milli::update_store::UpdateStatus; use async_compression::tokio_02::write::GzipEncoder; use futures_util::stream::StreamExt; use tokio::io::AsyncWriteExt; -use milli::update::{IndexDocumentsMethod, UpdateFormat}; -use milli::update_store::UpdateStatus; use super::Data; -use crate::updates::{UpdateMeta, UpdateResult, UpdateStatusResponse, Settings}; +use crate::index_controller::IndexController; +use crate::index_controller::{UpdateStatusResponse, Settings}; + impl Data { pub async fn add_documents( &self, - _index: S, + index: S, method: IndexDocumentsMethod, format: UpdateFormat, mut stream: impl futures::Stream> + Unpin, @@ -20,7 +22,7 @@ impl Data { where B: Deref, E: std::error::Error + Send + Sync + 'static, - S: AsRef, + S: AsRef + Send + Sync + 'static, { let file = tokio::task::spawn_blocking(tempfile::tempfile).await?; let file = tokio::fs::File::from_std(file?); @@ -37,43 +39,39 @@ impl Data { let file = file.into_std().await; let mmap = unsafe { memmap::Mmap::map(&file)? }; - let meta = UpdateMeta::DocumentsAddition { method, format }; - - let queue = self.update_queue.clone(); - let update = tokio::task::spawn_blocking(move || queue.register_update(meta, &mmap[..])).await??; - + let indexes = self.indexes.clone(); + let update = tokio::task::spawn_blocking(move ||indexes.add_documents(index, method, format, &mmap[..])).await??; Ok(update.into()) } - pub async fn update_settings>( + pub async fn update_settings + Send + Sync + 'static>( &self, - _index: S, + index: S, settings: Settings ) -> anyhow::Result { - let meta = UpdateMeta::Settings(settings); - let queue = self.update_queue.clone(); - let update = tokio::task::spawn_blocking(move || queue.register_update(meta, &[])).await??; + let indexes = self.indexes.clone(); + let update = tokio::task::spawn_blocking(move || indexes.update_settings(index, settings)).await??; Ok(update.into()) } - #[inline] - pub fn get_update_status(&self, _index: &str, uid: u64) -> anyhow::Result>> { - self.update_queue.get_update_status(uid) - } + //#[inline] + //pub fn get_update_status>(&self, _index: S, uid: u64) -> anyhow::Result>> { + //self.indexes.get_update_status(uid) + //} - pub fn get_updates_status(&self, _index: &str) -> anyhow::Result>> { - let result = self.update_queue.iter_metas(|processing, processed, pending, aborted, failed| { - let mut metas = processing - .map(UpdateStatus::from) - .into_iter() - .chain(processed.filter_map(|i| Some(i.ok()?.1)).map(UpdateStatus::from)) - .chain(pending.filter_map(|i| Some(i.ok()?.1)).map(UpdateStatus::from)) - .chain(aborted.filter_map(|i| Some(i.ok()?.1)).map(UpdateStatus::from)) - .chain(failed.filter_map(|i| Some(i.ok()?.1)).map(UpdateStatus::from)) - .collect::>(); - metas.sort_by(|a, b| a.id().cmp(&b.id())); - Ok(metas) - })?; - Ok(result) - } + //pub fn get_updates_status(&self, _index: &str) -> anyhow::Result>> { + //let result = self.update_queue.iter_metas(|processing, processed, pending, aborted, failed| { + //let mut metas = processing + //.map(UpdateStatus::from) + //.into_iter() + //.chain(processed.filter_map(|i| Some(i.ok()?.1)).map(UpdateStatus::from)) + //.chain(pending.filter_map(|i| Some(i.ok()?.1)).map(UpdateStatus::from)) + //.chain(aborted.filter_map(|i| Some(i.ok()?.1)).map(UpdateStatus::from)) + //.chain(failed.filter_map(|i| Some(i.ok()?.1)).map(UpdateStatus::from)) + //.collect::>(); + //metas.sort_by(|a, b| a.id().cmp(&b.id())); + //Ok(metas) + //})?; + //Ok(result) + //} } diff --git a/src/index_controller/index_store.rs b/src/index_controller/index_store.rs new file mode 100644 index 000000000..6ba6621c3 --- /dev/null +++ b/src/index_controller/index_store.rs @@ -0,0 +1,255 @@ +use std::fs::File; +use std::io::{Read, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::collections::HashMap; + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use dashmap::DashMap; +use heed::types::{Str, SerdeBincode}; +use heed::{EnvOpenOptions, Env, Database}; +use milli::{Index, FieldsIdsMap, SearchResult, FieldId, facet::FacetType}; +use serde::{Serialize, Deserialize}; +use ouroboros::self_referencing; + +use crate::data::SearchQuery; + +const CONTROLLER_META_FILENAME: &str = "index_controller_meta"; +const INDEXES_CONTROLLER_FILENAME: &str = "indexes_db"; +const INDEXES_DB_NAME: &str = "indexes_db"; + + +#[derive(Debug, Serialize, Deserialize)] +struct IndexStoreMeta { + open_options: EnvOpenOptions, + created_at: DateTime, +} + +impl IndexStoreMeta { + fn from_path(path: impl AsRef) -> Result> { + let mut path = path.as_ref().to_path_buf(); + path.push(CONTROLLER_META_FILENAME); + if path.exists() { + let mut file = File::open(path)?; + let mut buffer = Vec::new(); + let n = file.read_to_end(&mut buffer)?; + let meta: IndexStoreMeta = serde_json::from_slice(&buffer[..n])?; + Ok(Some(meta)) + } else { + Ok(None) + } + } + + fn to_path(self, path: impl AsRef) -> Result<()> { + let mut path = path.as_ref().to_path_buf(); + path.push(CONTROLLER_META_FILENAME); + if path.exists() { + Err(anyhow::anyhow!("Index controller metadata already exists")) + } else { + let mut file = File::create(path)?; + let json = serde_json::to_vec(&self)?; + file.write_all(&json)?; + Ok(()) + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct IndexMetadata { + created_at: DateTime, + open_options: EnvOpenOptions, + uuid: String, +} + +impl IndexMetadata { + fn open_index(self, path: impl AsRef) -> Result { + // create a path in the form "db_path/indexes/index_id" + let mut path = path.as_ref().to_path_buf(); + path.push("indexes"); + path.push(&self.uuid); + Ok(Index::new(self.open_options, path)?) + } +} + + +#[self_referencing] +pub struct IndexView { + pub index: Arc, + #[borrows(index)] + #[covariant] + pub txn: heed::RoTxn<'this>, + uuid: String, +} + +impl IndexView { + pub fn search(&self, search_query: &SearchQuery) -> Result { + self.with(|this| { + let mut search = this.index.search(&this.txn); + if let Some(query) = &search_query.q { + search.query(query); + } + + if let Some(offset) = search_query.offset { + search.offset(offset); + } + + let limit = search_query.limit; + search.limit(limit); + + Ok(search.execute()?) + }) + } + + #[inline] + pub fn fields_ids_map(&self) -> Result { + self.with(|this| Ok(this.index.fields_ids_map(&this.txn)?)) + + } + + #[inline] + pub fn displayed_fields_ids(&self) -> Result>> { + self.with(|this| Ok(this.index.displayed_fields_ids(&this.txn)?)) + } + + #[inline] + pub fn displayed_fields(&self) -> Result>> { + self.with(|this| Ok(this.index + .displayed_fields(&this.txn)? + .map(|fields| fields.into_iter().map(String::from).collect()))) + } + + #[inline] + pub fn searchable_fields(&self) -> Result>> { + self.with(|this| Ok(this.index + .searchable_fields(&this.txn)? + .map(|fields| fields.into_iter().map(String::from).collect()))) + } + + #[inline] + pub fn faceted_fields(&self) -> Result> { + self.with(|this| Ok(this.index.faceted_fields(&this.txn)?)) + } + + pub fn documents(&self, ids: &[u32]) -> Result)>> { + let txn = self.borrow_txn(); + let index = self.borrow_index(); + Ok(index.documents(txn, ids.into_iter().copied())?) + } + + //pub async fn add_documents( + //&self, + //method: IndexDocumentsMethod, + //format: UpdateFormat, + //mut stream: impl futures::Stream> + Unpin, + //) -> anyhow::Result + //where + //B: Deref, + //E: std::error::Error + Send + Sync + 'static, + //{ + //let file = tokio::task::spawn_blocking(tempfile::tempfile).await?; + //let file = tokio::fs::File::from_std(file?); + //let mut encoder = GzipEncoder::new(file); + + //while let Some(result) = stream.next().await { + //let bytes = &*result?; + //encoder.write_all(&bytes[..]).await?; + //} + + //encoder.shutdown().await?; + //let mut file = encoder.into_inner(); + //file.sync_all().await?; + //let file = file.into_std().await; + //let mmap = unsafe { memmap::Mmap::map(&file)? }; + + //let meta = UpdateMeta::DocumentsAddition { method, format }; + + //let index = self.index.clone(); + //let queue = self.update_store.clone(); + //let update = tokio::task::spawn_blocking(move || queue.register_update(index, meta, &mmap[..])).await??; + //Ok(update.into()) + //} +} + +pub struct IndexStore { + path: PathBuf, + env: Env, + indexes_db: Database>, + indexes: DashMap)>, +} + +impl IndexStore { + /// Open the index controller from meta found at path, and create a new one if no meta is + /// found. + pub fn new(path: impl AsRef) -> Result { + // If index controller metadata is present, we return the env, otherwise, we create a new + // metadata from scratch before returning a new env. + let path = path.as_ref().to_path_buf(); + let env = match IndexStoreMeta::from_path(&path)? { + Some(meta) => meta.open_options.open(INDEXES_CONTROLLER_FILENAME)?, + None => { + let mut open_options = EnvOpenOptions::new(); + open_options.map_size(page_size::get() * 1000); + let env = open_options.open(INDEXES_CONTROLLER_FILENAME)?; + let created_at = Utc::now(); + let meta = IndexStoreMeta { open_options: open_options.clone(), created_at }; + meta.to_path(&path)?; + env + } + }; + let indexes = DashMap::new(); + let indexes_db = match env.open_database(Some(INDEXES_DB_NAME))? { + Some(indexes_db) => indexes_db, + None => env.create_database(Some(INDEXES_DB_NAME))?, + }; + + Ok(Self { env, indexes, indexes_db, path }) + } + + pub fn get_or_create>(&self, _name: S) -> Result { + todo!() + } + + /// Get an index with read access to the db. The index are lazily loaded, meaning that we first + /// check for its exixtence in the indexes map, and if it doesn't exist, the index db is check + /// for metadata to launch the index. + pub fn get>(&self, name: S) -> Result> { + match self.indexes.get(name.as_ref()) { + Some(entry) => { + let index = entry.1.clone(); + let uuid = entry.0.clone(); + let view = IndexView::try_new(index, |index| index.read_txn(), uuid)?; + Ok(Some(view)) + } + None => { + let txn = self.env.read_txn()?; + match self.indexes_db.get(&txn, name.as_ref())? { + Some(meta) => { + let uuid = meta.uuid.clone(); + let index = Arc::new(meta.open_index(&self.path)?); + self.indexes.insert(name.as_ref().to_owned(), (uuid.clone(), index.clone())); + let view = IndexView::try_new(index, |index| index.read_txn(), uuid)?; + Ok(Some(view)) + } + None => Ok(None) + } + } + } + } + + pub fn get_mut>(&self, _name: S) -> Result> { + todo!() + } + + pub async fn delete_index>(&self, _name:S) -> Result<()> { + todo!() + } + + pub async fn list_indices(&self) -> Result> { + todo!() + } + + pub async fn rename_index(&self, _old: &str, _new: &str) -> Result<()> { + todo!() + } +} diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index a3b29879a..c927a6c5b 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -1,196 +1,145 @@ -use std::fs::File; -use std::io::{Read, Write}; -use std::path::{Path, PathBuf}; +mod index_store; +mod update_store; + +pub use index_store::IndexStore; +pub use update_store::UpdateStore; + +use std::num::NonZeroUsize; +use std::ops::Deref; +use std::collections::HashMap; use anyhow::Result; -use chrono::{DateTime, Utc}; -use dashmap::DashMap; -use dashmap::mapref::one::Ref; -use heed::types::{Str, SerdeBincode}; -use heed::{EnvOpenOptions, Env, Database}; -use milli::{Index, FieldsIdsMap, SearchResult, FieldId}; -use serde::{Serialize, Deserialize}; +use milli::update::{IndexDocumentsMethod, UpdateFormat}; +use milli::update_store::{Processed, Processing, Failed, Pending, Aborted}; +use serde::{Serialize, Deserialize, de::Deserializer}; -use crate::data::SearchQuery; +pub type UpdateStatusResponse = UpdateStatus; -const CONTROLLER_META_FILENAME: &str = "index_controller_meta"; -const INDEXES_CONTROLLER_FILENAME: &str = "indexes_db"; -const INDEXES_DB_NAME: &str = "indexes_db"; - -pub trait UpdateStore {} - -pub struct IndexController { - path: PathBuf, - update_store: U, - env: Env, - indexes_db: Database>, - indexes: DashMap, +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum UpdateMeta { + DocumentsAddition { method: IndexDocumentsMethod, format: UpdateFormat }, + ClearDocuments, + Settings(Settings), + Facets(Facets), } -#[derive(Debug, Serialize, Deserialize)] -struct IndexControllerMeta { - open_options: EnvOpenOptions, - created_at: DateTime, +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +pub struct Facets { + pub level_group_size: Option, + pub min_level_size: Option, } -impl IndexControllerMeta { - fn from_path(path: impl AsRef) -> Result> { - let mut path = path.as_ref().to_path_buf(); - path.push(CONTROLLER_META_FILENAME); - if path.exists() { - let mut file = File::open(path)?; - let mut buffer = Vec::new(); - let n = file.read_to_end(&mut buffer)?; - let meta: IndexControllerMeta = serde_json::from_slice(&buffer[..n])?; - Ok(Some(meta)) - } else { - Ok(None) - } - } +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type")] +pub enum UpdateStatus { + Pending { update_id: u64, meta: Pending }, + Progressing { update_id: u64, meta: P }, + Processed { update_id: u64, meta: Processed }, + Aborted { update_id: u64, meta: Aborted }, +} - fn to_path(self, path: impl AsRef) -> Result<()> { - let mut path = path.as_ref().to_path_buf(); - path.push(CONTROLLER_META_FILENAME); - if path.exists() { - Err(anyhow::anyhow!("Index controller metadata already exists")) - } else { - let mut file = File::create(path)?; - let json = serde_json::to_vec(&self)?; - file.write_all(&json)?; - Ok(()) +fn deserialize_some<'de, T, D>(deserializer: D) -> Result, D::Error> +where T: Deserialize<'de>, + D: Deserializer<'de> +{ + Deserialize::deserialize(deserializer).map(Some) +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +pub struct Settings { + #[serde( + default, + deserialize_with = "deserialize_some", + skip_serializing_if = "Option::is_none", + )] + pub displayed_attributes: Option>>, + + #[serde( + default, + deserialize_with = "deserialize_some", + skip_serializing_if = "Option::is_none", + )] + pub searchable_attributes: Option>>, + + #[serde(default)] + pub faceted_attributes: Option>>, + + #[serde( + default, + deserialize_with = "deserialize_some", + skip_serializing_if = "Option::is_none", + )] + pub criteria: Option>>, +} + +impl Settings { + pub fn cleared() -> Self { + Self { + displayed_attributes: Some(None), + searchable_attributes: Some(None), + faceted_attributes: Some(None), + criteria: Some(None), } } } - -#[derive(Debug, Serialize, Deserialize)] -struct IndexMetadata { - created_at: DateTime, - open_options: EnvOpenOptions, - id: String, +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum UpdateResult { + //DocumentsAddition(DocumentAdditionResult), + Other, } -impl IndexMetadata { - fn open_index(&self, path: impl AsRef) -> Result { - // create a path in the form "db_path/indexes/index_id" - let mut path = path.as_ref().to_path_buf(); - path.push("indexes"); - path.push(&self.id); - Ok(Index::new(self.open_options, path)?) - } -} +/// The `IndexController` is in charge of the access to the underlying indices. It splits the logic +/// for read access which is provided, and write access which must be provided. This allows the +/// implementer to define the behaviour of write accesses to the indices, and abstract the +/// scheduling of the updates. The implementer must be able to provide an instance of `IndexStore` +pub trait IndexController: Deref { -struct IndexView<'a, U> { - txn: heed::RoTxn<'a>, - index: Ref<'a, String, Index>, - update_store: &'a U, -} + /* + * Write operations + * + * Logic for the write operation need to be provided by the implementer, since they can be made + * asynchronous thanks to an update_store for example. + * + * */ -impl<'a, U: UpdateStore> IndexView<'a, U> { - pub fn search(&self, search_query: SearchQuery) -> Result { - let mut search = self.index.search(&self.txn); - if let Some(query) = &search_query.q { - search.query(query); - } + /// Perform document addition on the database. If the provided index does not exist, it will be + /// created when the addition is applied to the index. + fn add_documents>( + &self, + index: S, + method: IndexDocumentsMethod, + format: UpdateFormat, + data: &[u8], + ) -> anyhow::Result; - if let Some(offset) = search_query.offset { - search.offset(offset); - } + /// Updates an index settings. If the index does not exist, it will be created when the update + /// is applied to the index. + fn update_settings>(&self, index_uid: S, settings: Settings) -> anyhow::Result; - let limit = search_query.limit; - search.limit(limit); + /// Create an index with the given `index_uid`. + fn create_index>(&self, index_uid: S) -> Result<()>; - Ok(search.execute()?) - } + /// Delete index with the given `index_uid`, attempting to close it beforehand. + fn delete_index>(&self, index_uid: S) -> Result<()>; - pub fn fields_ids_map(&self) -> Result { - Ok(self.index.fields_ids_map(&self.txn)?) - } + /// Swap two indexes, concretely, it simply swaps the index the names point to. + fn swap_indices, S2: AsRef>(&self, index1_uid: S1, index2_uid: S2) -> Result<()>; - pub fn fields_displayed_fields_ids(&self) -> Result>> { - Ok(self.index.displayed_fields_ids(&self.txn)?) - } - - pub fn documents(&self, ids: Vec) -> Result)>> { - Ok(self.index.documents(&self.txn, ids)?) - } -} - -impl IndexController { - /// Open the index controller from meta found at path, and create a new one if no meta is - /// found. - pub fn new(path: impl AsRef, update_store: U) -> Result { - // If index controller metadata is present, we return the env, otherwise, we create a new - // metadata from scratch before returning a new env. - let path = path.as_ref().to_path_buf(); - let env = match IndexControllerMeta::from_path(&path)? { - Some(meta) => meta.open_options.open(INDEXES_CONTROLLER_FILENAME)?, - None => { - let open_options = EnvOpenOptions::new() - .map_size(page_size::get() * 1000); - let env = open_options.open(INDEXES_CONTROLLER_FILENAME)?; - let created_at = Utc::now(); - let meta = IndexControllerMeta { open_options: open_options.clone(), created_at }; - meta.to_path(path)?; - env - } - }; - let indexes = DashMap::new(); - let indexes_db = match env.open_database(Some(INDEXES_DB_NAME))? { - Some(indexes_db) => indexes_db, - None => env.create_database(Some(INDEXES_DB_NAME))?, - }; - - Ok(Self { env, indexes, indexes_db, update_store, path }) - } - - pub fn get_or_create>(&mut self, name: S) -> Result> { - todo!() - } - - /// Get an index with read access to the db. The index are lazily loaded, meaning that we first - /// check for its exixtence in the indexes map, and if it doesn't exist, the index db is check - /// for metadata to launch the index. - pub fn get>(&self, name: S) -> Result>> { - let update_store = &self.update_store; - match self.indexes.get(name.as_ref()) { - Some(index) => { - let txn = index.read_txn()?; - Ok(Some(IndexView { index, update_store, txn })) - } - None => { - let txn = self.env.read_txn()?; - match self.indexes_db.get(&txn, name.as_ref())? { - Some(meta) => { - let index = meta.open_index(self.path)?; - self.indexes.insert(name.as_ref().to_owned(), index); - // TODO: create index view - match self.indexes.get(name.as_ref()) { - Some(index) => { - let txn = index.read_txn()?; - Ok(Some(IndexView { index, txn, update_store })) - } - None => Ok(None) - } - } - None => Ok(None) - } - } - } - } - - pub fn get_mut>(&self, name: S) -> Result>> { - todo!() - } - - pub async fn delete_index>(&self, name:S) -> Result<()> { - todo!() - } - - pub async fn list_indices(&self) -> Result> { - todo!() - } - - pub async fn rename_index(&self, old: &str, new: &str) -> Result<()> { + /// Apply an update to the given index. This method can be called when an update is ready to be + /// processed + fn handle_update>( + &self, + _index: S, + _update_id: u64, + _meta: Processing, + _content: &[u8] + ) -> Result, Failed> { todo!() } } + diff --git a/src/index_controller/update_store/mod.rs b/src/index_controller/update_store/mod.rs new file mode 100644 index 000000000..84db2f63d --- /dev/null +++ b/src/index_controller/update_store/mod.rs @@ -0,0 +1,49 @@ +use std::ops::Deref; + +use super::{IndexStore, IndexController}; + +pub struct UpdateStore { + index_store: IndexStore, +} + +impl Deref for UpdateStore { + type Target = IndexStore; + + fn deref(&self) -> &Self::Target { + &self.index_store + } +} + +impl UpdateStore { + pub fn new(index_store: IndexStore) -> Self { + Self { index_store } + } +} + +impl IndexController for UpdateStore { + fn add_documents>( + &self, + _index: S, + _method: milli::update::IndexDocumentsMethod, + _format: milli::update::UpdateFormat, + _data: &[u8], + ) -> anyhow::Result { + todo!() + } + + fn update_settings>(&self, _index_uid: S, _settings: crate::index_controller::Settings) -> anyhow::Result { + todo!() + } + + fn create_index>(&self, _index_uid: S) -> anyhow::Result<()> { + todo!() + } + + fn delete_index>(&self, _index_uid: S) -> anyhow::Result<()> { + todo!() + } + + fn swap_indices, S2: AsRef>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> { + todo!() + } +} diff --git a/src/lib.rs b/src/lib.rs index f5dd79b0b..df9381914 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ pub mod error; pub mod helpers; pub mod option; pub mod routes; -mod updates; +//mod updates; mod index_controller; use actix_http::Error; diff --git a/src/option.rs b/src/option.rs index f9e98f4fa..f280553e1 100644 --- a/src/option.rs +++ b/src/option.rs @@ -9,10 +9,60 @@ use rustls::{ AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient, NoClientAuth, RootCertStore, }; +use grenad::CompressionType; use structopt::StructOpt; -use crate::updates::IndexerOpts; +#[derive(Debug, Clone, StructOpt)] +pub struct IndexerOpts { + /// The amount of documents to skip before printing + /// a log regarding the indexing advancement. + #[structopt(long, default_value = "100000")] // 100k + pub log_every_n: usize, + /// MTBL max number of chunks in bytes. + #[structopt(long)] + pub max_nb_chunks: Option, + + /// The maximum amount of memory to use for the MTBL buffer. It is recommended + /// to use something like 80%-90% of the available memory. + /// + /// It is automatically split by the number of jobs e.g. if you use 7 jobs + /// and 7 GB of max memory, each thread will use a maximum of 1 GB. + #[structopt(long, default_value = "7 GiB")] + pub max_memory: Byte, + + /// Size of the linked hash map cache when indexing. + /// The bigger it is, the faster the indexing is but the more memory it takes. + #[structopt(long, default_value = "500")] + pub linked_hash_map_size: usize, + + /// The name of the compression algorithm to use when compressing intermediate + /// chunks during indexing documents. + /// + /// Choosing a fast algorithm will make the indexing faster but may consume more memory. + #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] + pub chunk_compression_type: CompressionType, + + /// The level of compression of the chosen algorithm. + #[structopt(long, requires = "chunk-compression-type")] + pub chunk_compression_level: Option, + + /// The number of bytes to remove from the begining of the chunks while reading/sorting + /// or merging them. + /// + /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, + /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. + #[structopt(long, default_value = "4 GiB")] + pub chunk_fusing_shrink_size: Byte, + + /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. + #[structopt(long)] + pub enable_chunk_fusing: bool, + + /// Number of parallel jobs for indexing, defaults to # of CPUs. + #[structopt(long)] + pub indexing_jobs: Option, +} const POSSIBLE_ENV: [&str; 2] = ["development", "production"]; #[derive(Debug, Clone, StructOpt)] diff --git a/src/routes/document.rs b/src/routes/document.rs index 6c5f93991..aeec0e5df 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -122,7 +122,7 @@ async fn add_documents_json( ) -> Result { let addition_result = data .add_documents( - &path.index_uid, + path.into_inner().index_uid, IndexDocumentsMethod::UpdateDocuments, UpdateFormat::Json, body diff --git a/src/routes/index.rs b/src/routes/index.rs index 515e771e1..fa4ae0679 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -1,7 +1,7 @@ use actix_web::{delete, get, post, put}; use actix_web::{web, HttpResponse}; use chrono::{DateTime, Utc}; -use log::error; +//use log::error; use serde::{Deserialize, Serialize}; use crate::Data; @@ -94,8 +94,8 @@ async fn delete_index( #[derive(Deserialize)] struct UpdateParam { - index_uid: String, - update_id: u64, + _index_uid: String, + _update_id: u64, } #[get( @@ -103,39 +103,41 @@ struct UpdateParam { wrap = "Authentication::Private" )] async fn get_update_status( - data: web::Data, - path: web::Path, + _data: web::Data, + _path: web::Path, ) -> Result { - let result = data.get_update_status(&path.index_uid, path.update_id); - match result { - Ok(Some(meta)) => { - let json = serde_json::to_string(&meta).unwrap(); - Ok(HttpResponse::Ok().body(json)) - } - Ok(None) => { - todo!() - } - Err(e) => { - error!("{}", e); - todo!() - } - } + todo!() + //let result = data.get_update_status(&path.index_uid, path.update_id); + //match result { + //Ok(Some(meta)) => { + //let json = serde_json::to_string(&meta).unwrap(); + //Ok(HttpResponse::Ok().body(json)) + //} + //Ok(None) => { + //todo!() + //} + //Err(e) => { + //error!("{}", e); + //todo!() + //} + //} } #[get("/indexes/{index_uid}/updates", wrap = "Authentication::Private")] async fn get_all_updates_status( - data: web::Data, - path: web::Path, + _data: web::Data, + _path: web::Path, ) -> Result { - let result = data.get_updates_status(&path.index_uid); - match result { - Ok(metas) => { - let json = serde_json::to_string(&metas).unwrap(); - Ok(HttpResponse::Ok().body(json)) - } - Err(e) => { - error!("{}", e); - todo!() - } - } + todo!() + //let result = data.get_updates_status(&path.index_uid); + //match result { + //Ok(metas) => { + //let json = serde_json::to_string(&metas).unwrap(); + //Ok(HttpResponse::Ok().body(json)) + //} + //Err(e) => { + //error!("{}", e); + //todo!() + //} + //} } diff --git a/src/routes/settings/mod.rs b/src/routes/settings/mod.rs index d54ad469c..56c1d34a0 100644 --- a/src/routes/settings/mod.rs +++ b/src/routes/settings/mod.rs @@ -3,7 +3,7 @@ use log::error; use crate::Data; use crate::error::ResponseError; -use crate::updates::Settings; +use crate::index_controller::Settings; use crate::helpers::Authentication; #[macro_export] @@ -15,19 +15,19 @@ macro_rules! make_setting_route { use crate::data; use crate::error::ResponseError; use crate::helpers::Authentication; - use crate::updates::Settings; + use crate::index_controller::Settings; #[actix_web::delete($route, wrap = "Authentication::Private")] pub async fn delete( data: web::Data, index_uid: web::Path, ) -> Result { - use crate::updates::Settings; + use crate::index_controller::Settings; let settings = Settings { $attr: Some(None), ..Default::default() }; - match data.update_settings(index_uid.as_ref(), settings).await { + match data.update_settings(index_uid.into_inner(), settings).await { Ok(update_status) => { let json = serde_json::to_string(&update_status).unwrap(); Ok(HttpResponse::Ok().body(json)) @@ -50,7 +50,7 @@ macro_rules! make_setting_route { ..Default::default() }; - match data.update_settings(index_uid.as_ref(), settings).await { + match data.update_settings(index_uid.into_inner(), settings).await { Ok(update_status) => { let json = serde_json::to_string(&update_status).unwrap(); Ok(HttpResponse::Ok().body(json)) @@ -141,7 +141,7 @@ async fn update_all( index_uid: web::Path, body: web::Json, ) -> Result { - match data.update_settings(index_uid.as_ref(), body.into_inner()).await { + match data.update_settings(index_uid.into_inner(), body.into_inner()).await { Ok(update_result) => { let json = serde_json::to_string(&update_result).unwrap(); Ok(HttpResponse::Ok().body(json)) @@ -176,7 +176,7 @@ async fn delete_all( index_uid: web::Path, ) -> Result { let settings = Settings::cleared(); - match data.update_settings(index_uid.as_ref(), settings).await { + match data.update_settings(index_uid.into_inner(), settings).await { Ok(update_result) => { let json = serde_json::to_string(&update_result).unwrap(); Ok(HttpResponse::Ok().body(json)) diff --git a/src/updates/mod.rs b/src/updates/mod.rs index faefe7804..7e0802595 100644 --- a/src/updates/mod.rs +++ b/src/updates/mod.rs @@ -1,10 +1,10 @@ mod settings; +mod update_store; pub use settings::{Settings, Facets}; use std::io; use std::sync::Arc; -use std::ops::Deref; use std::fs::create_dir_all; use std::collections::HashMap; @@ -55,16 +55,6 @@ pub struct UpdateQueue { inner: Arc>, } -impl crate::index_controller::UpdateStore for UpdateQueue {} - -impl Deref for UpdateQueue { - type Target = Arc>; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - #[derive(Debug, Clone, StructOpt)] pub struct IndexerOpts { /// The amount of documents to skip before printing diff --git a/src/updates/update_store.rs b/src/updates/update_store.rs new file mode 100644 index 000000000..f750fc38b --- /dev/null +++ b/src/updates/update_store.rs @@ -0,0 +1,581 @@ +use std::path::Path; +use std::sync::{Arc, RwLock}; + +use crossbeam_channel::Sender; +use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; +use heed::{EnvOpenOptions, Env, Database}; +use serde::{Serialize, Deserialize}; +use chrono::{DateTime, Utc}; + +type BEU64 = heed::zerocopy::U64; + +#[derive(Clone)] +pub struct UpdateStore { + env: Env, + pending_meta: Database, SerdeJson>>, + pending: Database, ByteSlice>, + processed_meta: Database, SerdeJson>>, + failed_meta: Database, SerdeJson>>, + aborted_meta: Database, SerdeJson>>, + processing: Arc>>>, + notification_sender: Sender<()>, +} + +pub trait UpdateHandler { + fn handle_update(&mut self, update_id: u64, meta: Processing, content: &[u8]) -> Result, Failed>; +} + +impl UpdateHandler for F +where F: FnMut(u64, Processing, &[u8]) -> Result, Failed> + Send + 'static { + fn handle_update(&mut self, update_id: u64, meta: Processing, content: &[u8]) -> Result, Failed> { + self(update_id, meta, content) + } +} + +impl UpdateStore { + pub fn open( + size: Option, + path: P, + mut update_handler: U, + ) -> heed::Result>> + where + P: AsRef, + U: UpdateHandler + Send + 'static, + M: for<'a> Deserialize<'a> + Serialize + Send + Sync + Clone, + N: Serialize, + E: Serialize, + { + let mut options = EnvOpenOptions::new(); + if let Some(size) = size { + options.map_size(size); + } + options.max_dbs(5); + + let env = options.open(path)?; + let pending_meta = env.create_database(Some("pending-meta"))?; + let pending = env.create_database(Some("pending"))?; + let processed_meta = env.create_database(Some("processed-meta"))?; + let aborted_meta = env.create_database(Some("aborted-meta"))?; + let failed_meta = env.create_database(Some("failed-meta"))?; + let processing = Arc::new(RwLock::new(None)); + + let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); + // Send a first notification to trigger the process. + let _ = notification_sender.send(()); + + let update_store = Arc::new(UpdateStore { + env, + pending, + pending_meta, + processed_meta, + aborted_meta, + notification_sender, + failed_meta, + processing, + }); + + let update_store_cloned = update_store.clone(); + std::thread::spawn(move || { + // Block and wait for something to process. + for () in notification_receiver { + loop { + match update_store_cloned.process_pending_update(&mut update_handler) { + Ok(Some(_)) => (), + Ok(None) => break, + Err(e) => eprintln!("error while processing update: {}", e), + } + } + } + }); + + Ok(update_store) + } + + /// Returns the new biggest id to use to store the new update. + fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { + let last_pending = self.pending_meta + .remap_data_type::() + .last(txn)? + .map(|(k, _)| k.get()); + + let last_processed = self.processed_meta + .remap_data_type::() + .last(txn)? + .map(|(k, _)| k.get()); + + let last_aborted = self.aborted_meta + .remap_data_type::() + .last(txn)? + .map(|(k, _)| k.get()); + + let last_update_id = [last_pending, last_processed, last_aborted] + .iter() + .copied() + .flatten() + .max(); + + match last_update_id { + Some(last_id) => Ok(last_id + 1), + None => Ok(0), + } + } + + /// Registers the update content in the pending store and the meta + /// into the pending-meta store. Returns the new unique update id. + pub fn register_update(&self, meta: M, content: &[u8]) -> heed::Result> + where M: Serialize, + { + let mut wtxn = self.env.write_txn()?; + + // We ask the update store to give us a new update id, this is safe, + // no other update can have the same id because we use a write txn before + // asking for the id and registering it so other update registering + // will be forced to wait for a new write txn. + let update_id = self.new_update_id(&wtxn)?; + let update_key = BEU64::new(update_id); + + let meta = Pending::new(meta, update_id); + self.pending_meta.put(&mut wtxn, &update_key, &meta)?; + self.pending.put(&mut wtxn, &update_key, content)?; + + wtxn.commit()?; + + if let Err(e) = self.notification_sender.try_send(()) { + assert!(!e.is_disconnected(), "update notification channel is disconnected"); + } + Ok(meta) + } + /// Executes the user provided function on the next pending update (the one with the lowest id). + /// This is asynchronous as it let the user process the update with a read-only txn and + /// only writing the result meta to the processed-meta store *after* it has been processed. + fn process_pending_update(&self, handler: &mut U) -> heed::Result> + where + U: UpdateHandler, + M: for<'a> Deserialize<'a> + Serialize + Clone, + N: Serialize, + E: Serialize, + { + // Create a read transaction to be able to retrieve the pending update in order. + let rtxn = self.env.read_txn()?; + let first_meta = self.pending_meta.first(&rtxn)?; + + // If there is a pending update we process and only keep + // a reader while processing it, not a writer. + match first_meta { + Some((first_id, pending)) => { + let first_content = self.pending + .get(&rtxn, &first_id)? + .expect("associated update content"); + + // we cahnge the state of the update from pending to processing before we pass it + // to the update handler. Processing store is non persistent to be able recover + // from a failure + let processing = pending.processing(); + self.processing + .write() + .unwrap() + .replace(processing.clone()); + // Process the pending update using the provided user function. + let result = handler.handle_update(first_id.get(), processing, first_content); + drop(rtxn); + + // Once the pending update have been successfully processed + // we must remove the content from the pending and processing stores and + // write the *new* meta to the processed-meta store and commit. + let mut wtxn = self.env.write_txn()?; + self.processing + .write() + .unwrap() + .take(); + self.pending_meta.delete(&mut wtxn, &first_id)?; + self.pending.delete(&mut wtxn, &first_id)?; + match result { + Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?, + Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?, + } + wtxn.commit()?; + + Ok(Some(())) + }, + None => Ok(None) + } + } + + /// The id and metadata of the update that is currently being processed, + /// `None` if no update is being processed. + pub fn processing_update(&self) -> heed::Result)>> + where M: for<'a> Deserialize<'a>, + { + let rtxn = self.env.read_txn()?; + match self.pending_meta.first(&rtxn)? { + Some((key, meta)) => Ok(Some((key.get(), meta))), + None => Ok(None), + } + } + + /// Execute the user defined function with the meta-store iterators, the first + /// iterator is the *processed* meta one, the second the *aborted* meta one + /// and, the last is the *pending* meta one. + pub fn iter_metas(&self, mut f: F) -> heed::Result + where + M: for<'a> Deserialize<'a> + Clone, + N: for<'a> Deserialize<'a>, + F: for<'a> FnMut( + Option>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + ) -> heed::Result, + { + let rtxn = self.env.read_txn()?; + + // We get the pending, processed and aborted meta iterators. + let processed_iter = self.processed_meta.iter(&rtxn)?; + let aborted_iter = self.aborted_meta.iter(&rtxn)?; + let pending_iter = self.pending_meta.iter(&rtxn)?; + let processing = self.processing.read().unwrap().clone(); + let failed_iter = self.failed_meta.iter(&rtxn)?; + + // We execute the user defined function with both iterators. + (f)(processing, processed_iter, aborted_iter, pending_iter, failed_iter) + } + + /// Returns the update associated meta or `None` if the update doesn't exist. + pub fn meta(&self, update_id: u64) -> heed::Result>> + where + M: for<'a> Deserialize<'a> + Clone, + N: for<'a> Deserialize<'a>, + E: for<'a> Deserialize<'a>, + { + let rtxn = self.env.read_txn()?; + let key = BEU64::new(update_id); + + if let Some(ref meta) = *self.processing.read().unwrap() { + if meta.id() == update_id { + return Ok(Some(UpdateStatus::Processing(meta.clone()))); + } + } + + println!("pending"); + if let Some(meta) = self.pending_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Pending(meta))); + } + + println!("processed"); + if let Some(meta) = self.processed_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Processed(meta))); + } + + if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Aborted(meta))); + } + + if let Some(meta) = self.failed_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Failed(meta))); + } + + Ok(None) + } + + /// Aborts an update, an aborted update content is deleted and + /// the meta of it is moved into the aborted updates database. + /// + /// Trying to abort an update that is currently being processed, an update + /// that as already been processed or which doesn't actually exist, will + /// return `None`. + pub fn abort_update(&self, update_id: u64) -> heed::Result>> + where M: Serialize + for<'a> Deserialize<'a>, + { + let mut wtxn = self.env.write_txn()?; + let key = BEU64::new(update_id); + + // We cannot abort an update that is currently being processed. + if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) { + return Ok(None); + } + + let pending = match self.pending_meta.get(&wtxn, &key)? { + Some(meta) => meta, + None => return Ok(None), + }; + + let aborted = pending.abort(); + + self.aborted_meta.put(&mut wtxn, &key, &aborted)?; + self.pending_meta.delete(&mut wtxn, &key)?; + self.pending.delete(&mut wtxn, &key)?; + + wtxn.commit()?; + + Ok(Some(aborted)) + } + + /// Aborts all the pending updates, and not the one being currently processed. + /// Returns the update metas and ids that were successfully aborted. + pub fn abort_pendings(&self) -> heed::Result)>> + where M: Serialize + for<'a> Deserialize<'a>, + { + let mut wtxn = self.env.write_txn()?; + let mut aborted_updates = Vec::new(); + + // We skip the first pending update as it is currently being processed. + for result in self.pending_meta.iter(&wtxn)?.skip(1) { + let (key, pending) = result?; + let id = key.get(); + aborted_updates.push((id, pending.abort())); + } + + for (id, aborted) in &aborted_updates { + let key = BEU64::new(*id); + self.aborted_meta.put(&mut wtxn, &key, &aborted)?; + self.pending_meta.delete(&mut wtxn, &key)?; + self.pending.delete(&mut wtxn, &key)?; + } + + wtxn.commit()?; + + Ok(aborted_updates) + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +pub struct Pending { + update_id: u64, + meta: M, + enqueued_at: DateTime, +} + +impl Pending { + fn new(meta: M, update_id: u64) -> Self { + Self { + enqueued_at: Utc::now(), + meta, + update_id, + } + } + + pub fn processing(self) -> Processing { + Processing { + from: self, + started_processing_at: Utc::now(), + } + } + + pub fn abort(self) -> Aborted { + Aborted { + from: self, + aborted_at: Utc::now(), + } + } + + pub fn meta(&self) -> &M { + &self.meta + } + + pub fn id(&self) -> u64 { + self.update_id + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +pub struct Processed { + success: N, + processed_at: DateTime, + #[serde(flatten)] + from: Processing, +} + +impl Processed { + fn id(&self) -> u64 { + self.from.id() + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +pub struct Processing { + #[serde(flatten)] + from: Pending, + started_processing_at: DateTime, +} + +impl Processing { + pub fn id(&self) -> u64 { + self.from.id() + } + + pub fn meta(&self) -> &M { + self.from.meta() + } + + pub fn process(self, meta: N) -> Processed { + Processed { + success: meta, + from: self, + processed_at: Utc::now(), + } + } + + pub fn fail(self, error: E) -> Failed { + Failed { + from: self, + error, + failed_at: Utc::now(), + } + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +pub struct Aborted { + #[serde(flatten)] + from: Pending, + aborted_at: DateTime, +} + +impl Aborted { + fn id(&self) -> u64 { + self.from.id() + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +pub struct Failed { + #[serde(flatten)] + from: Processing, + error: E, + failed_at: DateTime, +} + +impl Failed { + fn id(&self) -> u64 { + self.from.id() + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Serialize)] +#[serde(tag = "status")] +pub enum UpdateStatus { + Processing(Processing), + Pending(Pending), + Processed(Processed), + Aborted(Aborted), + Failed(Failed), +} + +impl UpdateStatus { + pub fn id(&self) -> u64 { + match self { + UpdateStatus::Processing(u) => u.id(), + UpdateStatus::Pending(u) => u.id(), + UpdateStatus::Processed(u) => u.id(), + UpdateStatus::Aborted(u) => u.id(), + UpdateStatus::Failed(u) => u.id(), + } + } +} + +impl From> for UpdateStatus { + fn from(other: Pending) -> Self { + Self::Pending(other) + } +} + +impl From> for UpdateStatus { + fn from(other: Aborted) -> Self { + Self::Aborted(other) + } +} + +impl From> for UpdateStatus { + fn from(other: Processed) -> Self { + Self::Processed(other) + } +} + +impl From> for UpdateStatus { + fn from(other: Processing) -> Self { + Self::Processing(other) + } +} + +impl From> for UpdateStatus { + fn from(other: Failed) -> Self { + Self::Failed(other) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread; + use std::time::{Duration, Instant}; + + #[test] + fn simple() { + let dir = tempfile::tempdir().unwrap(); + let update_store = UpdateStore::open(None, dir, |_id, meta: Processing, _content: &_| -> Result<_, Failed<_, ()>> { + let new_meta = meta.meta().to_string() + " processed"; + let processed = meta.process(new_meta); + Ok(processed) + }).unwrap(); + + let meta = String::from("kiki"); + let update = update_store.register_update(meta, &[]).unwrap(); + thread::sleep(Duration::from_millis(100)); + let meta = update_store.meta(update.id()).unwrap().unwrap(); + if let UpdateStatus::Processed(Processed { success, .. }) = meta { + assert_eq!(success, "kiki processed"); + } else { + panic!() + } + } + + #[test] + #[ignore] + fn long_running_update() { + let dir = tempfile::tempdir().unwrap(); + let update_store = UpdateStore::open(None, dir, |_id, meta: Processing, _content:&_| -> Result<_, Failed<_, ()>> { + thread::sleep(Duration::from_millis(400)); + let new_meta = meta.meta().to_string() + "processed"; + let processed = meta.process(new_meta); + Ok(processed) + }).unwrap(); + + let before_register = Instant::now(); + + let meta = String::from("kiki"); + let update_kiki = update_store.register_update(meta, &[]).unwrap(); + assert!(before_register.elapsed() < Duration::from_millis(200)); + + let meta = String::from("coco"); + let update_coco = update_store.register_update(meta, &[]).unwrap(); + assert!(before_register.elapsed() < Duration::from_millis(200)); + + let meta = String::from("cucu"); + let update_cucu = update_store.register_update(meta, &[]).unwrap(); + assert!(before_register.elapsed() < Duration::from_millis(200)); + + thread::sleep(Duration::from_millis(400 * 3 + 100)); + + let meta = update_store.meta(update_kiki.id()).unwrap().unwrap(); + if let UpdateStatus::Processed(Processed { success, .. }) = meta { + assert_eq!(success, "kiki processed"); + } else { + panic!() + } + + let meta = update_store.meta(update_coco.id()).unwrap().unwrap(); + if let UpdateStatus::Processed(Processed { success, .. }) = meta { + assert_eq!(success, "coco processed"); + } else { + panic!() + } + + let meta = update_store.meta(update_cucu.id()).unwrap().unwrap(); + if let UpdateStatus::Processed(Processed { success, .. }) = meta { + assert_eq!(success, "cucu processed"); + } else { + panic!() + } + } +}