meilisearch/meilisearch-core/src/update/documents_deletion.rs

181 lines
5.6 KiB
Rust
Raw Normal View History

2019-10-18 19:05:28 +08:00
use std::collections::{BTreeSet, HashMap, HashSet};
2019-10-03 21:04:11 +08:00
use fst::{SetBuilder, Streamer};
2019-11-26 18:06:55 +08:00
use meilisearch_schema::Schema;
2019-10-18 19:05:28 +08:00
use sdset::{duo::DifferenceByKey, SetBuf, SetOperation};
2019-10-03 21:04:11 +08:00
use crate::database::{MainT, UpdateT};
2019-11-06 17:49:13 +08:00
use crate::database::{UpdateEvent, UpdateEventsEmitter};
2019-10-03 21:04:11 +08:00
use crate::serde::extract_document_id;
use crate::store;
use crate::update::{next_update_id, compute_short_prefixes, Update};
2019-10-18 19:05:28 +08:00
use crate::{DocumentId, Error, MResult, RankedMap};
2019-10-03 21:04:11 +08:00
pub struct DocumentsDeletion {
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
2019-11-06 17:49:13 +08:00
updates_notifier: UpdateEventsEmitter,
2019-10-03 21:04:11 +08:00
documents: Vec<DocumentId>,
}
impl DocumentsDeletion {
pub fn new(
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
2019-11-06 17:49:13 +08:00
updates_notifier: UpdateEventsEmitter,
2019-10-18 19:05:28 +08:00
) -> DocumentsDeletion {
DocumentsDeletion {
updates_store,
updates_results_store,
updates_notifier,
documents: Vec::new(),
}
2019-10-03 21:04:11 +08:00
}
pub fn delete_document_by_id(&mut self, document_id: DocumentId) {
self.documents.push(document_id);
}
pub fn delete_document<D>(&mut self, schema: &Schema, document: D) -> MResult<()>
2019-10-18 19:05:28 +08:00
where
D: serde::Serialize,
2019-10-03 21:04:11 +08:00
{
2020-01-14 02:10:58 +08:00
let identifier = schema.identifier();
let document_id = match extract_document_id(&identifier, &document)? {
2019-10-03 21:04:11 +08:00
Some(id) => id,
None => return Err(Error::MissingDocumentId),
};
self.delete_document_by_id(document_id);
Ok(())
}
pub fn finalize(self, writer: &mut heed::RwTxn<UpdateT>) -> MResult<u64> {
2019-11-06 17:49:13 +08:00
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_documents_deletion(
2019-10-11 17:29:47 +08:00
writer,
self.updates_store,
self.updates_results_store,
self.documents,
)?;
Ok(update_id)
2019-10-03 21:04:11 +08:00
}
}
impl Extend<DocumentId> for DocumentsDeletion {
2019-10-18 19:05:28 +08:00
fn extend<T: IntoIterator<Item = DocumentId>>(&mut self, iter: T) {
2019-10-03 21:04:11 +08:00
self.documents.extend(iter)
}
}
pub fn push_documents_deletion(
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
deletion: Vec<DocumentId>,
2019-10-18 19:05:28 +08:00
) -> MResult<u64> {
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
2019-11-13 01:00:47 +08:00
let update = Update::documents_deletion(deletion);
2019-10-08 23:31:07 +08:00
updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
2019-10-03 21:04:11 +08:00
pub fn apply_documents_deletion(
writer: &mut heed::RwTxn<MainT>,
index: &store::Index,
2019-10-03 21:04:11 +08:00
deletion: Vec<DocumentId>,
2019-10-18 19:05:28 +08:00
) -> MResult<()> {
2019-10-03 21:04:11 +08:00
let idset = SetBuf::from_dirty(deletion);
let schema = match index.main.schema(writer)? {
Some(schema) => schema,
None => return Err(Error::SchemaMissing),
};
let mut ranked_map = match index.main.ranked_map(writer)? {
Some(ranked_map) => ranked_map,
None => RankedMap::default(),
};
2019-10-03 21:04:11 +08:00
// collect the ranked attributes according to the schema
2020-01-30 01:30:21 +08:00
let ranked_fields = schema.ranked();
2019-10-03 21:04:11 +08:00
let mut words_document_ids = HashMap::new();
for id in idset {
// remove all the ranked attributes from the ranked_map
2020-01-30 01:30:21 +08:00
for ranked_attr in &ranked_fields {
2019-10-03 21:04:11 +08:00
ranked_map.remove(id, *ranked_attr);
}
if let Some(words) = index.docs_words.doc_words(writer, id)? {
2019-10-03 21:04:11 +08:00
let mut stream = words.stream();
while let Some(word) = stream.next() {
let word = word.to_vec();
2019-10-18 19:05:28 +08:00
words_document_ids
.entry(word)
.or_insert_with(Vec::new)
.push(id);
2019-10-03 21:04:11 +08:00
}
}
}
let mut deleted_documents = HashSet::new();
let mut removed_words = BTreeSet::new();
for (word, document_ids) in words_document_ids {
let document_ids = SetBuf::from_dirty(document_ids);
if let Some(postings) = index.postings_lists.postings_list(writer, &word)? {
let op = DifferenceByKey::new(&postings.matches, &document_ids, |d| d.document_id, |id| *id);
2019-10-03 21:04:11 +08:00
let doc_indexes = op.into_set_buf();
if !doc_indexes.is_empty() {
index.postings_lists.put_postings_list(writer, &word, &doc_indexes)?;
2019-10-03 21:04:11 +08:00
} else {
index.postings_lists.del_postings_list(writer, &word)?;
2019-10-03 21:04:11 +08:00
removed_words.insert(word);
}
}
for id in document_ids {
index.documents_fields_counts.del_all_document_fields_counts(writer, id)?;
if index.documents_fields.del_all_document_fields(writer, id)? != 0 {
2019-10-03 21:04:11 +08:00
deleted_documents.insert(id);
}
}
}
let deleted_documents_len = deleted_documents.len() as u64;
for id in deleted_documents {
index.docs_words.del_doc_words(writer, id)?;
}
2019-10-03 21:04:11 +08:00
let removed_words = fst::Set::from_iter(removed_words).unwrap();
let words = match index.main.words_fst(writer)? {
2019-10-03 21:04:11 +08:00
Some(words_set) => {
let op = fst::set::OpBuilder::new()
.add(words_set.stream())
.add(removed_words.stream())
.difference();
let mut words_builder = SetBuilder::memory();
words_builder.extend_stream(op).unwrap();
words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap()
2019-10-18 19:05:28 +08:00
}
2019-10-03 21:04:11 +08:00
None => fst::Set::default(),
};
index.main.put_words_fst(writer, &words)?;
index.main.put_ranked_map(writer, &ranked_map)?;
index.main.put_number_of_documents(writer, |old| old - deleted_documents_len)?;
2019-10-03 21:04:11 +08:00
compute_short_prefixes(writer, index)?;
2019-10-03 21:04:11 +08:00
Ok(())
}