diff --git a/Cargo.lock b/Cargo.lock index 0a97f899e..e763ecaa5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1830,7 +1830,6 @@ dependencies = [ [[package]] name = "milli" version = "0.16.0" -source = "git+https://github.com/meilisearch/milli.git?branch=main#b2a332599ebdbf492360ddd3e98c3a14fb84608e" dependencies = [ "bimap", "bincode", diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index d9f030122..085a40b09 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -49,7 +49,7 @@ meilisearch-lib = { path = "../meilisearch-lib" } meilisearch-error = { path = "../meilisearch-error" } meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" } memmap = "0.7.0" -milli = { git = "https://github.com/meilisearch/milli.git", branch = "main" } +milli = { path = "../../milli/milli" } mime = "0.3.16" num_cpus = "1.13.0" once_cell = "1.8.0" diff --git a/meilisearch-http/src/helpers/mod.rs b/meilisearch-http/src/helpers/mod.rs index 0b72c3694..3908c440c 100644 --- a/meilisearch-http/src/helpers/mod.rs +++ b/meilisearch-http/src/helpers/mod.rs @@ -1,4 +1,3 @@ -//pub mod compression; mod env; pub use env::EnvSizer; diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index 307bbcefa..481d38e1c 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -47,6 +47,9 @@ pub mod analytics; pub mod helpers; pub mod option; pub mod routes; +use std::path::Path; +use std::time::Duration; + use crate::extractors::authentication::AuthConfig; pub use option::Opt; @@ -81,6 +84,53 @@ impl ApiKeys { } } +pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result { + let mut meilisearch = MeiliSearch::builder(); + meilisearch + .set_max_index_size(opt.max_index_size.get_bytes() as usize) + .set_max_update_store_size(opt.max_udb_size.get_bytes() as usize) + .set_ignore_missing_snapshot(opt.ignore_missing_snapshot) + .set_ignore_snapshot_if_db_exists(opt.ignore_snapshot_if_db_exists) + .set_dump_dst(opt.dumps_dir.clone()) + .set_snapshot_interval(Duration::from_secs(opt.snapshot_interval_sec)) + .set_snapshot_dir(opt.snapshot_dir.clone()); + + if let Some(ref path) = opt.import_snapshot { + meilisearch.set_import_snapshot(path.clone()); + } + + if let Some(ref path) = opt.import_dump { + meilisearch.set_dump_src(path.clone()); + } + + if opt.schedule_snapshot { + meilisearch.set_schedule_snapshot(); + } + + meilisearch.build(opt.db_path.clone(), opt.indexer_options.clone()) +} + +/// Cleans and setup the temporary file folder in the database directory. This must be done after +/// the meilisearch instance has been created, to not interfere with the snapshot and dump loading. +pub fn setup_temp_dir(db_path: impl AsRef) -> anyhow::Result<()> { + // Set the tempfile directory in the current db path, to avoid cross device references. Also + // remove the previous outstanding files found there + // + // TODO: if two processes open the same db, one might delete the other tmpdir. Need to make + // sure that no one is using it before deleting it. + let temp_path = db_path.as_ref().join("tmp"); + // Ignore error if tempdir doesn't exist + let _ = std::fs::remove_dir_all(&temp_path); + std::fs::create_dir_all(&temp_path)?; + if cfg!(windows) { + std::env::set_var("TMP", temp_path); + } else { + std::env::set_var("TMPDIR", temp_path); + } + + Ok(()) +} + pub fn configure_data( config: &mut web::ServiceConfig, data: MeiliSearch, diff --git a/meilisearch-http/src/main.rs b/meilisearch-http/src/main.rs index dfa4bcc2d..3c7a34ddf 100644 --- a/meilisearch-http/src/main.rs +++ b/meilisearch-http/src/main.rs @@ -1,7 +1,7 @@ -use std::{env, path::Path, time::Duration}; +use std::env; use actix_web::HttpServer; -use meilisearch_http::{create_app, Opt}; +use meilisearch_http::{Opt, create_app, setup_meilisearch}; use meilisearch_lib::MeiliSearch; use structopt::StructOpt; @@ -27,53 +27,6 @@ fn setup(opt: &Opt) -> anyhow::Result<()> { Ok(()) } -/// Cleans and setup the temporary file folder in the database directory. This must be done after -/// the meilisearch instance has been created, to not interfere with the snapshot and dump loading. -fn setup_temp_dir(db_path: impl AsRef) -> anyhow::Result<()> { - // Set the tempfile directory in the current db path, to avoid cross device references. Also - // remove the previous outstanding files found there - // - // TODO: if two processes open the same db, one might delete the other tmpdir. Need to make - // sure that no one is using it before deleting it. - let temp_path = db_path.as_ref().join("tmp"); - // Ignore error if tempdir doesn't exist - let _ = std::fs::remove_dir_all(&temp_path); - std::fs::create_dir_all(&temp_path)?; - if cfg!(windows) { - std::env::set_var("TMP", temp_path); - } else { - std::env::set_var("TMPDIR", temp_path); - } - - Ok(()) -} - -fn setup_meilisearch(opt: &Opt) -> anyhow::Result { - let mut meilisearch = MeiliSearch::builder(); - meilisearch - .set_max_index_size(opt.max_index_size.get_bytes() as usize) - .set_max_update_store_size(opt.max_udb_size.get_bytes() as usize) - .set_ignore_missing_snapshot(opt.ignore_missing_snapshot) - .set_ignore_snapshot_if_db_exists(opt.ignore_snapshot_if_db_exists) - .set_dump_dst(opt.dumps_dir.clone()) - .set_snapshot_interval(Duration::from_secs(opt.snapshot_interval_sec)) - .set_snapshot_dir(opt.snapshot_dir.clone()); - - if let Some(ref path) = opt.import_snapshot { - meilisearch.set_import_snapshot(path.clone()); - } - - if let Some(ref path) = opt.import_dump { - meilisearch.set_dump_src(path.clone()); - } - - if opt.schedule_snapshot { - meilisearch.set_schedule_snapshot(); - } - - meilisearch.build(opt.db_path.clone(), opt.indexer_options.clone()) -} - #[actix_web::main] async fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); @@ -92,7 +45,9 @@ async fn main() -> anyhow::Result<()> { let meilisearch = setup_meilisearch(&opt)?; - setup_temp_dir(&opt.db_path)?; + // Setup the temp directory to be in the db folder. This is important, since temporary file + // don't support to be persisted accross filesystem boundaries. + meilisearch_http::setup_temp_dir(&opt.db_path)?; #[cfg(all(not(debug_assertions), feature = "analytics"))] if !opt.no_analytics { diff --git a/meilisearch-http/src/option.rs b/meilisearch-http/src/option.rs index 2a4d425e9..72fbeab44 100644 --- a/meilisearch-http/src/option.rs +++ b/meilisearch-http/src/option.rs @@ -1,9 +1,5 @@ -use byte_unit::ByteError; -use std::fmt; use std::io::{BufReader, Read}; -use std::ops::Deref; use std::path::PathBuf; -use std::str::FromStr; use std::sync::Arc; use std::fs; @@ -14,7 +10,6 @@ use rustls::{ RootCertStore, }; use structopt::StructOpt; -use sysinfo::{RefreshKind, System, SystemExt}; use meilisearch_lib::options::IndexerOpts; const POSSIBLE_ENV: [&str; 2] = ["development", "production"]; @@ -177,63 +172,6 @@ impl Opt { } } -/// A type used to detect the max memory available and use 2/3 of it. -#[derive(Debug, Clone, Copy)] -pub struct MaxMemory(Option); - -impl FromStr for MaxMemory { - type Err = ByteError; - - fn from_str(s: &str) -> Result { - Byte::from_str(s).map(Some).map(MaxMemory) - } -} - -impl Default for MaxMemory { - fn default() -> MaxMemory { - MaxMemory( - total_memory_bytes() - .map(|bytes| bytes * 2 / 3) - .map(Byte::from_bytes), - ) - } -} - -impl fmt::Display for MaxMemory { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self.0 { - Some(memory) => write!(f, "{}", memory.get_appropriate_unit(true)), - None => f.write_str("unknown"), - } - } -} - -impl Deref for MaxMemory { - type Target = Option; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl MaxMemory { - pub fn unlimited() -> Self { - Self(None) - } -} - -/// Returns the total amount of bytes available or `None` if this system isn't supported. -fn total_memory_bytes() -> Option { - if System::IS_SUPPORTED { - let memory_kind = RefreshKind::new().with_memory(); - let mut system = System::new_with_specifics(memory_kind); - system.refresh_memory(); - Some(system.total_memory() * 1024) // KiB into bytes - } else { - None - } -} - fn load_certs(filename: PathBuf) -> anyhow::Result> { let certfile = fs::File::open(filename).map_err(|_| anyhow::anyhow!("cannot open certificate file"))?; let mut reader = BufReader::new(certfile); diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index ee86e12ad..f7aa4f485 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -106,7 +106,7 @@ pub async fn delete_document( ) -> Result { let DocumentParam { document_id, index_uid } = path.into_inner(); let update = Update::DeleteDocuments(vec![document_id]); - let update_status = meilisearch.register_update(index_uid, update).await?; + let update_status = meilisearch.register_update(index_uid, update, false).await?; debug!("returns: {:?}", update_status); Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) } @@ -170,7 +170,7 @@ pub async fn add_documents( format: DocumentAdditionFormat::Json, }; let update_status = meilisearch - .register_update(path.into_inner().index_uid, update) + .register_update(path.into_inner().index_uid, update, true) .await?; debug!("returns: {:?}", update_status); @@ -193,7 +193,7 @@ pub async fn update_documents( format: DocumentAdditionFormat::Json, }; let update_status = meilisearch - .register_update(path.into_inner().index_uid, update) + .register_update(path.into_inner().index_uid, update, true) .await?; debug!("returns: {:?}", update_status); @@ -216,7 +216,7 @@ pub async fn delete_documents( .collect(); let update = Update::DeleteDocuments(ids); - let update_status = meilisearch.register_update(path.into_inner().index_uid, update).await?; + let update_status = meilisearch.register_update(path.into_inner().index_uid, update, false).await?; debug!("returns: {:?}", update_status); Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) } @@ -226,7 +226,7 @@ pub async fn clear_all_documents( path: web::Path, ) -> Result { let update = Update::ClearDocuments; - let update_status = meilisearch.register_update(path.into_inner().index_uid, update).await?; + let update_status = meilisearch.register_update(path.into_inner().index_uid, update, false).await?; debug!("returns: {:?}", update_status); Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) } diff --git a/meilisearch-http/src/routes/indexes/mod.rs b/meilisearch-http/src/routes/indexes/mod.rs index 59ad6fa0f..b10b5a004 100644 --- a/meilisearch-http/src/routes/indexes/mod.rs +++ b/meilisearch-http/src/routes/indexes/mod.rs @@ -18,7 +18,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( web::resource("") .route(web::get().to(list_indexes)) - //.route(web::post().to(create_index)), + .route(web::post().to(create_index)), ) .service( web::scope("/{index_uid}") @@ -26,7 +26,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { web::resource("") .route(web::get().to(get_index)) .route(web::put().to(update_index)) - //.route(web::delete().to(delete_index)), + .route(web::delete().to(delete_index)), ) .service(web::resource("/stats").route(web::get().to(get_index_stats))) .service(web::scope("/documents").configure(documents::configure)) @@ -49,14 +49,14 @@ pub struct IndexCreateRequest { primary_key: Option, } -//pub async fn create_index( - //data: GuardedData, - //body: web::Json, -//) -> Result { - //let body = body.into_inner(); - //let meta = data.create_index(body.uid, body.primary_key).await?; - //Ok(HttpResponse::Created().json(meta)) -//} +pub async fn create_index( + meilisearch: GuardedData, + body: web::Json, +) -> Result { + let body = body.into_inner(); + let meta = meilisearch.create_index(body.uid, body.primary_key).await?; + Ok(HttpResponse::Created().json(meta)) +} #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] @@ -102,13 +102,13 @@ pub async fn update_index( Ok(HttpResponse::Ok().json(meta)) } -//pub async fn delete_index( - //data: GuardedData, - //path: web::Path, -//) -> Result { - //data.delete_index(path.index_uid.clone()).await?; - //Ok(HttpResponse::NoContent().finish()) -//} +pub async fn delete_index( + meilisearch: GuardedData, + path: web::Path, +) -> Result { + meilisearch.delete_index(path.index_uid.clone()).await?; + Ok(HttpResponse::NoContent().finish()) +} pub async fn get_index_stats( meilisearch: GuardedData, diff --git a/meilisearch-http/src/routes/indexes/settings.rs b/meilisearch-http/src/routes/indexes/settings.rs index 180be4108..4a1e26426 100644 --- a/meilisearch-http/src/routes/indexes/settings.rs +++ b/meilisearch-http/src/routes/indexes/settings.rs @@ -30,7 +30,7 @@ macro_rules! make_setting_route { ..Default::default() }; let update = Update::Settings(settings); - let update_status = meilisearch.register_update(index_uid.into_inner(), update).await?; + let update_status = meilisearch.register_update(index_uid.into_inner(), update, false).await?; debug!("returns: {:?}", update_status); Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) } @@ -49,7 +49,7 @@ macro_rules! make_setting_route { }; let update = Update::Settings(settings); - let update_status = meilisearch.register_update(index_uid.into_inner(), update).await?; + let update_status = meilisearch.register_update(index_uid.into_inner(), update, true).await?; debug!("returns: {:?}", update_status); Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) } @@ -159,7 +159,7 @@ pub async fn update_all( let update = Update::Settings(settings); let update_result = meilisearch - .register_update(index_uid.into_inner(), update) + .register_update(index_uid.into_inner(), update, true) .await?; let json = serde_json::json!({ "updateId": update_result.id() }); debug!("returns: {:?}", json); @@ -183,7 +183,7 @@ pub async fn delete_all( let update = Update::Settings(settings.into_unchecked()); let update_result = data - .register_update(index_uid.into_inner(), update) + .register_update(index_uid.into_inner(), update, false) .await?; let json = serde_json::json!({ "updateId": update_result.id() }); debug!("returns: {:?}", json); diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index 9d99a7d0c..3066b3492 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -280,18 +280,17 @@ pub async fn get_health() -> Result { #[cfg(test)] mod test { use super::*; - use crate::data::Data; use crate::extractors::authentication::GuardedData; /// A type implemented for a route that uses a authentication policy `Policy`. /// /// This trait is used for regression testing of route authenticaton policies. - trait Is {} + trait Is {} macro_rules! impl_is_policy { ($($param:ident)*) => { - impl Is for Func - where Func: Fn(GuardedData, $($param,)*) -> Res {} + impl Is for Func + where Func: Fn(GuardedData, $($param,)*) -> Res {} }; } @@ -310,7 +309,7 @@ mod test { ($($policy:ident => { $($route:expr,)*})*) => { #[test] fn test_auth() { - $($(let _: &dyn Is<$policy, _> = &$route;)*)* + $($(let _: &dyn Is<$policy, _, _> = &$route;)*)* } }; } diff --git a/meilisearch-http/tests/common/server.rs b/meilisearch-http/tests/common/server.rs index 5fb423ffd..ef2e51355 100644 --- a/meilisearch-http/tests/common/server.rs +++ b/meilisearch-http/tests/common/server.rs @@ -2,12 +2,14 @@ use std::path::Path; use actix_web::http::StatusCode; use byte_unit::{Byte, ByteUnit}; +use meilisearch_http::setup_meilisearch; +use meilisearch_lib::options::{IndexerOpts, MaxMemory}; +use once_cell::sync::Lazy; use serde_json::Value; -use tempdir::TempDir; +use tempfile::TempDir; use urlencoding::encode; -use meilisearch_http::data::Data; -use meilisearch_http::option::{IndexerOpts, MaxMemory, Opt}; +use meilisearch_http::option::Opt; use super::index::Index; use super::service::Service; @@ -15,17 +17,25 @@ use super::service::Service; pub struct Server { pub service: Service, // hold ownership to the tempdir while we use the server instance. - _dir: Option, + _dir: Option, } +static TEST_TEMP_DIR: Lazy = Lazy::new(|| TempDir::new().unwrap()); + impl Server { pub async fn new() -> Self { - let dir = TempDir::new("meilisearch").unwrap(); + let dir = TempDir::new().unwrap(); - let opt = default_settings(dir.path()); + if cfg!(windows) { + std::env::set_var("TMP", TEST_TEMP_DIR.path()); + } else { + std::env::set_var("TMPDIR", TEST_TEMP_DIR.path()); + } - let data = Data::new(opt).unwrap(); - let service = Service(data); + let options = default_settings(dir.path()); + + let meilisearch = setup_meilisearch(&options).unwrap(); + let service = Service { meilisearch, options }; Server { service, @@ -33,9 +43,9 @@ impl Server { } } - pub async fn new_with_options(opt: Opt) -> Self { - let data = Data::new(opt).unwrap(); - let service = Service(data); + pub async fn new_with_options(options: Opt) -> Self { + let meilisearch = setup_meilisearch(&options).unwrap(); + let service = Service { meilisearch, options }; Server { service, diff --git a/meilisearch-http/tests/common/service.rs b/meilisearch-http/tests/common/service.rs index 08db5b9dc..1450a6dd9 100644 --- a/meilisearch-http/tests/common/service.rs +++ b/meilisearch-http/tests/common/service.rs @@ -1,14 +1,17 @@ use actix_web::{http::StatusCode, test}; +use meilisearch_lib::MeiliSearch; use serde_json::Value; -use meilisearch_http::create_app; -use meilisearch_http::data::Data; +use meilisearch_http::{Opt, create_app}; -pub struct Service(pub Data); +pub struct Service { + pub meilisearch: MeiliSearch, + pub options: Opt, +} impl Service { pub async fn post(&self, url: impl AsRef, body: Value) -> (Value, StatusCode) { - let app = test::init_service(create_app!(&self.0, true)).await; + let app = test::init_service(create_app!(&self.meilisearch, true, &self.options)).await; let req = test::TestRequest::post() .uri(url.as_ref()) @@ -28,7 +31,7 @@ impl Service { url: impl AsRef, body: impl AsRef, ) -> (Value, StatusCode) { - let app = test::init_service(create_app!(&self.0, true)).await; + let app = test::init_service(create_app!(&self.meilisearch, true, &self.options)).await; let req = test::TestRequest::post() .uri(url.as_ref()) @@ -44,7 +47,7 @@ impl Service { } pub async fn get(&self, url: impl AsRef) -> (Value, StatusCode) { - let app = test::init_service(create_app!(&self.0, true)).await; + let app = test::init_service(create_app!(&self.meilisearch, true, &self.options)).await; let req = test::TestRequest::get().uri(url.as_ref()).to_request(); let res = test::call_service(&app, req).await; @@ -56,7 +59,7 @@ impl Service { } pub async fn put(&self, url: impl AsRef, body: Value) -> (Value, StatusCode) { - let app = test::init_service(create_app!(&self.0, true)).await; + let app = test::init_service(create_app!(&self.meilisearch, true, &self.options)).await; let req = test::TestRequest::put() .uri(url.as_ref()) @@ -71,7 +74,7 @@ impl Service { } pub async fn delete(&self, url: impl AsRef) -> (Value, StatusCode) { - let app = test::init_service(create_app!(&self.0, true)).await; + let app = test::init_service(create_app!(&self.meilisearch, true, &self.options)).await; let req = test::TestRequest::delete().uri(url.as_ref()).to_request(); let res = test::call_service(&app, req).await; diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index 4c94cf194..a0436c67d 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -16,7 +16,7 @@ async fn add_documents_test_json_content_types() { // this is a what is expected and should work let server = Server::new().await; - let app = test::init_service(create_app!(&server.service.0, true)).await; + let app = test::init_service(create_app!(&server.service.meilisearch, true, &server.service.options)).await; let req = test::TestRequest::post() .uri("/indexes/dog/documents") .set_payload(document.to_string()) @@ -41,7 +41,7 @@ async fn add_documents_test_no_content_types() { ]); let server = Server::new().await; - let app = test::init_service(create_app!(&server.service.0, true)).await; + let app = test::init_service(create_app!(&server.service.meilisearch, true, &server.service.options)).await; let req = test::TestRequest::post() .uri("/indexes/dog/documents") .set_payload(document.to_string()) @@ -67,7 +67,7 @@ async fn add_documents_test_bad_content_types() { ]); let server = Server::new().await; - let app = test::init_service(create_app!(&server.service.0, true)).await; + let app = test::init_service(create_app!(&server.service.meilisearch, true, &server.service.options)).await; let req = test::TestRequest::post() .uri("/indexes/dog/documents") .set_payload(document.to_string()) diff --git a/meilisearch-http/tests/settings/get_settings.rs b/meilisearch-http/tests/settings/get_settings.rs index c7bcd4376..37ea6bc82 100644 --- a/meilisearch-http/tests/settings/get_settings.rs +++ b/meilisearch-http/tests/settings/get_settings.rs @@ -30,8 +30,8 @@ static DEFAULT_SETTINGS_VALUES: Lazy> = Lazy::new(| #[actix_rt::test] async fn get_settings_unexisting_index() { let server = Server::new().await; - let (_response, code) = server.index("test").settings().await; - assert_eq!(code, 404) + let (response, code) = server.index("test").settings().await; + assert_eq!(code, 404, "{}", response) } #[actix_rt::test] @@ -167,8 +167,8 @@ async fn update_setting_unexisting_index() { async fn update_setting_unexisting_index_invalid_uid() { let server = Server::new().await; let index = server.index("test##! "); - let (_response, code) = index.update_settings(json!({})).await; - assert_eq!(code, 400); + let (response, code) = index.update_settings(json!({})).await; + assert_eq!(code, 400, "{}", response); } macro_rules! test_setting_routes { diff --git a/meilisearch-http/tests/snapshot/mod.rs b/meilisearch-http/tests/snapshot/mod.rs index b5602c508..fb6713779 100644 --- a/meilisearch-http/tests/snapshot/mod.rs +++ b/meilisearch-http/tests/snapshot/mod.rs @@ -9,8 +9,8 @@ use meilisearch_http::Opt; #[actix_rt::test] async fn perform_snapshot() { - let temp = tempfile::tempdir_in(".").unwrap(); - let snapshot_dir = tempfile::tempdir_in(".").unwrap(); + let temp = tempfile::tempdir().unwrap(); + let snapshot_dir = tempfile::tempdir().unwrap(); let options = Opt { snapshot_dir: snapshot_dir.path().to_owned(), @@ -29,7 +29,7 @@ async fn perform_snapshot() { sleep(Duration::from_secs(2)).await; - let temp = tempfile::tempdir_in(".").unwrap(); + let temp = tempfile::tempdir().unwrap(); let snapshot_path = snapshot_dir .path() diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml index 54cedd6d5..446a20fb4 100644 --- a/meilisearch-lib/Cargo.toml +++ b/meilisearch-lib/Cargo.toml @@ -33,7 +33,8 @@ main_error = "0.1.1" meilisearch-error = { path = "../meilisearch-error" } meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" } memmap = "0.7.0" -milli = { git = "https://github.com/meilisearch/milli.git", branch = "main" } +#milli = { git = "https://github.com/meilisearch/milli.git", branch = "main" } +milli = { path = "../../milli/milli" } mime = "0.3.16" num_cpus = "1.13.0" once_cell = "1.8.0" diff --git a/meilisearch-lib/src/index/search.rs b/meilisearch-lib/src/index/search.rs index edc672897..70d0510ac 100644 --- a/meilisearch-lib/src/index/search.rs +++ b/meilisearch-lib/src/index/search.rs @@ -7,8 +7,7 @@ use heed::RoTxn; use indexmap::IndexMap; use meilisearch_tokenizer::{Analyzer, AnalyzerConfig, Token}; use milli::{ - AscDesc, AscDescError, FieldId, FieldsIdsMap, FilterCondition, MatchingWords, SortError, - UserError, + AscDesc, FieldId, FieldsIdsMap, FilterCondition, MatchingWords, SortError }; use regex::Regex; use serde::{Deserialize, Serialize}; diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs index c2410107d..802f872cd 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/mod.rs @@ -2,7 +2,6 @@ use std::fs::File; use std::path::{Path, PathBuf}; use std::sync::Arc; -use anyhow::Context; use chrono::{DateTime, Utc}; use log::{info, trace, warn}; #[cfg(test)] @@ -112,22 +111,16 @@ pub fn load_dump( update_db_size: usize, indexer_opts: &IndexerOpts, ) -> anyhow::Result<()> { - let tmp_src = tempfile::tempdir_in(".")?; + let tmp_src = tempfile::tempdir()?; let tmp_src_path = tmp_src.path(); - println!("importing to {}", dst_path.as_ref().display()); crate::from_tar_gz(&src_path, tmp_src_path)?; let meta_path = tmp_src_path.join(META_FILE_NAME); let mut meta_file = File::open(&meta_path)?; let meta: Metadata = serde_json::from_reader(&mut meta_file)?; - let dst_dir = dst_path - .as_ref() - .parent() - .with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?; - - let tmp_dst = tempfile::tempdir_in(dst_dir)?; + let tmp_dst = tempfile::tempdir()?; match meta { Metadata::V1(meta) => { @@ -168,9 +161,8 @@ impl DumpTask { create_dir_all(&self.path).await?; - let path_clone = self.path.clone(); let temp_dump_dir = - tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??; + tokio::task::spawn_blocking(|| tempfile::TempDir::new()).await??; let temp_dump_path = temp_dump_dir.path().to_owned(); let meta = Metadata::new_v2(self.index_db_size, self.update_db_size); @@ -183,7 +175,7 @@ impl DumpTask { UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?; let dump_path = tokio::task::spawn_blocking(move || -> Result { - let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?; + let temp_dump_file = tempfile::NamedTempFile::new()?; crate::to_tar_gz(temp_dump_path, temp_dump_file.path()) .map_err(|e| DumpActorError::Internal(e.into()))?; diff --git a/meilisearch-lib/src/index_controller/index_resolver/mod.rs b/meilisearch-lib/src/index_controller/index_resolver/mod.rs index eea8e7a2c..d41c37ac6 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/mod.rs @@ -84,11 +84,14 @@ where U: UuidStore, Ok(indexes) } - pub async fn create_index(&self, uid: String, primary_key: Option) -> Result<(Uuid, Index)> { + pub async fn create_index(&self, uid: String, primary_key: Option) -> Result { + if !is_index_uid_valid(&uid) { + return Err(IndexResolverError::BadlyFormatted(uid)); + } let uuid = Uuid::new_v4(); let index = self.index_store.create(uuid, primary_key).await?; self.index_uuid_store.insert(uid, uuid).await?; - Ok((uuid, index)) + Ok(index) } pub async fn list(&self) -> Result> { @@ -109,11 +112,11 @@ where U: UuidStore, Ok(indexes) } - pub async fn delete_index(&self, uid: String) -> Result<()> { + pub async fn delete_index(&self, uid: String) -> Result { match self.index_uuid_store.delete(uid.clone()).await? { Some(uuid) => { let _ = self.index_store.delete(uuid).await; - Ok(()) + Ok(uuid) } None => Err(IndexResolverError::UnexistingIndex(uid)), } @@ -148,3 +151,8 @@ where U: UuidStore, } } } + +fn is_index_uid_valid(uid: &str) -> bool { + uid.chars() + .all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_') +} diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 325082074..0dee6521f 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -23,7 +23,7 @@ use crate::index_controller::index_resolver::create_index_resolver; use crate::index_controller::snapshot::SnapshotService; use crate::options::IndexerOpts; use error::Result; -use crate::index::error::Result as IndexResult; +use crate::index::error::{Result as IndexResult}; use self::dump_actor::load_dump; use self::index_resolver::HardStateIndexResolver; @@ -33,29 +33,15 @@ use self::updates::UpdateMsg; mod dump_actor; pub mod error; -//pub mod indexes; mod snapshot; pub mod update_file_store; pub mod updates; -//mod uuid_resolver; mod index_resolver; pub type Payload = Box< dyn Stream> + Send + Sync + 'static + Unpin, >; -macro_rules! time { - ($e:expr) => { - { - let now = std::time::Instant::now(); - let result = $e; - let elapsed = now.elapsed(); - println!("elapsed at line {}: {}ms ({}ns)", line!(), elapsed.as_millis(), elapsed.as_nanos()); - result - } - }; -} - #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct IndexMetadata { @@ -260,148 +246,27 @@ impl IndexController { IndexControllerBuilder::default() } - pub async fn register_update(&self, uid: String, update: Update) -> Result { + pub async fn register_update(&self, uid: String, update: Update, create_index: bool) -> Result { match self.index_resolver.get_uuid(uid).await { Ok(uuid) => { let update_result = UpdateMsg::update(&self.update_sender, uuid, update).await?; Ok(update_result) } Err(IndexResolverError::UnexistingIndex(name)) => { - let (uuid, _) = self.index_resolver.create_index(name, None).await?; - let update_result = UpdateMsg::update(&self.update_sender, uuid, update).await?; - // ignore if index creation fails now, since it may already have been created + if create_index { + let index = self.index_resolver.create_index(name, None).await?; + let update_result = UpdateMsg::update(&self.update_sender, index.uuid, update).await?; + // ignore if index creation fails now, since it may already have been created - Ok(update_result) + Ok(update_result) + } else { + Err(IndexResolverError::UnexistingIndex(name).into()) + } } Err(e) => Err(e.into()), } } - //pub async fn add_documents( - //&self, - //uid: String, - //method: milli::update::IndexDocumentsMethod, - //payload: Payload, - //primary_key: Option, - //) -> Result { - //let perform_update = |uuid| async move { - //let meta = UpdateMeta::DocumentsAddition { - //method, - //primary_key, - //}; - //let (sender, receiver) = mpsc::channel(10); - - //// It is necessary to spawn a local task to send the payload to the update handle to - //// prevent dead_locking between the update_handle::update that waits for the update to be - //// registered and the update_actor that waits for the the payload to be sent to it. - //tokio::task::spawn_local(async move { - //payload - //.for_each(|r| async { - //let _ = sender.send(r).await; - //}) - //.await - //}); - - //// This must be done *AFTER* spawning the task. - //self.update_handle.update(meta, receiver, uuid).await - //}; - - //match self.uuid_resolver.get(uid).await { - //Ok(uuid) => Ok(perform_update(uuid).await?), - //Err(UuidResolverError::UnexistingIndex(name)) => { - //let uuid = Uuid::new_v4(); - //let status = perform_update(uuid).await?; - //// ignore if index creation fails now, since it may already have been created - //let _ = self.index_handle.create_index(uuid, None).await; - //self.uuid_resolver.insert(name, uuid).await?; - //Ok(status) - //} - //Err(e) => Err(e.into()), - //} - //} - - //pub async fn clear_documents(&self, uid: String) -> Result { - //let uuid = self.uuid_resolver.get(uid).await?; - //let meta = UpdateMeta::ClearDocuments; - //let (_, receiver) = mpsc::channel(1); - //let status = self.update_handle.update(meta, receiver, uuid).await?; - //Ok(status) - //} - - //pub async fn delete_documents( - //&self, - //uid: String, - //documents: Vec, - //) -> Result { - //let uuid = self.uuid_resolver.get(uid).await?; - //let meta = UpdateMeta::DeleteDocuments { ids: documents }; - //let (_, receiver) = mpsc::channel(1); - //let status = self.update_handle.update(meta, receiver, uuid).await?; - //Ok(status) - //} - - //pub async fn update_settings( - //&self, - //uid: String, - //settings: Settings, - //create: bool, - //) -> Result { - //let perform_udpate = |uuid| async move { - //let meta = UpdateMeta::Settings(settings.into_unchecked()); - //// Nothing so send, drop the sender right away, as not to block the update actor. - //let (_, receiver) = mpsc::channel(1); - //self.update_handle.update(meta, receiver, uuid).await - //}; - - //match self.uuid_resolver.get(uid).await { - //Ok(uuid) => Ok(perform_udpate(uuid).await?), - //Err(UuidResolverError::UnexistingIndex(name)) if create => { - //let uuid = Uuid::new_v4(); - //let status = perform_udpate(uuid).await?; - //// ignore if index creation fails now, since it may already have been created - //let _ = self.index_handle.create_index(uuid, None).await; - //self.uuid_resolver.insert(name, uuid).await?; - //Ok(status) - //} - //Err(e) => Err(e.into()), - //} - //} - - //pub async fn create_index(&self, index_settings: IndexSettings) -> Result { - //let IndexSettings { uid, primary_key } = index_settings; - //let uid = uid.ok_or(IndexControllerError::MissingUid)?; - //let uuid = Uuid::new_v4(); - //let meta = self.index_handle.create_index(uuid, primary_key).await?; - //self.uuid_resolver.insert(uid.clone(), uuid).await?; - //let meta = IndexMetadata { - //uuid, - //name: uid.clone(), - //uid, - //meta, - //}; - - //Ok(meta) - //} - - //pub async fn delete_index(&self, uid: String) -> Result<()> { - //let uuid = self.uuid_resolver.delete(uid).await?; - - //// We remove the index from the resolver synchronously, and effectively perform the index - //// deletion as a background task. - //let update_handle = self.update_handle.clone(); - //let index_handle = self.index_handle.clone(); - //tokio::spawn(async move { - //if let Err(e) = update_handle.delete(uuid).await { - //error!("Error while deleting index: {}", e); - //} - //if let Err(e) = index_handle.delete(uuid).await { - //error!("Error while deleting index: {}", e); - //} - //}); - - //Ok(()) - //} - pub async fn update_status(&self, uid: String, id: u64) -> Result { let uuid = self.index_resolver.get_uuid(uid).await?; let result = UpdateMsg::get_update(&self.update_sender, uuid, id).await?; @@ -481,8 +346,8 @@ impl IndexController { } pub async fn search(&self, uid: String, query: SearchQuery) -> Result { - let index = time!(self.index_resolver.get_index(uid.clone()).await?); - let result = time!(spawn_blocking(move || time!(index.perform_search(query))).await??); + let index = self.index_resolver.get_index(uid.clone()).await?; + let result = spawn_blocking(move || index.perform_search(query)).await??; Ok(result) } @@ -549,6 +414,33 @@ impl IndexController { pub async fn dump_info(&self, uid: String) -> Result { Ok(self.dump_handle.dump_info(uid).await?) } + + pub async fn create_index(&self, uid: String, primary_key: Option) -> Result { + let index = self.index_resolver.create_index(uid.clone(), primary_key).await?; + let meta = spawn_blocking(move || -> IndexResult<_> { + let meta = index.meta()?; + let meta = IndexMetadata { + uuid: index.uuid, + uid: uid.clone(), + name: uid, + meta, + }; + Ok(meta) + }).await??; + + Ok(meta) + } + + pub async fn delete_index(&self, uid: String) -> Result<()> { + let uuid = self.index_resolver.delete_index(uid).await?; + + let update_sender = self.update_sender.clone(); + tokio::spawn(async move { + let _ = UpdateMsg::delete(&update_sender, uuid).await; + }); + + Ok(()) + } } pub async fn get_arc_ownership_blocking(mut item: Arc) -> T { diff --git a/meilisearch-lib/src/index_controller/snapshot.rs b/meilisearch-lib/src/index_controller/snapshot.rs index 132745c96..66bdfe60e 100644 --- a/meilisearch-lib/src/index_controller/snapshot.rs +++ b/meilisearch-lib/src/index_controller/snapshot.rs @@ -57,7 +57,7 @@ impl SnapshotService { let snapshot_dir = self.snapshot_path.clone(); fs::create_dir_all(&snapshot_dir).await?; let temp_snapshot_dir = - spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??; + spawn_blocking(move || tempfile::tempdir()).await??; let temp_snapshot_path = temp_snapshot_dir.path().to_owned(); let indexes = self @@ -71,12 +71,11 @@ impl SnapshotService { UpdateMsg::snapshot(&self.update_sender, temp_snapshot_path.clone(), indexes).await?; - let snapshot_dir = self.snapshot_path.clone(); let snapshot_path = self .snapshot_path .join(format!("{}.snapshot", self.db_name)); let snapshot_path = spawn_blocking(move || -> anyhow::Result { - let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?; + let temp_snapshot_file = tempfile::NamedTempFile::new()?; let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); crate::compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?; temp_snapshot_file.persist(&snapshot_path)?; @@ -137,13 +136,6 @@ pub fn load_snapshot( //use uuid::Uuid; //use super::*; - //use crate::index_controller::index_actor::MockIndexActorHandle; - //use crate::index_controller::updates::{ - //error::UpdateActorError, MockUpdateActorHandle, UpdateActorHandleImpl, - //}; - //use crate::index_controller::uuid_resolver::{ - //error::UuidResolverError, MockUuidResolverHandle, - //}; //#[actix_rt::test] //async fn test_normal() { @@ -191,7 +183,7 @@ pub fn load_snapshot( //uuid_resolver //.expect_snapshot() //.times(1) - //abitrary error + ////abitrary error //.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); //let update_handle = MockUpdateActorHandle::new(); @@ -206,7 +198,7 @@ pub fn load_snapshot( //); //assert!(snapshot_service.perform_snapshot().await.is_err()); - //Nothing was written to the file + ////Nothing was written to the file //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); //} @@ -222,7 +214,7 @@ pub fn load_snapshot( //let mut update_handle = MockUpdateActorHandle::new(); //update_handle //.expect_snapshot() - //abitrary error + ////abitrary error //.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0)))); //let snapshot_path = tempfile::tempdir_in(".").unwrap(); @@ -235,7 +227,7 @@ pub fn load_snapshot( //); //assert!(snapshot_service.perform_snapshot().await.is_err()); - //Nothing was written to the file + ////Nothing was written to the file //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); //} @@ -244,9 +236,9 @@ pub fn load_snapshot( //let mut uuid_resolver = MockUuidResolverHandle::new(); //uuid_resolver //.expect_snapshot() - //we expect the funtion to be called between 2 and 3 time in the given interval. + ////we expect the funtion to be called between 2 and 3 time in the given interval. //.times(2..4) - //abitrary error, to short-circuit the function + ////abitrary error, to short-circuit the function //.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist))); //let update_handle = MockUpdateActorHandle::new(); diff --git a/meilisearch-lib/src/index_controller/updates/message.rs b/meilisearch-lib/src/index_controller/updates/message.rs index 22a920e12..3b157e568 100644 --- a/meilisearch-lib/src/index_controller/updates/message.rs +++ b/meilisearch-lib/src/index_controller/updates/message.rs @@ -24,7 +24,7 @@ pub enum UpdateMsg { ret: oneshot::Sender>, id: u64, }, - Delete { + DeleteIndex { uuid: Uuid, ret: oneshot::Sender>, }, @@ -99,4 +99,11 @@ impl UpdateMsg { sender.send(msg).await?; rcv.await? } + + pub async fn delete(sender: &mpsc::Sender, uuid: Uuid) -> Result<()> { + let (ret, rcv) = oneshot::channel(); + let msg = Self::DeleteIndex { ret, uuid }; + sender.send(msg).await?; + rcv.await? + } } diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index 733bda8e6..dc707c24b 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -172,7 +172,7 @@ impl UpdateLoop { GetUpdate { uuid, ret, id } => { let _ = ret.send(self.handle_get_update(uuid, id).await); } - Delete { uuid, ret } => { + DeleteIndex { uuid, ret } => { let _ = ret.send(self.handle_delete(uuid).await); } Snapshot { indexes, path, ret } => { diff --git a/meilisearch-lib/src/index_controller/updates/store/mod.rs b/meilisearch-lib/src/index_controller/updates/store/mod.rs index 01e7fd989..21bfad61e 100644 --- a/meilisearch-lib/src/index_controller/updates/store/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/store/mod.rs @@ -552,149 +552,149 @@ impl UpdateStore { } } -#[cfg(test)] -mod test { - use super::*; - use crate::index_controller::{ - index_actor::{error::IndexActorError, MockIndexActorHandle}, - UpdateResult, - }; +//#[cfg(test)] +//mod test { + //use super::*; + //use crate::index_controller::{ + //index_actor::{error::IndexActorError, MockIndexActorHandle}, + //UpdateResult, + //}; - use futures::future::ok; + //use futures::future::ok; - #[actix_rt::test] - async fn test_next_id() { - let dir = tempfile::tempdir_in(".").unwrap(); - let mut options = EnvOpenOptions::new(); - let handle = Arc::new(MockIndexActorHandle::new()); - options.map_size(4096 * 100); - let update_store = UpdateStore::open( - options, - dir.path(), - handle, - Arc::new(AtomicBool::new(false)), - ) - .unwrap(); + //#[actix_rt::test] + //async fn test_next_id() { + //let dir = tempfile::tempdir_in(".").unwrap(); + //let mut options = EnvOpenOptions::new(); + //let handle = Arc::new(MockIndexActorHandle::new()); + //options.map_size(4096 * 100); + //let update_store = UpdateStore::open( + //options, + //dir.path(), + //handle, + //Arc::new(AtomicBool::new(false)), + //) + //.unwrap(); - let index1_uuid = Uuid::new_v4(); - let index2_uuid = Uuid::new_v4(); + //let index1_uuid = Uuid::new_v4(); + //let index2_uuid = Uuid::new_v4(); - let mut txn = update_store.env.write_txn().unwrap(); - let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); - txn.commit().unwrap(); - assert_eq!((0, 0), ids); + //let mut txn = update_store.env.write_txn().unwrap(); + //let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); + //txn.commit().unwrap(); + //assert_eq!((0, 0), ids); - let mut txn = update_store.env.write_txn().unwrap(); - let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); - txn.commit().unwrap(); - assert_eq!((1, 0), ids); + //let mut txn = update_store.env.write_txn().unwrap(); + //let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); + //txn.commit().unwrap(); + //assert_eq!((1, 0), ids); - let mut txn = update_store.env.write_txn().unwrap(); - let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); - txn.commit().unwrap(); - assert_eq!((2, 1), ids); - } + //let mut txn = update_store.env.write_txn().unwrap(); + //let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); + //txn.commit().unwrap(); + //assert_eq!((2, 1), ids); + //} - #[actix_rt::test] - async fn test_register_update() { - let dir = tempfile::tempdir_in(".").unwrap(); - let mut options = EnvOpenOptions::new(); - let handle = Arc::new(MockIndexActorHandle::new()); - options.map_size(4096 * 100); - let update_store = UpdateStore::open( - options, - dir.path(), - handle, - Arc::new(AtomicBool::new(false)), - ) - .unwrap(); - let meta = UpdateMeta::ClearDocuments; - let uuid = Uuid::new_v4(); - let store_clone = update_store.clone(); - tokio::task::spawn_blocking(move || { - store_clone.register_update(meta, None, uuid).unwrap(); - }) - .await - .unwrap(); + //#[actix_rt::test] + //async fn test_register_update() { + //let dir = tempfile::tempdir_in(".").unwrap(); + //let mut options = EnvOpenOptions::new(); + //let handle = Arc::new(MockIndexActorHandle::new()); + //options.map_size(4096 * 100); + //let update_store = UpdateStore::open( + //options, + //dir.path(), + //handle, + //Arc::new(AtomicBool::new(false)), + //) + //.unwrap(); + //let meta = UpdateMeta::ClearDocuments; + //let uuid = Uuid::new_v4(); + //let store_clone = update_store.clone(); + //tokio::task::spawn_blocking(move || { + //store_clone.register_update(meta, None, uuid).unwrap(); + //}) + //.await + //.unwrap(); - let txn = update_store.env.read_txn().unwrap(); - assert!(update_store - .pending_queue - .get(&txn, &(0, uuid, 0)) - .unwrap() - .is_some()); - } + //let txn = update_store.env.read_txn().unwrap(); + //assert!(update_store + //.pending_queue + //.get(&txn, &(0, uuid, 0)) + //.unwrap() + //.is_some()); + //} - #[actix_rt::test] - async fn test_process_update() { - let dir = tempfile::tempdir_in(".").unwrap(); - let mut handle = MockIndexActorHandle::new(); + //#[actix_rt::test] + //async fn test_process_update() { + //let dir = tempfile::tempdir_in(".").unwrap(); + //let mut handle = MockIndexActorHandle::new(); - handle - .expect_update() - .times(2) - .returning(|_index_uuid, processing, _file| { - if processing.id() == 0 { - Box::pin(ok(Ok(processing.process(UpdateResult::Other)))) - } else { - Box::pin(ok(Err( - processing.fail(IndexActorError::ExistingPrimaryKey.into()) - ))) - } - }); + //handle + //.expect_update() + //.times(2) + //.returning(|_index_uuid, processing, _file| { + //if processing.id() == 0 { + //Box::pin(ok(Ok(processing.process(UpdateResult::Other)))) + //} else { + //Box::pin(ok(Err( + //processing.fail(IndexActorError::ExistingPrimaryKey.into()) + //))) + //} + //}); - let handle = Arc::new(handle); + //let handle = Arc::new(handle); - let mut options = EnvOpenOptions::new(); - options.map_size(4096 * 100); - let store = UpdateStore::open( - options, - dir.path(), - handle.clone(), - Arc::new(AtomicBool::new(false)), - ) - .unwrap(); + //let mut options = EnvOpenOptions::new(); + //options.map_size(4096 * 100); + //let store = UpdateStore::open( + //options, + //dir.path(), + //handle.clone(), + //Arc::new(AtomicBool::new(false)), + //) + //.unwrap(); - // wait a bit for the event loop exit. - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + //// wait a bit for the event loop exit. + //tokio::time::sleep(std::time::Duration::from_millis(50)).await; - let mut txn = store.env.write_txn().unwrap(); + //let mut txn = store.env.write_txn().unwrap(); - let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); - let uuid = Uuid::new_v4(); + //let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); + //let uuid = Uuid::new_v4(); - store - .pending_queue - .put(&mut txn, &(0, uuid, 0), &update) - .unwrap(); + //store + //.pending_queue + //.put(&mut txn, &(0, uuid, 0), &update) + //.unwrap(); - let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); + //let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); - store - .pending_queue - .put(&mut txn, &(1, uuid, 1), &update) - .unwrap(); + //store + //.pending_queue + //.put(&mut txn, &(1, uuid, 1), &update) + //.unwrap(); - txn.commit().unwrap(); + //txn.commit().unwrap(); - // Process the pending, and check that it has been moved to the update databases, and - // removed from the pending database. - let store_clone = store.clone(); - tokio::task::spawn_blocking(move || { - store_clone.process_pending_update(handle.clone()).unwrap(); - store_clone.process_pending_update(handle).unwrap(); - }) - .await - .unwrap(); + //// Process the pending, and check that it has been moved to the update databases, and + //// removed from the pending database. + //let store_clone = store.clone(); + //tokio::task::spawn_blocking(move || { + //store_clone.process_pending_update(handle.clone()).unwrap(); + //store_clone.process_pending_update(handle).unwrap(); + //}) + //.await + //.unwrap(); - let txn = store.env.read_txn().unwrap(); + //let txn = store.env.read_txn().unwrap(); - assert!(store.pending_queue.first(&txn).unwrap().is_none()); - let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap(); + //assert!(store.pending_queue.first(&txn).unwrap().is_none()); + //let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap(); - assert!(matches!(update, UpdateStatus::Processed(_))); - let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap(); + //assert!(matches!(update, UpdateStatus::Processed(_))); + //let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap(); - assert!(matches!(update, UpdateStatus::Failed(_))); - } -} + //assert!(matches!(update, UpdateStatus::Failed(_))); + //} +//}