From ba32ce21d08aab356821a738994418a7712883e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 8 Oct 2019 17:16:48 +0200 Subject: [PATCH] Introduce synonyms deletions updates --- meilidb-core/src/store/synonyms.rs | 14 ++ meilidb-core/src/update/mod.rs | 33 ++++ meilidb-core/src/update/synonyms_addition.rs | 2 +- meilidb-core/src/update/synonyms_deletion.rs | 151 +++++++++++++++++++ 4 files changed, 199 insertions(+), 1 deletion(-) create mode 100644 meilidb-core/src/update/synonyms_deletion.rs diff --git a/meilidb-core/src/store/synonyms.rs b/meilidb-core/src/store/synonyms.rs index ec0ada6e8..c00f891ce 100644 --- a/meilidb-core/src/store/synonyms.rs +++ b/meilidb-core/src/store/synonyms.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use rkv::StoreError; use crate::error::MResult; #[derive(Copy, Clone)] @@ -18,6 +19,19 @@ impl Synonyms { self.synonyms.put(writer, word, &blob) } + pub fn del_synonyms( + &self, + writer: &mut rkv::Writer, + word: &[u8], + ) -> Result + { + match self.synonyms.delete(writer, word) { + Ok(()) => Ok(true), + Err(StoreError::LmdbError(lmdb::Error::NotFound)) => Ok(false), + Err(e) => Err(e), + } + } + pub fn synonyms( &self, reader: &impl rkv::Readable, diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 24c06bd0c..0e9b6b512 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -2,11 +2,13 @@ mod documents_addition; mod documents_deletion; mod schema_update; mod synonyms_addition; +mod synonyms_deletion; pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; pub use self::schema_update::apply_schema_update; pub use self::synonyms_addition::{SynonymsAddition, apply_synonyms_addition}; +pub use self::synonyms_deletion::{SynonymsDeletion, apply_synonyms_deletion}; use std::time::{Duration, Instant}; use std::collections::BTreeMap; @@ -24,6 +26,7 @@ pub enum Update { DocumentsAddition(Vec), DocumentsDeletion(Vec), SynonymsAddition(BTreeMap>), + SynonymsDeletion(BTreeMap>>), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -32,6 +35,7 @@ pub enum UpdateType { DocumentsAddition { number: usize }, DocumentsDeletion { number: usize }, SynonymsAddition { number: usize }, + SynonymsDeletion { number: usize }, } #[derive(Clone, Serialize, Deserialize)] @@ -172,6 +176,21 @@ pub fn push_synonyms_addition( Ok(last_update_id) } +pub fn push_synonyms_deletion( + writer: &mut rkv::Writer, + updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + deletion: BTreeMap>>, +) -> MResult +{ + let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; + + let update = Update::SynonymsDeletion(deletion); + let update_id = updates_store.put_update(writer, last_update_id, &update)?; + + Ok(last_update_id) +} + pub fn update_task( writer: &mut rkv::Writer, index: store::Index, @@ -249,6 +268,20 @@ pub fn update_task( synonyms, ); + (update_type, result, start.elapsed()) + }, + Update::SynonymsDeletion(synonyms) => { + let start = Instant::now(); + + let update_type = UpdateType::SynonymsDeletion { number: synonyms.len() }; + + let result = apply_synonyms_deletion( + writer, + index.main, + index.synonyms, + synonyms, + ); + (update_type, result, start.elapsed()) }, }; diff --git a/meilidb-core/src/update/synonyms_addition.rs b/meilidb-core/src/update/synonyms_addition.rs index a65883f89..7d83ae59e 100644 --- a/meilidb-core/src/update/synonyms_addition.rs +++ b/meilidb-core/src/update/synonyms_addition.rs @@ -62,7 +62,7 @@ pub fn apply_synonyms_addition( main_store: store::Main, synonyms_store: store::Synonyms, addition: BTreeMap>, -) -> Result<(), Error> +) -> MResult<()> { let mut synonyms_builder = SetBuilder::memory(); diff --git a/meilidb-core/src/update/synonyms_deletion.rs b/meilidb-core/src/update/synonyms_deletion.rs new file mode 100644 index 000000000..0672f4ca6 --- /dev/null +++ b/meilidb-core/src/update/synonyms_deletion.rs @@ -0,0 +1,151 @@ +use std::collections::BTreeMap; +use std::iter::FromIterator; +use std::sync::Arc; + +use fst::{SetBuilder, set::OpBuilder}; +use sdset::SetBuf; + +use crate::automaton::normalize_str; +use crate::raw_indexer::RawIndexer; +use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; +use crate::store; +use crate::update::push_synonyms_deletion; +use crate::{MResult, Error, RankedMap}; + +pub struct SynonymsDeletion { + updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + updates_notifier: crossbeam_channel::Sender<()>, + synonyms: BTreeMap>>, +} + +impl SynonymsDeletion { + pub fn new( + updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + updates_notifier: crossbeam_channel::Sender<()>, + ) -> SynonymsDeletion + { + SynonymsDeletion { + updates_store, + updates_results_store, + updates_notifier, + synonyms: BTreeMap::new(), + } + } + + pub fn delete_all_alternatives_of>(&mut self, synonym: S) { + let synonym = normalize_str(synonym.as_ref()); + self.synonyms.insert(synonym, None); + } + + pub fn delete_specific_alternatives_of(&mut self, synonym: S, alternatives: I) + where S: AsRef, + T: AsRef, + I: Iterator, + { + let synonym = normalize_str(synonym.as_ref()); + let value = self.synonyms.entry(synonym).or_insert(None); + let alternatives = alternatives.map(|s| s.as_ref().to_lowercase()); + match value { + Some(v) => v.extend(alternatives), + None => *value = Some(Vec::from_iter(alternatives)), + } + } + + pub fn finalize(self, mut writer: rkv::Writer) -> MResult { + let update_id = push_synonyms_deletion( + &mut writer, + self.updates_store, + self.updates_results_store, + self.synonyms, + )?; + writer.commit()?; + let _ = self.updates_notifier.send(()); + + Ok(update_id) + } +} + +pub fn apply_synonyms_deletion( + writer: &mut rkv::Writer, + main_store: store::Main, + synonyms_store: store::Synonyms, + deletion: BTreeMap>>, +) -> MResult<()> +{ + let mut delete_whole_synonym_builder = SetBuilder::memory(); + + for (synonym, alternatives) in deletion { + match alternatives { + Some(alternatives) => { + let prev_alternatives = synonyms_store.synonyms(writer, synonym.as_bytes())?; + let prev_alternatives = match prev_alternatives { + Some(alternatives) => alternatives, + None => continue, + }; + + let delta_alternatives = { + let alternatives = SetBuf::from_dirty(alternatives); + let mut builder = SetBuilder::memory(); + builder.extend_iter(alternatives).unwrap(); + builder.into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }; + + let op = OpBuilder::new() + .add(prev_alternatives.stream()) + .add(delta_alternatives.stream()) + .difference(); + + let (alternatives, empty_alternatives) = { + let mut builder = SetBuilder::memory(); + let len = builder.get_ref().len(); + builder.extend_stream(op).unwrap(); + let is_empty = len == builder.get_ref().len(); + let bytes = builder.into_inner().unwrap(); + let alternatives = fst::Set::from_bytes(bytes).unwrap(); + + (alternatives, is_empty) + }; + + if empty_alternatives { + delete_whole_synonym_builder.insert(synonym.as_bytes())?; + } else { + synonyms_store.put_synonyms(writer, synonym.as_bytes(), &alternatives)?; + } + }, + None => { + delete_whole_synonym_builder.insert(&synonym).unwrap(); + synonyms_store.del_synonyms(writer, synonym.as_bytes())?; + } + } + } + + let delta_synonyms = delete_whole_synonym_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap(); + + let synonyms = match main_store.synonyms_fst(writer)? { + Some(synonyms) => { + let op = OpBuilder::new() + .add(synonyms.stream()) + .add(delta_synonyms.stream()) + .difference(); + + let mut synonyms_builder = SetBuilder::memory(); + synonyms_builder.extend_stream(op).unwrap(); + synonyms_builder + .into_inner() + .and_then(fst::Set::from_bytes) + .unwrap() + }, + None => fst::Set::default(), + }; + + main_store.put_synonyms_fst(writer, &synonyms)?; + + Ok(()) +}