diff --git a/Cargo.lock b/Cargo.lock index 0282bfe92..5ceaef691 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1137,6 +1137,17 @@ dependencies = [ "wasi 0.9.0+wasi-snapshot-preview1", ] +[[package]] +name = "getrandom" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", +] + [[package]] name = "gimli" version = "0.23.0" @@ -1608,7 +1619,7 @@ checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" [[package]] name = "meilisearch-error" -version = "0.18.0" +version = "0.18.1" dependencies = [ "actix-http", ] @@ -1669,6 +1680,7 @@ dependencies = [ "tempfile", "tokio", "ureq", + "uuid", "vergen", "walkdir", "whoami", @@ -2263,7 +2275,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" dependencies = [ - "getrandom", + "getrandom 0.1.15", "libc", "rand_chacha", "rand_core 0.5.1", @@ -2302,7 +2314,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" dependencies = [ - "getrandom", + "getrandom 0.1.15", ] [[package]] @@ -3365,11 +3377,11 @@ checksum = "9071ac216321a4470a69fb2b28cfc68dcd1a39acd877c8be8e014df6772d8efa" [[package]] name = "uuid" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ - "rand 0.7.3", + "getrandom 0.2.2", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 26a3dc1b8..eb2fca43d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ dashmap = "4.0.2" page_size = "0.4.2" obkv = "0.1.1" ouroboros = "0.8.0" +uuid = "0.8.2" [dependencies.sentry] default-features = false diff --git a/src/data/mod.rs b/src/data/mod.rs index 4258278d2..7494792bc 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -5,19 +5,20 @@ pub use search::{SearchQuery, SearchResult}; use std::ops::Deref; use std::sync::Arc; +use std::fs::create_dir_all; use sha2::Digest; use crate::{option::Opt, index_controller::Settings}; -use crate::index_controller::{IndexStore, UpdateStore}; +use crate::index_controller::{IndexController, LocalIndexController}; #[derive(Clone)] pub struct Data { - inner: Arc>, + inner: Arc, } impl Deref for Data { - type Target = DataInner; + type Target = DataInner; fn deref(&self) -> &Self::Target { &self.inner @@ -25,8 +26,8 @@ impl Deref for Data { } #[derive(Clone)] -pub struct DataInner { - pub indexes: Arc, +pub struct DataInner { + pub index_controller: Arc, api_keys: ApiKeys, options: Opt, } @@ -58,8 +59,9 @@ impl ApiKeys { impl Data { pub fn new(options: Opt) -> anyhow::Result { let path = options.db_path.clone(); - let index_store = IndexStore::new(&path)?; - let index_controller = UpdateStore::new(index_store); + let indexer_opts = options.indexer_options.clone(); + create_dir_all(&path)?; + let index_controller = LocalIndexController::new(&path, indexer_opts)?; let indexes = Arc::new(index_controller); let mut api_keys = ApiKeys { @@ -70,28 +72,31 @@ impl Data { api_keys.generate_missing_api_keys(); - let inner = DataInner { indexes, options, api_keys }; + let inner = DataInner { index_controller: indexes, options, api_keys }; let inner = Arc::new(inner); Ok(Data { inner }) } pub fn settings>(&self, index_uid: S) -> anyhow::Result { - let index = self.indexes - .get(&index_uid)? + let index = self.index_controller + .index(&index_uid)? .ok_or_else(|| anyhow::anyhow!("Index {} does not exist.", index_uid.as_ref()))?; + let txn = index.read_txn()?; + let displayed_attributes = index - .displayed_fields()? + .displayed_fields(&txn)? .map(|fields| fields.into_iter().map(String::from).collect()) .unwrap_or_else(|| vec!["*".to_string()]); let searchable_attributes = index - .searchable_fields()? + .searchable_fields(&txn)? .map(|fields| fields.into_iter().map(String::from).collect()) .unwrap_or_else(|| vec!["*".to_string()]); - let faceted_attributes = index.faceted_fields()? + let faceted_attributes = index + .faceted_fields(&txn)? .into_iter() .map(|(k, v)| (k, v.to_string())) .collect(); diff --git a/src/data/search.rs b/src/data/search.rs index c30345073..246e3bdac 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -4,11 +4,11 @@ use std::time::Instant; use serde_json::{Value, Map}; use serde::{Deserialize, Serialize}; -use milli::{SearchResult as Results, obkv_to_json}; +use milli::{Index, obkv_to_json, FacetCondition}; use meilisearch_tokenizer::{Analyzer, AnalyzerConfig}; +use anyhow::bail; -use crate::error::Error; - +use crate::index_controller::IndexController; use super::Data; const DEFAULT_SEARCH_LIMIT: usize = 20; @@ -26,11 +26,68 @@ pub struct SearchQuery { pub attributes_to_retrieve: Option>, pub attributes_to_crop: Option>, pub crop_length: Option, - pub attributes_to_highlight: Option>, + pub attributes_to_highlight: Option>, pub filters: Option, pub matches: Option, pub facet_filters: Option, pub facets_distribution: Option>, + pub facet_condition: Option, +} + +impl SearchQuery { + pub fn perform(&self, index: impl AsRef) -> anyhow::Result{ + let index = index.as_ref(); + + let before_search = Instant::now(); + let rtxn = index.read_txn().unwrap(); + + let mut search = index.search(&rtxn); + + if let Some(ref query) = self.q { + search.query(query); + } + + if let Some(ref condition) = self.facet_condition { + if !condition.trim().is_empty() { + let condition = FacetCondition::from_str(&rtxn, &index, &condition).unwrap(); + search.facet_condition(condition); + } + } + + if let Some(offset) = self.offset { + search.offset(offset); + } + + let milli::SearchResult { documents_ids, found_words, nb_hits, limit, } = search.execute()?; + + let mut documents = Vec::new(); + let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + + let displayed_fields = match index.displayed_fields_ids(&rtxn).unwrap() { + Some(fields) => fields, + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; + + let stop_words = fst::Set::default(); + let highlighter = Highlighter::new(&stop_words); + + for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() { + let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap(); + if let Some(ref attributes_to_highlight) = self.attributes_to_highlight { + highlighter.highlight_record(&mut object, &found_words, attributes_to_highlight); + } + documents.push(object); + } + + Ok(SearchResult { + hits: documents, + nb_hits, + query: self.q.clone().unwrap_or_default(), + limit, + offset: self.offset.unwrap_or_default(), + processing_time_ms: before_search.elapsed().as_millis(), + }) + } } #[derive(Serialize)] @@ -105,45 +162,9 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> { impl Data { pub fn search>(&self, index: S, search_query: SearchQuery) -> anyhow::Result { - let start = Instant::now(); - let index = self.indexes - .get(&index)? - .ok_or_else(|| Error::OpenIndex(format!("Index {} doesn't exists.", index.as_ref())))?; - - let Results { found_words, documents_ids, nb_hits, limit, .. } = index.search(&search_query)?; - - let fields_ids_map = index.fields_ids_map()?; - - let displayed_fields = match index.displayed_fields_ids()? { - Some(fields) => fields, - None => fields_ids_map.iter().map(|(id, _)| id).collect(), - }; - - let attributes_to_highlight = match search_query.attributes_to_highlight { - Some(fields) => fields.iter().map(ToOwned::to_owned).collect(), - None => HashSet::new(), - }; - - let stop_words = fst::Set::default(); - let highlighter = Highlighter::new(&stop_words); - let mut documents = Vec::new(); - for (_id, obkv) in index.documents(&documents_ids)? { - let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap(); - highlighter.highlight_record(&mut object, &found_words, &attributes_to_highlight); - documents.push(object); + match self.index_controller.index(&index)? { + Some(index) => Ok(search_query.perform(index)?), + None => bail!("index {:?} doesn't exists", index.as_ref()), } - - let processing_time_ms = start.elapsed().as_millis(); - - let result = SearchResult { - hits: documents, - nb_hits, - query: search_query.q.unwrap_or_default(), - offset: search_query.offset.unwrap_or(0), - limit, - processing_time_ms, - }; - - Ok(result) } } diff --git a/src/data/updates.rs b/src/data/updates.rs index 507223d41..d05617361 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -1,15 +1,12 @@ use std::ops::Deref; use milli::update::{IndexDocumentsMethod, UpdateFormat}; -//use milli::update_store::UpdateStatus; use async_compression::tokio_02::write::GzipEncoder; use futures_util::stream::StreamExt; use tokio::io::AsyncWriteExt; use super::Data; -use crate::index_controller::IndexController; -use crate::index_controller::{UpdateStatusResponse, Settings}; - +use crate::index_controller::{IndexController, UpdateStatusResponse, Settings}; impl Data { pub async fn add_documents( @@ -39,8 +36,8 @@ impl Data { let file = file.into_std().await; let mmap = unsafe { memmap::Mmap::map(&file)? }; - let indexes = self.indexes.clone(); - let update = tokio::task::spawn_blocking(move ||indexes.add_documents(index, method, format, &mmap[..])).await??; + let index_controller = self.index_controller.clone(); + let update = tokio::task::spawn_blocking(move ||index_controller.add_documents(index, method, format, &mmap[..])).await??; Ok(update.into()) } @@ -49,7 +46,7 @@ impl Data { index: S, settings: Settings ) -> anyhow::Result { - let indexes = self.indexes.clone(); + let indexes = self.index_controller.clone(); let update = tokio::task::spawn_blocking(move || indexes.update_settings(index, settings)).await??; Ok(update.into()) } diff --git a/src/index_controller/_update_store/mod.rs b/src/index_controller/_update_store/mod.rs new file mode 100644 index 000000000..ef8711ded --- /dev/null +++ b/src/index_controller/_update_store/mod.rs @@ -0,0 +1,17 @@ +use std::sync::Arc; + +use heed::Env; + +use super::IndexStore; + +pub struct UpdateStore { + env: Env, + index_store: Arc, +} + +impl UpdateStore { + pub fn new(env: Env, index_store: Arc) -> anyhow::Result { + Ok(Self { env, index_store }) + } +} + diff --git a/src/index_controller/index_store.rs b/src/index_controller/index_store.rs index 6ba6621c3..6652086f9 100644 --- a/src/index_controller/index_store.rs +++ b/src/index_controller/index_store.rs @@ -1,78 +1,12 @@ -use std::fs::File; -use std::io::{Read, Write}; -use std::path::{Path, PathBuf}; use std::sync::Arc; use std::collections::HashMap; use anyhow::Result; -use chrono::{DateTime, Utc}; -use dashmap::DashMap; -use heed::types::{Str, SerdeBincode}; -use heed::{EnvOpenOptions, Env, Database}; use milli::{Index, FieldsIdsMap, SearchResult, FieldId, facet::FacetType}; -use serde::{Serialize, Deserialize}; use ouroboros::self_referencing; use crate::data::SearchQuery; -const CONTROLLER_META_FILENAME: &str = "index_controller_meta"; -const INDEXES_CONTROLLER_FILENAME: &str = "indexes_db"; -const INDEXES_DB_NAME: &str = "indexes_db"; - - -#[derive(Debug, Serialize, Deserialize)] -struct IndexStoreMeta { - open_options: EnvOpenOptions, - created_at: DateTime, -} - -impl IndexStoreMeta { - fn from_path(path: impl AsRef) -> Result> { - let mut path = path.as_ref().to_path_buf(); - path.push(CONTROLLER_META_FILENAME); - if path.exists() { - let mut file = File::open(path)?; - let mut buffer = Vec::new(); - let n = file.read_to_end(&mut buffer)?; - let meta: IndexStoreMeta = serde_json::from_slice(&buffer[..n])?; - Ok(Some(meta)) - } else { - Ok(None) - } - } - - fn to_path(self, path: impl AsRef) -> Result<()> { - let mut path = path.as_ref().to_path_buf(); - path.push(CONTROLLER_META_FILENAME); - if path.exists() { - Err(anyhow::anyhow!("Index controller metadata already exists")) - } else { - let mut file = File::create(path)?; - let json = serde_json::to_vec(&self)?; - file.write_all(&json)?; - Ok(()) - } - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct IndexMetadata { - created_at: DateTime, - open_options: EnvOpenOptions, - uuid: String, -} - -impl IndexMetadata { - fn open_index(self, path: impl AsRef) -> Result { - // create a path in the form "db_path/indexes/index_id" - let mut path = path.as_ref().to_path_buf(); - path.push("indexes"); - path.push(&self.uuid); - Ok(Index::new(self.open_options, path)?) - } -} - - #[self_referencing] pub struct IndexView { pub index: Arc, @@ -136,120 +70,5 @@ impl IndexView { let index = self.borrow_index(); Ok(index.documents(txn, ids.into_iter().copied())?) } - - //pub async fn add_documents( - //&self, - //method: IndexDocumentsMethod, - //format: UpdateFormat, - //mut stream: impl futures::Stream> + Unpin, - //) -> anyhow::Result - //where - //B: Deref, - //E: std::error::Error + Send + Sync + 'static, - //{ - //let file = tokio::task::spawn_blocking(tempfile::tempfile).await?; - //let file = tokio::fs::File::from_std(file?); - //let mut encoder = GzipEncoder::new(file); - - //while let Some(result) = stream.next().await { - //let bytes = &*result?; - //encoder.write_all(&bytes[..]).await?; - //} - - //encoder.shutdown().await?; - //let mut file = encoder.into_inner(); - //file.sync_all().await?; - //let file = file.into_std().await; - //let mmap = unsafe { memmap::Mmap::map(&file)? }; - - //let meta = UpdateMeta::DocumentsAddition { method, format }; - - //let index = self.index.clone(); - //let queue = self.update_store.clone(); - //let update = tokio::task::spawn_blocking(move || queue.register_update(index, meta, &mmap[..])).await??; - //Ok(update.into()) - //} } -pub struct IndexStore { - path: PathBuf, - env: Env, - indexes_db: Database>, - indexes: DashMap)>, -} - -impl IndexStore { - /// Open the index controller from meta found at path, and create a new one if no meta is - /// found. - pub fn new(path: impl AsRef) -> Result { - // If index controller metadata is present, we return the env, otherwise, we create a new - // metadata from scratch before returning a new env. - let path = path.as_ref().to_path_buf(); - let env = match IndexStoreMeta::from_path(&path)? { - Some(meta) => meta.open_options.open(INDEXES_CONTROLLER_FILENAME)?, - None => { - let mut open_options = EnvOpenOptions::new(); - open_options.map_size(page_size::get() * 1000); - let env = open_options.open(INDEXES_CONTROLLER_FILENAME)?; - let created_at = Utc::now(); - let meta = IndexStoreMeta { open_options: open_options.clone(), created_at }; - meta.to_path(&path)?; - env - } - }; - let indexes = DashMap::new(); - let indexes_db = match env.open_database(Some(INDEXES_DB_NAME))? { - Some(indexes_db) => indexes_db, - None => env.create_database(Some(INDEXES_DB_NAME))?, - }; - - Ok(Self { env, indexes, indexes_db, path }) - } - - pub fn get_or_create>(&self, _name: S) -> Result { - todo!() - } - - /// Get an index with read access to the db. The index are lazily loaded, meaning that we first - /// check for its exixtence in the indexes map, and if it doesn't exist, the index db is check - /// for metadata to launch the index. - pub fn get>(&self, name: S) -> Result> { - match self.indexes.get(name.as_ref()) { - Some(entry) => { - let index = entry.1.clone(); - let uuid = entry.0.clone(); - let view = IndexView::try_new(index, |index| index.read_txn(), uuid)?; - Ok(Some(view)) - } - None => { - let txn = self.env.read_txn()?; - match self.indexes_db.get(&txn, name.as_ref())? { - Some(meta) => { - let uuid = meta.uuid.clone(); - let index = Arc::new(meta.open_index(&self.path)?); - self.indexes.insert(name.as_ref().to_owned(), (uuid.clone(), index.clone())); - let view = IndexView::try_new(index, |index| index.read_txn(), uuid)?; - Ok(Some(view)) - } - None => Ok(None) - } - } - } - } - - pub fn get_mut>(&self, _name: S) -> Result> { - todo!() - } - - pub async fn delete_index>(&self, _name:S) -> Result<()> { - todo!() - } - - pub async fn list_indices(&self) -> Result> { - todo!() - } - - pub async fn rename_index(&self, _old: &str, _new: &str) -> Result<()> { - todo!() - } -} diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs new file mode 100644 index 000000000..bfe63459a --- /dev/null +++ b/src/index_controller/local_index_controller/index_store.rs @@ -0,0 +1,188 @@ +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use dashmap::DashMap; +use dashmap::mapref::entry::Entry; +use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; +use milli::Index; +use rayon::ThreadPool; +use uuid::Uuid; +use serde::{Serialize, Deserialize}; + +use super::update_store::UpdateStore; +use super::update_handler::UpdateHandler; +use crate::option::IndexerOpts; + +#[derive(Serialize, Deserialize, Debug)] +struct IndexMeta { + update_size: usize, + index_size: usize, + uid: Uuid, +} + +impl IndexMeta { + fn open( + &self, + path: impl AsRef, + thread_pool: Arc, + opt: &IndexerOpts, + ) -> anyhow::Result<(Arc, Arc)> { + let update_path = make_update_db_path(&path, &self.uid); + let index_path = make_index_db_path(&path, &self.uid); + + let mut options = EnvOpenOptions::new(); + options.map_size(self.index_size); + let index = Arc::new(Index::new(options, index_path)?); + + let mut options = EnvOpenOptions::new(); + options.map_size(self.update_size); + let handler = UpdateHandler::new(opt, index.clone(), thread_pool)?; + let update_store = UpdateStore::open(options, update_path, handler)?; + Ok((index, update_store)) + } +} + +pub struct IndexStore { + env: Env, + name_to_uid: DashMap, + name_to_uid_db: Database, + uid_to_index: DashMap, Arc)>, + uid_to_index_db: Database>, + + thread_pool: Arc, + opt: IndexerOpts, +} + +impl IndexStore { + pub fn new(path: impl AsRef, opt: IndexerOpts) -> anyhow::Result { + let env = EnvOpenOptions::new() + .map_size(4096 * 100) + .max_dbs(2) + .open(path)?; + + let name_to_uid = DashMap::new(); + let uid_to_index = DashMap::new(); + let name_to_uid_db = open_or_create_database(&env, Some("name_to_uid"))?; + let uid_to_index_db = open_or_create_database(&env, Some("uid_to_index_db"))?; + + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(opt.indexing_jobs.unwrap_or(0)) + .build()?; + let thread_pool = Arc::new(thread_pool); + + Ok(Self { + env, + name_to_uid, + name_to_uid_db, + uid_to_index, + uid_to_index_db, + + thread_pool, + opt, + }) + } + + fn index_uid(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result> { + match self.name_to_uid.entry(name.as_ref().to_string()) { + Entry::Vacant(entry) => { + match self.name_to_uid_db.get(txn, name.as_ref())? { + Some(bytes) => { + let uuid = Uuid::from_slice(bytes)?; + entry.insert(uuid); + Ok(Some(uuid)) + } + None => Ok(None) + } + } + Entry::Occupied(entry) => Ok(Some(entry.get().clone())), + } + } + + fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result, Arc)>> { + match self.uid_to_index.entry(uid.clone()) { + Entry::Vacant(entry) => { + match self.uid_to_index_db.get(txn, uid.as_bytes())? { + Some(meta) => { + let path = self.env.path(); + let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.opt)?; + entry.insert((index.clone(), updates.clone())); + Ok(Some((index, updates))) + }, + None => Ok(None) + } + } + Entry::Occupied(entry) => { + let (index, updates) = entry.get(); + Ok(Some((index.clone(), updates.clone()))) + } + } + } + + fn _get_index(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result, Arc)>> { + match self.index_uid(&txn, name)? { + Some(uid) => self.retrieve_index(&txn, uid), + None => Ok(None), + } + } + + pub fn index(&self, name: impl AsRef) -> anyhow::Result, Arc)>> { + let txn = self.env.read_txn()?; + self._get_index(&txn, name) + } + + pub fn get_or_create_index( + &self, name: impl AsRef, + update_size: usize, + index_size: usize, + ) -> anyhow::Result<(Arc, Arc)> { + let mut txn = self.env.write_txn()?; + match self._get_index(&txn, name.as_ref())? { + Some(res) => Ok(res), + None => { + let uid = Uuid::new_v4(); + // TODO: clean in case of error + Ok(self.create_index(&mut txn, uid, name, update_size, index_size)?) + }, + } + } + + fn create_index( &self, + txn: &mut RwTxn, + uid: Uuid, + name: impl AsRef, + update_size: usize, + index_size: usize, + ) -> anyhow::Result<(Arc, Arc)> { + let meta = IndexMeta { update_size, index_size, uid: uid.clone() }; + + self.name_to_uid_db.put(txn, name.as_ref(), uid.as_bytes())?; + self.uid_to_index_db.put(txn, uid.as_bytes(), &meta)?; + + let path = self.env.path(); + let (index, update_store) = meta.open(path, self.thread_pool.clone(), &self.opt)?; + + self.name_to_uid.insert(name.as_ref().to_string(), uid); + self.uid_to_index.insert(uid, (index.clone(), update_store.clone())); + + Ok((index, update_store)) + } +} + +fn open_or_create_database(env: &Env, name: Option<&str>) -> anyhow::Result> { + match env.open_database::(name)? { + Some(db) => Ok(db), + None => Ok(env.create_database::(name)?), + } +} + +fn make_update_db_path(path: impl AsRef, uid: &Uuid) -> PathBuf { + let mut path = path.as_ref().to_path_buf(); + path.push(format!("update{}", uid)); + path +} + +fn make_index_db_path(path: impl AsRef, uid: &Uuid) -> PathBuf { + let mut path = path.as_ref().to_path_buf(); + path.push(format!("index{}", uid)); + path +} diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs new file mode 100644 index 000000000..dc26d0da8 --- /dev/null +++ b/src/index_controller/local_index_controller/mod.rs @@ -0,0 +1,57 @@ +mod update_store; +mod index_store; +mod update_handler; + +use index_store::IndexStore; + +use std::path::Path; +use std::sync::Arc; + +use milli::Index; + +use crate::option::IndexerOpts; +use super::IndexController; + +pub struct LocalIndexController { + indexes: IndexStore, +} + +impl LocalIndexController { + pub fn new(path: impl AsRef, opt: IndexerOpts) -> anyhow::Result { + let indexes = IndexStore::new(path, opt)?; + Ok(Self { indexes }) + } +} + +impl IndexController for LocalIndexController { + fn add_documents>( + &self, + _index: S, + _method: milli::update::IndexDocumentsMethod, + _format: milli::update::UpdateFormat, + _data: &[u8], + ) -> anyhow::Result { + todo!() + } + + fn update_settings>(&self, _index_uid: S, _settings: super::Settings) -> anyhow::Result { + todo!() + } + + fn create_index>(&self, _index_uid: S) -> anyhow::Result<()> { + todo!() + } + + fn delete_index>(&self, _index_uid: S) -> anyhow::Result<()> { + todo!() + } + + fn swap_indices, S2: AsRef>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> { + todo!() + } + + fn index(&self, name: impl AsRef) -> anyhow::Result>> { + let index = self.indexes.index(name)?.map(|(i, _)| i); + Ok(index) + } +} diff --git a/src/index_controller/local_index_controller/update_handler.rs b/src/index_controller/local_index_controller/update_handler.rs new file mode 100644 index 000000000..fae3ad0ae --- /dev/null +++ b/src/index_controller/local_index_controller/update_handler.rs @@ -0,0 +1,206 @@ +use std::io; +use std::sync::Arc; +use std::collections::HashMap; + +use anyhow::Result; +use flate2::read::GzDecoder; +use grenad::CompressionType; +use log::info; +use milli::Index; +use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod}; +use rayon::ThreadPool; + +use crate::index_controller::updates::{Processing, Processed, Failed}; +use crate::index_controller::{UpdateResult, UpdateMeta, Settings, Facets}; +use crate::option::IndexerOpts; +use super::update_store::HandleUpdate; + +pub struct UpdateHandler { + index: Arc, + max_nb_chunks: Option, + chunk_compression_level: Option, + thread_pool: Arc, + log_frequency: usize, + max_memory: usize, + linked_hash_map_size: usize, + chunk_compression_type: CompressionType, + chunk_fusing_shrink_size: u64, +} + +impl UpdateHandler { + pub fn new( + opt: &IndexerOpts, + index: Arc, + thread_pool: Arc, + ) -> anyhow::Result { + Ok(Self { + index, + max_nb_chunks: opt.max_nb_chunks, + chunk_compression_level: opt.chunk_compression_level, + thread_pool, + log_frequency: opt.log_every_n, + max_memory: opt.max_memory.get_bytes() as usize, + linked_hash_map_size: opt.linked_hash_map_size, + chunk_compression_type: opt.chunk_compression_type, + chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(), + }) + } + + fn update_buidler(&self, update_id: u64) -> UpdateBuilder { + // We prepare the update by using the update builder. + let mut update_builder = UpdateBuilder::new(update_id); + if let Some(max_nb_chunks) = self.max_nb_chunks { + update_builder.max_nb_chunks(max_nb_chunks); + } + if let Some(chunk_compression_level) = self.chunk_compression_level { + update_builder.chunk_compression_level(chunk_compression_level); + } + update_builder.thread_pool(&self.thread_pool); + update_builder.log_every_n(self.log_frequency); + update_builder.max_memory(self.max_memory); + update_builder.linked_hash_map_size(self.linked_hash_map_size); + update_builder.chunk_compression_type(self.chunk_compression_type); + update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size); + update_builder + } + + fn update_documents( + &self, + format: UpdateFormat, + method: IndexDocumentsMethod, + content: &[u8], + update_builder: UpdateBuilder, + ) -> anyhow::Result { + // We must use the write transaction of the update here. + let mut wtxn = self.index.write_txn()?; + let mut builder = update_builder.index_documents(&mut wtxn, &self.index); + builder.update_format(format); + builder.index_documents_method(method); + + let gzipped = true; + let reader = if gzipped { + Box::new(GzDecoder::new(content)) + } else { + Box::new(content) as Box + }; + + let result = builder.execute(reader, |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); + + match result { + Ok(addition_result) => wtxn + .commit() + .and(Ok(UpdateResult::DocumentsAddition(addition_result))) + .map_err(Into::into), + Err(e) => Err(e.into()) + } + } + + fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result { + // We must use the write transaction of the update here. + let mut wtxn = self.index.write_txn()?; + let builder = update_builder.clear_documents(&mut wtxn, &self.index); + + match builder.execute() { + Ok(_count) => wtxn + .commit() + .and(Ok(UpdateResult::Other)) + .map_err(Into::into), + Err(e) => Err(e.into()) + } + } + + fn update_settings(&self, settings: &Settings, update_builder: UpdateBuilder) -> anyhow::Result { + // We must use the write transaction of the update here. + let mut wtxn = self.index.write_txn()?; + let mut builder = update_builder.settings(&mut wtxn, &self.index); + + // We transpose the settings JSON struct into a real setting update. + if let Some(ref names) = settings.searchable_attributes { + match names { + Some(names) => builder.set_searchable_fields(names.clone()), + None => builder.reset_searchable_fields(), + } + } + + // We transpose the settings JSON struct into a real setting update. + if let Some(ref names) = settings.displayed_attributes { + match names { + Some(names) => builder.set_displayed_fields(names.clone()), + None => builder.reset_displayed_fields(), + } + } + + // We transpose the settings JSON struct into a real setting update. + if let Some(ref facet_types) = settings.faceted_attributes { + let facet_types = facet_types.clone().unwrap_or_else(|| HashMap::new()); + builder.set_faceted_fields(facet_types); + } + + // We transpose the settings JSON struct into a real setting update. + if let Some(ref criteria) = settings.criteria { + match criteria { + Some(criteria) => builder.set_criteria(criteria.clone()), + None => builder.reset_criteria(), + } + } + + let result = builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); + + match result { + Ok(()) => wtxn + .commit() + .and(Ok(UpdateResult::Other)) + .map_err(Into::into), + Err(e) => Err(e.into()) + } + } + + fn update_facets( + &self, + levels: &Facets, + update_builder: UpdateBuilder + ) -> anyhow::Result { + // We must use the write transaction of the update here. + let mut wtxn = self.index.write_txn()?; + let mut builder = update_builder.facets(&mut wtxn, &self.index); + if let Some(value) = levels.level_group_size { + builder.level_group_size(value); + } + if let Some(value) = levels.min_level_size { + builder.min_level_size(value); + } + match builder.execute() { + Ok(()) => wtxn + .commit() + .and(Ok(UpdateResult::Other)) + .map_err(Into::into), + Err(e) => Err(e.into()) + } + } +} + +impl HandleUpdate for UpdateHandler { + fn handle_update( + &mut self, + update_id: u64, + meta: Processing, + content: &[u8] + ) -> Result, Failed> { + use UpdateMeta::*; + + let update_builder = self.update_buidler(update_id); + + let result = match meta.meta() { + DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder), + ClearDocuments => self.clear_documents(update_builder), + Settings(settings) => self.update_settings(settings, update_builder), + Facets(levels) => self.update_facets(levels, update_builder), + }; + + match result { + Ok(result) => Ok(meta.process(result)), + Err(e) => Err(meta.fail(e.to_string())), + } + } +} + diff --git a/src/index_controller/local_index_controller/update_store.rs b/src/index_controller/local_index_controller/update_store.rs new file mode 100644 index 000000000..94543b0a1 --- /dev/null +++ b/src/index_controller/local_index_controller/update_store.rs @@ -0,0 +1,311 @@ +use std::path::Path; +use std::sync::{Arc, RwLock}; + +use crossbeam_channel::Sender; +use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; +use heed::{EnvOpenOptions, Env, Database}; + +use crate::index_controller::updates::*; +use crate::index_controller::{UpdateMeta, UpdateResult}; + +type BEU64 = heed::zerocopy::U64; + +#[derive(Clone)] +pub struct UpdateStore { + env: Env, + pending_meta: Database, SerdeJson>>, + pending: Database, ByteSlice>, + processed_meta: Database, SerdeJson>>, + failed_meta: Database, SerdeJson>>, + aborted_meta: Database, SerdeJson>>, + processing: Arc>>>, + notification_sender: Sender<()>, +} + +pub trait HandleUpdate { + fn handle_update(&mut self, update_id: u64, meta: Processing, content: &[u8]) -> Result, Failed>; +} + +impl UpdateStore { + pub fn open( + mut options: EnvOpenOptions, + path: P, + mut update_handler: U, + ) -> heed::Result> + where + P: AsRef, + U: HandleUpdate + Send + 'static, + { + options.max_dbs(5); + + let env = options.open(path)?; + let pending_meta = env.create_database(Some("pending-meta"))?; + let pending = env.create_database(Some("pending"))?; + let processed_meta = env.create_database(Some("processed-meta"))?; + let aborted_meta = env.create_database(Some("aborted-meta"))?; + let failed_meta = env.create_database(Some("failed-meta"))?; + let processing = Arc::new(RwLock::new(None)); + + let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); + // Send a first notification to trigger the process. + let _ = notification_sender.send(()); + + let update_store = Arc::new(UpdateStore { + env, + pending, + pending_meta, + processed_meta, + aborted_meta, + notification_sender, + failed_meta, + processing, + }); + + let update_store_cloned = update_store.clone(); + std::thread::spawn(move || { + // Block and wait for something to process. + for () in notification_receiver { + loop { + match update_store_cloned.process_pending_update(&mut update_handler) { + Ok(Some(_)) => (), + Ok(None) => break, + Err(e) => eprintln!("error while processing update: {}", e), + } + } + } + }); + + Ok(update_store) + } + + /// Returns the new biggest id to use to store the new update. + fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { + let last_pending = self.pending_meta + .remap_data_type::() + .last(txn)? + .map(|(k, _)| k.get()); + + let last_processed = self.processed_meta + .remap_data_type::() + .last(txn)? + .map(|(k, _)| k.get()); + + let last_aborted = self.aborted_meta + .remap_data_type::() + .last(txn)? + .map(|(k, _)| k.get()); + + let last_update_id = [last_pending, last_processed, last_aborted] + .iter() + .copied() + .flatten() + .max(); + + match last_update_id { + Some(last_id) => Ok(last_id + 1), + None => Ok(0), + } + } + + /// Registers the update content in the pending store and the meta + /// into the pending-meta store. Returns the new unique update id. + pub fn register_update( + &self, + meta: UpdateMeta, + content: &[u8] + ) -> heed::Result> { + let mut wtxn = self.env.write_txn()?; + + // We ask the update store to give us a new update id, this is safe, + // no other update can have the same id because we use a write txn before + // asking for the id and registering it so other update registering + // will be forced to wait for a new write txn. + let update_id = self.new_update_id(&wtxn)?; + let update_key = BEU64::new(update_id); + + let meta = Pending::new(meta, update_id); + self.pending_meta.put(&mut wtxn, &update_key, &meta)?; + self.pending.put(&mut wtxn, &update_key, content)?; + + wtxn.commit()?; + + if let Err(e) = self.notification_sender.try_send(()) { + assert!(!e.is_disconnected(), "update notification channel is disconnected"); + } + Ok(meta) + } + /// Executes the user provided function on the next pending update (the one with the lowest id). + /// This is asynchronous as it let the user process the update with a read-only txn and + /// only writing the result meta to the processed-meta store *after* it has been processed. + fn process_pending_update(&self, handler: &mut U) -> heed::Result> + where + U: HandleUpdate + Send + 'static, + { + // Create a read transaction to be able to retrieve the pending update in order. + let rtxn = self.env.read_txn()?; + let first_meta = self.pending_meta.first(&rtxn)?; + + // If there is a pending update we process and only keep + // a reader while processing it, not a writer. + match first_meta { + Some((first_id, pending)) => { + let first_content = self.pending + .get(&rtxn, &first_id)? + .expect("associated update content"); + + // we cahnge the state of the update from pending to processing before we pass it + // to the update handler. Processing store is non persistent to be able recover + // from a failure + let processing = pending.processing(); + self.processing + .write() + .unwrap() + .replace(processing.clone()); + // Process the pending update using the provided user function. + let result = handler.handle_update(first_id.get(), processing, first_content); + drop(rtxn); + + // Once the pending update have been successfully processed + // we must remove the content from the pending and processing stores and + // write the *new* meta to the processed-meta store and commit. + let mut wtxn = self.env.write_txn()?; + self.processing + .write() + .unwrap() + .take(); + self.pending_meta.delete(&mut wtxn, &first_id)?; + self.pending.delete(&mut wtxn, &first_id)?; + match result { + Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?, + Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?, + } + wtxn.commit()?; + + Ok(Some(())) + }, + None => Ok(None) + } + } + + /// The id and metadata of the update that is currently being processed, + /// `None` if no update is being processed. + pub fn processing_update(&self) -> heed::Result)>> { + let rtxn = self.env.read_txn()?; + match self.pending_meta.first(&rtxn)? { + Some((key, meta)) => Ok(Some((key.get(), meta))), + None => Ok(None), + } + } + + /// Execute the user defined function with the meta-store iterators, the first + /// iterator is the *processed* meta one, the second the *aborted* meta one + /// and, the last is the *pending* meta one. + pub fn iter_metas(&self, mut f: F) -> heed::Result + where + F: for<'a> FnMut( + Option>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + ) -> heed::Result, + { + let rtxn = self.env.read_txn()?; + + // We get the pending, processed and aborted meta iterators. + let processed_iter = self.processed_meta.iter(&rtxn)?; + let aborted_iter = self.aborted_meta.iter(&rtxn)?; + let pending_iter = self.pending_meta.iter(&rtxn)?; + let processing = self.processing.read().unwrap().clone(); + let failed_iter = self.failed_meta.iter(&rtxn)?; + + // We execute the user defined function with both iterators. + (f)(processing, processed_iter, aborted_iter, pending_iter, failed_iter) + } + + /// Returns the update associated meta or `None` if the update doesn't exist. + pub fn meta(&self, update_id: u64) -> heed::Result>> { + let rtxn = self.env.read_txn()?; + let key = BEU64::new(update_id); + + if let Some(ref meta) = *self.processing.read().unwrap() { + if meta.id() == update_id { + return Ok(Some(UpdateStatus::Processing(meta.clone()))); + } + } + + if let Some(meta) = self.pending_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Pending(meta))); + } + + if let Some(meta) = self.processed_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Processed(meta))); + } + + if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Aborted(meta))); + } + + if let Some(meta) = self.failed_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Failed(meta))); + } + + Ok(None) + } + + /// Aborts an update, an aborted update content is deleted and + /// the meta of it is moved into the aborted updates database. + /// + /// Trying to abort an update that is currently being processed, an update + /// that as already been processed or which doesn't actually exist, will + /// return `None`. + pub fn abort_update(&self, update_id: u64) -> heed::Result>> { + let mut wtxn = self.env.write_txn()?; + let key = BEU64::new(update_id); + + // We cannot abort an update that is currently being processed. + if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) { + return Ok(None); + } + + let pending = match self.pending_meta.get(&wtxn, &key)? { + Some(meta) => meta, + None => return Ok(None), + }; + + let aborted = pending.abort(); + + self.aborted_meta.put(&mut wtxn, &key, &aborted)?; + self.pending_meta.delete(&mut wtxn, &key)?; + self.pending.delete(&mut wtxn, &key)?; + + wtxn.commit()?; + + Ok(Some(aborted)) + } + + /// Aborts all the pending updates, and not the one being currently processed. + /// Returns the update metas and ids that were successfully aborted. + pub fn abort_pendings(&self) -> heed::Result)>> { + let mut wtxn = self.env.write_txn()?; + let mut aborted_updates = Vec::new(); + + // We skip the first pending update as it is currently being processed. + for result in self.pending_meta.iter(&wtxn)?.skip(1) { + let (key, pending) = result?; + let id = key.get(); + aborted_updates.push((id, pending.abort())); + } + + for (id, aborted) in &aborted_updates { + let key = BEU64::new(*id); + self.aborted_meta.put(&mut wtxn, &key, &aborted)?; + self.pending_meta.delete(&mut wtxn, &key)?; + self.pending.delete(&mut wtxn, &key)?; + } + + wtxn.commit()?; + + Ok(aborted_updates) + } +} diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index c927a6c5b..d59575f7d 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -1,18 +1,19 @@ -mod index_store; -mod update_store; +mod local_index_controller; +mod updates; -pub use index_store::IndexStore; -pub use update_store::UpdateStore; +pub use local_index_controller::LocalIndexController; -use std::num::NonZeroUsize; -use std::ops::Deref; use std::collections::HashMap; +use std::num::NonZeroUsize; +use std::sync::Arc; use anyhow::Result; -use milli::update::{IndexDocumentsMethod, UpdateFormat}; -use milli::update_store::{Processed, Processing, Failed, Pending, Aborted}; +use milli::Index; +use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; +use updates::{Processed, Processing, Failed, Pending, Aborted}; + pub type UpdateStatusResponse = UpdateStatus; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -89,7 +90,7 @@ impl Settings { } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UpdateResult { - //DocumentsAddition(DocumentAdditionResult), + DocumentsAddition(DocumentAdditionResult), Other, } @@ -97,7 +98,7 @@ pub enum UpdateResult { /// for read access which is provided, and write access which must be provided. This allows the /// implementer to define the behaviour of write accesses to the indices, and abstract the /// scheduling of the updates. The implementer must be able to provide an instance of `IndexStore` -pub trait IndexController: Deref { +pub trait IndexController { /* * Write operations @@ -141,5 +142,7 @@ pub trait IndexController: Deref { ) -> Result, Failed> { todo!() } -} + /// Returns, if it exists, an `IndexView` to the requested index. + fn index(&self, uid: impl AsRef) -> anyhow::Result>>; +} diff --git a/src/index_controller/update_store/mod.rs b/src/index_controller/update_store/mod.rs deleted file mode 100644 index 84db2f63d..000000000 --- a/src/index_controller/update_store/mod.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::ops::Deref; - -use super::{IndexStore, IndexController}; - -pub struct UpdateStore { - index_store: IndexStore, -} - -impl Deref for UpdateStore { - type Target = IndexStore; - - fn deref(&self) -> &Self::Target { - &self.index_store - } -} - -impl UpdateStore { - pub fn new(index_store: IndexStore) -> Self { - Self { index_store } - } -} - -impl IndexController for UpdateStore { - fn add_documents>( - &self, - _index: S, - _method: milli::update::IndexDocumentsMethod, - _format: milli::update::UpdateFormat, - _data: &[u8], - ) -> anyhow::Result { - todo!() - } - - fn update_settings>(&self, _index_uid: S, _settings: crate::index_controller::Settings) -> anyhow::Result { - todo!() - } - - fn create_index>(&self, _index_uid: S) -> anyhow::Result<()> { - todo!() - } - - fn delete_index>(&self, _index_uid: S) -> anyhow::Result<()> { - todo!() - } - - fn swap_indices, S2: AsRef>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> { - todo!() - } -} diff --git a/src/index_controller/updates.rs b/src/index_controller/updates.rs new file mode 100644 index 000000000..7c67ea8c2 --- /dev/null +++ b/src/index_controller/updates.rs @@ -0,0 +1,167 @@ +use chrono::{Utc, DateTime}; +use serde::{Serialize, Deserialize}; + +#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +pub struct Pending { + update_id: u64, + meta: M, + enqueued_at: DateTime, +} + +impl Pending { + pub fn new(meta: M, update_id: u64) -> Self { + Self { + enqueued_at: Utc::now(), + meta, + update_id, + } + } + + pub fn processing(self) -> Processing { + Processing { + from: self, + started_processing_at: Utc::now(), + } + } + + pub fn abort(self) -> Aborted { + Aborted { + from: self, + aborted_at: Utc::now(), + } + } + + pub fn meta(&self) -> &M { + &self.meta + } + + pub fn id(&self) -> u64 { + self.update_id + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +pub struct Processed { + success: N, + processed_at: DateTime, + #[serde(flatten)] + from: Processing, +} + +impl Processed { + pub fn id(&self) -> u64 { + self.from.id() + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +pub struct Processing { + #[serde(flatten)] + from: Pending, + started_processing_at: DateTime, +} + +impl Processing { + pub fn id(&self) -> u64 { + self.from.id() + } + + pub fn meta(&self) -> &M { + self.from.meta() + } + + pub fn process(self, meta: N) -> Processed { + Processed { + success: meta, + from: self, + processed_at: Utc::now(), + } + } + + pub fn fail(self, error: E) -> Failed { + Failed { + from: self, + error, + failed_at: Utc::now(), + } + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +pub struct Aborted { + #[serde(flatten)] + from: Pending, + aborted_at: DateTime, +} + +impl Aborted { + pub fn id(&self) -> u64 { + self.from.id() + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] +pub struct Failed { + #[serde(flatten)] + from: Processing, + error: E, + failed_at: DateTime, +} + +impl Failed { + pub fn id(&self) -> u64 { + self.from.id() + } +} + +#[derive(Debug, PartialEq, Eq, Hash, Serialize)] +#[serde(tag = "status")] +pub enum UpdateStatus { + Processing(Processing), + Pending(Pending), + Processed(Processed), + Aborted(Aborted), + Failed(Failed), +} + +impl UpdateStatus { + pub fn id(&self) -> u64 { + match self { + UpdateStatus::Processing(u) => u.id(), + UpdateStatus::Pending(u) => u.id(), + UpdateStatus::Processed(u) => u.id(), + UpdateStatus::Aborted(u) => u.id(), + UpdateStatus::Failed(u) => u.id(), + } + } +} + +impl From> for UpdateStatus { + fn from(other: Pending) -> Self { + Self::Pending(other) + } +} + +impl From> for UpdateStatus { + fn from(other: Aborted) -> Self { + Self::Aborted(other) + } +} + +impl From> for UpdateStatus { + fn from(other: Processed) -> Self { + Self::Processed(other) + } +} + +impl From> for UpdateStatus { + fn from(other: Processing) -> Self { + Self::Processing(other) + } +} + +impl From> for UpdateStatus { + fn from(other: Failed) -> Self { + Self::Failed(other) + } +}