Introduce synonyms additions updates

This commit is contained in:
Clément Renault 2019-10-08 17:06:56 +02:00
parent 175461c13a
commit 0e224efa46
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 161 additions and 7 deletions

View File

@ -1,12 +1,15 @@
mod documents_addition; mod documents_addition;
mod documents_deletion; mod documents_deletion;
mod schema_update; mod schema_update;
mod synonyms_addition;
pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; pub use self::documents_addition::{DocumentsAddition, apply_documents_addition};
pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion};
pub use self::schema_update::apply_schema_update; pub use self::schema_update::apply_schema_update;
pub use self::synonyms_addition::{SynonymsAddition, apply_synonyms_addition};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::collections::BTreeMap;
use log::debug; use log::debug;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
@ -20,6 +23,7 @@ pub enum Update {
SchemaUpdate(Schema), SchemaUpdate(Schema),
DocumentsAddition(Vec<rmpv::Value>), DocumentsAddition(Vec<rmpv::Value>),
DocumentsDeletion(Vec<DocumentId>), DocumentsDeletion(Vec<DocumentId>),
SynonymsAddition(BTreeMap<String, Vec<String>>),
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -27,6 +31,7 @@ pub enum UpdateType {
SchemaUpdate { schema: Schema }, SchemaUpdate { schema: Schema },
DocumentsAddition { number: usize }, DocumentsAddition { number: usize },
DocumentsDeletion { number: usize }, DocumentsDeletion { number: usize },
SynonymsAddition { number: usize },
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
@ -68,7 +73,7 @@ pub fn update_status<T: rkv::Readable>(
} }
} }
pub fn biggest_update_id( fn biggest_update_id(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
@ -85,6 +90,21 @@ pub fn biggest_update_id(
Ok(max) Ok(max)
} }
pub fn next_update_id(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
) -> MResult<u64>
{
let last_update_id = biggest_update_id(
writer,
updates_store,
updates_results_store
)?;
Ok(last_update_id.map_or(0, |n| n + 1))
}
pub fn push_schema_update( pub fn push_schema_update(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
updates_store: store::Updates, updates_store: store::Updates,
@ -92,8 +112,7 @@ pub fn push_schema_update(
schema: Schema, schema: Schema,
) -> MResult<u64> ) -> MResult<u64>
{ {
let last_update_id = biggest_update_id(writer, updates_store, updates_results_store)?; let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let last_update_id = last_update_id.map_or(0, |n| n + 1);
let update = Update::SchemaUpdate(schema); let update = Update::SchemaUpdate(schema);
let update_id = updates_store.put_update(writer, last_update_id, &update)?; let update_id = updates_store.put_update(writer, last_update_id, &update)?;
@ -115,8 +134,7 @@ pub fn push_documents_addition<D: serde::Serialize>(
values.push(add); values.push(add);
} }
let last_update_id = biggest_update_id(writer, updates_store, updates_results_store)?; let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let last_update_id = last_update_id.map_or(0, |n| n + 1);
let update = Update::DocumentsAddition(values); let update = Update::DocumentsAddition(values);
let update_id = updates_store.put_update(writer, last_update_id, &update)?; let update_id = updates_store.put_update(writer, last_update_id, &update)?;
@ -131,8 +149,7 @@ pub fn push_documents_deletion(
deletion: Vec<DocumentId>, deletion: Vec<DocumentId>,
) -> MResult<u64> ) -> MResult<u64>
{ {
let last_update_id = biggest_update_id(writer, updates_store, updates_results_store)?; let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let last_update_id = last_update_id.map_or(0, |n| n + 1);
let update = Update::DocumentsDeletion(deletion); let update = Update::DocumentsDeletion(deletion);
let update_id = updates_store.put_update(writer, last_update_id, &update)?; let update_id = updates_store.put_update(writer, last_update_id, &update)?;
@ -140,6 +157,21 @@ pub fn push_documents_deletion(
Ok(last_update_id) Ok(last_update_id)
} }
pub fn push_synonyms_addition(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
addition: BTreeMap<String, Vec<String>>,
) -> MResult<u64>
{
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::SynonymsAddition(addition);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn update_task( pub fn update_task(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
index: store::Index, index: store::Index,
@ -203,6 +235,20 @@ pub fn update_task(
documents, documents,
); );
(update_type, result, start.elapsed())
},
Update::SynonymsAddition(synonyms) => {
let start = Instant::now();
let update_type = UpdateType::SynonymsAddition { number: synonyms.len() };
let result = apply_synonyms_addition(
writer,
index.main,
index.synonyms,
synonyms,
);
(update_type, result, start.elapsed()) (update_type, result, start.elapsed())
}, },
}; };

View File

@ -0,0 +1,108 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use fst::{SetBuilder, set::OpBuilder};
use sdset::SetBuf;
use crate::automaton::normalize_str;
use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, Serializer, RamDocumentStore};
use crate::store;
use crate::update::push_synonyms_addition;
use crate::{MResult, Error, RankedMap};
pub struct SynonymsAddition {
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>,
synonyms: BTreeMap<String, Vec<String>>,
}
impl SynonymsAddition {
pub fn new(
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>,
) -> SynonymsAddition
{
SynonymsAddition {
updates_store,
updates_results_store,
updates_notifier,
synonyms: BTreeMap::new(),
}
}
pub fn add_synonym<S, T, I>(&mut self, synonym: S, alternatives: I)
where S: AsRef<str>,
T: AsRef<str>,
I: IntoIterator<Item=T>,
{
let synonym = normalize_str(synonym.as_ref());
let alternatives = alternatives.into_iter().map(|s| s.as_ref().to_lowercase());
self.synonyms.entry(synonym).or_insert_with(Vec::new).extend(alternatives);
}
pub fn finalize(self, mut writer: rkv::Writer) -> MResult<u64> {
let update_id = push_synonyms_addition(
&mut writer,
self.updates_store,
self.updates_results_store,
self.synonyms,
)?;
writer.commit()?;
let _ = self.updates_notifier.send(());
Ok(update_id)
}
}
pub fn apply_synonyms_addition(
writer: &mut rkv::Writer,
main_store: store::Main,
synonyms_store: store::Synonyms,
addition: BTreeMap<String, Vec<String>>,
) -> Result<(), Error>
{
let mut synonyms_builder = SetBuilder::memory();
for (word, alternatives) in addition {
synonyms_builder.insert(&word).unwrap();
let alternatives = {
let alternatives = SetBuf::from_dirty(alternatives);
let mut alternatives_builder = SetBuilder::memory();
alternatives_builder.extend_iter(alternatives).unwrap();
let bytes = alternatives_builder.into_inner().unwrap();
fst::Set::from_bytes(bytes).unwrap()
};
synonyms_store.put_synonyms(writer, word.as_bytes(), &alternatives)?;
}
let delta_synonyms = synonyms_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
let synonyms = match main_store.synonyms_fst(writer)? {
Some(synonyms) => {
let op = OpBuilder::new()
.add(synonyms.stream())
.add(delta_synonyms.stream())
.r#union();
let mut synonyms_builder = SetBuilder::memory();
synonyms_builder.extend_stream(op).unwrap();
synonyms_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap()
},
None => delta_synonyms,
};
main_store.put_synonyms_fst(writer, &synonyms)?;
Ok(())
}