From 776673ebae209f69a5ac889822860a65f71875e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 29 Oct 2019 15:24:09 +0100 Subject: [PATCH] Introduce the stop words addition update type --- meilidb-core/src/store/main.rs | 19 +++ meilidb-core/src/store/mod.rs | 8 ++ meilidb-core/src/update/mod.rs | 18 ++- .../src/update/stop_words_addition.rs | 116 ++++++++++++++++++ 4 files changed, 160 insertions(+), 1 deletion(-) create mode 100644 meilidb-core/src/update/stop_words_addition.rs diff --git a/meilidb-core/src/store/main.rs b/meilidb-core/src/store/main.rs index 416fcfe37..dca995759 100644 --- a/meilidb-core/src/store/main.rs +++ b/meilidb-core/src/store/main.rs @@ -9,6 +9,7 @@ const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents"; const RANKED_MAP_KEY: &str = "ranked-map"; const SCHEMA_KEY: &str = "schema"; const SYNONYMS_KEY: &str = "synonyms"; +const STOP_WORDS_KEY: &str = "stop-words"; const WORDS_KEY: &str = "words"; #[derive(Copy, Clone)] @@ -71,6 +72,24 @@ impl Main { } } + pub fn put_stop_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> { + let bytes = fst.as_fst().as_bytes(); + self.main + .put::(writer, STOP_WORDS_KEY, bytes) + } + + pub fn stop_words_fst(self, reader: &heed::RoTxn) -> ZResult> { + match self.main.get::(reader, STOP_WORDS_KEY)? { + Some(bytes) => { + let len = bytes.len(); + let bytes = Arc::from(bytes); + let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap(); + Ok(Some(fst::Set::from(fst))) + } + None => Ok(None), + } + } + pub fn put_number_of_documents(self, writer: &mut heed::RwTxn, f: F) -> ZResult where F: Fn(u64) -> u64, diff --git a/meilidb-core/src/store/mod.rs b/meilidb-core/src/store/mod.rs index 4909ce77b..d12ebf8ff 100644 --- a/meilidb-core/src/store/mod.rs +++ b/meilidb-core/src/store/mod.rs @@ -187,6 +187,14 @@ impl Index { ) } + pub fn stop_words_addition(&self) -> update::StopWordsAddition { + update::StopWordsAddition::new( + self.updates, + self.updates_results, + self.updates_notifier.clone(), + ) + } + pub fn current_update_id(&self, reader: &heed::RoTxn) -> MResult> { match self.updates.last_update_id(reader)? { Some((id, _)) => Ok(Some(id)), diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 82290cb4f..ecb76a079 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -3,6 +3,7 @@ mod customs_update; mod documents_addition; mod documents_deletion; mod schema_update; +mod stop_words_addition; mod synonyms_addition; mod synonyms_deletion; @@ -11,11 +12,12 @@ pub use self::customs_update::{apply_customs_update, push_customs_update}; pub use self::documents_addition::{apply_documents_addition, DocumentsAddition}; pub use self::documents_deletion::{apply_documents_deletion, DocumentsDeletion}; pub use self::schema_update::{apply_schema_update, push_schema_update}; +pub use self::stop_words_addition::{apply_stop_words_addition, StopWordsAddition}; pub use self::synonyms_addition::{apply_synonyms_addition, SynonymsAddition}; pub use self::synonyms_deletion::{apply_synonyms_deletion, SynonymsDeletion}; use std::cmp; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::time::{Duration, Instant}; use heed::Result as ZResult; @@ -34,6 +36,7 @@ pub enum Update { DocumentsDeletion(Vec), SynonymsAddition(BTreeMap>), SynonymsDeletion(BTreeMap>>), + StopWordsAddition(BTreeSet), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -45,6 +48,7 @@ pub enum UpdateType { DocumentsDeletion { number: usize }, SynonymsAddition { number: usize }, SynonymsDeletion { number: usize }, + StopWordsAddition { number: usize }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -210,6 +214,18 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult { + let start = Instant::now(); + + let update_type = UpdateType::StopWordsAddition { + number: stop_words.len(), + }; + + let result = + apply_stop_words_addition(writer, index.main, index.postings_lists, stop_words); + (update_type, result, start.elapsed()) } }; diff --git a/meilidb-core/src/update/stop_words_addition.rs b/meilidb-core/src/update/stop_words_addition.rs new file mode 100644 index 000000000..182394e59 --- /dev/null +++ b/meilidb-core/src/update/stop_words_addition.rs @@ -0,0 +1,116 @@ +use std::collections::BTreeSet; + +use fst::{set::OpBuilder, SetBuilder}; + +use crate::automaton::normalize_str; +use crate::update::{next_update_id, Update}; +use crate::{store, MResult}; + +pub struct StopWordsAddition { + updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + updates_notifier: crossbeam_channel::Sender<()>, + stop_words: BTreeSet, +} + +impl StopWordsAddition { + pub fn new( + updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + updates_notifier: crossbeam_channel::Sender<()>, + ) -> StopWordsAddition { + StopWordsAddition { + updates_store, + updates_results_store, + updates_notifier, + stop_words: BTreeSet::new(), + } + } + + pub fn add_stop_word>(&mut self, stop_word: S) { + let stop_word = normalize_str(stop_word.as_ref()); + self.stop_words.insert(stop_word); + } + + pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { + let _ = self.updates_notifier.send(()); + let update_id = push_stop_words_addition( + writer, + self.updates_store, + self.updates_results_store, + self.stop_words, + )?; + Ok(update_id) + } +} + +pub fn push_stop_words_addition( + writer: &mut heed::RwTxn, + updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + addition: BTreeSet, +) -> MResult { + let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; + + let update = Update::StopWordsAddition(addition); + updates_store.put_update(writer, last_update_id, &update)?; + + Ok(last_update_id) +} + +pub fn apply_stop_words_addition( + writer: &mut heed::RwTxn, + main_store: store::Main, + postings_lists_store: store::PostingsLists, + addition: BTreeSet, +) -> MResult<()> { + let mut stop_words_builder = SetBuilder::memory(); + + for word in addition { + stop_words_builder.insert(&word).unwrap(); + // we remove every posting list associated to a new stop word + postings_lists_store.del_postings_list(writer, word.as_bytes())?; + } + + // create the new delta stop words fst + let delta_stop_words = stop_words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap(); + + // we also need to remove all the stop words from the main fst + if let Some(word_fst) = main_store.words_fst(writer)? { + let op = OpBuilder::new() + .add(&word_fst) + .add(&delta_stop_words) + .difference(); + + let mut word_fst_builder = SetBuilder::memory(); + word_fst_builder.extend_stream(op).unwrap(); + let word_fst = word_fst_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap(); + + main_store.put_words_fst(writer, &word_fst)?; + } + + // now we add all of these stop words to the main store + let stop_words_fst = main_store.stop_words_fst(writer)?.unwrap_or_default(); + + let op = OpBuilder::new() + .add(&stop_words_fst) + .add(&delta_stop_words) + .r#union(); + + let mut stop_words_builder = SetBuilder::memory(); + stop_words_builder.extend_stream(op).unwrap(); + let stop_words_fst = stop_words_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap(); + + main_store.put_stop_words_fst(writer, &stop_words_fst)?; + + Ok(()) +}