From 40b3451a4e46cae9300c0f3e279bc459ed9a54a0 Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 11 Mar 2021 22:11:58 +0100 Subject: [PATCH] fix unexisting update store + race conditions --- meilisearch-http/src/index_controller/mod.rs | 1 + .../src/index_controller/update_actor.rs | 60 ++++++++++++++----- meilisearch-http/tests/updates/mod.rs | 1 + 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 2c181817b..318b7270c 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -144,6 +144,7 @@ impl IndexController { let name = name.unwrap(); let uuid = self.uuid_resolver.create(name.clone()).await?; let meta = self.index_handle.create_index(uuid, primary_key).await?; + let _ = self.update_handle.create(uuid).await?; let meta = IndexMetadata { name, meta }; Ok(meta) diff --git a/meilisearch-http/src/index_controller/update_actor.rs b/meilisearch-http/src/index_controller/update_actor.rs index 8614e297b..2c16eb60b 100644 --- a/meilisearch-http/src/index_controller/update_actor.rs +++ b/meilisearch-http/src/index_controller/update_actor.rs @@ -47,6 +47,10 @@ enum UpdateMsg { uuid: Uuid, ret: oneshot::Sender>, }, + Create { + uuid: Uuid, + ret: oneshot::Sender>, + } } struct UpdateActor { @@ -102,7 +106,10 @@ where Some(Delete { uuid, ret }) => { let _ = ret.send(self.handle_delete(uuid).await); } - None => {} + Some(Create { uuid, ret }) => { + let _ = ret.send(self.handle_create(uuid).await); + } + None => break, } } } @@ -190,6 +197,11 @@ where Ok(()) } + + async fn handle_create(&self, uuid: Uuid) -> Result<()> { + let _ = self.store.get_or_create(uuid).await?; + Ok(()) + } } #[derive(Clone)] @@ -249,6 +261,13 @@ where let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } + + pub async fn create(&self, uuid: Uuid) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Create { uuid, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } } struct MapUpdateStoreStore { @@ -282,7 +301,7 @@ impl UpdateStoreStore for MapUpdateStoreStore { let store = UpdateStore::open(options, &path, move |meta, file| { futures::executor::block_on(index_handle.update(meta, file)) }) - .unwrap(); + .map_err(|e| UpdateError::Error(e.into()))?; let store = e.insert(store); Ok(store.clone()) } @@ -291,22 +310,35 @@ impl UpdateStoreStore for MapUpdateStoreStore { } async fn get(&self, uuid: &Uuid) -> Result>> { - // attemps to get pre-loaded ref to the index - match self.db.read().await.get(uuid) { + let guard = self.db.read().await; + match guard.get(uuid) { Some(uuid) => Ok(Some(uuid.clone())), None => { - // otherwise we try to check if it exists, and load it. + // The index is not found in the found in the loaded indexes, so we attempt to load + // it from disk. We need to acquire a write lock **before** attempting to open the + // index, because someone could be trying to open it at the same time as us. + drop(guard); let path = self.path.clone().join(format!("updates-{}", uuid)); if path.exists() { - let index_handle = self.index_handle.clone(); - let mut options = heed::EnvOpenOptions::new(); - options.map_size(4096 * 100_000); - let store = UpdateStore::open(options, &path, move |meta, file| { - futures::executor::block_on(index_handle.update(meta, file)) - }) - .unwrap(); - self.db.write().await.insert(uuid.clone(), store.clone()); - Ok(Some(store)) + let mut guard = self.db.write().await; + match guard.entry(uuid.clone()) { + Entry::Vacant(entry) => { + // We can safely load the index + let index_handle = self.index_handle.clone(); + let mut options = heed::EnvOpenOptions::new(); + options.map_size(4096 * 100_000); + let store = UpdateStore::open(options, &path, move |meta, file| { + futures::executor::block_on(index_handle.update(meta, file)) + }) + .map_err(|e| UpdateError::Error(e.into()))?; + let store = entry.insert(store.clone()); + Ok(Some(store.clone())) + } + Entry::Occupied(entry) => { + // The index was loaded while we attempted to to iter + Ok(Some(entry.get().clone())) + } + } } else { Ok(None) } diff --git a/meilisearch-http/tests/updates/mod.rs b/meilisearch-http/tests/updates/mod.rs index 64b5b560e..713936b8c 100644 --- a/meilisearch-http/tests/updates/mod.rs +++ b/meilisearch-http/tests/updates/mod.rs @@ -46,6 +46,7 @@ async fn list_no_updates() { let index = server.index("test"); index.create(None).await; let (response, code) = index.list_updates().await; + println!("response: {}", response); assert_eq!(code, 200); assert!(response.as_array().unwrap().is_empty()); }