From 90b930ed7f9c2e8e7344ab685f83fc6e68de12c4 Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 9 Feb 2021 11:26:52 +0100 Subject: [PATCH 1/3] implement update index implement update index --- src/data/updates.rs | 16 ++++- .../local_index_controller/index_store.rs | 68 ++++++++++++++++++- .../local_index_controller/mod.rs | 38 +++++++++++ src/index_controller/mod.rs | 10 ++- src/routes/index.rs | 14 ++-- 5 files changed, 135 insertions(+), 11 deletions(-) diff --git a/src/data/updates.rs b/src/data/updates.rs index 06aed8faa..57e9d1864 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -6,7 +6,7 @@ use futures_util::stream::StreamExt; use tokio::io::AsyncWriteExt; use super::Data; -use crate::index_controller::{IndexController, Settings}; +use crate::index_controller::{IndexController, Settings, IndexSettings, IndexMetadata}; use crate::index_controller::UpdateStatus; impl Data { @@ -70,4 +70,18 @@ impl Data { pub fn get_updates_status(&self, index: impl AsRef) -> anyhow::Result> { self.index_controller.all_update_status(index) } + + pub fn update_index( + &self, + name: impl AsRef, + primary_key: Option>, + new_name: Option> + ) -> anyhow::Result { + let settings = IndexSettings { + name: new_name.map(|s| s.as_ref().to_string()), + primary_key: primary_key.map(|s| s.as_ref().to_string()), + }; + + self.index_controller.update_index(name, settings) + } } diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index d28228efd..36c864772 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -24,6 +24,7 @@ pub struct IndexMeta { index_store_size: u64, pub uuid: Uuid, pub created_at: DateTime, + pub updated_at: DateTime, } impl IndexMeta { @@ -131,6 +132,52 @@ impl IndexStore { self.get_index_txn(&txn, name) } + /// Use this function to perform an update on an index. + /// This function also puts a lock on what index in allowed to perform an update. + pub fn update_index(&self, name: impl AsRef, f: F) -> anyhow::Result<(T, IndexMeta)> + where + F: FnOnce(&Index) -> anyhow::Result, + { + let mut txn = self.env.write_txn()?; + let (index, _) = self.get_index_txn(&txn, &name)? + .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; + let result = f(index.as_ref()); + match result { + Ok(ret) => { + let meta = self.update_meta(&mut txn, name, |meta| meta.updated_at = Utc::now())?; + txn.commit()?; + Ok((ret, meta)) + } + Err(e) => Err(e) + } + } + + pub fn index_with_meta(&self, name: impl AsRef) -> anyhow::Result, IndexMeta)>> { + let txn = self.env.read_txn()?; + let uuid = self.index_uuid(&txn, &name)?; + match uuid { + Some(uuid) => { + let meta = self.uuid_to_index_meta.get(&txn, uuid.as_bytes())? + .with_context(|| format!("unable to retrieve metadata for index {:?}", name.as_ref()))?; + let (index, _) = self.retrieve_index(&txn, uuid)? + .with_context(|| format!("unable to retrieve index {:?}", name.as_ref()))?; + Ok(Some((index, meta))) + } + None => Ok(None), + } + } + + fn update_meta(&self, txn: &mut RwTxn, name: impl AsRef, f: impl FnOnce(&mut IndexMeta)) -> anyhow::Result { + let uuid = self.index_uuid(txn, &name)? + .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; + let mut meta = self.uuid_to_index_meta + .get(txn, uuid.as_bytes())? + .with_context(|| format!("couldn't retrieve metadata for index {:?}", name.as_ref()))?; + f(&mut meta); + self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?; + Ok(meta) + } + pub fn get_or_create_index( &self, name: impl AsRef, @@ -173,7 +220,14 @@ impl IndexStore { index_store_size: u64, ) -> anyhow::Result<(Arc, Arc, IndexMeta)> { let created_at = Utc::now(); - let meta = IndexMeta { update_store_size, index_store_size, uuid: uuid.clone(), created_at }; + let updated_at = created_at; + let meta = IndexMeta { + update_store_size, + index_store_size, + uuid: uuid.clone(), + created_at, + updated_at, + }; self.name_to_uuid.put(txn, name.as_ref(), uuid.as_bytes())?; self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?; @@ -318,11 +372,15 @@ mod test { let txn = store.env.read_txn().unwrap(); assert!(store.retrieve_index(&txn, uuid).unwrap().is_none()); + let created_at = Utc::now(); + let updated_at = created_at; + let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone(), - created_at: Utc::now(), + created_at, + updated_at, }; let mut txn = store.env.write_txn().unwrap(); store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); @@ -344,12 +402,16 @@ mod test { assert!(store.index(&name).unwrap().is_none()); + let created_at = Utc::now(); + let updated_at = created_at; + let uuid = Uuid::new_v4(); let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone(), - created_at: Utc::now(), + created_at, + updated_at, }; let mut txn = store.env.write_txn().unwrap(); store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index 60a02573f..62055f1a8 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -144,6 +144,44 @@ impl IndexController for LocalIndexController { } Ok(output_meta) } + + fn update_index(&self, name: impl AsRef, index_settings: IndexSettings) -> anyhow::Result { + if index_settings.name.is_some() { + bail!("can't udpate an index name.") + } + + let (primary_key, meta) = match index_settings.primary_key { + Some(ref primary_key) => { + self.indexes + .update_index(&name, |index| { + let mut txn = index.write_txn()?; + if index.primary_key(&txn)?.is_some() { + bail!("primary key already exists.") + } + index.put_primary_key(&mut txn, primary_key)?; + txn.commit()?; + Ok(Some(primary_key.clone())) + })? + }, + None => { + let (index, meta) = self.indexes + .index_with_meta(&name)? + .with_context(|| format!("index {:?} doesn't exist.", name.as_ref()))?; + let primary_key = index + .primary_key(&index.read_txn()?)? + .map(String::from); + (primary_key, meta) + }, + }; + + Ok(IndexMetadata { + name: name.as_ref().to_string(), + uuid: meta.uuid.clone(), + created_at: meta.created_at, + updated_at: meta.updated_at, + primary_key, + }) + } } fn update_primary_key(index: impl AsRef, primary_key: impl AsRef) -> anyhow::Result<()> { diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 2c8dd1226..a0f8476ff 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -10,9 +10,7 @@ use std::sync::Arc; use anyhow::Result; use chrono::{DateTime, Utc}; use milli::Index; -use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; -use serde::{Serialize, Deserialize, de::Deserializer}; -use uuid::Uuid; +use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; use uuid::Uuid; pub use updates::{Processed, Processing, Failed}; @@ -155,10 +153,16 @@ pub trait IndexController { /// Returns, if it exists, the `Index` with the povided name. fn index(&self, name: impl AsRef) -> anyhow::Result>>; + /// Returns the udpate status an update fn update_status(&self, index: impl AsRef, id: u64) -> anyhow::Result>; + + /// Returns all the udpate status for an index fn all_update_status(&self, index: impl AsRef) -> anyhow::Result>; + /// List all the indexes fn list_indexes(&self) -> anyhow::Result>; + + fn update_index(&self, name: impl AsRef, index_settings: IndexSettings) -> anyhow::Result; } diff --git a/src/routes/index.rs b/src/routes/index.rs index d682376e3..395811774 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -94,11 +94,17 @@ struct UpdateIndexResponse { #[put("/indexes/{index_uid}", wrap = "Authentication::Private")] async fn update_index( - _data: web::Data, - _path: web::Path, - _body: web::Json, + data: web::Data, + path: web::Path, + body: web::Json, ) -> Result { - todo!() + match data.update_index(&path.index_uid, body.primary_key.as_ref(), body.name.as_ref()) { + Ok(meta) => { + let json = serde_json::to_string(&meta).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Err(_) => { todo!() } + } } #[delete("/indexes/{index_uid}", wrap = "Authentication::Private")] From 4ca46b9e5f3480f234ca9c0ce2934ea38d3f72be Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 9 Feb 2021 13:25:20 +0100 Subject: [PATCH 2/3] fix bug in error message --- src/index_controller/local_index_controller/index_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index 36c864772..96ef3e7cd 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -257,7 +257,7 @@ impl IndexStore { let mut txn = self.env.write_txn()?; if self.name_to_uuid.get(&txn, name.as_ref())?.is_some() { - bail!("cannot create index {:?}: an index with this name already exists.") + bail!("index {:?} already exists", name.as_ref()) } let result = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; From 9fb0d94fc3ec742e82e107ffb8b9b9f294e1b1ab Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 9 Feb 2021 16:08:13 +0100 Subject: [PATCH 3/3] add tests --- .../local_index_controller/index_store.rs | 7 ++- src/index_controller/mod.rs | 57 ++++++++++++++++++- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index 96ef3e7cd..226915016 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -133,7 +133,7 @@ impl IndexStore { } /// Use this function to perform an update on an index. - /// This function also puts a lock on what index in allowed to perform an update. + /// This function also puts a lock on what index is allowed to perform an update. pub fn update_index(&self, name: impl AsRef, f: F) -> anyhow::Result<(T, IndexMeta)> where F: FnOnce(&Index) -> anyhow::Result, @@ -167,7 +167,10 @@ impl IndexStore { } } - fn update_meta(&self, txn: &mut RwTxn, name: impl AsRef, f: impl FnOnce(&mut IndexMeta)) -> anyhow::Result { + fn update_meta(&self, txn: &mut RwTxn, name: impl AsRef, f: F) -> anyhow::Result + where + F: FnOnce(&mut IndexMeta) + { let uuid = self.index_uuid(txn, &name)? .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; let mut meta = self.uuid_to_index_meta diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index a0f8476ff..77d91575a 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -10,7 +10,9 @@ use std::sync::Arc; use anyhow::Result; use chrono::{DateTime, Utc}; use milli::Index; -use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; use uuid::Uuid; +use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; +use serde::{Serialize, Deserialize, de::Deserializer}; +use uuid::Uuid; pub use updates::{Processed, Processing, Failed}; @@ -95,6 +97,7 @@ pub enum UpdateResult { Other, } +#[derive(Clone, Debug)] pub struct IndexSettings { pub name: Option, pub primary_key: Option, @@ -183,10 +186,15 @@ pub(crate) mod test { fn test_create_index_with_no_name_is_error() { crate::index_controller::test::create_index_with_no_name_is_error($controller_buider); } + + #[test] + fn test_update_index() { + crate::index_controller::test::update_index($controller_buider); + } }; } - pub(crate) fn create_and_list_indexes(controller: S) { + pub(crate) fn create_and_list_indexes(controller: impl IndexController) { let settings1 = IndexSettings { name: Some(String::from("test_index")), primary_key: None, @@ -207,11 +215,54 @@ pub(crate) mod test { assert_eq!(indexes[1].primary_key.clone().unwrap(), "foo"); } - pub(crate) fn create_index_with_no_name_is_error(controller: S) { + pub(crate) fn create_index_with_no_name_is_error(controller: impl IndexController) { let settings = IndexSettings { name: None, primary_key: None, }; assert!(controller.create_index(settings).is_err()); } + + pub(crate) fn update_index(controller: impl IndexController) { + + let settings = IndexSettings { + name: Some(String::from("test")), + primary_key: None, + }; + + assert!(controller.create_index(settings).is_ok()); + + // perform empty update returns index meta unchanged + let settings = IndexSettings { + name: None, + primary_key: None, + }; + + let result = controller.update_index("test", settings).unwrap(); + assert_eq!(result.name, "test"); + assert_eq!(result.created_at, result.updated_at); + assert!(result.primary_key.is_none()); + + // Changing the name trigger an error + let settings = IndexSettings { + name: Some(String::from("bar")), + primary_key: None, + }; + + assert!(controller.update_index("test", settings).is_err()); + + // Update primary key + let settings = IndexSettings { + name: None, + primary_key: Some(String::from("foo")), + }; + + let result = controller.update_index("test", settings.clone()).unwrap(); + assert_eq!(result.name, "test"); + assert!(result.created_at < result.updated_at); + assert_eq!(result.primary_key.unwrap(), "foo"); + + // setting the primary key again is an error + assert!(controller.update_index("test", settings).is_err()); + } }