diff --git a/src/data/mod.rs b/src/data/mod.rs index 7494792bc..175aedba5 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -61,7 +61,12 @@ impl Data { let path = options.db_path.clone(); let indexer_opts = options.indexer_options.clone(); create_dir_all(&path)?; - let index_controller = LocalIndexController::new(&path, indexer_opts)?; + let index_controller = LocalIndexController::new( + &path, + indexer_opts, + options.max_mdb_size.get_bytes(), + options.max_udb_size.get_bytes(), + )?; let indexes = Arc::new(index_controller); let mut api_keys = ApiKeys { diff --git a/src/data/updates.rs b/src/data/updates.rs index d05617361..880579bf6 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -6,7 +6,8 @@ use futures_util::stream::StreamExt; use tokio::io::AsyncWriteExt; use super::Data; -use crate::index_controller::{IndexController, UpdateStatusResponse, Settings}; +use crate::index_controller::{IndexController, Settings, UpdateResult, UpdateMeta}; +use crate::index_controller::updates::UpdateStatus; impl Data { pub async fn add_documents( @@ -15,7 +16,7 @@ impl Data { method: IndexDocumentsMethod, format: UpdateFormat, mut stream: impl futures::Stream> + Unpin, - ) -> anyhow::Result + ) -> anyhow::Result> where B: Deref, E: std::error::Error + Send + Sync + 'static, @@ -45,7 +46,7 @@ impl Data { &self, index: S, settings: Settings - ) -> anyhow::Result { + ) -> anyhow::Result> { let indexes = self.index_controller.clone(); let update = tokio::task::spawn_blocking(move || indexes.update_settings(index, settings)).await??; Ok(update.into()) diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index bfe63459a..4c9a98472 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -1,4 +1,5 @@ use std::path::{Path, PathBuf}; +use std::fs::create_dir_all; use std::sync::Arc; use dashmap::DashMap; @@ -8,6 +9,7 @@ use milli::Index; use rayon::ThreadPool; use uuid::Uuid; use serde::{Serialize, Deserialize}; +use log::warn; use super::update_store::UpdateStore; use super::update_handler::UpdateHandler; @@ -15,8 +17,8 @@ use crate::option::IndexerOpts; #[derive(Serialize, Deserialize, Debug)] struct IndexMeta { - update_size: usize, - index_size: usize, + update_size: u64, + index_size: u64, uid: Uuid, } @@ -30,12 +32,15 @@ impl IndexMeta { let update_path = make_update_db_path(&path, &self.uid); let index_path = make_index_db_path(&path, &self.uid); + create_dir_all(&update_path)?; + create_dir_all(&index_path)?; + let mut options = EnvOpenOptions::new(); - options.map_size(self.index_size); + options.map_size(self.index_size as usize); let index = Arc::new(Index::new(options, index_path)?); let mut options = EnvOpenOptions::new(); - options.map_size(self.update_size); + options.map_size(self.update_size as usize); let handler = UpdateHandler::new(opt, index.clone(), thread_pool)?; let update_store = UpdateStore::open(options, update_path, handler)?; Ok((index, update_store)) @@ -132,8 +137,8 @@ impl IndexStore { pub fn get_or_create_index( &self, name: impl AsRef, - update_size: usize, - index_size: usize, + update_size: u64, + index_size: u64, ) -> anyhow::Result<(Arc, Arc)> { let mut txn = self.env.write_txn()?; match self._get_index(&txn, name.as_ref())? { @@ -141,17 +146,39 @@ impl IndexStore { None => { let uid = Uuid::new_v4(); // TODO: clean in case of error - Ok(self.create_index(&mut txn, uid, name, update_size, index_size)?) + let result = self.create_index(&mut txn, uid, name, update_size, index_size); + match result { + Ok((index, update_store)) => { + match txn.commit() { + Ok(_) => Ok((index, update_store)), + Err(e) => { + self.clean_uid(&uid); + Err(anyhow::anyhow!("error creating index: {}", e)) + } + } + } + Err(e) => { + self.clean_uid(&uid); + Err(e) + } + } }, } } + /// removes all data acociated with an index Uuid. This is called when index creation failed + /// and outstanding files and data need to be cleaned. + fn clean_uid(&self, _uid: &Uuid) { + // TODO! + warn!("creating cleanup is not yet implemented"); + } + fn create_index( &self, txn: &mut RwTxn, uid: Uuid, name: impl AsRef, - update_size: usize, - index_size: usize, + update_size: u64, + index_size: u64, ) -> anyhow::Result<(Arc, Arc)> { let meta = IndexMeta { update_size, index_size, uid: uid.clone() }; diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index dc26d0da8..00a6cc363 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -11,30 +11,46 @@ use milli::Index; use crate::option::IndexerOpts; use super::IndexController; +use super::updates::UpdateStatus; +use super::{UpdateMeta, UpdateResult}; pub struct LocalIndexController { indexes: IndexStore, + update_db_size: u64, + index_db_size: u64, } impl LocalIndexController { - pub fn new(path: impl AsRef, opt: IndexerOpts) -> anyhow::Result { + pub fn new( + path: impl AsRef, + opt: IndexerOpts, + index_db_size: u64, + update_db_size: u64, + ) -> anyhow::Result { let indexes = IndexStore::new(path, opt)?; - Ok(Self { indexes }) + Ok(Self { indexes, index_db_size, update_db_size }) } } impl IndexController for LocalIndexController { fn add_documents>( &self, - _index: S, - _method: milli::update::IndexDocumentsMethod, - _format: milli::update::UpdateFormat, - _data: &[u8], - ) -> anyhow::Result { - todo!() + index: S, + method: milli::update::IndexDocumentsMethod, + format: milli::update::UpdateFormat, + data: &[u8], + ) -> anyhow::Result> { + let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?; + let meta = UpdateMeta::DocumentsAddition { method, format }; + let pending = update_store.register_update(meta, data).unwrap(); + Ok(pending.into()) } - fn update_settings>(&self, _index_uid: S, _settings: super::Settings) -> anyhow::Result { + fn update_settings>( + &self, + _index_uid: S, + _settings: super::Settings + ) -> anyhow::Result> { todo!() } diff --git a/src/index_controller/local_index_controller/update_handler.rs b/src/index_controller/local_index_controller/update_handler.rs index fae3ad0ae..24b9ab405 100644 --- a/src/index_controller/local_index_controller/update_handler.rs +++ b/src/index_controller/local_index_controller/update_handler.rs @@ -188,6 +188,8 @@ impl HandleUpdate for UpdateHandler { ) -> Result, Failed> { use UpdateMeta::*; + println!("handling update {}", update_id); + let update_builder = self.update_buidler(update_id); let result = match meta.meta() { @@ -197,6 +199,8 @@ impl HandleUpdate for UpdateHandler { Facets(levels) => self.update_facets(levels, update_builder), }; + println!("{:?}", result); + match result { Ok(result) => Ok(meta.process(result)), Err(e) => Err(meta.fail(e.to_string())), diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index d59575f7d..0907ba0c8 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -1,5 +1,5 @@ mod local_index_controller; -mod updates; +pub mod updates; pub use local_index_controller::LocalIndexController; @@ -12,9 +12,8 @@ use milli::Index; use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; -use updates::{Processed, Processing, Failed, Pending, Aborted}; +use updates::{Processed, Processing, Failed, UpdateStatus}; -pub type UpdateStatusResponse = UpdateStatus; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] @@ -33,15 +32,6 @@ pub struct Facets { pub min_level_size: Option, } -#[derive(Debug, Clone, Serialize)] -#[serde(tag = "type")] -pub enum UpdateStatus { - Pending { update_id: u64, meta: Pending }, - Progressing { update_id: u64, meta: P }, - Processed { update_id: u64, meta: Processed }, - Aborted { update_id: u64, meta: Aborted }, -} - fn deserialize_some<'de, T, D>(deserializer: D) -> Result, D::Error> where T: Deserialize<'de>, D: Deserializer<'de> @@ -116,11 +106,11 @@ pub trait IndexController { method: IndexDocumentsMethod, format: UpdateFormat, data: &[u8], - ) -> anyhow::Result; + ) -> anyhow::Result>; /// Updates an index settings. If the index does not exist, it will be created when the update /// is applied to the index. - fn update_settings>(&self, index_uid: S, settings: Settings) -> anyhow::Result; + fn update_settings>(&self, index_uid: S, settings: Settings) -> anyhow::Result>; /// Create an index with the given `index_uid`. fn create_index>(&self, index_uid: S) -> Result<()>;