From 102c46f88b3674174e9025be439d7758013d7c65 Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 28 Sep 2021 22:22:59 +0200 Subject: [PATCH] clippy + fmt --- meilisearch-http/src/analytics.rs | 2 +- meilisearch-http/src/lib.rs | 13 +- meilisearch-http/src/main.rs | 7 +- meilisearch-http/src/option.rs | 26 +- meilisearch-http/src/routes/dump.rs | 4 +- .../src/routes/indexes/documents.rs | 23 +- meilisearch-http/src/routes/indexes/mod.rs | 6 +- .../src/routes/indexes/settings.rs | 6 +- .../src/routes/indexes/updates.rs | 4 +- meilisearch-http/src/routes/mod.rs | 8 +- meilisearch-http/tests/common/server.rs | 10 +- meilisearch-http/tests/common/service.rs | 2 +- .../tests/documents/add_documents.rs | 21 +- meilisearch-lib/src/compression.rs | 12 +- meilisearch-lib/src/document_formats.rs | 17 +- meilisearch-lib/src/index/dump.rs | 4 +- meilisearch-lib/src/index/mod.rs | 33 +- meilisearch-lib/src/index/search.rs | 6 +- meilisearch-lib/src/index/updates.rs | 35 +- .../index_controller/dump_actor/loaders/v1.rs | 15 +- .../src/index_controller/dump_actor/mod.rs | 3 +- .../index_controller/index_resolver/error.rs | 3 +- .../index_resolver/index_store.rs | 13 +- .../index_controller/index_resolver/mod.rs | 57 +-- .../index_resolver/uuid_store.rs | 2 +- meilisearch-lib/src/index_controller/mod.rs | 81 +++-- .../src/index_controller/snapshot.rs | 247 +++++++------ .../src/index_controller/update_file_store.rs | 2 +- .../src/index_controller/updates/error.rs | 10 +- .../src/index_controller/updates/message.rs | 6 +- .../src/index_controller/updates/mod.rs | 18 +- .../src/index_controller/updates/status.rs | 7 +- .../index_controller/updates/store/dump.rs | 37 +- .../src/index_controller/updates/store/mod.rs | 325 +++++++++--------- meilisearch-lib/src/lib.rs | 3 +- meilisearch-lib/src/options.rs | 1 - 36 files changed, 596 insertions(+), 473 deletions(-) diff --git a/meilisearch-http/src/analytics.rs b/meilisearch-http/src/analytics.rs index 8d91c9e9c..596b69aa0 100644 --- a/meilisearch-http/src/analytics.rs +++ b/meilisearch-http/src/analytics.rs @@ -2,9 +2,9 @@ use std::hash::{Hash, Hasher}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use log::debug; +use meilisearch_lib::MeiliSearch; use serde::Serialize; use siphasher::sip::SipHasher; -use meilisearch_lib::MeiliSearch; use crate::Opt; diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index bea39ffdd..0e479b122 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -92,11 +92,7 @@ pub fn setup_temp_dir(db_path: impl AsRef) -> anyhow::Result<()> { Ok(()) } -pub fn configure_data( - config: &mut web::ServiceConfig, - data: MeiliSearch, - opt: &Opt, - ) { +pub fn configure_data(config: &mut web::ServiceConfig, data: MeiliSearch, opt: &Opt) { let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize; config .app_data(data) @@ -120,9 +116,9 @@ pub fn configure_auth(config: &mut web::ServiceConfig, opts: &Opt) { master: opts.master_key.clone(), private: None, public: None, - }; + }; - keys.generate_missing_api_keys(); + keys.generate_missing_api_keys(); let auth_config = if let Some(ref master_key) = keys.master { let private_key = keys.private.as_ref().unwrap(); @@ -139,8 +135,7 @@ pub fn configure_auth(config: &mut web::ServiceConfig, opts: &Opt) { AuthConfig::NoAuth }; - config.app_data(auth_config) - .app_data(keys); + config.app_data(auth_config).app_data(keys); } #[cfg(feature = "mini-dashboard")] diff --git a/meilisearch-http/src/main.rs b/meilisearch-http/src/main.rs index 3c7a34ddf..52892c3d6 100644 --- a/meilisearch-http/src/main.rs +++ b/meilisearch-http/src/main.rs @@ -1,7 +1,7 @@ use std::env; use actix_web::HttpServer; -use meilisearch_http::{Opt, create_app, setup_meilisearch}; +use meilisearch_http::{create_app, setup_meilisearch, Opt}; use meilisearch_lib::MeiliSearch; use structopt::StructOpt; @@ -23,7 +23,6 @@ fn setup(opt: &Opt) -> anyhow::Result<()> { log_builder.init(); - Ok(()) } @@ -36,7 +35,9 @@ async fn main() -> anyhow::Result<()> { match opt.env.as_ref() { "production" => { if opt.master_key.is_none() { - anyhow::bail!("In production mode, the environment variable MEILI_MASTER_KEY is mandatory") + anyhow::bail!( + "In production mode, the environment variable MEILI_MASTER_KEY is mandatory" + ) } } "development" => (), diff --git a/meilisearch-http/src/option.rs b/meilisearch-http/src/option.rs index 72fbeab44..20e3be38d 100644 --- a/meilisearch-http/src/option.rs +++ b/meilisearch-http/src/option.rs @@ -1,16 +1,16 @@ +use std::fs; use std::io::{BufReader, Read}; use std::path::PathBuf; use std::sync::Arc; -use std::fs; use byte_unit::Byte; +use meilisearch_lib::options::IndexerOpts; use rustls::internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys}; use rustls::{ AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient, NoClientAuth, RootCertStore, }; use structopt::StructOpt; -use meilisearch_lib::options::IndexerOpts; const POSSIBLE_ENV: [&str; 2] = ["development", "production"]; @@ -173,24 +173,30 @@ impl Opt { } fn load_certs(filename: PathBuf) -> anyhow::Result> { - let certfile = fs::File::open(filename).map_err(|_| anyhow::anyhow!("cannot open certificate file"))?; + let certfile = + fs::File::open(filename).map_err(|_| anyhow::anyhow!("cannot open certificate file"))?; let mut reader = BufReader::new(certfile); - Ok(certs(&mut reader).map_err(|_| anyhow::anyhow!("cannot read certificate file"))?) + certs(&mut reader).map_err(|_| anyhow::anyhow!("cannot read certificate file")) } fn load_private_key(filename: PathBuf) -> anyhow::Result { let rsa_keys = { - let keyfile = - fs::File::open(filename.clone()).map_err(|_| anyhow::anyhow!("cannot open private key file"))?; + let keyfile = fs::File::open(filename.clone()) + .map_err(|_| anyhow::anyhow!("cannot open private key file"))?; let mut reader = BufReader::new(keyfile); - rsa_private_keys(&mut reader).map_err(|_| anyhow::anyhow!("file contains invalid rsa private key"))? + rsa_private_keys(&mut reader) + .map_err(|_| anyhow::anyhow!("file contains invalid rsa private key"))? }; let pkcs8_keys = { - let keyfile = fs::File::open(filename).map_err(|_| anyhow::anyhow!("cannot open private key file"))?; + let keyfile = fs::File::open(filename) + .map_err(|_| anyhow::anyhow!("cannot open private key file"))?; let mut reader = BufReader::new(keyfile); - pkcs8_private_keys(&mut reader) - .map_err(|_| anyhow::anyhow!("file contains invalid pkcs8 private key (encrypted keys not supported)"))? + pkcs8_private_keys(&mut reader).map_err(|_| { + anyhow::anyhow!( + "file contains invalid pkcs8 private key (encrypted keys not supported)" + ) + })? }; // prefer to load pkcs8 keys diff --git a/meilisearch-http/src/routes/dump.rs b/meilisearch-http/src/routes/dump.rs index 494e97516..cbf89ddea 100644 --- a/meilisearch-http/src/routes/dump.rs +++ b/meilisearch-http/src/routes/dump.rs @@ -11,7 +11,9 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::resource("/{dump_uid}/status").route(web::get().to(get_dump_status))); } -pub async fn create_dump(meilisearch: GuardedData) -> Result { +pub async fn create_dump( + meilisearch: GuardedData, +) -> Result { let res = meilisearch.create_dump().await?; debug!("returns: {:?}", res); diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index cf939bccd..e89b75f28 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -1,11 +1,11 @@ use actix_web::error::PayloadError; -use actix_web::{web, HttpResponse}; use actix_web::web::Bytes; +use actix_web::{web, HttpResponse}; use futures::{Stream, StreamExt}; use log::debug; -use meilisearch_lib::MeiliSearch; use meilisearch_lib::index_controller::{DocumentAdditionFormat, Update}; use meilisearch_lib::milli::update::IndexDocumentsMethod; +use meilisearch_lib::MeiliSearch; use serde::Deserialize; use serde_json::Value; use tokio::sync::mpsc; @@ -38,7 +38,7 @@ guard_content_type!(guard_json, "application/json"); */ /// This is required because Payload is not Sync nor Send -fn payload_to_stream(mut payload: Payload) -> impl Stream> { +fn payload_to_stream(mut payload: Payload) -> impl Stream> { let (snd, recv) = mpsc::channel(1); tokio::task::spawn_local(async move { while let Some(data) = payload.next().await { @@ -104,9 +104,14 @@ pub async fn delete_document( meilisearch: GuardedData, path: web::Path, ) -> Result { - let DocumentParam { document_id, index_uid } = path.into_inner(); + let DocumentParam { + document_id, + index_uid, + } = path.into_inner(); let update = Update::DeleteDocuments(vec![document_id]); - let update_status = meilisearch.register_update(index_uid, update, false).await?; + let update_status = meilisearch + .register_update(index_uid, update, false) + .await?; debug!("returns: {:?}", update_status); Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) } @@ -216,7 +221,9 @@ pub async fn delete_documents( .collect(); let update = Update::DeleteDocuments(ids); - let update_status = meilisearch.register_update(path.into_inner().index_uid, update, false).await?; + let update_status = meilisearch + .register_update(path.into_inner().index_uid, update, false) + .await?; debug!("returns: {:?}", update_status); Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) } @@ -226,7 +233,9 @@ pub async fn clear_all_documents( path: web::Path, ) -> Result { let update = Update::ClearDocuments; - let update_status = meilisearch.register_update(path.into_inner().index_uid, update, false).await?; + let update_status = meilisearch + .register_update(path.into_inner().index_uid, update, false) + .await?; debug!("returns: {:?}", update_status); Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) } diff --git a/meilisearch-http/src/routes/indexes/mod.rs b/meilisearch-http/src/routes/indexes/mod.rs index b10b5a004..0d0132d05 100644 --- a/meilisearch-http/src/routes/indexes/mod.rs +++ b/meilisearch-http/src/routes/indexes/mod.rs @@ -1,8 +1,8 @@ use actix_web::{web, HttpResponse}; use chrono::{DateTime, Utc}; use log::debug; -use meilisearch_lib::MeiliSearch; use meilisearch_lib::index_controller::IndexSettings; +use meilisearch_lib::MeiliSearch; use serde::{Deserialize, Serialize}; use crate::error::ResponseError; @@ -36,7 +36,9 @@ pub fn configure(cfg: &mut web::ServiceConfig) { ); } -pub async fn list_indexes(data: GuardedData) -> Result { +pub async fn list_indexes( + data: GuardedData, +) -> Result { let indexes = data.list_indexes().await?; debug!("returns: {:?}", indexes); Ok(HttpResponse::Ok().json(indexes)) diff --git a/meilisearch-http/src/routes/indexes/settings.rs b/meilisearch-http/src/routes/indexes/settings.rs index 24fd469d3..7e6033180 100644 --- a/meilisearch-http/src/routes/indexes/settings.rs +++ b/meilisearch-http/src/routes/indexes/settings.rs @@ -1,12 +1,12 @@ use log::debug; -use actix_web::{HttpResponse, web}; -use meilisearch_lib::MeiliSearch; +use actix_web::{web, HttpResponse}; use meilisearch_lib::index::{Settings, Unchecked}; use meilisearch_lib::index_controller::Update; +use meilisearch_lib::MeiliSearch; -use crate::extractors::authentication::{policies::*, GuardedData}; use crate::error::ResponseError; +use crate::extractors::authentication::{policies::*, GuardedData}; #[macro_export] macro_rules! make_setting_route { diff --git a/meilisearch-http/src/routes/indexes/updates.rs b/meilisearch-http/src/routes/indexes/updates.rs index cfef5ba63..2923736b7 100644 --- a/meilisearch-http/src/routes/indexes/updates.rs +++ b/meilisearch-http/src/routes/indexes/updates.rs @@ -53,7 +53,9 @@ pub async fn get_all_updates_status( meilisearch: GuardedData, path: web::Path, ) -> Result { - let metas = meilisearch.all_update_status(path.into_inner().index_uid).await?; + let metas = meilisearch + .all_update_status(path.into_inner().index_uid) + .await?; let metas = metas .into_iter() .map(UpdateStatusResponse::from) diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index 3a5f84f18..382147f31 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -6,8 +6,8 @@ use log::debug; use meilisearch_lib::index_controller::updates::status::{UpdateResult, UpdateStatus}; use serde::{Deserialize, Serialize}; -use meilisearch_lib::{MeiliSearch, Update}; use meilisearch_lib::index::{Settings, Unchecked}; +use meilisearch_lib::{MeiliSearch, Update}; use crate::error::ResponseError; use crate::extractors::authentication::{policies::*, GuardedData}; @@ -52,7 +52,7 @@ impl From<&UpdateStatus> for UpdateType { fn from(other: &UpdateStatus) -> Self { use meilisearch_lib::milli::update::IndexDocumentsMethod::*; match other.meta() { - Update::DocumentAddition{ method, .. } => { + Update::DocumentAddition { method, .. } => { let number = match other { UpdateStatus::Processed(processed) => match processed.success { UpdateResult::DocumentsAddition(ref addition) => { @@ -233,7 +233,9 @@ pub async fn running() -> HttpResponse { HttpResponse::Ok().json(serde_json::json!({ "status": "MeiliSearch is running" })) } -async fn get_stats(meilisearch: GuardedData) -> Result { +async fn get_stats( + meilisearch: GuardedData, +) -> Result { let response = meilisearch.get_all_stats().await?; debug!("returns: {:?}", response); diff --git a/meilisearch-http/tests/common/server.rs b/meilisearch-http/tests/common/server.rs index ef2e51355..82666fc57 100644 --- a/meilisearch-http/tests/common/server.rs +++ b/meilisearch-http/tests/common/server.rs @@ -35,7 +35,10 @@ impl Server { let options = default_settings(dir.path()); let meilisearch = setup_meilisearch(&options).unwrap(); - let service = Service { meilisearch, options }; + let service = Service { + meilisearch, + options, + }; Server { service, @@ -45,7 +48,10 @@ impl Server { pub async fn new_with_options(options: Opt) -> Self { let meilisearch = setup_meilisearch(&options).unwrap(); - let service = Service { meilisearch, options }; + let service = Service { + meilisearch, + options, + }; Server { service, diff --git a/meilisearch-http/tests/common/service.rs b/meilisearch-http/tests/common/service.rs index 1450a6dd9..8a3b07c1d 100644 --- a/meilisearch-http/tests/common/service.rs +++ b/meilisearch-http/tests/common/service.rs @@ -2,7 +2,7 @@ use actix_web::{http::StatusCode, test}; use meilisearch_lib::MeiliSearch; use serde_json::Value; -use meilisearch_http::{Opt, create_app}; +use meilisearch_http::{create_app, Opt}; pub struct Service { pub meilisearch: MeiliSearch, diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index a0436c67d..42fdc7509 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -16,7 +16,12 @@ async fn add_documents_test_json_content_types() { // this is a what is expected and should work let server = Server::new().await; - let app = test::init_service(create_app!(&server.service.meilisearch, true, &server.service.options)).await; + let app = test::init_service(create_app!( + &server.service.meilisearch, + true, + &server.service.options + )) + .await; let req = test::TestRequest::post() .uri("/indexes/dog/documents") .set_payload(document.to_string()) @@ -41,7 +46,12 @@ async fn add_documents_test_no_content_types() { ]); let server = Server::new().await; - let app = test::init_service(create_app!(&server.service.meilisearch, true, &server.service.options)).await; + let app = test::init_service(create_app!( + &server.service.meilisearch, + true, + &server.service.options + )) + .await; let req = test::TestRequest::post() .uri("/indexes/dog/documents") .set_payload(document.to_string()) @@ -67,7 +77,12 @@ async fn add_documents_test_bad_content_types() { ]); let server = Server::new().await; - let app = test::init_service(create_app!(&server.service.meilisearch, true, &server.service.options)).await; + let app = test::init_service(create_app!( + &server.service.meilisearch, + true, + &server.service.options + )) + .await; let req = test::TestRequest::post() .uri("/indexes/dog/documents") .set_payload(document.to_string()) diff --git a/meilisearch-lib/src/compression.rs b/meilisearch-lib/src/compression.rs index cd60854c6..a71a02a55 100644 --- a/meilisearch-lib/src/compression.rs +++ b/meilisearch-lib/src/compression.rs @@ -17,10 +17,10 @@ pub fn to_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Resul } //pub fn from_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { - //let f = File::open(&src)?; - //let gz = GzDecoder::new(f); - //let mut ar = Archive::new(gz); - //create_dir_all(&dest)?; - //ar.unpack(&dest)?; - //Ok(()) +//let f = File::open(&src)?; +//let gz = GzDecoder::new(f); +//let mut ar = Archive::new(gz); +//create_dir_all(&dest)?; +//ar.unpack(&dest)?; +//Ok(()) //} diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs index a535ec686..297c89831 100644 --- a/meilisearch-lib/src/document_formats.rs +++ b/meilisearch-lib/src/document_formats.rs @@ -1,4 +1,7 @@ -use std::{fmt, io::{Read, Seek, Write}}; +use std::{ + fmt, + io::{Read, Seek, Write}, +}; use milli::documents::DocumentBatchBuilder; use serde_json::{Deserializer, Map, Value}; @@ -25,12 +28,13 @@ pub enum DocumentFormatError { #[error("Internal error: {0}")] Internal(Box), #[error("{0}. The {1} payload provided is malformed.")] - MalformedPayload(Box, PayloadType), + MalformedPayload( + Box, + PayloadType, + ), } -internal_error!( - DocumentFormatError: milli::documents::Error -); +internal_error!(DocumentFormatError: milli::documents::Error); macro_rules! malformed { ($type:path, $e:expr) => { @@ -57,7 +61,8 @@ pub fn read_jsonl(input: impl Read, writer: impl Write + Seek) -> Result<()> { pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<()> { let mut builder = DocumentBatchBuilder::new(writer).unwrap(); - let documents: Vec> = malformed!(PayloadType::Json, serde_json::from_reader(input))?; + let documents: Vec> = + malformed!(PayloadType::Json, serde_json::from_reader(input))?; builder.add_documents(documents).unwrap(); builder.finish().unwrap(); diff --git a/meilisearch-lib/src/index/dump.rs b/meilisearch-lib/src/index/dump.rs index 8049df500..f6e081760 100644 --- a/meilisearch-lib/src/index/dump.rs +++ b/meilisearch-lib/src/index/dump.rs @@ -151,7 +151,9 @@ impl Index { //If the document file is empty, we don't perform the document addition, to prevent //a primary key error to be thrown. if !documents_reader.is_empty() { - let builder = update_handler.update_builder(0).index_documents(&mut txn, &index); + let builder = update_handler + .update_builder(0) + .index_documents(&mut txn, &index); builder.execute(documents_reader, |_, _| ())?; } diff --git a/meilisearch-lib/src/index/mod.rs b/meilisearch-lib/src/index/mod.rs index 0e4375517..899c830a5 100644 --- a/meilisearch-lib/src/index/mod.rs +++ b/meilisearch-lib/src/index/mod.rs @@ -8,17 +8,17 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use heed::{EnvOpenOptions, RoTxn}; use milli::update::Setting; -use milli::{FieldDistribution, FieldId, obkv_to_json}; +use milli::{obkv_to_json, FieldDistribution, FieldId}; +use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; -use serde::{Serialize, Deserialize}; use error::Result; pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; -pub use updates::{Checked, Facets, Settings, Unchecked, apply_settings_to_builder}; +pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecked}; use uuid::Uuid; -use crate::EnvSizer; use crate::index_controller::update_file_store::UpdateFileStore; +use crate::EnvSizer; use self::error::IndexError; use self::update_handler::UpdateHandler; @@ -75,11 +75,11 @@ impl IndexMeta { #[derivative(Debug)] pub struct Index { pub uuid: Uuid, - #[derivative(Debug="ignore")] + #[derivative(Debug = "ignore")] pub inner: Arc, - #[derivative(Debug="ignore")] + #[derivative(Debug = "ignore")] update_file_store: Arc, - #[derivative(Debug="ignore")] + #[derivative(Debug = "ignore")] update_handler: Arc, } @@ -92,12 +92,23 @@ impl Deref for Index { } impl Index { - pub fn open(path: impl AsRef, size: usize, update_file_store: Arc, uuid: Uuid, update_handler: Arc) -> Result { + pub fn open( + path: impl AsRef, + size: usize, + update_file_store: Arc, + uuid: Uuid, + update_handler: Arc, + ) -> Result { create_dir_all(&path)?; let mut options = EnvOpenOptions::new(); options.map_size(size); let inner = Arc::new(milli::Index::new(options, &path)?); - Ok(Index { inner, update_file_store, uuid, update_handler }) + Ok(Index { + inner, + update_file_store, + uuid, + update_handler, + }) } pub fn stats(&self) -> Result { @@ -268,7 +279,9 @@ impl Index { create_dir_all(&dst)?; dst.push("data.mdb"); let _txn = self.write_txn()?; - self.inner.env.copy_to_path(dst, heed::CompactionOption::Enabled)?; + self.inner + .env + .copy_to_path(dst, heed::CompactionOption::Enabled)?; Ok(()) } } diff --git a/meilisearch-lib/src/index/search.rs b/meilisearch-lib/src/index/search.rs index 70d0510ac..a0ea26127 100644 --- a/meilisearch-lib/src/index/search.rs +++ b/meilisearch-lib/src/index/search.rs @@ -6,9 +6,7 @@ use either::Either; use heed::RoTxn; use indexmap::IndexMap; use meilisearch_tokenizer::{Analyzer, AnalyzerConfig, Token}; -use milli::{ - AscDesc, FieldId, FieldsIdsMap, FilterCondition, MatchingWords, SortError -}; +use milli::{AscDesc, FieldId, FieldsIdsMap, FilterCondition, MatchingWords, SortError}; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -685,7 +683,7 @@ fn parse_filter_array( } } - Ok(FilterCondition::from_array(txn, &index, ands)?) + Ok(FilterCondition::from_array(txn, index, ands)?) } #[cfg(test)] diff --git a/meilisearch-lib/src/index/updates.rs b/meilisearch-lib/src/index/updates.rs index b5035443a..b5de21403 100644 --- a/meilisearch-lib/src/index/updates.rs +++ b/meilisearch-lib/src/index/updates.rs @@ -8,11 +8,11 @@ use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder}; use serde::{Deserialize, Serialize, Serializer}; use uuid::Uuid; -use crate::Update; use crate::index_controller::updates::status::{Failed, Processed, Processing, UpdateResult}; +use crate::Update; -use super::{Index, IndexMeta}; use super::error::{IndexError, Result}; +use super::{Index, IndexMeta}; fn serialize_with_wildcard( field: &Setting>, @@ -170,18 +170,26 @@ impl Index { let result = (|| { let mut txn = self.write_txn()?; let result = match update.meta() { - Update::DocumentAddition { primary_key, content_uuid, method } => { - self.update_documents(&mut txn, *method, *content_uuid, update_builder, primary_key.as_deref()) - } + Update::DocumentAddition { + primary_key, + content_uuid, + method, + } => self.update_documents( + &mut txn, + *method, + *content_uuid, + update_builder, + primary_key.as_deref(), + ), Update::Settings(settings) => { let settings = settings.clone().check(); self.update_settings(&mut txn, &settings, update_builder) - }, + } Update::ClearDocuments => { let builder = update_builder.clear_documents(&mut txn, self); let _count = builder.execute()?; Ok(UpdateResult::Other) - }, + } Update::DeleteDocuments(ids) => { let mut builder = update_builder.delete_documents(&mut txn, self)?; @@ -276,7 +284,10 @@ impl Index { } } -pub fn apply_settings_to_builder(settings: &Settings, builder: &mut milli::update::Settings) { +pub fn apply_settings_to_builder( + settings: &Settings, + builder: &mut milli::update::Settings, +) { match settings.searchable_attributes { Setting::Set(ref names) => builder.set_searchable_fields(names.clone()), Setting::Reset => builder.reset_searchable_fields(), @@ -298,9 +309,7 @@ pub fn apply_settings_to_builder(settings: &Settings, builder: &mut mil } match settings.sortable_attributes { - Setting::Set(ref fields) => { - builder.set_sortable_fields(fields.iter().cloned().collect()) - } + Setting::Set(ref fields) => builder.set_sortable_fields(fields.iter().cloned().collect()), Setting::Reset => builder.reset_sortable_fields(), Setting::NotSet => (), } @@ -318,9 +327,7 @@ pub fn apply_settings_to_builder(settings: &Settings, builder: &mut mil } match settings.synonyms { - Setting::Set(ref synonyms) => { - builder.set_synonyms(synonyms.clone().into_iter().collect()) - } + Setting::Set(ref synonyms) => builder.set_synonyms(synonyms.clone().into_iter().collect()), Setting::Reset => builder.reset_synonyms(), Setting::NotSet => (), } diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs index f474935f0..840fd7ccc 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs @@ -1,5 +1,5 @@ use std::collections::{BTreeMap, BTreeSet}; -use std::fs::{File, create_dir_all}; +use std::fs::{create_dir_all, File}; use std::io::{BufReader, Seek, SeekFrom}; use std::marker::PhantomData; use std::path::Path; @@ -17,10 +17,7 @@ use crate::index::update_handler::UpdateHandler; use crate::index_controller::index_resolver::uuid_store::HeedUuidStore; use crate::index_controller::{self, IndexMetadata}; use crate::index_controller::{asc_ranking_rule, desc_ranking_rule}; -use crate::{ - index::Unchecked, - options::IndexerOpts, -}; +use crate::{index::Unchecked, options::IndexerOpts}; #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] @@ -136,16 +133,16 @@ fn load_index( //If the document file is empty, we don't perform the document addition, to prevent //a primary key error to be thrown. if !documents_reader.is_empty() { - let builder = update_handler.update_builder(0).index_documents(&mut txn, &index); + let builder = update_handler + .update_builder(0) + .index_documents(&mut txn, &index); builder.execute(documents_reader, |_, _| ())?; } txn.commit()?; // Finaly, we extract the original milli::Index and close it - index - .prepare_for_closing() - .wait(); + index.prepare_for_closing().wait(); // Updates are ignored in dumps V1. diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs index 82b8d1355..72a83a505 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/mod.rs @@ -158,8 +158,7 @@ impl DumpTask { create_dir_all(&self.path).await?; - let temp_dump_dir = - tokio::task::spawn_blocking(|| tempfile::TempDir::new()).await??; + let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; let temp_dump_path = temp_dump_dir.path().to_owned(); let meta = Metadata::new_v2(self.index_db_size, self.update_db_size); diff --git a/meilisearch-lib/src/index_controller/index_resolver/error.rs b/meilisearch-lib/src/index_controller/index_resolver/error.rs index af61a99de..661b9bde3 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/error.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/error.rs @@ -27,7 +27,8 @@ pub enum IndexResolverError { } impl From> for IndexResolverError -where T: Send + Sync + 'static + fmt::Debug +where + T: Send + Sync + 'static + fmt::Debug, { fn from(other: tokio::sync::mpsc::error::SendError) -> Self { Self::Internal(Box::new(other)) diff --git a/meilisearch-lib/src/index_controller/index_resolver/index_store.rs b/meilisearch-lib/src/index_controller/index_resolver/index_store.rs index 5969108de..047711a96 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/index_store.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/index_store.rs @@ -9,8 +9,8 @@ use tokio::task::spawn_blocking; use uuid::Uuid; use super::error::{IndexResolverError, Result}; -use crate::index::Index; use crate::index::update_handler::UpdateHandler; +use crate::index::Index; use crate::index_controller::update_file_store::UpdateFileStore; use crate::options::IndexerOpts; @@ -32,7 +32,11 @@ pub struct MapIndexStore { } impl MapIndexStore { - pub fn new(path: impl AsRef, index_size: usize, indexer_opts: &IndexerOpts) -> anyhow::Result { + pub fn new( + path: impl AsRef, + index_size: usize, + indexer_opts: &IndexerOpts, + ) -> anyhow::Result { let update_handler = Arc::new(UpdateHandler::new(indexer_opts)?); let update_file_store = Arc::new(UpdateFileStore::new(path.as_ref()).unwrap()); let path = path.as_ref().join("indexes/"); @@ -100,7 +104,10 @@ impl IndexStore for MapIndexStore { let index_size = self.index_size; let file_store = self.update_file_store.clone(); let update_handler = self.update_handler.clone(); - let index = spawn_blocking(move || Index::open(path, index_size, file_store, uuid, update_handler)).await??; + let index = spawn_blocking(move || { + Index::open(path, index_size, file_store, uuid, update_handler) + }) + .await??; self.index_store.write().await.insert(uuid, index.clone()); Ok(Some(index)) } diff --git a/meilisearch-lib/src/index_controller/index_resolver/mod.rs b/meilisearch-lib/src/index_controller/index_resolver/mod.rs index f04b3f42b..9f86f7b08 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/mod.rs @@ -1,19 +1,26 @@ -pub mod uuid_store; -mod index_store; pub mod error; +mod index_store; +pub mod uuid_store; use std::path::Path; -use uuid::Uuid; -use uuid_store::{UuidStore, HeedUuidStore}; +use error::{IndexResolverError, Result}; use index_store::{IndexStore, MapIndexStore}; -use error::{Result, IndexResolverError}; +use uuid::Uuid; +use uuid_store::{HeedUuidStore, UuidStore}; -use crate::{index::{Index, update_handler::UpdateHandler}, options::IndexerOpts}; +use crate::{ + index::{update_handler::UpdateHandler, Index}, + options::IndexerOpts, +}; pub type HardStateIndexResolver = IndexResolver; -pub fn create_index_resolver(path: impl AsRef, index_size: usize, indexer_opts: &IndexerOpts) -> anyhow::Result { +pub fn create_index_resolver( + path: impl AsRef, + index_size: usize, + indexer_opts: &IndexerOpts, +) -> anyhow::Result { let uuid_store = HeedUuidStore::new(&path)?; let index_store = MapIndexStore::new(&path, index_size, indexer_opts)?; Ok(IndexResolver::new(uuid_store, index_store)) @@ -30,7 +37,7 @@ impl IndexResolver { dst: impl AsRef, index_db_size: usize, indexer_opts: &IndexerOpts, - ) -> anyhow::Result<()> { + ) -> anyhow::Result<()> { HeedUuidStore::load_dump(&src, &dst)?; let indexes_path = src.as_ref().join("indexes"); @@ -46,14 +53,12 @@ impl IndexResolver { } } -impl IndexResolver -where U: UuidStore, - I: IndexStore, +impl IndexResolver +where + U: UuidStore, + I: IndexStore, { - pub fn new( - index_uuid_store: U, - index_store: I, - ) -> Self { + pub fn new(index_uuid_store: U, index_store: I) -> Self { Self { index_uuid_store, index_store, @@ -75,7 +80,10 @@ where U: UuidStore, } pub async fn snapshot(&self, path: impl AsRef) -> Result> { - let uuids = self.index_uuid_store.snapshot(path.as_ref().to_owned()).await?; + let uuids = self + .index_uuid_store + .snapshot(path.as_ref().to_owned()) + .await?; let mut indexes = Vec::new(); for uuid in uuids { indexes.push(self.get_index_by_uuid(uuid).await?); @@ -99,13 +107,11 @@ where U: UuidStore, let mut indexes = Vec::new(); for (name, uuid) in uuids { match self.index_store.get(uuid).await? { - Some(index) => { - indexes.push((name, index)) - }, + Some(index) => indexes.push((name, index)), None => { // we found an unexisting index, we remove it from the uuid store let _ = self.index_uuid_store.delete(name).await; - }, + } } } @@ -124,7 +130,10 @@ where U: UuidStore, pub async fn get_index_by_uuid(&self, uuid: Uuid) -> Result { // TODO: Handle this error better. - self.index_store.get(uuid).await?.ok_or(IndexResolverError::UnexistingIndex(String::new())) + self.index_store + .get(uuid) + .await? + .ok_or_else(|| IndexResolverError::UnexistingIndex(String::new())) } pub async fn get_index(&self, uid: String) -> Result { @@ -137,17 +146,17 @@ where U: UuidStore, // and remove the uuid from th uuid store. let _ = self.index_uuid_store.delete(name.clone()).await; Err(IndexResolverError::UnexistingIndex(name)) - }, + } } } - (name, _) => Err(IndexResolverError::UnexistingIndex(name)) + (name, _) => Err(IndexResolverError::UnexistingIndex(name)), } } pub async fn get_uuid(&self, uid: String) -> Result { match self.index_uuid_store.get_uuid(uid).await? { (_, Some(uuid)) => Ok(uuid), - (name, _) => Err(IndexResolverError::UnexistingIndex(name)) + (name, _) => Err(IndexResolverError::UnexistingIndex(name)), } } } diff --git a/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs b/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs index a4bcd17d4..3e582944d 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs @@ -8,7 +8,7 @@ use heed::{CompactionOption, Database, Env, EnvOpenOptions}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use super::error::{Result, IndexResolverError}; +use super::error::{IndexResolverError, Result}; use crate::EnvSizer; const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 0dee6521f..f117369fd 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -18,25 +18,27 @@ use dump_actor::DumpActorHandle; pub use dump_actor::{DumpInfo, DumpStatus}; use snapshot::load_snapshot; -use crate::index::{Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked}; +use crate::index::error::Result as IndexResult; +use crate::index::{ + Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, +}; use crate::index_controller::index_resolver::create_index_resolver; use crate::index_controller::snapshot::SnapshotService; use crate::options::IndexerOpts; use error::Result; -use crate::index::error::{Result as IndexResult}; use self::dump_actor::load_dump; -use self::index_resolver::HardStateIndexResolver; use self::index_resolver::error::IndexResolverError; +use self::index_resolver::HardStateIndexResolver; use self::updates::status::UpdateStatus; use self::updates::UpdateMsg; mod dump_actor; pub mod error; +mod index_resolver; mod snapshot; pub mod update_file_store; pub mod updates; -mod index_resolver; pub type Payload = Box< dyn Stream> + Send + Sync + 'static + Unpin, @@ -79,6 +81,7 @@ pub struct Stats { pub indexes: BTreeMap, } +#[allow(clippy::large_enum_variant)] #[derive(derivative::Derivative)] #[derivative(Debug)] pub enum Update { @@ -86,7 +89,7 @@ pub enum Update { ClearDocuments, Settings(Settings), DocumentAddition { - #[derivative(Debug="ignore")] + #[derivative(Debug = "ignore")] payload: Payload, primary_key: Option, method: IndexDocumentsMethod, @@ -141,12 +144,19 @@ impl IndexControllerBuilder { std::fs::create_dir_all(db_path.as_ref())?; - let index_resolver = Arc::new(create_index_resolver(&db_path, index_size, &indexer_options)?); + let index_resolver = Arc::new(create_index_resolver( + &db_path, + index_size, + &indexer_options, + )?); #[allow(unreachable_code)] - let update_sender = updates::create_update_handler(index_resolver.clone(), &db_path, update_store_size)?; + let update_sender = + updates::create_update_handler(index_resolver.clone(), &db_path, update_store_size)?; - let dump_path = self.dump_dst.ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?; + let dump_path = self + .dump_dst + .ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?; let dump_handle = dump_actor::DumpActorHandleImpl::new( dump_path, index_resolver.clone(), @@ -159,13 +169,15 @@ impl IndexControllerBuilder { let snapshot_service = SnapshotService::new( index_resolver.clone(), update_sender.clone(), - self.snapshot_interval.ok_or_else(|| anyhow::anyhow!("Snapshot interval not provided."))?, - self.snapshot_dir.ok_or_else(|| anyhow::anyhow!("Snapshot path not provided."))?, + self.snapshot_interval + .ok_or_else(|| anyhow::anyhow!("Snapshot interval not provided."))?, + self.snapshot_dir + .ok_or_else(|| anyhow::anyhow!("Snapshot path not provided."))?, db_path - .as_ref() - .file_name() - .map(|n| n.to_owned().into_string().expect("invalid path")) - .unwrap_or_else(|| String::from("data.ms")), + .as_ref() + .file_name() + .map(|n| n.to_owned().into_string().expect("invalid path")) + .unwrap_or_else(|| String::from("data.ms")), ); tokio::task::spawn(snapshot_service.run()); @@ -246,7 +258,12 @@ impl IndexController { IndexControllerBuilder::default() } - pub async fn register_update(&self, uid: String, update: Update, create_index: bool) -> Result { + pub async fn register_update( + &self, + uid: String, + update: Update, + create_index: bool, + ) -> Result { match self.index_resolver.get_uuid(uid).await { Ok(uuid) => { let update_result = UpdateMsg::update(&self.update_sender, uuid, update).await?; @@ -255,12 +272,13 @@ impl IndexController { Err(IndexResolverError::UnexistingIndex(name)) => { if create_index { let index = self.index_resolver.create_index(name, None).await?; - let update_result = UpdateMsg::update(&self.update_sender, index.uuid, update).await?; + let update_result = + UpdateMsg::update(&self.update_sender, index.uuid, update).await?; // ignore if index creation fails now, since it may already have been created Ok(update_result) } else { - Err(IndexResolverError::UnexistingIndex(name).into()) + Err(IndexResolverError::UnexistingIndex(name).into()) } } Err(e) => Err(e.into()), @@ -310,7 +328,9 @@ impl IndexController { attributes_to_retrieve: Option>, ) -> Result> { let index = self.index_resolver.get_index(uid).await?; - let documents = spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve)).await??; + let documents = + spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve)) + .await??; Ok(documents) } @@ -321,7 +341,9 @@ impl IndexController { attributes_to_retrieve: Option>, ) -> Result { let index = self.index_resolver.get_index(uid).await?; - let document = spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve)).await??; + let document = + spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve)) + .await??; Ok(document) } @@ -330,12 +352,12 @@ impl IndexController { uid: String, mut index_settings: IndexSettings, ) -> Result { - index_settings.uid.take(); let index = self.index_resolver.get_index(uid.clone()).await?; let uuid = index.uuid; - let meta = spawn_blocking(move || index.update_primary_key(index_settings.primary_key)).await??; + let meta = + spawn_blocking(move || index.update_primary_key(index_settings.primary_key)).await??; let meta = IndexMetadata { uuid, name: uid.clone(), @@ -386,7 +408,8 @@ impl IndexController { let stats = index.stats()?; let meta = index.meta()?; Ok((stats, meta)) - }).await??; + }) + .await??; database_size += stats.size; @@ -415,8 +438,15 @@ impl IndexController { Ok(self.dump_handle.dump_info(uid).await?) } - pub async fn create_index(&self, uid: String, primary_key: Option) -> Result { - let index = self.index_resolver.create_index(uid.clone(), primary_key).await?; + pub async fn create_index( + &self, + uid: String, + primary_key: Option, + ) -> Result { + let index = self + .index_resolver + .create_index(uid.clone(), primary_key) + .await?; let meta = spawn_blocking(move || -> IndexResult<_> { let meta = index.meta()?; let meta = IndexMetadata { @@ -426,7 +456,8 @@ impl IndexController { meta, }; Ok(meta) - }).await??; + }) + .await??; Ok(meta) } diff --git a/meilisearch-lib/src/index_controller/snapshot.rs b/meilisearch-lib/src/index_controller/snapshot.rs index 66bdfe60e..2d83a491c 100644 --- a/meilisearch-lib/src/index_controller/snapshot.rs +++ b/meilisearch-lib/src/index_controller/snapshot.rs @@ -4,14 +4,14 @@ use std::time::Duration; use anyhow::bail; use log::{error, info, trace}; +use tokio::fs; use tokio::task::spawn_blocking; use tokio::time::sleep; -use tokio::fs; use crate::index_controller::updates::UpdateMsg; -use super::updates::UpdateSender; use super::index_resolver::HardStateIndexResolver; +use super::updates::UpdateSender; pub struct SnapshotService { index_resolver: Arc, @@ -56,8 +56,7 @@ impl SnapshotService { let snapshot_dir = self.snapshot_path.clone(); fs::create_dir_all(&snapshot_dir).await?; - let temp_snapshot_dir = - spawn_blocking(move || tempfile::tempdir()).await??; + let temp_snapshot_dir = spawn_blocking(tempfile::tempdir).await??; let temp_snapshot_path = temp_snapshot_dir.path().to_owned(); let indexes = self @@ -99,7 +98,7 @@ pub fn load_snapshot( match crate::from_tar_gz(snapshot_path, &db_path) { Ok(()) => Ok(()), Err(e) => { - //clean created db folder + //clean created db folder std::fs::remove_dir_all(&db_path)?; Err(e) } @@ -127,131 +126,131 @@ pub fn load_snapshot( //#[cfg(test)] //mod test { - //use std::iter::FromIterator; - //use std::{collections::HashSet, sync::Arc}; +//use std::iter::FromIterator; +//use std::{collections::HashSet, sync::Arc}; - //use futures::future::{err, ok}; - //use rand::Rng; - //use tokio::time::timeout; - //use uuid::Uuid; +//use futures::future::{err, ok}; +//use rand::Rng; +//use tokio::time::timeout; +//use uuid::Uuid; - //use super::*; +//use super::*; - //#[actix_rt::test] - //async fn test_normal() { - //let mut rng = rand::thread_rng(); - //let uuids_num: usize = rng.gen_range(5..10); - //let uuids = (0..uuids_num) - //.map(|_| Uuid::new_v4()) - //.collect::>(); +//#[actix_rt::test] +//async fn test_normal() { +//let mut rng = rand::thread_rng(); +//let uuids_num: usize = rng.gen_range(5..10); +//let uuids = (0..uuids_num) +//.map(|_| Uuid::new_v4()) +//.collect::>(); - //let mut uuid_resolver = MockUuidResolverHandle::new(); - //let uuids_clone = uuids.clone(); - //uuid_resolver - //.expect_snapshot() - //.times(1) - //.returning(move |_| Box::pin(ok(uuids_clone.clone()))); +//let mut uuid_resolver = MockUuidResolverHandle::new(); +//let uuids_clone = uuids.clone(); +//uuid_resolver +//.expect_snapshot() +//.times(1) +//.returning(move |_| Box::pin(ok(uuids_clone.clone()))); - //let uuids_clone = uuids.clone(); - //let mut index_handle = MockIndexActorHandle::new(); - //index_handle - //.expect_snapshot() - //.withf(move |uuid, _path| uuids_clone.contains(uuid)) - //.times(uuids_num) - //.returning(move |_, _| Box::pin(ok(()))); +//let uuids_clone = uuids.clone(); +//let mut index_handle = MockIndexActorHandle::new(); +//index_handle +//.expect_snapshot() +//.withf(move |uuid, _path| uuids_clone.contains(uuid)) +//.times(uuids_num) +//.returning(move |_, _| Box::pin(ok(()))); - //let dir = tempfile::tempdir_in(".").unwrap(); - //let handle = Arc::new(index_handle); - //let update_handle = - //UpdateActorHandleImpl::>::new(handle.clone(), dir.path(), 4096 * 100).unwrap(); +//let dir = tempfile::tempdir_in(".").unwrap(); +//let handle = Arc::new(index_handle); +//let update_handle = +//UpdateActorHandleImpl::>::new(handle.clone(), dir.path(), 4096 * 100).unwrap(); - //let snapshot_path = tempfile::tempdir_in(".").unwrap(); - //let snapshot_service = SnapshotService::new( - //uuid_resolver, - //update_handle, - //Duration::from_millis(100), - //snapshot_path.path().to_owned(), - //"data.ms".to_string(), - //); +//let snapshot_path = tempfile::tempdir_in(".").unwrap(); +//let snapshot_service = SnapshotService::new( +//uuid_resolver, +//update_handle, +//Duration::from_millis(100), +//snapshot_path.path().to_owned(), +//"data.ms".to_string(), +//); - //snapshot_service.perform_snapshot().await.unwrap(); - //} - - //#[actix_rt::test] - //async fn error_performing_uuid_snapshot() { - //let mut uuid_resolver = MockUuidResolverHandle::new(); - //uuid_resolver - //.expect_snapshot() - //.times(1) - ////abitrary error - //.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); - - //let update_handle = MockUpdateActorHandle::new(); - - //let snapshot_path = tempfile::tempdir_in(".").unwrap(); - //let snapshot_service = SnapshotService::new( - //uuid_resolver, - //update_handle, - //Duration::from_millis(100), - //snapshot_path.path().to_owned(), - //"data.ms".to_string(), - //); - - //assert!(snapshot_service.perform_snapshot().await.is_err()); - ////Nothing was written to the file - //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); - //} - - //#[actix_rt::test] - //async fn error_performing_index_snapshot() { - //let uuid = Uuid::new_v4(); - //let mut uuid_resolver = MockUuidResolverHandle::new(); - //uuid_resolver - //.expect_snapshot() - //.times(1) - //.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); - - //let mut update_handle = MockUpdateActorHandle::new(); - //update_handle - //.expect_snapshot() - ////abitrary error - //.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0)))); - - //let snapshot_path = tempfile::tempdir_in(".").unwrap(); - //let snapshot_service = SnapshotService::new( - //uuid_resolver, - //update_handle, - //Duration::from_millis(100), - //snapshot_path.path().to_owned(), - //"data.ms".to_string(), - //); - - //assert!(snapshot_service.perform_snapshot().await.is_err()); - ////Nothing was written to the file - //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); - //} - - //#[actix_rt::test] - //async fn test_loop() { - //let mut uuid_resolver = MockUuidResolverHandle::new(); - //uuid_resolver - //.expect_snapshot() - ////we expect the funtion to be called between 2 and 3 time in the given interval. - //.times(2..4) - ////abitrary error, to short-circuit the function - //.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist))); - - //let update_handle = MockUpdateActorHandle::new(); - - //let snapshot_path = tempfile::tempdir_in(".").unwrap(); - //let snapshot_service = SnapshotService::new( - //uuid_resolver, - //update_handle, - //Duration::from_millis(100), - //snapshot_path.path().to_owned(), - //"data.ms".to_string(), - //); - - //let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await; - //} +//snapshot_service.perform_snapshot().await.unwrap(); +//} + +//#[actix_rt::test] +//async fn error_performing_uuid_snapshot() { +//let mut uuid_resolver = MockUuidResolverHandle::new(); +//uuid_resolver +//.expect_snapshot() +//.times(1) +////abitrary error +//.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); + +//let update_handle = MockUpdateActorHandle::new(); + +//let snapshot_path = tempfile::tempdir_in(".").unwrap(); +//let snapshot_service = SnapshotService::new( +//uuid_resolver, +//update_handle, +//Duration::from_millis(100), +//snapshot_path.path().to_owned(), +//"data.ms".to_string(), +//); + +//assert!(snapshot_service.perform_snapshot().await.is_err()); +////Nothing was written to the file +//assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); +//} + +//#[actix_rt::test] +//async fn error_performing_index_snapshot() { +//let uuid = Uuid::new_v4(); +//let mut uuid_resolver = MockUuidResolverHandle::new(); +//uuid_resolver +//.expect_snapshot() +//.times(1) +//.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); + +//let mut update_handle = MockUpdateActorHandle::new(); +//update_handle +//.expect_snapshot() +////abitrary error +//.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0)))); + +//let snapshot_path = tempfile::tempdir_in(".").unwrap(); +//let snapshot_service = SnapshotService::new( +//uuid_resolver, +//update_handle, +//Duration::from_millis(100), +//snapshot_path.path().to_owned(), +//"data.ms".to_string(), +//); + +//assert!(snapshot_service.perform_snapshot().await.is_err()); +////Nothing was written to the file +//assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); +//} + +//#[actix_rt::test] +//async fn test_loop() { +//let mut uuid_resolver = MockUuidResolverHandle::new(); +//uuid_resolver +//.expect_snapshot() +////we expect the funtion to be called between 2 and 3 time in the given interval. +//.times(2..4) +////abitrary error, to short-circuit the function +//.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist))); + +//let update_handle = MockUpdateActorHandle::new(); + +//let snapshot_path = tempfile::tempdir_in(".").unwrap(); +//let snapshot_service = SnapshotService::new( +//uuid_resolver, +//update_handle, +//Duration::from_millis(100), +//snapshot_path.path().to_owned(), +//"data.ms".to_string(), +//); + +//let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await; +//} //} diff --git a/meilisearch-lib/src/index_controller/update_file_store.rs b/meilisearch-lib/src/index_controller/update_file_store.rs index fed5fe200..f7a7e3a1a 100644 --- a/meilisearch-lib/src/index_controller/update_file_store.rs +++ b/meilisearch-lib/src/index_controller/update_file_store.rs @@ -151,7 +151,7 @@ impl UpdateFileStore { } serde_json::to_writer(&mut dst_file, &document_buffer)?; - dst_file.write(b"\n")?; + dst_file.write_all(b"\n")?; document_buffer.clear(); } diff --git a/meilisearch-lib/src/index_controller/updates/error.rs b/meilisearch-lib/src/index_controller/updates/error.rs index d6c3bcba4..8cbcf211a 100644 --- a/meilisearch-lib/src/index_controller/updates/error.rs +++ b/meilisearch-lib/src/index_controller/updates/error.rs @@ -1,9 +1,12 @@ -use std::fmt; use std::error::Error; +use std::fmt; use meilisearch_error::{Code, ErrorCode}; -use crate::{document_formats::DocumentFormatError, index_controller::update_file_store::UpdateFileStoreError}; +use crate::{ + document_formats::DocumentFormatError, + index_controller::update_file_store::UpdateFileStoreError, +}; pub type Result = std::result::Result; @@ -28,7 +31,8 @@ pub enum UpdateLoopError { } impl From> for UpdateLoopError -where T: Sync + Send + 'static + fmt::Debug +where + T: Sync + Send + 'static + fmt::Debug, { fn from(other: tokio::sync::mpsc::error::SendError) -> Self { Self::Internal(Box::new(other)) diff --git a/meilisearch-lib/src/index_controller/updates/message.rs b/meilisearch-lib/src/index_controller/updates/message.rs index 3b157e568..4249e36f2 100644 --- a/meilisearch-lib/src/index_controller/updates/message.rs +++ b/meilisearch-lib/src/index_controller/updates/message.rs @@ -44,7 +44,11 @@ pub enum UpdateMsg { } impl UpdateMsg { - pub async fn snapshot(sender: &mpsc::Sender, path: PathBuf, indexes: Vec) -> Result<()> { + pub async fn snapshot( + sender: &mpsc::Sender, + path: PathBuf, + indexes: Vec, + ) -> Result<()> { let (ret, rcv) = oneshot::channel(); let msg = Self::Snapshot { path, indexes, ret }; sender.send(msg).await?; diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index 3823f6b70..fad337553 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -80,7 +80,7 @@ impl> + Unpin> io::Rea self.read(buf) } Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)), - None => return Ok(0), + None => Ok(0), }, } } @@ -109,7 +109,13 @@ impl UpdateLoop { let must_exit = Arc::new(AtomicBool::new(false)); let update_file_store = UpdateFileStore::new(&path).unwrap(); - let store = UpdateStore::open(options, &path, index_resolver.clone(), must_exit.clone(), update_file_store.clone())?; + let store = UpdateStore::open( + options, + &path, + index_resolver, + must_exit.clone(), + update_file_store.clone(), + )?; let inbox = Some(inbox); @@ -194,8 +200,8 @@ impl UpdateLoop { update_file.persist()?; Ok(()) - }).await??; - + }) + .await??; store::Update::DocumentAddition { primary_key, @@ -216,7 +222,6 @@ impl UpdateLoop { Ok(status.into()) } - async fn handle_list_updates(&self, uuid: Uuid) -> Result> { let update_store = self.store.clone(); tokio::task::spawn_blocking(move || { @@ -248,8 +253,7 @@ impl UpdateLoop { async fn handle_snapshot(&self, indexes: Vec, path: PathBuf) -> Result<()> { let update_store = self.store.clone(); - tokio::task::spawn_blocking(move || update_store.snapshot(indexes, path)) - .await??; + tokio::task::spawn_blocking(move || update_store.snapshot(indexes, path)).await??; Ok(()) } diff --git a/meilisearch-lib/src/index_controller/updates/status.rs b/meilisearch-lib/src/index_controller/updates/status.rs index 3108fe638..e7f82b343 100644 --- a/meilisearch-lib/src/index_controller/updates/status.rs +++ b/meilisearch-lib/src/index_controller/updates/status.rs @@ -6,7 +6,10 @@ use meilisearch_error::{Code, ErrorCode}; use milli::update::{DocumentAdditionResult, IndexDocumentsMethod}; use serde::{Deserialize, Serialize}; -use crate::{Update, index::{Settings, Unchecked}}; +use crate::{ + index::{Settings, Unchecked}, + Update, +}; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UpdateResult { @@ -160,7 +163,7 @@ impl Display for Failed { } } -impl Error for Failed { } +impl Error for Failed {} impl ErrorCode for Failed { fn error_code(&self) -> Code { diff --git a/meilisearch-lib/src/index_controller/updates/store/dump.rs b/meilisearch-lib/src/index_controller/updates/store/dump.rs index 3356a54b9..298217885 100644 --- a/meilisearch-lib/src/index_controller/updates/store/dump.rs +++ b/meilisearch-lib/src/index_controller/updates/store/dump.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; -use std::path::{Path, PathBuf}; +use std::fs::{create_dir_all, File}; use std::io::{BufReader, Write}; -use std::fs::{File, create_dir_all}; +use std::path::{Path, PathBuf}; use heed::{EnvOpenOptions, RoTxn}; use rayon::prelude::*; @@ -11,7 +11,14 @@ use tempfile::{NamedTempFile, TempDir}; use uuid::Uuid; use super::{Result, State, UpdateStore}; -use crate::{Update, index::Index, index_controller::{update_file_store::UpdateFileStore, updates::status::{Enqueued, UpdateStatus}}}; +use crate::{ + index::Index, + index_controller::{ + update_file_store::UpdateFileStore, + updates::status::{Enqueued, UpdateStatus}, + }, + Update, +}; #[derive(Serialize, Deserialize)] struct UpdateEntry { @@ -20,11 +27,7 @@ struct UpdateEntry { } impl UpdateStore { - pub fn dump( - &self, - indexes: &[Index], - path: PathBuf, - ) -> Result<()> { + pub fn dump(&self, indexes: &[Index], path: PathBuf) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Dumping); @@ -35,7 +38,10 @@ impl UpdateStore { self.dump_updates(&txn, &uuids, &path)?; - indexes.par_iter().try_for_each(|index| index.dump(&path)).unwrap(); + indexes + .par_iter() + .try_for_each(|index| index.dump(&path)) + .unwrap(); Ok(()) } @@ -74,11 +80,13 @@ impl UpdateStore { let update = data.decode()?; if let Enqueued { - meta: Update::DocumentAddition { - content_uuid, .. - }, .. - } = update { - self.update_file_store.dump(content_uuid, &dst_path).unwrap(); + meta: Update::DocumentAddition { content_uuid, .. }, + .. + } = update + { + self.update_file_store + .dump(content_uuid, &dst_path) + .unwrap(); } let update_json = UpdateEntry { @@ -122,7 +130,6 @@ impl UpdateStore { dst: impl AsRef, db_size: usize, ) -> anyhow::Result<()> { - println!("target path: {}", dst.as_ref().display()); let mut options = EnvOpenOptions::new(); diff --git a/meilisearch-lib/src/index_controller/updates/store/mod.rs b/meilisearch-lib/src/index_controller/updates/store/mod.rs index 46786f1ac..bb77250b5 100644 --- a/meilisearch-lib/src/index_controller/updates/store/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/store/mod.rs @@ -17,25 +17,26 @@ use heed::zerocopy::U64; use heed::{CompactionOption, Database, Env, EnvOpenOptions}; use log::error; use parking_lot::{Mutex, MutexGuard}; +use rayon::prelude::*; use tokio::runtime::Handle; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tokio::time::timeout; use uuid::Uuid; -use rayon::prelude::*; use codec::*; use super::error::Result; use super::status::{Enqueued, Processing}; -use crate::EnvSizer; +use crate::index::Index; use crate::index_controller::update_files_path; use crate::index_controller::updates::*; -use crate::index::Index; +use crate::EnvSizer; #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; +#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Update { DeleteDocuments(Vec), @@ -164,7 +165,8 @@ impl UpdateStore { must_exit: Arc, update_file_store: UpdateFileStore, ) -> anyhow::Result> { - let (update_store, mut notification_receiver) = Self::new(options, path, update_file_store)?; + let (update_store, mut notification_receiver) = + Self::new(options, path, update_file_store)?; let update_store = Arc::new(update_store); // Send a first notification to trigger the process. @@ -250,11 +252,7 @@ impl UpdateStore { /// Registers the update content in the pending store and the meta /// into the pending-meta store. Returns the new unique update id. - pub fn register_update( - &self, - index_uuid: Uuid, - update: Update, - ) -> heed::Result { + pub fn register_update(&self, index_uuid: Uuid, update: Update) -> heed::Result { let mut txn = self.env.write_txn()?; let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; let meta = Enqueued::new(update, update_id); @@ -299,7 +297,10 @@ impl UpdateStore { /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, index_resolver: Arc) -> Result> { + fn process_pending_update( + &self, + index_resolver: Arc, + ) -> Result> { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; let first_meta = self.pending_queue.first(&rtxn)?; @@ -315,8 +316,7 @@ impl UpdateStore { let state = self.state.write(); state.swap(State::Processing(index_uuid, processing.clone())); - let result = - self.perform_update(processing, index_resolver, index_uuid, global_id); + let result = self.perform_update(processing, index_resolver, index_uuid, global_id); state.swap(State::Idle); @@ -444,7 +444,7 @@ impl UpdateStore { if uuid == index_uuid { let mut _pending = pending.decode()?; //if let Some(update_uuid) = pending.content.take() { - //uuids_to_remove.push(update_uuid); + //uuids_to_remove.push(update_uuid); //} // Invariant check: we can only delete the current entry when we don't hold @@ -495,15 +495,10 @@ impl UpdateStore { Ok(()) } - pub fn snapshot( - &self, - indexes: Vec, - path: impl AsRef, - ) -> Result<()> { + pub fn snapshot(&self, indexes: Vec, path: impl AsRef) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Snapshoting); - let txn = self.env.write_txn()?; let update_path = path.as_ref().join("updates"); @@ -523,19 +518,22 @@ impl UpdateStore { let ((_, uuid, _), pending) = entry?; if uuids.contains(&uuid) { if let Enqueued { - meta: Update::DocumentAddition { - content_uuid, .. - }, + meta: Update::DocumentAddition { content_uuid, .. }, .. } = pending.decode()? { - self.update_file_store.snapshot(content_uuid, &path).unwrap(); + self.update_file_store + .snapshot(content_uuid, &path) + .unwrap(); } } } let path = path.as_ref().to_owned(); - indexes.par_iter().try_for_each(|index| index.snapshot(path.clone())).unwrap(); + indexes + .par_iter() + .try_for_each(|index| index.snapshot(path.clone())) + .unwrap(); Ok(()) } @@ -546,10 +544,7 @@ impl UpdateStore { for entry in self.pending_queue.iter(&txn)? { let (_, pending) = entry?; if let Enqueued { - meta: store::Update::DocumentAddition { - content_uuid, - .. - }, + meta: store::Update::DocumentAddition { content_uuid, .. }, .. } = pending { @@ -568,147 +563,147 @@ impl UpdateStore { //#[cfg(test)] //mod test { - //use super::*; - //use crate::index_controller::{ - //index_actor::{error::IndexActorError, MockIndexActorHandle}, - //UpdateResult, - //}; +//use super::*; +//use crate::index_controller::{ +//index_actor::{error::IndexActorError, MockIndexActorHandle}, +//UpdateResult, +//}; - //use futures::future::ok; +//use futures::future::ok; - //#[actix_rt::test] - //async fn test_next_id() { - //let dir = tempfile::tempdir_in(".").unwrap(); - //let mut options = EnvOpenOptions::new(); - //let handle = Arc::new(MockIndexActorHandle::new()); - //options.map_size(4096 * 100); - //let update_store = UpdateStore::open( - //options, - //dir.path(), - //handle, - //Arc::new(AtomicBool::new(false)), - //) - //.unwrap(); +//#[actix_rt::test] +//async fn test_next_id() { +//let dir = tempfile::tempdir_in(".").unwrap(); +//let mut options = EnvOpenOptions::new(); +//let handle = Arc::new(MockIndexActorHandle::new()); +//options.map_size(4096 * 100); +//let update_store = UpdateStore::open( +//options, +//dir.path(), +//handle, +//Arc::new(AtomicBool::new(false)), +//) +//.unwrap(); - //let index1_uuid = Uuid::new_v4(); - //let index2_uuid = Uuid::new_v4(); +//let index1_uuid = Uuid::new_v4(); +//let index2_uuid = Uuid::new_v4(); - //let mut txn = update_store.env.write_txn().unwrap(); - //let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); - //txn.commit().unwrap(); - //assert_eq!((0, 0), ids); +//let mut txn = update_store.env.write_txn().unwrap(); +//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); +//txn.commit().unwrap(); +//assert_eq!((0, 0), ids); - //let mut txn = update_store.env.write_txn().unwrap(); - //let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); - //txn.commit().unwrap(); - //assert_eq!((1, 0), ids); +//let mut txn = update_store.env.write_txn().unwrap(); +//let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); +//txn.commit().unwrap(); +//assert_eq!((1, 0), ids); - //let mut txn = update_store.env.write_txn().unwrap(); - //let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); - //txn.commit().unwrap(); - //assert_eq!((2, 1), ids); - //} - - //#[actix_rt::test] - //async fn test_register_update() { - //let dir = tempfile::tempdir_in(".").unwrap(); - //let mut options = EnvOpenOptions::new(); - //let handle = Arc::new(MockIndexActorHandle::new()); - //options.map_size(4096 * 100); - //let update_store = UpdateStore::open( - //options, - //dir.path(), - //handle, - //Arc::new(AtomicBool::new(false)), - //) - //.unwrap(); - //let meta = UpdateMeta::ClearDocuments; - //let uuid = Uuid::new_v4(); - //let store_clone = update_store.clone(); - //tokio::task::spawn_blocking(move || { - //store_clone.register_update(meta, None, uuid).unwrap(); - //}) - //.await - //.unwrap(); - - //let txn = update_store.env.read_txn().unwrap(); - //assert!(update_store - //.pending_queue - //.get(&txn, &(0, uuid, 0)) - //.unwrap() - //.is_some()); - //} - - //#[actix_rt::test] - //async fn test_process_update() { - //let dir = tempfile::tempdir_in(".").unwrap(); - //let mut handle = MockIndexActorHandle::new(); - - //handle - //.expect_update() - //.times(2) - //.returning(|_index_uuid, processing, _file| { - //if processing.id() == 0 { - //Box::pin(ok(Ok(processing.process(UpdateResult::Other)))) - //} else { - //Box::pin(ok(Err( - //processing.fail(IndexActorError::ExistingPrimaryKey.into()) - //))) - //} - //}); - - //let handle = Arc::new(handle); - - //let mut options = EnvOpenOptions::new(); - //options.map_size(4096 * 100); - //let store = UpdateStore::open( - //options, - //dir.path(), - //handle.clone(), - //Arc::new(AtomicBool::new(false)), - //) - //.unwrap(); - - //// wait a bit for the event loop exit. - //tokio::time::sleep(std::time::Duration::from_millis(50)).await; - - //let mut txn = store.env.write_txn().unwrap(); - - //let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); - //let uuid = Uuid::new_v4(); - - //store - //.pending_queue - //.put(&mut txn, &(0, uuid, 0), &update) - //.unwrap(); - - //let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); - - //store - //.pending_queue - //.put(&mut txn, &(1, uuid, 1), &update) - //.unwrap(); - - //txn.commit().unwrap(); - - //// Process the pending, and check that it has been moved to the update databases, and - //// removed from the pending database. - //let store_clone = store.clone(); - //tokio::task::spawn_blocking(move || { - //store_clone.process_pending_update(handle.clone()).unwrap(); - //store_clone.process_pending_update(handle).unwrap(); - //}) - //.await - //.unwrap(); - - //let txn = store.env.read_txn().unwrap(); - - //assert!(store.pending_queue.first(&txn).unwrap().is_none()); - //let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap(); - - //assert!(matches!(update, UpdateStatus::Processed(_))); - //let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap(); - - //assert!(matches!(update, UpdateStatus::Failed(_))); - //} +//let mut txn = update_store.env.write_txn().unwrap(); +//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); +//txn.commit().unwrap(); +//assert_eq!((2, 1), ids); +//} + +//#[actix_rt::test] +//async fn test_register_update() { +//let dir = tempfile::tempdir_in(".").unwrap(); +//let mut options = EnvOpenOptions::new(); +//let handle = Arc::new(MockIndexActorHandle::new()); +//options.map_size(4096 * 100); +//let update_store = UpdateStore::open( +//options, +//dir.path(), +//handle, +//Arc::new(AtomicBool::new(false)), +//) +//.unwrap(); +//let meta = UpdateMeta::ClearDocuments; +//let uuid = Uuid::new_v4(); +//let store_clone = update_store.clone(); +//tokio::task::spawn_blocking(move || { +//store_clone.register_update(meta, None, uuid).unwrap(); +//}) +//.await +//.unwrap(); + +//let txn = update_store.env.read_txn().unwrap(); +//assert!(update_store +//.pending_queue +//.get(&txn, &(0, uuid, 0)) +//.unwrap() +//.is_some()); +//} + +//#[actix_rt::test] +//async fn test_process_update() { +//let dir = tempfile::tempdir_in(".").unwrap(); +//let mut handle = MockIndexActorHandle::new(); + +//handle +//.expect_update() +//.times(2) +//.returning(|_index_uuid, processing, _file| { +//if processing.id() == 0 { +//Box::pin(ok(Ok(processing.process(UpdateResult::Other)))) +//} else { +//Box::pin(ok(Err( +//processing.fail(IndexActorError::ExistingPrimaryKey.into()) +//))) +//} +//}); + +//let handle = Arc::new(handle); + +//let mut options = EnvOpenOptions::new(); +//options.map_size(4096 * 100); +//let store = UpdateStore::open( +//options, +//dir.path(), +//handle.clone(), +//Arc::new(AtomicBool::new(false)), +//) +//.unwrap(); + +//// wait a bit for the event loop exit. +//tokio::time::sleep(std::time::Duration::from_millis(50)).await; + +//let mut txn = store.env.write_txn().unwrap(); + +//let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); +//let uuid = Uuid::new_v4(); + +//store +//.pending_queue +//.put(&mut txn, &(0, uuid, 0), &update) +//.unwrap(); + +//let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); + +//store +//.pending_queue +//.put(&mut txn, &(1, uuid, 1), &update) +//.unwrap(); + +//txn.commit().unwrap(); + +//// Process the pending, and check that it has been moved to the update databases, and +//// removed from the pending database. +//let store_clone = store.clone(); +//tokio::task::spawn_blocking(move || { +//store_clone.process_pending_update(handle.clone()).unwrap(); +//store_clone.process_pending_update(handle).unwrap(); +//}) +//.await +//.unwrap(); + +//let txn = store.env.read_txn().unwrap(); + +//assert!(store.pending_queue.first(&txn).unwrap().is_none()); +//let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap(); + +//assert!(matches!(update, UpdateStatus::Processed(_))); +//let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap(); + +//assert!(matches!(update, UpdateStatus::Failed(_))); +//} //} diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs index 3b7b5e7fa..6eaaf431c 100644 --- a/meilisearch-lib/src/lib.rs +++ b/meilisearch-lib/src/lib.rs @@ -5,7 +5,7 @@ pub mod options; pub mod index; pub mod index_controller; -pub use index_controller::{IndexController as MeiliSearch, updates::store::Update}; +pub use index_controller::{updates::store::Update, IndexController as MeiliSearch}; pub use milli; @@ -55,4 +55,3 @@ pub fn from_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Res ar.unpack(&dest)?; Ok(()) } - diff --git a/meilisearch-lib/src/options.rs b/meilisearch-lib/src/options.rs index f4b992f2e..0e59392d6 100644 --- a/meilisearch-lib/src/options.rs +++ b/meilisearch-lib/src/options.rs @@ -112,4 +112,3 @@ fn total_memory_bytes() -> Option { None } } -