From 53cf500e36f0e477a18133f5335d3870ff2b2799 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 10 Mar 2021 18:04:20 +0100 Subject: [PATCH] uuid resolver hard state --- meilisearch-http/src/data/updates.rs | 3 - .../local_index_controller/mod.rs | 2 +- meilisearch-http/src/index_controller/mod.rs | 2 +- .../src/index_controller/uuid_resolver.rs | 193 ++++++++++++++---- meilisearch-http/src/routes/index.rs | 2 +- meilisearch-http/tests/common/index.rs | 6 +- meilisearch-http/tests/updates/mod.rs | 2 +- 7 files changed, 161 insertions(+), 49 deletions(-) diff --git a/meilisearch-http/src/data/updates.rs b/meilisearch-http/src/data/updates.rs index e9d1b51b7..c6e30ea02 100644 --- a/meilisearch-http/src/data/updates.rs +++ b/meilisearch-http/src/data/updates.rs @@ -1,7 +1,4 @@ -//use async_compression::tokio_02::write::GzipEncoder; -//use futures_util::stream::StreamExt; use milli::update::{IndexDocumentsMethod, UpdateFormat}; -//use tokio::io::AsyncWriteExt; use actix_web::web::Payload; use crate::index_controller::{UpdateStatus, IndexMetadata}; diff --git a/meilisearch-http/src/index_controller/local_index_controller/mod.rs b/meilisearch-http/src/index_controller/local_index_controller/mod.rs index 14efe42c7..8ac600f5f 100644 --- a/meilisearch-http/src/index_controller/local_index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/local_index_controller/mod.rs @@ -148,7 +148,7 @@ impl IndexController for LocalIndexController { fn update_index(&self, uid: impl AsRef, index_settings: IndexSettings) -> anyhow::Result { if index_settings.name.is_some() { - bail!("can't udpate an index name.") + bail!("can't update an index name.") } let (primary_key, meta) = match index_settings.primary_key { diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index a824852bf..85469728b 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -69,7 +69,7 @@ enum IndexControllerMsg { impl IndexController { pub fn new(path: impl AsRef) -> anyhow::Result { - let uuid_resolver = uuid_resolver::UuidResolverHandle::new(); + let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?; let index_actor = index_actor::IndexActorHandle::new(&path)?; let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path); Ok(Self { uuid_resolver, index_handle: index_actor, update_handle }) diff --git a/meilisearch-http/src/index_controller/uuid_resolver.rs b/meilisearch-http/src/index_controller/uuid_resolver.rs index 8d33edef4..a369790f3 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver.rs @@ -1,9 +1,9 @@ +use std::{fs::create_dir_all, path::Path}; + +use heed::{Database, Env, EnvOpenOptions, types::{ByteSlice, Str}}; use log::{info, warn}; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::sync::Arc; use thiserror::Error; -use tokio::sync::{mpsc, oneshot, RwLock}; +use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; pub type Result = std::result::Result; @@ -81,14 +81,14 @@ impl UuidResolverActor { async fn handle_resolve(&self, name: String) -> Result { self.store - .get_uuid(&name) + .get_uuid(name.clone()) .await? .ok_or(UuidError::UnexistingIndex(name)) } async fn handle_delete(&self, name: String) -> Result { self.store - .delete(&name) + .delete(name.clone()) .await? .ok_or(UuidError::UnexistingIndex(name)) } @@ -105,12 +105,12 @@ pub struct UuidResolverHandle { } impl UuidResolverHandle { - pub fn new() -> Self { + pub fn new(path: impl AsRef) -> anyhow::Result { let (sender, reveiver) = mpsc::channel(100); - let store = MapUuidStore(Arc::new(RwLock::new(HashMap::new()))); + let store = HeedUuidStore::new(path)?; let actor = UuidResolverActor::new(reveiver, store); tokio::spawn(actor.run()); - Self { sender } + Ok(Self { sender }) } pub async fn resolve(&self, name: String) -> anyhow::Result { @@ -159,12 +159,18 @@ impl UuidResolverHandle { } } -#[derive(Clone, Debug, Error)] +#[derive(Debug, Error)] pub enum UuidError { #[error("Name already exist.")] NameAlreadyExist, #[error("Index \"{0}\" doesn't exist.")] UnexistingIndex(String), + #[error("Error performing task: {0}")] + TokioTask(#[from] tokio::task::JoinError), + #[error("Database error: {0}")] + Heed(#[from] heed::Error), + #[error("Uuid error: {0}")] + Uuid(#[from] uuid::Error), } #[async_trait::async_trait] @@ -172,48 +178,157 @@ trait UuidStore { // Create a new entry for `name`. Return an error if `err` and the entry already exists, return // the uuid otherwise. async fn create_uuid(&self, name: String, err: bool) -> Result; - async fn get_uuid(&self, name: &str) -> Result>; - async fn delete(&self, name: &str) -> Result>; + async fn get_uuid(&self, name: String) -> Result>; + async fn delete(&self, name: String) -> Result>; async fn list(&self) -> Result>; } -struct MapUuidStore(Arc>>); +struct HeedUuidStore { + env: Env, + db: Database, +} + +fn open_or_create_database(env: &Env, name: Option<&str>) -> heed::Result> { + match env.open_database(name)? { + Some(db) => Ok(db), + None => env.create_database(name), + } +} + +impl HeedUuidStore { + fn new(path: impl AsRef) -> anyhow::Result { + let path = path.as_ref().join("index_uuids"); + create_dir_all(&path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(1_073_741_824); // 1GB + let env = options.open(path)?; + let db = open_or_create_database(&env, None)?; + Ok(Self { env, db }) + } +} #[async_trait::async_trait] -impl UuidStore for MapUuidStore { +impl UuidStore for HeedUuidStore { async fn create_uuid(&self, name: String, err: bool) -> Result { - match self.0.write().await.entry(name) { - Entry::Occupied(entry) => { - if err { - Err(UuidError::NameAlreadyExist) - } else { - Ok(entry.get().clone()) + let env = self.env.clone(); + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let mut txn = env.write_txn()?; + match db.get(&txn, &name)? { + Some(uuid) => { + if err { + Err(UuidError::NameAlreadyExist) + } else { + let uuid = Uuid::from_slice(uuid)?; + Ok(uuid) + } + } + None => { + let uuid = Uuid::new_v4(); + db.put(&mut txn, &name, uuid.as_bytes())?; + txn.commit()?; + Ok(uuid) } } - Entry::Vacant(entry) => { - let uuid = Uuid::new_v4(); - let uuid = entry.insert(uuid); - Ok(uuid.clone()) + }).await? + } + + async fn get_uuid(&self, name: String) -> Result> { + let env = self.env.clone(); + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let txn = env.read_txn()?; + match db.get(&txn, &name)? { + Some(uuid) => { + let uuid = Uuid::from_slice(uuid)?; + Ok(Some(uuid)) + } + None => Ok(None), } - } + }).await? } - async fn get_uuid(&self, name: &str) -> Result> { - Ok(self.0.read().await.get(name).cloned()) - } - - async fn delete(&self, name: &str) -> Result> { - Ok(self.0.write().await.remove(name)) + async fn delete(&self, name: String) -> Result> { + let env = self.env.clone(); + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let mut txn = env.write_txn()?; + match db.get(&txn, &name)? { + Some(uuid) => { + let uuid = Uuid::from_slice(uuid)?; + db.delete(&mut txn, &name)?; + txn.commit()?; + Ok(None) + } + None => Ok(None) + } + }).await? } async fn list(&self) -> Result> { - let list = self - .0 - .read() - .await - .iter() - .map(|(name, uuid)| (name.to_owned(), uuid.clone())) - .collect(); - Ok(list) + let env = self.env.clone(); + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let txn = env.read_txn()?; + let mut entries = Vec::new(); + for entry in db.iter(&txn)? { + let (name, uuid) = entry?; + let uuid = Uuid::from_slice(uuid)?; + entries.push((name.to_owned(), uuid)) + } + Ok(entries) + }).await? + } +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + use std::collections::hash_map::Entry; + use std::sync::Arc; + + use tokio::sync::RwLock; + + use super::*; + + struct MapUuidStore(Arc>>); + + #[async_trait::async_trait] + impl UuidStore for MapUuidStore { + async fn create_uuid(&self, name: String, err: bool) -> Result { + match self.0.write().await.entry(name) { + Entry::Occupied(entry) => { + if err { + Err(UuidError::NameAlreadyExist) + } else { + Ok(entry.get().clone()) + } + } + Entry::Vacant(entry) => { + let uuid = Uuid::new_v4(); + let uuid = entry.insert(uuid); + Ok(uuid.clone()) + } + } + } + + async fn get_uuid(&self, name: String) -> Result> { + Ok(self.0.read().await.get(&name).cloned()) + } + + async fn delete(&self, name: String) -> Result> { + Ok(self.0.write().await.remove(&name)) + } + + async fn list(&self) -> Result> { + let list = self + .0 + .read() + .await + .iter() + .map(|(name, uuid)| (name.to_owned(), uuid.clone())) + .collect(); + Ok(list) + } } } diff --git a/meilisearch-http/src/routes/index.rs b/meilisearch-http/src/routes/index.rs index 818c87303..5c6e3f5a9 100644 --- a/meilisearch-http/src/routes/index.rs +++ b/meilisearch-http/src/routes/index.rs @@ -140,7 +140,7 @@ async fn get_update_status( Ok(HttpResponse::Ok().body(json)) } Ok(None) => { - let e = format!("udpate {} for index {:?} doesn't exists.", path.update_id, path.index_uid); + let e = format!("update {} for index {:?} doesn't exists.", path.update_id, path.index_uid); Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) } Err(e) => { diff --git a/meilisearch-http/tests/common/index.rs b/meilisearch-http/tests/common/index.rs index 58a8de200..a14769ee1 100644 --- a/meilisearch-http/tests/common/index.rs +++ b/meilisearch-http/tests/common/index.rs @@ -73,7 +73,7 @@ impl Index<'_> { let url = format!("/indexes/{}/updates/{}", self.uid, update_id); for _ in 0..10 { let (response, status_code) = self.service.get(&url).await; - assert_eq!(status_code, 200); + assert_eq!(status_code, 200, "response: {}", response); if response["status"] == "processed" || response["status"] == "failed" { return response; @@ -84,8 +84,8 @@ impl Index<'_> { panic!("Timeout waiting for update id"); } - pub async fn get_update(&self, udpate_id: u64) -> (Value, StatusCode) { - let url = format!("/indexes/{}/updates/{}", self.uid, udpate_id); + pub async fn get_update(&self, update_id: u64) -> (Value, StatusCode) { + let url = format!("/indexes/{}/updates/{}", self.uid, update_id); self.service.get(url).await } diff --git a/meilisearch-http/tests/updates/mod.rs b/meilisearch-http/tests/updates/mod.rs index 03b307daf..64b5b560e 100644 --- a/meilisearch-http/tests/updates/mod.rs +++ b/meilisearch-http/tests/updates/mod.rs @@ -8,7 +8,7 @@ async fn get_update_unexisting_index() { } #[actix_rt::test] -async fn get_unexisting_udpate_status() { +async fn get_unexisting_update_status() { let server = Server::new().await; let index = server.index("test"); index.create(None).await;