From d08b76a323b9b941e31a923d6224ef74c732a37a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 26 Nov 2019 16:12:06 +0100 Subject: [PATCH] Separate the update and main databases We used the heed typed transaction to make it safe (https://github.com/Kerollmops/heed/pull/27). --- Cargo.lock | 8 +- meilisearch-core/Cargo.toml | 2 +- meilisearch-core/examples/from_file.rs | 29 +-- meilisearch-core/src/automaton/mod.rs | 7 +- meilisearch-core/src/database.rs | 233 +++++++++++------- meilisearch-core/src/lib.rs | 2 +- meilisearch-core/src/query_builder.rs | 89 +++---- meilisearch-core/src/serde/deserializer.rs | 3 +- meilisearch-core/src/serde/serializer.rs | 9 +- meilisearch-core/src/store/docs_words.rs | 9 +- .../src/store/documents_fields.rs | 11 +- .../src/store/documents_fields_counts.rs | 15 +- meilisearch-core/src/store/main.rs | 93 +++---- meilisearch-core/src/store/mod.rs | 43 ++-- meilisearch-core/src/store/postings_lists.rs | 9 +- meilisearch-core/src/store/synonyms.rs | 9 +- meilisearch-core/src/store/updates.rs | 20 +- meilisearch-core/src/store/updates_results.rs | 11 +- meilisearch-core/src/update/clear_all.rs | 5 +- meilisearch-core/src/update/customs_update.rs | 10 +- .../src/update/documents_addition.rs | 13 +- .../src/update/documents_deletion.rs | 7 +- meilisearch-core/src/update/mod.rs | 21 +- meilisearch-core/src/update/schema_update.rs | 5 +- .../src/update/stop_words_addition.rs | 7 +- .../src/update/stop_words_deletion.rs | 7 +- .../src/update/synonyms_addition.rs | 7 +- .../src/update/synonyms_deletion.rs | 7 +- meilisearch-http/Cargo.toml | 2 +- meilisearch-http/src/data.rs | 14 +- meilisearch-http/src/helpers/meilisearch.rs | 5 +- meilisearch-http/src/helpers/tide.rs | 5 +- meilisearch-http/src/routes/document.rs | 38 +-- meilisearch-http/src/routes/health.rs | 15 +- meilisearch-http/src/routes/index.rs | 40 ++- meilisearch-http/src/routes/key.rs | 27 +- meilisearch-http/src/routes/search.rs | 8 +- meilisearch-http/src/routes/setting.rs | 11 +- meilisearch-http/src/routes/stats.rs | 13 +- meilisearch-http/src/routes/stop_words.rs | 12 +- meilisearch-http/src/routes/synonym.rs | 31 +-- 41 files changed, 498 insertions(+), 414 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5b49b050..21a95d47f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -636,7 +636,7 @@ dependencies = [ [[package]] name = "heed" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bincode 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -944,7 +944,7 @@ dependencies = [ "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "fst 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "hashbrown 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", - "heed 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "heed 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "levenshtein_automata 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -975,7 +975,7 @@ dependencies = [ "chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", - "heed 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "heed 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "isahc 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2582,7 +2582,7 @@ dependencies = [ "checksum h2 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "a5b34c246847f938a410a03c5458c7fee2274436675e76d8b903c08efc29c462" "checksum hashbrown 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" -"checksum heed 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b021df76de18f82f716fa6c858fd6bf39aec2c651852055563b5aba51debca81" +"checksum heed 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df6c88807a125a2722484f62fa9c9615d85b0779a06467626db1279c32e287ba" "checksum hermit-abi 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "307c3c9f937f38e3534b1d6447ecf090cafcc9744e4a6360e8b037b2cf5af120" "checksum http 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)" = "d7e06e336150b178206af098a055e3621e8336027e2b4d126bda0bc64824baaf" "checksum http-body 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6741c859c1b2463a423a1dbce98d418e6c3c3fc720fb0d45528657320920292d" diff --git a/meilisearch-core/Cargo.toml b/meilisearch-core/Cargo.toml index e99a4a143..a3ac89504 100644 --- a/meilisearch-core/Cargo.toml +++ b/meilisearch-core/Cargo.toml @@ -14,7 +14,7 @@ deunicode = "1.0.0" env_logger = "0.7.0" fst = { version = "0.3.5", default-features = false } hashbrown = { version = "0.6.0", features = ["serde"] } -heed = "0.5.0" +heed = "0.6.0" levenshtein_automata = { version = "0.1.1", features = ["fst_automaton"] } log = "0.4.8" meilisearch-schema = { path = "../meilisearch-schema", version = "0.8.0" } diff --git a/meilisearch-core/examples/from_file.rs b/meilisearch-core/examples/from_file.rs index ce9f7e358..854ab3f5d 100644 --- a/meilisearch-core/examples/from_file.rs +++ b/meilisearch-core/examples/from_file.rs @@ -113,24 +113,25 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box { if current_schema != schema { return Err(meilisearch_core::Error::SchemaDiffer.into()); } - writer.abort(); + update_writer.abort(); } None => { - index.schema_update(&mut writer, schema)?; - writer.commit().unwrap(); + index.schema_update(&mut update_writer, schema)?; + update_writer.commit().unwrap(); } } @@ -173,10 +174,10 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box Result<(), Box> { - let env = &database.env; + let db = &database; let index = database .open_index(&command.index_uid) .expect("Could not find index"); - let reader = env.read_txn().unwrap(); + let reader = db.main_read_txn().unwrap(); let schema = index.main.schema(&reader)?; reader.abort(); @@ -339,7 +340,7 @@ fn search_command(command: SearchCommand, database: Database) -> Result<(), Box< Ok(query) => { let start_total = Instant::now(); - let reader = env.read_txn().unwrap(); + let reader = db.main_read_txn().unwrap(); let ref_index = &index; let ref_reader = &reader; @@ -444,12 +445,12 @@ fn show_updates_command( command: ShowUpdatesCommand, database: Database, ) -> Result<(), Box> { - let env = &database.env; + let db = &database; let index = database .open_index(&command.index_uid) .expect("Could not find index"); - let reader = env.read_txn().unwrap(); + let reader = db.update_read_txn().unwrap(); let updates = index.all_updates_status(&reader)?; println!("{:#?}", updates); reader.abort(); diff --git a/meilisearch-core/src/automaton/mod.rs b/meilisearch-core/src/automaton/mod.rs index f383f0c1b..a803eee8e 100644 --- a/meilisearch-core/src/automaton/mod.rs +++ b/meilisearch-core/src/automaton/mod.rs @@ -8,6 +8,7 @@ use fst::{IntoStreamer, Streamer}; use levenshtein_automata::DFA; use meilisearch_tokenizer::{is_cjk, split_query_string}; +use crate::database::MainT; use crate::error::MResult; use crate::store; @@ -23,7 +24,7 @@ pub struct AutomatonProducer { impl AutomatonProducer { pub fn new( - reader: &heed::RoTxn, + reader: &heed::RoTxn, query: &str, main_store: store::Main, postings_list_store: store::PostingsLists, @@ -131,7 +132,7 @@ pub fn normalize_str(string: &str) -> String { } fn split_best_frequency<'a>( - reader: &heed::RoTxn, + reader: &heed::RoTxn, word: &'a str, postings_lists_store: store::PostingsLists, ) -> MResult> { @@ -159,7 +160,7 @@ fn split_best_frequency<'a>( } fn generate_automatons( - reader: &heed::RoTxn, + reader: &heed::RoTxn, query: &str, main_store: store::Main, postings_lists_store: store::PostingsLists, diff --git a/meilisearch-core/src/database.rs b/meilisearch-core/src/database.rs index d63c79a71..d4c8e8c36 100644 --- a/meilisearch-core/src/database.rs +++ b/meilisearch-core/src/database.rs @@ -14,8 +14,12 @@ use crate::{store, update, Index, MResult}; pub type BoxUpdateFn = Box; type ArcSwapFn = arc_swap::ArcSwapOption; +pub struct MainT; +pub struct UpdateT; + pub struct Database { - pub env: heed::Env, + env: heed::Env, + update_env: heed::Env, common_store: heed::PolyDatabase, indexes_store: heed::Database, indexes: RwLock>)>>, @@ -45,6 +49,7 @@ pub type UpdateEventsEmitter = Sender; fn update_awaiter( receiver: UpdateEvents, env: heed::Env, + update_env: heed::Env, index_uid: &str, update_fn: Arc, index: Index, @@ -52,42 +57,54 @@ fn update_awaiter( let mut receiver = receiver.into_iter(); while let Some(UpdateEvent::NewUpdate) = receiver.next() { loop { - // instantiate a main/parent transaction - let mut writer = break_try!(env.write_txn(), "LMDB write transaction begin failed"); + // We instantiate a *write* transaction to *block* the thread + // until the *other*, notifiying, thread commits + let result = update_env.typed_write_txn::(); + let update_reader = break_try!(result, "LMDB read transaction (update) begin failed"); // retrieve the update that needs to be processed - let result = index.updates.pop_front(&mut writer); + let result = index.updates.first_update(&update_reader); let (update_id, update) = match break_try!(result, "pop front update failed") { Some(value) => value, None => { debug!("no more updates"); - writer.abort(); break; } }; - // instantiate a nested transaction - let result = env.nested_write_txn(&mut writer); - let mut nested_writer = break_try!(result, "LMDB nested write transaction failed"); + // do not keep the reader for too long + update_reader.abort(); - // try to apply the update to the database using the nested transaction - let result = update::update_task(&mut nested_writer, index.clone(), update_id, update); + // instantiate a transaction to touch to the main env + let result = env.typed_write_txn::(); + let mut main_writer = break_try!(result, "LMDB nested write transaction failed"); + + // try to apply the update to the database using the main transaction + let result = update::update_task(&mut main_writer, &index, update_id, update); let status = break_try!(result, "update task failed"); - // commit the nested transaction if the update was successful, abort it otherwise + // commit the main transaction if the update was successful, abort it otherwise if status.error.is_none() { - break_try!(nested_writer.commit(), "commit nested transaction failed"); + break_try!(main_writer.commit(), "commit nested transaction failed"); } else { - nested_writer.abort() + main_writer.abort() } - // write the result of the update in the updates-results store - let updates_results = index.updates_results; - let result = updates_results.put_update_result(&mut writer, update_id, &status); + // now that the update has been processed we can instantiate + // a transaction to move the result to the updates-results store + let result = update_env.typed_write_txn::(); + let mut update_writer = break_try!(result, "LMDB write transaction begin failed"); - // always commit the main/parent transaction, even if the update was unsuccessful + // definitely remove the update from the updates store + index.updates.del_update(&mut update_writer, update_id)?; + + // write the result of the updates-results store + let updates_results = index.updates_results; + let result = updates_results.put_update_result(&mut update_writer, update_id, &status); + + // always commit the main transaction, even if the update was unsuccessful break_try!(result, "update result store commit failed"); - break_try!(writer.commit(), "update parent transaction failed"); + break_try!(update_writer.commit(), "update transaction commit failed"); // call the user callback when the update and the result are written consistently if let Some(ref callback) = *update_fn.load() { @@ -98,9 +115,11 @@ fn update_awaiter( debug!("update loop system stopped"); - let mut writer = env.write_txn()?; - store::clear(&mut writer, &index)?; + let mut writer = env.typed_write_txn::()?; + let mut update_writer = update_env.typed_write_txn::()?; + store::clear(&mut writer, &mut update_writer, &index)?; writer.commit()?; + update_writer.commit()?; debug!("store {} cleared", index_uid); @@ -109,12 +128,20 @@ fn update_awaiter( impl Database { pub fn open_or_create(path: impl AsRef) -> MResult { - fs::create_dir_all(path.as_ref())?; + let main_path = path.as_ref().join("main"); + let update_path = path.as_ref().join("update"); + fs::create_dir_all(&main_path)?; let env = heed::EnvOpenOptions::new() .map_size(10 * 1024 * 1024 * 1024) // 10GB .max_dbs(3000) - .open(path)?; + .open(main_path)?; + + fs::create_dir_all(&update_path)?; + let update_env = heed::EnvOpenOptions::new() + .map_size(10 * 1024 * 1024 * 1024) // 10GB + .max_dbs(3000) + .open(update_path)?; let common_store = env.create_poly_database(Some("common"))?; let indexes_store = env.create_database::(Some("indexes"))?; @@ -134,7 +161,7 @@ impl Database { let mut indexes = HashMap::new(); for index_uid in must_open { let (sender, receiver) = crossbeam_channel::bounded(100); - let index = match store::open(&env, &index_uid, sender.clone())? { + let index = match store::open(&env, &update_env, &index_uid, sender.clone())? { Some(index) => index, None => { log::warn!( @@ -146,6 +173,7 @@ impl Database { }; let env_clone = env.clone(); + let update_env_clone = update_env.clone(); let index_clone = index.clone(); let name_clone = index_uid.clone(); let update_fn_clone = update_fn.clone(); @@ -154,6 +182,7 @@ impl Database { update_awaiter( receiver, env_clone, + update_env_clone, &name_clone, update_fn_clone, index_clone, @@ -173,6 +202,7 @@ impl Database { Ok(Database { env, + update_env, common_store, indexes_store, indexes: RwLock::new(indexes), @@ -196,12 +226,13 @@ impl Database { Entry::Occupied(_) => Err(crate::Error::IndexAlreadyExists), Entry::Vacant(entry) => { let (sender, receiver) = crossbeam_channel::bounded(100); - let index = store::create(&self.env, name, sender)?; + let index = store::create(&self.env, &self.update_env, name, sender)?; let mut writer = self.env.write_txn()?; self.indexes_store.put(&mut writer, name, &())?; let env_clone = self.env.clone(); + let update_env_clone = self.update_env.clone(); let index_clone = index.clone(); let name_clone = name.to_owned(); let update_fn_clone = self.update_fn.clone(); @@ -210,6 +241,7 @@ impl Database { update_awaiter( receiver, env_clone, + update_env_clone, &name_clone, update_fn_clone, index_clone, @@ -259,6 +291,22 @@ impl Database { self.update_fn.swap(None); } + pub fn main_read_txn(&self) -> heed::Result> { + self.env.typed_read_txn::() + } + + pub fn main_write_txn(&self) -> heed::Result> { + self.env.typed_write_txn::() + } + + pub fn update_read_txn(&self) -> heed::Result> { + self.update_env.typed_read_txn::() + } + + pub fn update_write_txn(&self) -> heed::Result> { + self.update_env.typed_write_txn::() + } + pub fn copy_and_compact_to_path>(&self, path: P) -> ZResult { self.env.copy_to_path(path, CompactionOption::Enabled) } @@ -288,7 +336,7 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let database = Database::open_or_create(dir.path()).unwrap(); - let env = &database.env; + let db = &database; let (sender, receiver) = mpsc::sync_channel(100); let update_fn = move |_name: &str, update: ProcessedUpdateResult| { @@ -313,9 +361,9 @@ mod tests { toml::from_str(data).unwrap() }; - let mut writer = env.write_txn().unwrap(); - let _update_id = index.schema_update(&mut writer, schema).unwrap(); - writer.commit().unwrap(); + let mut update_writer = db.update_write_txn().unwrap(); + let _update_id = index.schema_update(&mut update_writer, schema).unwrap(); + update_writer.commit().unwrap(); let mut additions = index.documents_addition(); @@ -334,15 +382,15 @@ mod tests { additions.update_document(doc1); additions.update_document(doc2); - let mut writer = env.write_txn().unwrap(); - let update_id = additions.finalize(&mut writer).unwrap(); - writer.commit().unwrap(); + let mut update_writer = db.update_write_txn().unwrap(); + let update_id = additions.finalize(&mut update_writer).unwrap(); + update_writer.commit().unwrap(); // block until the transaction is processed let _ = receiver.into_iter().find(|id| *id == update_id); - let reader = env.read_txn().unwrap(); - let result = index.update_status(&reader, update_id).unwrap(); + let update_reader = db.update_read_txn().unwrap(); + let result = index.update_status(&update_reader, update_id).unwrap(); assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); } @@ -351,7 +399,7 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let database = Database::open_or_create(dir.path()).unwrap(); - let env = &database.env; + let db = &database; let (sender, receiver) = mpsc::sync_channel(100); let update_fn = move |_name: &str, update: ProcessedUpdateResult| { @@ -376,9 +424,9 @@ mod tests { toml::from_str(data).unwrap() }; - let mut writer = env.write_txn().unwrap(); - let _update_id = index.schema_update(&mut writer, schema).unwrap(); - writer.commit().unwrap(); + let mut update_writer = db.update_write_txn().unwrap(); + let _update_id = index.schema_update(&mut update_writer, schema).unwrap(); + update_writer.commit().unwrap(); let mut additions = index.documents_addition(); @@ -396,15 +444,15 @@ mod tests { additions.update_document(doc1); additions.update_document(doc2); - let mut writer = env.write_txn().unwrap(); - let update_id = additions.finalize(&mut writer).unwrap(); - writer.commit().unwrap(); + let mut update_writer = db.update_write_txn().unwrap(); + let update_id = additions.finalize(&mut update_writer).unwrap(); + update_writer.commit().unwrap(); // block until the transaction is processed let _ = receiver.into_iter().find(|id| *id == update_id); - let reader = env.read_txn().unwrap(); - let result = index.update_status(&reader, update_id).unwrap(); + let update_reader = db.update_read_txn().unwrap(); + let result = index.update_status(&update_reader, update_id).unwrap(); assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_some()); } @@ -413,7 +461,7 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let database = Database::open_or_create(dir.path()).unwrap(); - let env = &database.env; + let db = &database; let (sender, receiver) = mpsc::sync_channel(100); let update_fn = move |_name: &str, update: ProcessedUpdateResult| { @@ -434,9 +482,9 @@ mod tests { toml::from_str(data).unwrap() }; - let mut writer = env.write_txn().unwrap(); - let _update_id = index.schema_update(&mut writer, schema).unwrap(); - writer.commit().unwrap(); + let mut update_writer = db.update_write_txn().unwrap(); + let _update_id = index.schema_update(&mut update_writer, schema).unwrap(); + update_writer.commit().unwrap(); let mut additions = index.documents_addition(); @@ -447,15 +495,15 @@ mod tests { additions.update_document(doc1); - let mut writer = env.write_txn().unwrap(); - let update_id = additions.finalize(&mut writer).unwrap(); - writer.commit().unwrap(); + let mut update_writer = db.update_write_txn().unwrap(); + let update_id = additions.finalize(&mut update_writer).unwrap(); + update_writer.commit().unwrap(); // block until the transaction is processed let _ = receiver.into_iter().find(|id| *id == update_id); - let reader = env.read_txn().unwrap(); - let result = index.update_status(&reader, update_id).unwrap(); + let update_reader = db.update_read_txn().unwrap(); + let result = index.update_status(&update_reader, update_id).unwrap(); assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); } @@ -464,7 +512,7 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let database = Database::open_or_create(dir.path()).unwrap(); - let env = &database.env; + let db = &database; let (sender, receiver) = mpsc::sync_channel(100); let update_fn = move |_name: &str, update: ProcessedUpdateResult| { @@ -489,9 +537,9 @@ mod tests { toml::from_str(data).unwrap() }; - let mut writer = env.write_txn().unwrap(); - let _update_id = index.schema_update(&mut writer, schema).unwrap(); - writer.commit().unwrap(); + let mut update_writer = db.update_write_txn().unwrap(); + let _update_id = index.schema_update(&mut update_writer, schema).unwrap(); + update_writer.commit().unwrap(); let mut additions = index.documents_addition(); @@ -510,9 +558,9 @@ mod tests { additions.update_document(doc1); additions.update_document(doc2); - let mut writer = env.write_txn().unwrap(); - let _update_id = additions.finalize(&mut writer).unwrap(); - writer.commit().unwrap(); + let mut update_writer = db.update_write_txn().unwrap(); + let _update_id = additions.finalize(&mut update_writer).unwrap(); + update_writer.commit().unwrap(); let schema = { let data = r#" @@ -537,7 +585,7 @@ mod tests { toml::from_str(data).unwrap() }; - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let update_id = index.schema_update(&mut writer, schema).unwrap(); writer.commit().unwrap(); @@ -545,10 +593,10 @@ mod tests { let _ = receiver.iter().find(|id| *id == update_id); // check if it has been accepted - let reader = env.read_txn().unwrap(); - let result = index.update_status(&reader, update_id).unwrap(); + let update_reader = db.update_read_txn().unwrap(); + let result = index.update_status(&update_reader, update_id).unwrap(); assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); - reader.abort(); + update_reader.abort(); let mut additions = index.documents_addition(); @@ -571,7 +619,7 @@ mod tests { additions.update_document(doc1); additions.update_document(doc2); - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let update_id = additions.finalize(&mut writer).unwrap(); writer.commit().unwrap(); @@ -579,11 +627,13 @@ mod tests { let _ = receiver.iter().find(|id| *id == update_id); // check if it has been accepted - let reader = env.read_txn().unwrap(); - let result = index.update_status(&reader, update_id).unwrap(); + let update_reader = db.update_read_txn().unwrap(); + let result = index.update_status(&update_reader, update_id).unwrap(); assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); + update_reader.abort(); // even try to search for a document + let reader = db.main_read_txn().unwrap(); let results = index.query_builder().query(&reader, "21 ", 0..20).unwrap(); assert_matches!(results.len(), 1); @@ -617,7 +667,7 @@ mod tests { toml::from_str(data).unwrap() }; - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let update_id = index.schema_update(&mut writer, schema).unwrap(); writer.commit().unwrap(); @@ -625,8 +675,8 @@ mod tests { let _ = receiver.iter().find(|id| *id == update_id); // check if it has been accepted - let reader = env.read_txn().unwrap(); - let result = index.update_status(&reader, update_id).unwrap(); + let update_reader = db.update_read_txn().unwrap(); + let result = index.update_status(&update_reader, update_id).unwrap(); assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_some()); } @@ -635,7 +685,7 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let database = Database::open_or_create(dir.path()).unwrap(); - let env = &database.env; + let db = &database; let (sender, receiver) = mpsc::sync_channel(100); let update_fn = move |_name: &str, update: ProcessedUpdateResult| { @@ -660,7 +710,7 @@ mod tests { toml::from_str(data).unwrap() }; - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let _update_id = index.schema_update(&mut writer, schema).unwrap(); writer.commit().unwrap(); @@ -683,17 +733,19 @@ mod tests { additions.update_document(doc1); additions.update_document(doc2); - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let update_id = additions.finalize(&mut writer).unwrap(); writer.commit().unwrap(); // block until the transaction is processed let _ = receiver.into_iter().find(|id| *id == update_id); - let reader = env.read_txn().unwrap(); - let result = index.update_status(&reader, update_id).unwrap(); + let update_reader = db.update_read_txn().unwrap(); + let result = index.update_status(&update_reader, update_id).unwrap(); assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); + update_reader.abort(); + let reader = db.main_read_txn().unwrap(); let document: Option = index.document(&reader, None, DocumentId(25)).unwrap(); assert!(document.is_none()); @@ -713,7 +765,7 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let database = Database::open_or_create(dir.path()).unwrap(); - let env = &database.env; + let db = &database; let (sender, receiver) = mpsc::sync_channel(100); let update_fn = move |_name: &str, update: ProcessedUpdateResult| { @@ -741,7 +793,7 @@ mod tests { toml::from_str(data).unwrap() }; - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let _update_id = index.schema_update(&mut writer, schema).unwrap(); writer.commit().unwrap(); @@ -764,17 +816,19 @@ mod tests { additions.update_document(doc1); additions.update_document(doc2); - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let update_id = additions.finalize(&mut writer).unwrap(); writer.commit().unwrap(); // block until the transaction is processed let _ = receiver.iter().find(|id| *id == update_id); - let reader = env.read_txn().unwrap(); - let result = index.update_status(&reader, update_id).unwrap(); + let update_reader = db.update_read_txn().unwrap(); + let result = index.update_status(&update_reader, update_id).unwrap(); assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); + update_reader.abort(); + let reader = db.main_read_txn().unwrap(); let document: Option = index.document(&reader, None, DocumentId(25)).unwrap(); assert!(document.is_none()); @@ -807,17 +861,19 @@ mod tests { partial_additions.update_document(partial_doc1); partial_additions.update_document(partial_doc2); - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let update_id = partial_additions.finalize(&mut writer).unwrap(); writer.commit().unwrap(); // block until the transaction is processed let _ = receiver.iter().find(|id| *id == update_id); - let reader = env.read_txn().unwrap(); - let result = index.update_status(&reader, update_id).unwrap(); + let update_reader = db.update_read_txn().unwrap(); + let result = index.update_status(&update_reader, update_id).unwrap(); assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none()); + update_reader.abort(); + let reader = db.main_read_txn().unwrap(); let document: Option = index .document(&reader, None, DocumentId(7900334843754999545)) .unwrap(); @@ -846,7 +902,7 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let database = Arc::new(Database::open_or_create(dir.path()).unwrap()); - let env = &database.env; + let db = &database; let (sender, receiver) = mpsc::sync_channel(100); let db_cloned = database.clone(); @@ -877,7 +933,7 @@ mod tests { }; // add a schema to the index - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let _update_id = index.schema_update(&mut writer, schema).unwrap(); writer.commit().unwrap(); @@ -899,7 +955,7 @@ mod tests { additions.update_document(doc1); additions.update_document(doc2); - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let update_id = additions.finalize(&mut writer).unwrap(); writer.commit().unwrap(); @@ -919,7 +975,7 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let database = Database::open_or_create(dir.path()).unwrap(); - let env = &database.env; + let db = &database; let (sender, receiver) = mpsc::sync_channel(100); let update_fn = move |_name: &str, update: ProcessedUpdateResult| { @@ -944,7 +1000,7 @@ mod tests { toml::from_str(data).unwrap() }; - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let _update_id = index.schema_update(&mut writer, schema).unwrap(); writer.commit().unwrap(); @@ -967,15 +1023,14 @@ mod tests { additions.update_document(doc1); additions.update_document(doc2); - let mut writer = env.write_txn().unwrap(); + let mut writer = db.update_write_txn().unwrap(); let update_id = additions.finalize(&mut writer).unwrap(); writer.commit().unwrap(); // block until the transaction is processed let _ = receiver.into_iter().find(|id| *id == update_id); - let reader = env.read_txn().unwrap(); - + let reader = db.main_read_txn().unwrap(); let schema = index.main.schema(&reader).unwrap().unwrap(); let ranked_map = index.main.ranked_map(&reader).unwrap().unwrap(); diff --git a/meilisearch-core/src/lib.rs b/meilisearch-core/src/lib.rs index 3955c56b4..e9ba84a41 100644 --- a/meilisearch-core/src/lib.rs +++ b/meilisearch-core/src/lib.rs @@ -18,7 +18,7 @@ pub mod serde; pub mod store; mod update; -pub use self::database::{BoxUpdateFn, Database}; +pub use self::database::{BoxUpdateFn, Database, MainT, UpdateT}; pub use self::error::{Error, MResult}; pub use self::number::{Number, ParseNumberError}; pub use self::ranked_map::RankedMap; diff --git a/meilisearch-core/src/query_builder.rs b/meilisearch-core/src/query_builder.rs index 705f13452..132dda557 100644 --- a/meilisearch-core/src/query_builder.rs +++ b/meilisearch-core/src/query_builder.rs @@ -9,6 +9,7 @@ use fst::{IntoStreamer, Streamer}; use sdset::SetBuf; use slice_group_by::{GroupBy, GroupByMut}; +use crate::database::MainT; use crate::automaton::{Automaton, AutomatonGroup, AutomatonProducer, QueryEnhancer}; use crate::distinct_map::{BufferedDistinctMap, DistinctMap}; use crate::levenshtein::prefix_damerau_levenshtein; @@ -139,7 +140,7 @@ fn multiword_rewrite_matches( } fn fetch_raw_documents( - reader: &heed::RoTxn, + reader: &heed::RoTxn, automatons_groups: &[AutomatonGroup], query_enhancer: &QueryEnhancer, searchables: Option<&ReorderedAttrs>, @@ -336,7 +337,7 @@ impl<'c, 'f, 'd> QueryBuilder<'c, 'f, 'd> { pub fn query( self, - reader: &heed::RoTxn, + reader: &heed::RoTxn, query: &str, range: Range, ) -> MResult> { @@ -374,7 +375,7 @@ impl<'c, 'f, 'd> QueryBuilder<'c, 'f, 'd> { } fn raw_query<'c, FI>( - reader: &heed::RoTxn, + reader: &heed::RoTxn, query: &str, range: Range, @@ -510,7 +511,7 @@ where } fn raw_query_with_distinct<'c, FI, FD>( - reader: &heed::RoTxn, + reader: &heed::RoTxn, query: &str, range: Range, @@ -765,8 +766,8 @@ mod tests { } pub fn add_synonym(&mut self, word: &str, new: SetBuf<&str>) { - let env = &self.database.env; - let mut writer = env.write_txn().unwrap(); + let db = &self.database; + let mut writer = db.main_write_txn().unwrap(); let word = word.to_lowercase(); @@ -809,8 +810,8 @@ mod tests { let database = Database::open_or_create(&tempdir).unwrap(); let index = database.create_index("default").unwrap(); - let env = &database.env; - let mut writer = env.write_txn().unwrap(); + let db = &database; + let mut writer = db.main_write_txn().unwrap(); let mut words_fst = BTreeSet::new(); let mut postings_lists = HashMap::new(); @@ -872,8 +873,8 @@ mod tests { ("apple", &[doc_char_index(0, 2, 2)][..]), ]); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "iphone from apple", 0..20).unwrap(); @@ -895,8 +896,8 @@ mod tests { store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "hello", 0..20).unwrap(); @@ -928,8 +929,8 @@ mod tests { store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello"])); store.add_synonym("salut", SetBuf::from_dirty(vec!["hello"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "sal", 0..20).unwrap(); @@ -972,8 +973,8 @@ mod tests { store.add_synonym("salutation", SetBuf::from_dirty(vec!["hello"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "salutution", 0..20).unwrap(); @@ -1010,8 +1011,8 @@ mod tests { store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello", "salut"])); store.add_synonym("salut", SetBuf::from_dirty(vec!["hello", "bonjour"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "hello", 0..20).unwrap(); @@ -1098,8 +1099,8 @@ mod tests { SetBuf::from_dirty(vec!["NY", "new york", "new york city"]), ); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY subway", 0..20).unwrap(); @@ -1168,8 +1169,8 @@ mod tests { store.add_synonym("NY", SetBuf::from_dirty(vec!["york new"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY", 0..20).unwrap(); @@ -1226,8 +1227,8 @@ mod tests { store.add_synonym("new york", SetBuf::from_dirty(vec!["NY"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY subway", 0..20).unwrap(); @@ -1291,8 +1292,8 @@ mod tests { SetBuf::from_dirty(vec!["NY", "new york", "new york city"]), ); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY subway", 0..20).unwrap(); @@ -1372,8 +1373,8 @@ mod tests { ); store.add_synonym("subway", SetBuf::from_dirty(vec!["underground train"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY subway broken", 0..20).unwrap(); @@ -1459,8 +1460,8 @@ mod tests { ); store.add_synonym("underground train", SetBuf::from_dirty(vec!["subway"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder @@ -1559,8 +1560,8 @@ mod tests { store.add_synonym("new york", SetBuf::from_dirty(vec!["new york city"])); store.add_synonym("new york city", SetBuf::from_dirty(vec!["new york"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "new york big ", 0..20).unwrap(); @@ -1596,8 +1597,8 @@ mod tests { store.add_synonym("NY", SetBuf::from_dirty(vec!["new york city story"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY subway ", 0..20).unwrap(); @@ -1646,8 +1647,8 @@ mod tests { store.add_synonym("new york city", SetBuf::from_dirty(vec!["NYC"])); store.add_synonym("subway", SetBuf::from_dirty(vec!["underground train"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder @@ -1679,8 +1680,8 @@ mod tests { store.add_synonym("téléphone", SetBuf::from_dirty(vec!["iphone"])); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "telephone", 0..20).unwrap(); @@ -1741,8 +1742,8 @@ mod tests { ("case", &[doc_index(0, 1)][..]), ]); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "i phone case", 0..20).unwrap(); @@ -1769,8 +1770,8 @@ mod tests { ("engine", &[doc_index(1, 2)][..]), ]); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "searchengine", 0..20).unwrap(); @@ -1801,8 +1802,8 @@ mod tests { ("engine", &[doc_index(1, 3)][..]), ]); - let env = &store.database.env; - let reader = env.read_txn().unwrap(); + let db = &store.database; + let reader = db.main_read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "searchengine", 0..20).unwrap(); diff --git a/meilisearch-core/src/serde/deserializer.rs b/meilisearch-core/src/serde/deserializer.rs index 1ccf1e095..0b345692b 100644 --- a/meilisearch-core/src/serde/deserializer.rs +++ b/meilisearch-core/src/serde/deserializer.rs @@ -8,6 +8,7 @@ use serde_json::de::IoRead as SerdeJsonIoRead; use serde_json::Deserializer as SerdeJsonDeserializer; use serde_json::Error as SerdeJsonError; +use crate::database::MainT; use crate::store::DocumentsFields; use crate::DocumentId; @@ -50,7 +51,7 @@ impl From for DeserializerError { pub struct Deserializer<'a> { pub document_id: DocumentId, - pub reader: &'a heed::RoTxn, + pub reader: &'a heed::RoTxn, pub documents_fields: DocumentsFields, pub schema: &'a Schema, pub attributes: Option<&'a HashSet>, diff --git a/meilisearch-core/src/serde/serializer.rs b/meilisearch-core/src/serde/serializer.rs index b9f86b077..c083991f5 100644 --- a/meilisearch-core/src/serde/serializer.rs +++ b/meilisearch-core/src/serde/serializer.rs @@ -1,6 +1,7 @@ use meilisearch_schema::{Schema, SchemaAttr, SchemaProps}; use serde::ser; +use crate::database::MainT; use crate::raw_indexer::RawIndexer; use crate::store::{DocumentsFields, DocumentsFieldsCounts}; use crate::{DocumentId, RankedMap}; @@ -8,7 +9,7 @@ use crate::{DocumentId, RankedMap}; use super::{ConvertToNumber, ConvertToString, Indexer, SerializerError}; pub struct Serializer<'a, 'b> { - pub txn: &'a mut heed::RwTxn<'b>, + pub txn: &'a mut heed::RwTxn<'b, MainT>, pub schema: &'a Schema, pub document_store: DocumentsFields, pub document_fields_counts: DocumentsFieldsCounts, @@ -191,7 +192,7 @@ impl<'a, 'b> ser::Serializer for Serializer<'a, 'b> { } pub struct MapSerializer<'a, 'b> { - txn: &'a mut heed::RwTxn<'b>, + txn: &'a mut heed::RwTxn<'b, MainT>, schema: &'a Schema, document_id: DocumentId, document_store: DocumentsFields, @@ -254,7 +255,7 @@ impl<'a, 'b> ser::SerializeMap for MapSerializer<'a, 'b> { } pub struct StructSerializer<'a, 'b> { - txn: &'a mut heed::RwTxn<'b>, + txn: &'a mut heed::RwTxn<'b, MainT>, schema: &'a Schema, document_id: DocumentId, document_store: DocumentsFields, @@ -297,7 +298,7 @@ impl<'a, 'b> ser::SerializeStruct for StructSerializer<'a, 'b> { } pub fn serialize_value( - txn: &mut heed::RwTxn, + txn: &mut heed::RwTxn, attribute: SchemaAttr, props: SchemaProps, document_id: DocumentId, diff --git a/meilisearch-core/src/store/docs_words.rs b/meilisearch-core/src/store/docs_words.rs index e39aeb41c..0ae153b3f 100644 --- a/meilisearch-core/src/store/docs_words.rs +++ b/meilisearch-core/src/store/docs_words.rs @@ -1,4 +1,5 @@ use super::BEU64; +use crate::database::MainT; use crate::DocumentId; use heed::types::{ByteSlice, OwnedType}; use heed::Result as ZResult; @@ -12,7 +13,7 @@ pub struct DocsWords { impl DocsWords { pub fn put_doc_words( self, - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, document_id: DocumentId, words: &fst::Set, ) -> ZResult<()> { @@ -21,18 +22,18 @@ impl DocsWords { self.docs_words.put(writer, &document_id, bytes) } - pub fn del_doc_words(self, writer: &mut heed::RwTxn, document_id: DocumentId) -> ZResult { + pub fn del_doc_words(self, writer: &mut heed::RwTxn, document_id: DocumentId) -> ZResult { let document_id = BEU64::new(document_id.0); self.docs_words.delete(writer, &document_id) } - pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { self.docs_words.clear(writer) } pub fn doc_words( self, - reader: &heed::RoTxn, + reader: &heed::RoTxn, document_id: DocumentId, ) -> ZResult> { let document_id = BEU64::new(document_id.0); diff --git a/meilisearch-core/src/store/documents_fields.rs b/meilisearch-core/src/store/documents_fields.rs index b3be882de..b217ecd31 100644 --- a/meilisearch-core/src/store/documents_fields.rs +++ b/meilisearch-core/src/store/documents_fields.rs @@ -1,4 +1,5 @@ use heed::types::{ByteSlice, OwnedType}; +use crate::database::MainT; use heed::Result as ZResult; use meilisearch_schema::SchemaAttr; @@ -13,7 +14,7 @@ pub struct DocumentsFields { impl DocumentsFields { pub fn put_document_field( self, - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, document_id: DocumentId, attribute: SchemaAttr, value: &[u8], @@ -24,7 +25,7 @@ impl DocumentsFields { pub fn del_all_document_fields( self, - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, document_id: DocumentId, ) -> ZResult { let start = DocumentAttrKey::new(document_id, SchemaAttr::min()); @@ -32,13 +33,13 @@ impl DocumentsFields { self.documents_fields.delete_range(writer, &(start..=end)) } - pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { self.documents_fields.clear(writer) } pub fn document_attribute<'txn>( self, - reader: &'txn heed::RoTxn, + reader: &'txn heed::RoTxn, document_id: DocumentId, attribute: SchemaAttr, ) -> ZResult> { @@ -48,7 +49,7 @@ impl DocumentsFields { pub fn document_fields<'txn>( self, - reader: &'txn heed::RoTxn, + reader: &'txn heed::RoTxn, document_id: DocumentId, ) -> ZResult> { let start = DocumentAttrKey::new(document_id, SchemaAttr::min()); diff --git a/meilisearch-core/src/store/documents_fields_counts.rs b/meilisearch-core/src/store/documents_fields_counts.rs index 604e5ecf6..72ac7a2f8 100644 --- a/meilisearch-core/src/store/documents_fields_counts.rs +++ b/meilisearch-core/src/store/documents_fields_counts.rs @@ -1,4 +1,5 @@ use super::DocumentAttrKey; +use crate::database::MainT; use crate::DocumentId; use heed::types::OwnedType; use heed::Result as ZResult; @@ -12,7 +13,7 @@ pub struct DocumentsFieldsCounts { impl DocumentsFieldsCounts { pub fn put_document_field_count( self, - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, document_id: DocumentId, attribute: SchemaAttr, value: u64, @@ -23,7 +24,7 @@ impl DocumentsFieldsCounts { pub fn del_all_document_fields_counts( self, - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, document_id: DocumentId, ) -> ZResult { let start = DocumentAttrKey::new(document_id, SchemaAttr::min()); @@ -32,13 +33,13 @@ impl DocumentsFieldsCounts { .delete_range(writer, &(start..=end)) } - pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { self.documents_fields_counts.clear(writer) } pub fn document_field_count( self, - reader: &heed::RoTxn, + reader: &heed::RoTxn, document_id: DocumentId, attribute: SchemaAttr, ) -> ZResult> { @@ -51,7 +52,7 @@ impl DocumentsFieldsCounts { pub fn document_fields_counts<'txn>( self, - reader: &'txn heed::RoTxn, + reader: &'txn heed::RoTxn, document_id: DocumentId, ) -> ZResult> { let start = DocumentAttrKey::new(document_id, SchemaAttr::min()); @@ -60,7 +61,7 @@ impl DocumentsFieldsCounts { Ok(DocumentFieldsCountsIter { iter }) } - pub fn documents_ids<'txn>(self, reader: &'txn heed::RoTxn) -> ZResult> { + pub fn documents_ids<'txn>(self, reader: &'txn heed::RoTxn) -> ZResult> { let iter = self.documents_fields_counts.iter(reader)?; Ok(DocumentsIdsIter { last_seen_id: None, @@ -70,7 +71,7 @@ impl DocumentsFieldsCounts { pub fn all_documents_fields_counts<'txn>( self, - reader: &'txn heed::RoTxn, + reader: &'txn heed::RoTxn, ) -> ZResult> { let iter = self.documents_fields_counts.iter(reader)?; Ok(AllDocumentsFieldsCountsIter { iter }) diff --git a/meilisearch-core/src/store/main.rs b/meilisearch-core/src/store/main.rs index b6e3d9744..0efdd140e 100644 --- a/meilisearch-core/src/store/main.rs +++ b/meilisearch-core/src/store/main.rs @@ -1,3 +1,4 @@ +use crate::database::MainT; use crate::RankedMap; use chrono::{DateTime, Utc}; use heed::types::{ByteSlice, OwnedType, SerdeBincode, Str}; @@ -28,46 +29,46 @@ pub struct Main { } impl Main { - pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { self.main.clear(writer) } - pub fn put_name(self, writer: &mut heed::RwTxn, name: &str) -> ZResult<()> { - self.main.put::(writer, NAME_KEY, name) + pub fn put_name(self, writer: &mut heed::RwTxn, name: &str) -> ZResult<()> { + self.main.put::<_, Str, Str>(writer, NAME_KEY, name) } - pub fn name(self, reader: &heed::RoTxn) -> ZResult> { + pub fn name(self, reader: &heed::RoTxn) -> ZResult> { Ok(self .main - .get::(reader, NAME_KEY)? + .get::<_, Str, Str>(reader, NAME_KEY)? .map(|name| name.to_owned())) } - pub fn put_created_at(self, writer: &mut heed::RwTxn) -> ZResult<()> { + pub fn put_created_at(self, writer: &mut heed::RwTxn) -> ZResult<()> { self.main - .put::(writer, CREATED_AT_KEY, &Utc::now()) + .put::<_, Str, SerdeDatetime>(writer, CREATED_AT_KEY, &Utc::now()) } - pub fn created_at(self, reader: &heed::RoTxn) -> ZResult>> { - self.main.get::(reader, CREATED_AT_KEY) + pub fn created_at(self, reader: &heed::RoTxn) -> ZResult>> { + self.main.get::<_, Str, SerdeDatetime>(reader, CREATED_AT_KEY) } - pub fn put_updated_at(self, writer: &mut heed::RwTxn) -> ZResult<()> { + pub fn put_updated_at(self, writer: &mut heed::RwTxn) -> ZResult<()> { self.main - .put::(writer, UPDATED_AT_KEY, &Utc::now()) + .put::<_, Str, SerdeDatetime>(writer, UPDATED_AT_KEY, &Utc::now()) } - pub fn updated_at(self, reader: &heed::RoTxn) -> ZResult>> { - self.main.get::(reader, UPDATED_AT_KEY) + pub fn updated_at(self, reader: &heed::RoTxn) -> ZResult>> { + self.main.get::<_, Str, SerdeDatetime>(reader, UPDATED_AT_KEY) } - pub fn put_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> { + pub fn put_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> { let bytes = fst.as_fst().as_bytes(); - self.main.put::(writer, WORDS_KEY, bytes) + self.main.put::<_, Str, ByteSlice>(writer, WORDS_KEY, bytes) } - pub fn words_fst(self, reader: &heed::RoTxn) -> ZResult> { - match self.main.get::(reader, WORDS_KEY)? { + pub fn words_fst(self, reader: &heed::RoTxn) -> ZResult> { + match self.main.get::<_, Str, ByteSlice>(reader, WORDS_KEY)? { Some(bytes) => { let len = bytes.len(); let bytes = Arc::new(bytes.to_owned()); @@ -78,33 +79,33 @@ impl Main { } } - pub fn put_schema(self, writer: &mut heed::RwTxn, schema: &Schema) -> ZResult<()> { + pub fn put_schema(self, writer: &mut heed::RwTxn, schema: &Schema) -> ZResult<()> { self.main - .put::>(writer, SCHEMA_KEY, schema) + .put::<_, Str, SerdeBincode>(writer, SCHEMA_KEY, schema) } - pub fn schema(self, reader: &heed::RoTxn) -> ZResult> { + pub fn schema(self, reader: &heed::RoTxn) -> ZResult> { self.main - .get::>(reader, SCHEMA_KEY) + .get::<_, Str, SerdeBincode>(reader, SCHEMA_KEY) } - pub fn put_ranked_map(self, writer: &mut heed::RwTxn, ranked_map: &RankedMap) -> ZResult<()> { + pub fn put_ranked_map(self, writer: &mut heed::RwTxn, ranked_map: &RankedMap) -> ZResult<()> { self.main - .put::>(writer, RANKED_MAP_KEY, &ranked_map) + .put::<_, Str, SerdeBincode>(writer, RANKED_MAP_KEY, &ranked_map) } - pub fn ranked_map(self, reader: &heed::RoTxn) -> ZResult> { + pub fn ranked_map(self, reader: &heed::RoTxn) -> ZResult> { self.main - .get::>(reader, RANKED_MAP_KEY) + .get::<_, Str, SerdeBincode>(reader, RANKED_MAP_KEY) } - pub fn put_synonyms_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> { + pub fn put_synonyms_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> { let bytes = fst.as_fst().as_bytes(); - self.main.put::(writer, SYNONYMS_KEY, bytes) + self.main.put::<_, Str, ByteSlice>(writer, SYNONYMS_KEY, bytes) } - pub fn synonyms_fst(self, reader: &heed::RoTxn) -> ZResult> { - match self.main.get::(reader, SYNONYMS_KEY)? { + pub fn synonyms_fst(self, reader: &heed::RoTxn) -> ZResult> { + match self.main.get::<_, Str, ByteSlice>(reader, SYNONYMS_KEY)? { Some(bytes) => { let len = bytes.len(); let bytes = Arc::new(bytes.to_owned()); @@ -115,14 +116,14 @@ impl Main { } } - pub fn put_stop_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> { + pub fn put_stop_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> { let bytes = fst.as_fst().as_bytes(); self.main - .put::(writer, STOP_WORDS_KEY, bytes) + .put::<_, Str, ByteSlice>(writer, STOP_WORDS_KEY, bytes) } - pub fn stop_words_fst(self, reader: &heed::RoTxn) -> ZResult> { - match self.main.get::(reader, STOP_WORDS_KEY)? { + pub fn stop_words_fst(self, reader: &heed::RoTxn) -> ZResult> { + match self.main.get::<_, Str, ByteSlice>(reader, STOP_WORDS_KEY)? { Some(bytes) => { let len = bytes.len(); let bytes = Arc::new(bytes.to_owned()); @@ -133,20 +134,20 @@ impl Main { } } - pub fn put_number_of_documents(self, writer: &mut heed::RwTxn, f: F) -> ZResult + pub fn put_number_of_documents(self, writer: &mut heed::RwTxn, f: F) -> ZResult where F: Fn(u64) -> u64, { - let new = self.number_of_documents(writer).map(f)?; + let new = self.number_of_documents(&*writer).map(f)?; self.main - .put::>(writer, NUMBER_OF_DOCUMENTS_KEY, &new)?; + .put::<_, Str, OwnedType>(writer, NUMBER_OF_DOCUMENTS_KEY, &new)?; Ok(new) } - pub fn number_of_documents(self, reader: &heed::RoTxn) -> ZResult { + pub fn number_of_documents(self, reader: &heed::RoTxn) -> ZResult { match self .main - .get::>(reader, NUMBER_OF_DOCUMENTS_KEY)? + .get::<_, Str, OwnedType>(reader, NUMBER_OF_DOCUMENTS_KEY)? { Some(value) => Ok(value), None => Ok(0), @@ -155,29 +156,29 @@ impl Main { pub fn put_fields_frequency( self, - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, fields_frequency: &FreqsMap, ) -> ZResult<()> { self.main - .put::(writer, FIELDS_FREQUENCY_KEY, fields_frequency) + .put::<_, Str, SerdeFreqsMap>(writer, FIELDS_FREQUENCY_KEY, fields_frequency) } - pub fn fields_frequency(&self, reader: &heed::RoTxn) -> ZResult> { + pub fn fields_frequency(&self, reader: &heed::RoTxn) -> ZResult> { match self .main - .get::(reader, FIELDS_FREQUENCY_KEY)? + .get::<_, Str, SerdeFreqsMap>(reader, FIELDS_FREQUENCY_KEY)? { Some(freqs) => Ok(Some(freqs)), None => Ok(None), } } - pub fn put_customs(self, writer: &mut heed::RwTxn, customs: &[u8]) -> ZResult<()> { + pub fn put_customs(self, writer: &mut heed::RwTxn, customs: &[u8]) -> ZResult<()> { self.main - .put::(writer, CUSTOMS_KEY, customs) + .put::<_, Str, ByteSlice>(writer, CUSTOMS_KEY, customs) } - pub fn customs<'txn>(self, reader: &'txn heed::RoTxn) -> ZResult> { - self.main.get::(reader, CUSTOMS_KEY) + pub fn customs<'txn>(self, reader: &'txn heed::RoTxn) -> ZResult> { + self.main.get::<_, Str, ByteSlice>(reader, CUSTOMS_KEY) } } diff --git a/meilisearch-core/src/store/mod.rs b/meilisearch-core/src/store/mod.rs index 8b9ca2add..aee29ce00 100644 --- a/meilisearch-core/src/store/mod.rs +++ b/meilisearch-core/src/store/mod.rs @@ -27,6 +27,7 @@ use zerocopy::{AsBytes, FromBytes}; use crate::criterion::Criteria; use crate::database::{UpdateEvent, UpdateEventsEmitter}; +use crate::database::{MainT, UpdateT}; use crate::serde::Deserializer; use crate::{query_builder::QueryBuilder, update, DocumentId, Error, MResult}; @@ -98,7 +99,7 @@ pub struct Index { impl Index { pub fn document( &self, - reader: &heed::RoTxn, + reader: &heed::RoTxn, attributes: Option<&HashSet<&str>>, document_id: DocumentId, ) -> MResult> { @@ -126,7 +127,7 @@ impl Index { pub fn document_attribute( &self, - reader: &heed::RoTxn, + reader: &heed::RoTxn, document_id: DocumentId, attribute: SchemaAttr, ) -> MResult> { @@ -139,12 +140,12 @@ impl Index { } } - pub fn schema_update(&self, writer: &mut heed::RwTxn, schema: Schema) -> MResult { + pub fn schema_update(&self, writer: &mut heed::RwTxn, schema: Schema) -> MResult { let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); update::push_schema_update(writer, self.updates, self.updates_results, schema) } - pub fn customs_update(&self, writer: &mut heed::RwTxn, customs: Vec) -> ZResult { + pub fn customs_update(&self, writer: &mut heed::RwTxn, customs: Vec) -> ZResult { let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); update::push_customs_update(writer, self.updates, self.updates_results, customs) } @@ -173,7 +174,7 @@ impl Index { ) } - pub fn clear_all(&self, writer: &mut heed::RwTxn) -> MResult { + pub fn clear_all(&self, writer: &mut heed::RwTxn) -> MResult { let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); update::push_clear_all(writer, self.updates, self.updates_results) } @@ -210,8 +211,8 @@ impl Index { ) } - pub fn current_update_id(&self, reader: &heed::RoTxn) -> MResult> { - match self.updates.last_update_id(reader)? { + pub fn current_update_id(&self, reader: &heed::RoTxn) -> MResult> { + match self.updates.last_update(reader)? { Some((id, _)) => Ok(Some(id)), None => Ok(None), } @@ -219,18 +220,18 @@ impl Index { pub fn update_status( &self, - reader: &heed::RoTxn, + reader: &heed::RoTxn, update_id: u64, ) -> MResult> { update::update_status(reader, self.updates, self.updates_results, update_id) } - pub fn all_updates_status(&self, reader: &heed::RoTxn) -> MResult> { + pub fn all_updates_status(&self, reader: &heed::RoTxn) -> MResult> { let mut updates = Vec::new(); let mut last_update_result_id = 0; // retrieve all updates results - if let Some((last_id, _)) = self.updates_results.last_update_id(reader)? { + if let Some((last_id, _)) = self.updates_results.last_update(reader)? { updates.reserve(last_id as usize); for id in 0..=last_id { @@ -242,7 +243,7 @@ impl Index { } // retrieve all enqueued updates - if let Some((last_id, _)) = self.updates.last_update_id(reader)? { + if let Some((last_id, _)) = self.updates.last_update(reader)? { for id in last_update_result_id + 1..=last_id { if let Some(update) = self.update_status(reader, id)? { updates.push(update); @@ -278,6 +279,7 @@ impl Index { pub fn create( env: &heed::Env, + update_env: &heed::Env, name: &str, updates_notifier: UpdateEventsEmitter, ) -> MResult { @@ -298,8 +300,8 @@ pub fn create( let documents_fields_counts = env.create_database(Some(&documents_fields_counts_name))?; let synonyms = env.create_database(Some(&synonyms_name))?; let docs_words = env.create_database(Some(&docs_words_name))?; - let updates = env.create_database(Some(&updates_name))?; - let updates_results = env.create_database(Some(&updates_results_name))?; + let updates = update_env.create_database(Some(&updates_name))?; + let updates_results = update_env.create_database(Some(&updates_results_name))?; Ok(Index { main: Main { main }, @@ -318,6 +320,7 @@ pub fn create( pub fn open( env: &heed::Env, + update_env: &heed::Env, name: &str, updates_notifier: UpdateEventsEmitter, ) -> MResult> { @@ -356,11 +359,11 @@ pub fn open( Some(docs_words) => docs_words, None => return Ok(None), }; - let updates = match env.open_database(Some(&updates_name))? { + let updates = match update_env.open_database(Some(&updates_name))? { Some(updates) => updates, None => return Ok(None), }; - let updates_results = match env.open_database(Some(&updates_results_name))? { + let updates_results = match update_env.open_database(Some(&updates_results_name))? { Some(updates_results) => updates_results, None => return Ok(None), }; @@ -380,7 +383,11 @@ pub fn open( })) } -pub fn clear(writer: &mut heed::RwTxn, index: &Index) -> MResult<()> { +pub fn clear( + writer: &mut heed::RwTxn, + update_writer: &mut heed::RwTxn, + index: &Index, +) -> MResult<()> { // clear all the stores index.main.clear(writer)?; index.postings_lists.clear(writer)?; @@ -388,7 +395,7 @@ pub fn clear(writer: &mut heed::RwTxn, index: &Index) -> MResult<()> { index.documents_fields_counts.clear(writer)?; index.synonyms.clear(writer)?; index.docs_words.clear(writer)?; - index.updates.clear(writer)?; - index.updates_results.clear(writer)?; + index.updates.clear(update_writer)?; + index.updates_results.clear(update_writer)?; Ok(()) } diff --git a/meilisearch-core/src/store/postings_lists.rs b/meilisearch-core/src/store/postings_lists.rs index 7f886b491..7e6c3ed71 100644 --- a/meilisearch-core/src/store/postings_lists.rs +++ b/meilisearch-core/src/store/postings_lists.rs @@ -1,4 +1,5 @@ use crate::DocIndex; +use crate::database::MainT; use heed::types::{ByteSlice, CowSlice}; use heed::Result as ZResult; use sdset::{Set, SetBuf}; @@ -12,24 +13,24 @@ pub struct PostingsLists { impl PostingsLists { pub fn put_postings_list( self, - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, word: &[u8], words_indexes: &Set, ) -> ZResult<()> { self.postings_lists.put(writer, word, words_indexes) } - pub fn del_postings_list(self, writer: &mut heed::RwTxn, word: &[u8]) -> ZResult { + pub fn del_postings_list(self, writer: &mut heed::RwTxn, word: &[u8]) -> ZResult { self.postings_lists.delete(writer, word) } - pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { self.postings_lists.clear(writer) } pub fn postings_list<'txn>( self, - reader: &'txn heed::RoTxn, + reader: &'txn heed::RoTxn, word: &[u8], ) -> ZResult>>> { match self.postings_lists.get(reader, word)? { diff --git a/meilisearch-core/src/store/synonyms.rs b/meilisearch-core/src/store/synonyms.rs index 9f4052170..75f7610eb 100644 --- a/meilisearch-core/src/store/synonyms.rs +++ b/meilisearch-core/src/store/synonyms.rs @@ -1,4 +1,5 @@ use heed::types::ByteSlice; +use crate::database::MainT; use heed::Result as ZResult; use std::sync::Arc; @@ -10,7 +11,7 @@ pub struct Synonyms { impl Synonyms { pub fn put_synonyms( self, - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, word: &[u8], synonyms: &fst::Set, ) -> ZResult<()> { @@ -18,15 +19,15 @@ impl Synonyms { self.synonyms.put(writer, word, bytes) } - pub fn del_synonyms(self, writer: &mut heed::RwTxn, word: &[u8]) -> ZResult { + pub fn del_synonyms(self, writer: &mut heed::RwTxn, word: &[u8]) -> ZResult { self.synonyms.delete(writer, word) } - pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { self.synonyms.clear(writer) } - pub fn synonyms(self, reader: &heed::RoTxn, word: &[u8]) -> ZResult> { + pub fn synonyms(self, reader: &heed::RoTxn, word: &[u8]) -> ZResult> { match self.synonyms.get(reader, word)? { Some(bytes) => { let len = bytes.len(); diff --git a/meilisearch-core/src/store/updates.rs b/meilisearch-core/src/store/updates.rs index 984da7b58..a614303a3 100644 --- a/meilisearch-core/src/store/updates.rs +++ b/meilisearch-core/src/store/updates.rs @@ -1,4 +1,5 @@ use super::BEU64; +use crate::database::UpdateT; use crate::update::Update; use heed::types::{OwnedType, SerdeJson}; use heed::Result as ZResult; @@ -10,7 +11,7 @@ pub struct Updates { impl Updates { // TODO do not trigger deserialize if possible - pub fn last_update_id(self, reader: &heed::RoTxn) -> ZResult> { + pub fn last_update(self, reader: &heed::RoTxn) -> ZResult> { match self.updates.last(reader)? { Some((key, data)) => Ok(Some((key.get(), data))), None => Ok(None), @@ -18,7 +19,7 @@ impl Updates { } // TODO do not trigger deserialize if possible - fn first_update_id(self, reader: &heed::RoTxn) -> ZResult> { + pub fn first_update(self, reader: &heed::RoTxn) -> ZResult> { match self.updates.first(reader)? { Some((key, data)) => Ok(Some((key.get(), data))), None => Ok(None), @@ -26,14 +27,14 @@ impl Updates { } // TODO do not trigger deserialize if possible - pub fn get(self, reader: &heed::RoTxn, update_id: u64) -> ZResult> { + pub fn get(self, reader: &heed::RoTxn, update_id: u64) -> ZResult> { let update_id = BEU64::new(update_id); self.updates.get(reader, &update_id) } pub fn put_update( self, - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, update_id: u64, update: &Update, ) -> ZResult<()> { @@ -42,8 +43,13 @@ impl Updates { self.updates.put(writer, &update_id, update) } - pub fn pop_front(self, writer: &mut heed::RwTxn) -> ZResult> { - match self.first_update_id(writer)? { + pub fn del_update(self, writer: &mut heed::RwTxn, update_id: u64) -> ZResult { + let update_id = BEU64::new(update_id); + self.updates.delete(writer, &update_id) + } + + pub fn pop_front(self, writer: &mut heed::RwTxn) -> ZResult> { + match self.first_update(writer)? { Some((update_id, update)) => { let key = BEU64::new(update_id); self.updates.delete(writer, &key)?; @@ -53,7 +59,7 @@ impl Updates { } } - pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { self.updates.clear(writer) } } diff --git a/meilisearch-core/src/store/updates_results.rs b/meilisearch-core/src/store/updates_results.rs index 1db58fc42..ca631e316 100644 --- a/meilisearch-core/src/store/updates_results.rs +++ b/meilisearch-core/src/store/updates_results.rs @@ -1,4 +1,5 @@ use super::BEU64; +use crate::database::UpdateT; use crate::update::ProcessedUpdateResult; use heed::types::{OwnedType, SerdeJson}; use heed::Result as ZResult; @@ -9,9 +10,9 @@ pub struct UpdatesResults { } impl UpdatesResults { - pub fn last_update_id( + pub fn last_update( self, - reader: &heed::RoTxn, + reader: &heed::RoTxn, ) -> ZResult> { match self.updates_results.last(reader)? { Some((key, data)) => Ok(Some((key.get(), data))), @@ -21,7 +22,7 @@ impl UpdatesResults { pub fn put_update_result( self, - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, update_id: u64, update_result: &ProcessedUpdateResult, ) -> ZResult<()> { @@ -31,14 +32,14 @@ impl UpdatesResults { pub fn update_result( self, - reader: &heed::RoTxn, + reader: &heed::RoTxn, update_id: u64, ) -> ZResult> { let update_id = BEU64::new(update_id); self.updates_results.get(reader, &update_id) } - pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { self.updates_results.clear(writer) } } diff --git a/meilisearch-core/src/update/clear_all.rs b/meilisearch-core/src/update/clear_all.rs index d0910a26d..754a1f4da 100644 --- a/meilisearch-core/src/update/clear_all.rs +++ b/meilisearch-core/src/update/clear_all.rs @@ -1,8 +1,9 @@ +use crate::database::{MainT, UpdateT}; use crate::update::{next_update_id, Update}; use crate::{store, MResult, RankedMap}; pub fn apply_clear_all( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, main_store: store::Main, documents_fields_store: store::DocumentsFields, documents_fields_counts_store: store::DocumentsFieldsCounts, @@ -21,7 +22,7 @@ pub fn apply_clear_all( } pub fn push_clear_all( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, ) -> MResult { diff --git a/meilisearch-core/src/update/customs_update.rs b/meilisearch-core/src/update/customs_update.rs index 09e15cc80..a413d13a6 100644 --- a/meilisearch-core/src/update/customs_update.rs +++ b/meilisearch-core/src/update/customs_update.rs @@ -1,9 +1,11 @@ -use crate::store; -use crate::update::{next_update_id, Update}; use heed::Result as ZResult; +use crate::database::{MainT, UpdateT}; +use crate::store; +use crate::update::{next_update_id, Update}; + pub fn apply_customs_update( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, main_store: store::Main, customs: &[u8], ) -> ZResult<()> { @@ -11,7 +13,7 @@ pub fn apply_customs_update( } pub fn push_customs_update( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, customs: Vec, diff --git a/meilisearch-core/src/update/documents_addition.rs b/meilisearch-core/src/update/documents_addition.rs index 834e06341..04f9942f1 100644 --- a/meilisearch-core/src/update/documents_addition.rs +++ b/meilisearch-core/src/update/documents_addition.rs @@ -4,6 +4,7 @@ use fst::{set::OpBuilder, SetBuilder}; use sdset::{duo::Union, SetOperation}; use serde::{Deserialize, Serialize}; +use crate::database::{MainT, UpdateT}; use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::raw_indexer::RawIndexer; use crate::serde::{extract_document_id, serialize_value, Deserializer, Serializer}; @@ -52,7 +53,7 @@ impl DocumentsAddition { self.documents.push(document); } - pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult + pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult where D: serde::Serialize, { @@ -75,7 +76,7 @@ impl Extend for DocumentsAddition { } pub fn push_documents_addition( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, addition: Vec, @@ -102,7 +103,7 @@ pub fn push_documents_addition( } pub fn apply_documents_addition<'a, 'b>( - writer: &'a mut heed::RwTxn<'b>, + writer: &'a mut heed::RwTxn<'b, MainT>, main_store: store::Main, documents_fields_store: store::DocumentsFields, documents_fields_counts_store: store::DocumentsFieldsCounts, @@ -181,7 +182,7 @@ pub fn apply_documents_addition<'a, 'b>( } pub fn apply_documents_partial_addition<'a, 'b>( - writer: &'a mut heed::RwTxn<'b>, + writer: &'a mut heed::RwTxn<'b, MainT>, main_store: store::Main, documents_fields_store: store::DocumentsFields, documents_fields_counts_store: store::DocumentsFieldsCounts, @@ -277,7 +278,7 @@ pub fn apply_documents_partial_addition<'a, 'b>( } pub fn reindex_all_documents( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, main_store: store::Main, documents_fields_store: store::DocumentsFields, documents_fields_counts_store: store::DocumentsFieldsCounts, @@ -354,7 +355,7 @@ pub fn reindex_all_documents( } pub fn write_documents_addition_index( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, main_store: store::Main, postings_lists_store: store::PostingsLists, docs_words_store: store::DocsWords, diff --git a/meilisearch-core/src/update/documents_deletion.rs b/meilisearch-core/src/update/documents_deletion.rs index a8444ae93..fec6d3ae7 100644 --- a/meilisearch-core/src/update/documents_deletion.rs +++ b/meilisearch-core/src/update/documents_deletion.rs @@ -4,6 +4,7 @@ use fst::{SetBuilder, Streamer}; use meilisearch_schema::Schema; use sdset::{duo::DifferenceByKey, SetBuf, SetOperation}; +use crate::database::{MainT, UpdateT}; use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::serde::extract_document_id; use crate::store; @@ -50,7 +51,7 @@ impl DocumentsDeletion { Ok(()) } - pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { + pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_documents_deletion( writer, @@ -69,7 +70,7 @@ impl Extend for DocumentsDeletion { } pub fn push_documents_deletion( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, deletion: Vec, @@ -83,7 +84,7 @@ pub fn push_documents_deletion( } pub fn apply_documents_deletion( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, main_store: store::Main, documents_fields_store: store::DocumentsFields, documents_fields_counts_store: store::DocumentsFieldsCounts, diff --git a/meilisearch-core/src/update/mod.rs b/meilisearch-core/src/update/mod.rs index 15cfedb23..3ac39bf0b 100644 --- a/meilisearch-core/src/update/mod.rs +++ b/meilisearch-core/src/update/mod.rs @@ -30,6 +30,7 @@ use log::debug; use serde::{Deserialize, Serialize}; use crate::{store, DocumentId, MResult}; +use crate::database::{MainT, UpdateT}; use meilisearch_schema::Schema; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -203,14 +204,14 @@ pub enum UpdateStatus { } pub fn update_status( - reader: &heed::RoTxn, + update_reader: &heed::RoTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, update_id: u64, ) -> MResult> { - match updates_results_store.update_result(reader, update_id)? { + match updates_results_store.update_result(update_reader, update_id)? { Some(result) => Ok(Some(UpdateStatus::Processed { content: result })), - None => match updates_store.get(reader, update_id)? { + None => match updates_store.get(update_reader, update_id)? { Some(update) => Ok(Some(UpdateStatus::Enqueued { content: EnqueuedUpdateResult { update_id, @@ -224,25 +225,25 @@ pub fn update_status( } pub fn next_update_id( - writer: &mut heed::RwTxn, + update_writer: &mut heed::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, ) -> ZResult { - let last_update_id = updates_store.last_update_id(writer)?; - let last_update_id = last_update_id.map(|(n, _)| n); + let last_update = updates_store.last_update(update_writer)?; + let last_update = last_update.map(|(n, _)| n); - let last_update_results_id = updates_results_store.last_update_id(writer)?; + let last_update_results_id = updates_results_store.last_update(update_writer)?; let last_update_results_id = last_update_results_id.map(|(n, _)| n); - let max_update_id = cmp::max(last_update_id, last_update_results_id); + let max_update_id = cmp::max(last_update, last_update_results_id); let new_update_id = max_update_id.map_or(0, |n| n + 1); Ok(new_update_id) } pub fn update_task<'a, 'b>( - writer: &'a mut heed::RwTxn<'b>, - index: store::Index, + writer: &'a mut heed::RwTxn<'b, MainT>, + index: &store::Index, update_id: u64, update: Update, ) -> MResult { diff --git a/meilisearch-core/src/update/schema_update.rs b/meilisearch-core/src/update/schema_update.rs index a8ab0f210..f946175ad 100644 --- a/meilisearch-core/src/update/schema_update.rs +++ b/meilisearch-core/src/update/schema_update.rs @@ -1,11 +1,12 @@ use meilisearch_schema::{Diff, Schema}; +use crate::database::{MainT, UpdateT}; use crate::update::documents_addition::reindex_all_documents; use crate::update::{next_update_id, Update}; use crate::{error::UnsupportedOperation, store, MResult}; pub fn apply_schema_update( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, new_schema: &Schema, main_store: store::Main, documents_fields_store: store::DocumentsFields, @@ -61,7 +62,7 @@ pub fn apply_schema_update( } pub fn push_schema_update( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, schema: Schema, diff --git a/meilisearch-core/src/update/stop_words_addition.rs b/meilisearch-core/src/update/stop_words_addition.rs index 53f890a7f..536854586 100644 --- a/meilisearch-core/src/update/stop_words_addition.rs +++ b/meilisearch-core/src/update/stop_words_addition.rs @@ -2,6 +2,7 @@ use std::collections::BTreeSet; use fst::{set::OpBuilder, SetBuilder}; +use crate::database::{MainT, UpdateT}; use crate::automaton::normalize_str; use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::{next_update_id, Update}; @@ -33,7 +34,7 @@ impl StopWordsAddition { self.stop_words.insert(stop_word); } - pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { + pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_stop_words_addition( writer, @@ -46,7 +47,7 @@ impl StopWordsAddition { } pub fn push_stop_words_addition( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, addition: BTreeSet, @@ -60,7 +61,7 @@ pub fn push_stop_words_addition( } pub fn apply_stop_words_addition( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, main_store: store::Main, postings_lists_store: store::PostingsLists, addition: BTreeSet, diff --git a/meilisearch-core/src/update/stop_words_deletion.rs b/meilisearch-core/src/update/stop_words_deletion.rs index e502959e4..2ef438454 100644 --- a/meilisearch-core/src/update/stop_words_deletion.rs +++ b/meilisearch-core/src/update/stop_words_deletion.rs @@ -2,6 +2,7 @@ use std::collections::BTreeSet; use fst::{set::OpBuilder, SetBuilder}; +use crate::database::{MainT, UpdateT}; use crate::automaton::normalize_str; use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::documents_addition::reindex_all_documents; @@ -34,7 +35,7 @@ impl StopWordsDeletion { self.stop_words.insert(stop_word); } - pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { + pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_stop_words_deletion( writer, @@ -47,7 +48,7 @@ impl StopWordsDeletion { } pub fn push_stop_words_deletion( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, deletion: BTreeSet, @@ -61,7 +62,7 @@ pub fn push_stop_words_deletion( } pub fn apply_stop_words_deletion( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, main_store: store::Main, documents_fields_store: store::DocumentsFields, documents_fields_counts_store: store::DocumentsFieldsCounts, diff --git a/meilisearch-core/src/update/synonyms_addition.rs b/meilisearch-core/src/update/synonyms_addition.rs index 4d9968c52..21aa8ef9b 100644 --- a/meilisearch-core/src/update/synonyms_addition.rs +++ b/meilisearch-core/src/update/synonyms_addition.rs @@ -3,6 +3,7 @@ use std::collections::BTreeMap; use fst::{set::OpBuilder, SetBuilder}; use sdset::SetBuf; +use crate::database::{MainT, UpdateT}; use crate::automaton::normalize_str; use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::{next_update_id, Update}; @@ -43,7 +44,7 @@ impl SynonymsAddition { .extend(alternatives); } - pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { + pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_synonyms_addition( writer, @@ -56,7 +57,7 @@ impl SynonymsAddition { } pub fn push_synonyms_addition( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, addition: BTreeMap>, @@ -70,7 +71,7 @@ pub fn push_synonyms_addition( } pub fn apply_synonyms_addition( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, main_store: store::Main, synonyms_store: store::Synonyms, addition: BTreeMap>, diff --git a/meilisearch-core/src/update/synonyms_deletion.rs b/meilisearch-core/src/update/synonyms_deletion.rs index a2ded59a1..eeff7b6cc 100644 --- a/meilisearch-core/src/update/synonyms_deletion.rs +++ b/meilisearch-core/src/update/synonyms_deletion.rs @@ -4,6 +4,7 @@ use std::iter::FromIterator; use fst::{set::OpBuilder, SetBuilder}; use sdset::SetBuf; +use crate::database::{MainT, UpdateT}; use crate::automaton::normalize_str; use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::{next_update_id, Update}; @@ -50,7 +51,7 @@ impl SynonymsDeletion { } } - pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { + pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_synonyms_deletion( writer, @@ -63,7 +64,7 @@ impl SynonymsDeletion { } pub fn push_synonyms_deletion( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, deletion: BTreeMap>>, @@ -77,7 +78,7 @@ pub fn push_synonyms_deletion( } pub fn apply_synonyms_deletion( - writer: &mut heed::RwTxn, + writer: &mut heed::RwTxn, main_store: store::Main, synonyms_store: store::Synonyms, deletion: BTreeMap>>, diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index 061d44845..bb0ab6296 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -16,7 +16,7 @@ bincode = "1.2.0" chrono = { version = "0.4.9", features = ["serde"] } crossbeam-channel = "0.4.0" env_logger = "0.7.1" -heed = "0.5.0" +heed = "0.6.0" http = "0.1.19" indexmap = { version = "1.3.0", features = ["serde-1"] } isahc = "0.7.6" diff --git a/meilisearch-http/src/data.rs b/meilisearch-http/src/data.rs index d6de9aea2..bc9f9effd 100644 --- a/meilisearch-http/src/data.rs +++ b/meilisearch-http/src/data.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use heed::types::{SerdeBincode, Str}; use log::error; -use meilisearch_core::{Database, Error as MError, MResult}; +use meilisearch_core::{Database, MainT, UpdateT, Error as MError, MResult}; use sysinfo::Pid; use crate::option::Opt; @@ -37,32 +37,32 @@ pub struct DataInner { } impl DataInner { - pub fn is_indexing(&self, reader: &heed::RoTxn, index: &str) -> MResult> { + pub fn is_indexing(&self, reader: &heed::RoTxn, index: &str) -> MResult> { match self.db.open_index(&index) { Some(index) => index.current_update_id(&reader).map(|u| Some(u.is_some())), None => Ok(None), } } - pub fn last_update(&self, reader: &heed::RoTxn) -> MResult>> { + pub fn last_update(&self, reader: &heed::RoTxn) -> MResult>> { match self .db .common_store() - .get::(reader, LAST_UPDATE_KEY)? + .get::<_, Str, SerdeDatetime>(reader, LAST_UPDATE_KEY)? { Some(datetime) => Ok(Some(datetime)), None => Ok(None), } } - pub fn set_last_update(&self, writer: &mut heed::RwTxn) -> MResult<()> { + pub fn set_last_update(&self, writer: &mut heed::RwTxn) -> MResult<()> { self.db .common_store() - .put::(writer, LAST_UPDATE_KEY, &Utc::now()) + .put::<_, Str, SerdeDatetime>(writer, LAST_UPDATE_KEY, &Utc::now()) .map_err(Into::into) } - pub fn compute_stats(&self, writer: &mut heed::RwTxn, index_uid: &str) -> MResult<()> { + pub fn compute_stats(&self, writer: &mut heed::RwTxn, index_uid: &str) -> MResult<()> { let index = match self.db.open_index(&index_uid) { Some(index) => index, None => { diff --git a/meilisearch-http/src/helpers/meilisearch.rs b/meilisearch-http/src/helpers/meilisearch.rs index c33ea205f..fc7958c49 100644 --- a/meilisearch-http/src/helpers/meilisearch.rs +++ b/meilisearch-http/src/helpers/meilisearch.rs @@ -4,6 +4,7 @@ use log::error; use meilisearch_core::criterion::*; use meilisearch_core::Highlight; use meilisearch_core::{Index, RankedMap}; +use meilisearch_core::MainT; use meilisearch_schema::{Schema, SchemaAttr}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -157,7 +158,7 @@ impl<'a> SearchBuilder<'a> { self } - pub fn search(&self, reader: &heed::RoTxn) -> Result { + pub fn search(&self, reader: &heed::RoTxn) -> Result { let schema = self.index.main.schema(reader); let schema = schema.map_err(|e| Error::Internal(e.to_string()))?; let schema = match schema { @@ -285,7 +286,7 @@ impl<'a> SearchBuilder<'a> { pub fn get_criteria( &self, - reader: &heed::RoTxn, + reader: &heed::RoTxn, ranked_map: &'a RankedMap, schema: &Schema, ) -> Result>, Error> { diff --git a/meilisearch-http/src/helpers/tide.rs b/meilisearch-http/src/helpers/tide.rs index 1a26d7b4a..cd05a393d 100644 --- a/meilisearch-http/src/helpers/tide.rs +++ b/meilisearch-http/src/helpers/tide.rs @@ -29,14 +29,13 @@ impl ContextExt for Context { let request_index: Option = None; //self.param::("index").ok(); let db = &self.state().db; - let env = &db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let token_key = format!("{}{}", TOKEN_PREFIX_KEY, user_api_key); let token_config = db .common_store() - .get::>(&reader, &token_key) + .get::<_, Str, SerdeBincode>(&reader, &token_key) .map_err(ResponseError::internal)? .ok_or(ResponseError::invalid_token(format!( "Api key does not exist: {}", diff --git a/meilisearch-http/src/routes/document.rs b/meilisearch-http/src/routes/document.rs index 71ba03d46..7c91219b8 100644 --- a/meilisearch-http/src/routes/document.rs +++ b/meilisearch-http/src/routes/document.rs @@ -21,8 +21,8 @@ pub async fn get_document(ctx: Context) -> SResult { let identifier = ctx.identifier()?; let document_id = meilisearch_core::serde::compute_document_id(identifier.clone()); - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let response = index .document::>(&reader, None, document_id) @@ -49,16 +49,16 @@ pub async fn delete_document(ctx: Context) -> SResult { let identifier = ctx.identifier()?; let document_id = meilisearch_core::serde::compute_document_id(identifier.clone()); - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let mut update_writer = db.update_write_txn().map_err(ResponseError::internal)?; let mut documents_deletion = index.documents_deletion(); documents_deletion.delete_document_by_id(document_id); let update_id = documents_deletion - .finalize(&mut writer) + .finalize(&mut update_writer) .map_err(ResponseError::internal)?; - writer.commit().map_err(ResponseError::internal)?; + update_writer.commit().map_err(ResponseError::internal)?; let response_body = IndexUpdateResponse { update_id }; Ok(tide::response::json(response_body) @@ -83,8 +83,8 @@ pub async fn get_all_documents(ctx: Context) -> SResult { let offset = query.offset.unwrap_or(0); let limit = query.limit.unwrap_or(20); - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let documents_ids: Result, _> = match index.documents_fields_counts.documents_ids(&reader) { @@ -146,18 +146,19 @@ async fn update_multiple_documents(mut ctx: Context, is_partial: bool) -> ctx.body_json().await.map_err(ResponseError::bad_request)?; let index = ctx.index()?; - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; + let mut update_writer = db.update_write_txn().map_err(ResponseError::internal)?; let current_schema = index .main - .schema(&writer) + .schema(&reader) .map_err(ResponseError::internal)?; if current_schema.is_none() { match data.first().and_then(infered_schema) { Some(schema) => { index - .schema_update(&mut writer, schema) + .schema_update(&mut update_writer, schema) .map_err(ResponseError::internal)?; } None => return Err(ResponseError::bad_request("Could not infer a schema")), @@ -175,10 +176,10 @@ async fn update_multiple_documents(mut ctx: Context, is_partial: bool) -> } let update_id = document_addition - .finalize(&mut writer) + .finalize(&mut update_writer) .map_err(ResponseError::internal)?; - writer.commit().map_err(ResponseError::internal)?; + update_writer.commit().map_err(ResponseError::internal)?; let response_body = IndexUpdateResponse { update_id }; Ok(tide::response::json(response_body) @@ -200,8 +201,8 @@ pub async fn delete_multiple_documents(mut ctx: Context) -> SResult = ctx.body_json().await.map_err(ResponseError::bad_request)?; let index = ctx.index()?; - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let mut writer = db.update_write_txn().map_err(ResponseError::internal)?; let mut documents_deletion = index.documents_deletion(); @@ -229,8 +230,9 @@ pub async fn clear_all_documents(ctx: Context) -> SResult { let index = ctx.index()?; - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let mut writer = db.update_write_txn().map_err(ResponseError::internal)?; + let update_id = index .clear_all(&mut writer) .map_err(ResponseError::internal)?; diff --git a/meilisearch-http/src/routes/health.rs b/meilisearch-http/src/routes/health.rs index d919487e6..4582ebc90 100644 --- a/meilisearch-http/src/routes/health.rs +++ b/meilisearch-http/src/routes/health.rs @@ -11,12 +11,11 @@ const UNHEALTHY_KEY: &str = "_is_unhealthy"; pub async fn get_health(ctx: Context) -> SResult<()> { let db = &ctx.state().db; - let env = &db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let common_store = ctx.state().db.common_store(); - if let Ok(Some(_)) = common_store.get::(&reader, UNHEALTHY_KEY) { + if let Ok(Some(_)) = common_store.get::<_, Str, Unit>(&reader, UNHEALTHY_KEY) { return Err(ResponseError::Maintenance); } @@ -27,11 +26,10 @@ pub async fn set_healthy(ctx: Context) -> SResult<()> { ctx.is_allowed(Admin)?; let db = &ctx.state().db; - let env = &db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let mut writer = db.main_write_txn().map_err(ResponseError::internal)?; let common_store = ctx.state().db.common_store(); - match common_store.delete::(&mut writer, UNHEALTHY_KEY) { + match common_store.delete::<_, Str>(&mut writer, UNHEALTHY_KEY) { Ok(_) => (), Err(e) => return Err(ResponseError::internal(e)), } @@ -47,12 +45,11 @@ pub async fn set_unhealthy(ctx: Context) -> SResult<()> { ctx.is_allowed(Admin)?; let db = &ctx.state().db; - let env = &db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let mut writer = db.main_write_txn().map_err(ResponseError::internal)?; let common_store = ctx.state().db.common_store(); - if let Err(e) = common_store.put::(&mut writer, UNHEALTHY_KEY, &()) { + if let Err(e) = common_store.put::<_, Str, Unit>(&mut writer, UNHEALTHY_KEY, &()) { return Err(ResponseError::internal(e)); } diff --git a/meilisearch-http/src/routes/index.rs b/meilisearch-http/src/routes/index.rs index 77e79bd7a..6655a9c6d 100644 --- a/meilisearch-http/src/routes/index.rs +++ b/meilisearch-http/src/routes/index.rs @@ -31,8 +31,8 @@ pub async fn list_indexes(ctx: Context) -> SResult { let indexes_uids = ctx.state().db.indexes_uids(); - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let mut response_body = Vec::new(); @@ -89,8 +89,8 @@ pub async fn get_index(ctx: Context) -> SResult { let index = ctx.index()?; - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let uid = ctx.url_param("index")?; let name = index @@ -164,8 +164,8 @@ pub async fn create_index(mut ctx: Context) -> SResult { Err(e) => return Err(ResponseError::create_index(e)), }; - let env = &db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let mut writer = db.main_write_txn().map_err(ResponseError::internal)?; + let mut update_writer = db.update_write_txn().map_err(ResponseError::internal)?; created_index .main @@ -184,12 +184,13 @@ pub async fn create_index(mut ctx: Context) -> SResult { let mut response_update_id = None; if let Some(schema) = schema { let update_id = created_index - .schema_update(&mut writer, schema) + .schema_update(&mut update_writer, schema) .map_err(ResponseError::internal)?; response_update_id = Some(update_id) } writer.commit().map_err(ResponseError::internal)?; + update_writer.commit().map_err(ResponseError::internal)?; let response_body = IndexCreateResponse { name: body.name, @@ -232,9 +233,7 @@ pub async fn update_index(mut ctx: Context) -> SResult { let index = ctx.index()?; let db = &ctx.state().db; - - let env = &db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let mut writer = db.main_write_txn().map_err(ResponseError::internal)?; index .main @@ -247,7 +246,7 @@ pub async fn update_index(mut ctx: Context) -> SResult { .map_err(ResponseError::internal)?; writer.commit().map_err(ResponseError::internal)?; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let created_at = index .main @@ -286,8 +285,8 @@ pub async fn get_index_schema(ctx: Context) -> SResult { // Tide doesn't support "no query param" let params: SchemaParams = ctx.url_query().unwrap_or_default(); - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let schema = index .main @@ -326,8 +325,7 @@ pub async fn update_schema(mut ctx: Context) -> SResult { }; let db = &ctx.state().db; - let env = &db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let mut writer = db.update_write_txn().map_err(ResponseError::internal)?; let index = db .open_index(&index_uid) @@ -348,8 +346,8 @@ pub async fn update_schema(mut ctx: Context) -> SResult { pub async fn get_update_status(ctx: Context) -> SResult { ctx.is_allowed(IndexesRead)?; - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.update_read_txn().map_err(ResponseError::internal)?; let update_id = ctx .param::("update_id") @@ -375,8 +373,8 @@ pub async fn get_update_status(ctx: Context) -> SResult { pub async fn get_all_updates_status(ctx: Context) -> SResult { ctx.is_allowed(IndexesRead)?; - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.update_read_txn().map_err(ResponseError::internal)?; let index = ctx.index()?; let all_status = index @@ -413,8 +411,8 @@ pub fn index_update_callback(index_uid: &str, data: &Data, status: ProcessedUpda } if let Some(index) = data.db.open_index(&index_uid) { - let env = &data.db.env; - let mut writer = match env.write_txn() { + let db = &data.db; + let mut writer = match db.main_write_txn() { Ok(writer) => writer, Err(e) => { error!("Impossible to get write_txn; {}", e); diff --git a/meilisearch-http/src/routes/key.rs b/meilisearch-http/src/routes/key.rs index 94b6ba2fa..ed9143410 100644 --- a/meilisearch-http/src/routes/key.rs +++ b/meilisearch-http/src/routes/key.rs @@ -26,15 +26,14 @@ pub async fn list(ctx: Context) -> SResult { ctx.is_allowed(Admin)?; let db = &ctx.state().db; - let env = &db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let common_store = db.common_store(); let mut response: Vec = Vec::new(); let iter = common_store - .prefix_iter::>(&reader, TOKEN_PREFIX_KEY) + .prefix_iter::<_, Str, SerdeBincode>(&reader, TOKEN_PREFIX_KEY) .map_err(ResponseError::internal)?; for result in iter { @@ -50,14 +49,13 @@ pub async fn get(ctx: Context) -> SResult { let request_key = ctx.url_param("key")?; let db = &ctx.state().db; - let env = &db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let token_key = format!("{}{}", TOKEN_PREFIX_KEY, request_key); let token_config = db .common_store() - .get::>(&reader, &token_key) + .get::<_, Str, SerdeBincode>(&reader, &token_key) .map_err(ResponseError::internal)? .ok_or(ResponseError::not_found(format!( "token key: {}", @@ -97,11 +95,10 @@ pub async fn create(mut ctx: Context) -> SResult { }; let db = &ctx.state().db; - let env = &db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let mut writer = db.main_write_txn().map_err(ResponseError::internal)?; db.common_store() - .put::>(&mut writer, &token_key, &token_definition) + .put::<_, Str, SerdeBincode>(&mut writer, &token_key, &token_definition) .map_err(ResponseError::internal)?; writer.commit().map_err(ResponseError::internal)?; @@ -128,15 +125,14 @@ pub async fn update(mut ctx: Context) -> SResult { let data: UpdatedRequest = ctx.body_json().await.map_err(ResponseError::bad_request)?; let db = &ctx.state().db; - let env = &db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let mut writer = db.main_write_txn().map_err(ResponseError::internal)?; let common_store = db.common_store(); let token_key = format!("{}{}", TOKEN_PREFIX_KEY, request_key); let mut token_config = common_store - .get::>(&writer, &token_key) + .get::<_, Str, SerdeBincode>(&writer, &token_key) .map_err(ResponseError::internal)? .ok_or(ResponseError::not_found(format!( "token key: {}", @@ -167,7 +163,7 @@ pub async fn update(mut ctx: Context) -> SResult { token_config.updated_at = Utc::now(); common_store - .put::>(&mut writer, &token_key, &token_config) + .put::<_, Str, SerdeBincode>(&mut writer, &token_key, &token_config) .map_err(ResponseError::internal)?; writer.commit().map_err(ResponseError::internal)?; @@ -182,15 +178,14 @@ pub async fn delete(ctx: Context) -> SResult { let request_key = ctx.url_param("key")?; let db = &ctx.state().db; - let env = &db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let mut writer = db.main_write_txn().map_err(ResponseError::internal)?; let common_store = db.common_store(); let token_key = format!("{}{}", TOKEN_PREFIX_KEY, request_key); common_store - .delete::(&mut writer, &token_key) + .delete::<_, Str>(&mut writer, &token_key) .map_err(ResponseError::internal)?; writer.commit().map_err(ResponseError::internal)?; diff --git a/meilisearch-http/src/routes/search.rs b/meilisearch-http/src/routes/search.rs index 3f83be98d..69dd01f09 100644 --- a/meilisearch-http/src/routes/search.rs +++ b/meilisearch-http/src/routes/search.rs @@ -33,8 +33,8 @@ pub async fn search_with_url_query(ctx: Context) -> SResult { // ctx.is_allowed(DocumentsRead)?; let index = ctx.index()?; - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let schema = index .main @@ -210,9 +210,7 @@ pub async fn search_multi_index(mut ctx: Context) -> SResult { } } - let env = &db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; - + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let response = search_builder .search(&reader) .map_err(ResponseError::internal)?; diff --git a/meilisearch-http/src/routes/setting.rs b/meilisearch-http/src/routes/setting.rs index 5e472e780..5793fcf47 100644 --- a/meilisearch-http/src/routes/setting.rs +++ b/meilisearch-http/src/routes/setting.rs @@ -34,8 +34,8 @@ pub async fn get(ctx: Context) -> SResult { ctx.is_allowed(SettingsRead)?; let index = ctx.index()?; - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let settings = match index.main.customs(&reader).unwrap() { Some(bytes) => bincode::deserialize(bytes).unwrap(), @@ -52,10 +52,11 @@ pub async fn update(mut ctx: Context) -> SResult { let index = ctx.index()?; - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_write_txn().map_err(ResponseError::internal)?; + let mut writer = db.update_write_txn().map_err(ResponseError::internal)?; - let mut current_settings = match index.main.customs(&writer).unwrap() { + let mut current_settings = match index.main.customs(&reader).unwrap() { Some(bytes) => bincode::deserialize(bytes).unwrap(), None => SettingBody::default(), }; diff --git a/meilisearch-http/src/routes/stats.rs b/meilisearch-http/src/routes/stats.rs index 0a5604cb0..1428e5662 100644 --- a/meilisearch-http/src/routes/stats.rs +++ b/meilisearch-http/src/routes/stats.rs @@ -26,8 +26,9 @@ pub async fn index_stat(ctx: Context) -> SResult { let index_uid = ctx.url_param("index")?; let index = ctx.index()?; - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; + let update_reader = db.update_read_txn().map_err(ResponseError::internal)?; let number_of_documents = index .main @@ -42,7 +43,7 @@ pub async fn index_stat(ctx: Context) -> SResult { let is_indexing = ctx .state() - .is_indexing(&reader, &index_uid) + .is_indexing(&update_reader, &index_uid) .map_err(ResponseError::internal)? .ok_or(ResponseError::internal("'is_indexing' date not found"))?; @@ -68,8 +69,8 @@ pub async fn get_stats(ctx: Context) -> SResult { let mut index_list = HashMap::new(); let db = &ctx.state().db; - let env = &db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; + let update_reader = db.update_read_txn().map_err(ResponseError::internal)?; let indexes_set = ctx.state().db.indexes_uids(); for index_uid in indexes_set { @@ -90,7 +91,7 @@ pub async fn get_stats(ctx: Context) -> SResult { let is_indexing = ctx .state() - .is_indexing(&reader, &index_uid) + .is_indexing(&update_reader, &index_uid) .map_err(ResponseError::internal)? .ok_or(ResponseError::internal("'is_indexing' date not found"))?; diff --git a/meilisearch-http/src/routes/stop_words.rs b/meilisearch-http/src/routes/stop_words.rs index 4628ad8fe..45169f1e3 100644 --- a/meilisearch-http/src/routes/stop_words.rs +++ b/meilisearch-http/src/routes/stop_words.rs @@ -12,8 +12,8 @@ pub async fn list(ctx: Context) -> SResult { ctx.is_allowed(SettingsRead)?; let index = ctx.index()?; - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let stop_words_fst = index .main @@ -35,8 +35,8 @@ pub async fn add(mut ctx: Context) -> SResult { let data: Vec = ctx.body_json().await.map_err(ResponseError::bad_request)?; - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let mut writer = db.update_write_txn().map_err(ResponseError::internal)?; let mut stop_words_addition = index.stop_words_addition(); for stop_word in data { @@ -61,8 +61,8 @@ pub async fn delete(mut ctx: Context) -> SResult { let data: Vec = ctx.body_json().await.map_err(ResponseError::bad_request)?; - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let mut writer = db.update_write_txn().map_err(ResponseError::internal)?; let mut stop_words_deletion = index.stop_words_deletion(); for stop_word in data { diff --git a/meilisearch-http/src/routes/synonym.rs b/meilisearch-http/src/routes/synonym.rs index b7d52a5a1..31222389a 100644 --- a/meilisearch-http/src/routes/synonym.rs +++ b/meilisearch-http/src/routes/synonym.rs @@ -31,8 +31,8 @@ pub async fn list(ctx: Context) -> SResult { ctx.is_allowed(SettingsRead)?; let index = ctx.index()?; - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let synonyms_fst = index .main @@ -65,8 +65,8 @@ pub async fn get(ctx: Context) -> SResult { let synonym = ctx.url_param("synonym")?; let index = ctx.index()?; - let env = &ctx.state().db.env; - let reader = env.read_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; let synonym_list = index .synonyms @@ -87,8 +87,8 @@ pub async fn create(mut ctx: Context) -> SResult { let index = ctx.index()?; - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let mut writer = db.update_write_txn().map_err(ResponseError::internal)?; let mut synonyms_addition = index.synonyms_addition(); @@ -125,8 +125,8 @@ pub async fn update(mut ctx: Context) -> SResult { let index = ctx.index()?; let data: Vec = ctx.body_json().await.map_err(ResponseError::bad_request)?; - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let mut writer = db.update_write_txn().map_err(ResponseError::internal)?; let mut synonyms_addition = index.synonyms_addition(); synonyms_addition.add_synonym(synonym.clone(), data.clone().into_iter()); @@ -147,8 +147,8 @@ pub async fn delete(ctx: Context) -> SResult { let synonym = ctx.url_param("synonym")?; let index = ctx.index()?; - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let mut writer = db.update_write_txn().map_err(ResponseError::internal)?; let mut synonyms_deletion = index.synonyms_deletion(); synonyms_deletion.delete_all_alternatives_of(synonym); @@ -171,8 +171,8 @@ pub async fn batch_write(mut ctx: Context) -> SResult { let index = ctx.index()?; - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let mut writer = db.update_write_txn().map_err(ResponseError::internal)?; let mut synonyms_addition = index.synonyms_addition(); for raw in data { @@ -207,12 +207,13 @@ pub async fn clear(ctx: Context) -> SResult { ctx.is_allowed(SettingsWrite)?; let index = ctx.index()?; - let env = &ctx.state().db.env; - let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let db = &ctx.state().db; + let reader = db.main_read_txn().map_err(ResponseError::internal)?; + let mut writer = db.update_write_txn().map_err(ResponseError::internal)?; let synonyms_fst = index .main - .synonyms_fst(&writer) + .synonyms_fst(&reader) .map_err(ResponseError::internal)?; let synonyms_fst = synonyms_fst.unwrap_or_default();