2019-10-03 21:04:11 +08:00
|
|
|
use std::collections::HashSet;
|
|
|
|
|
|
|
|
use fst::{SetBuilder, set::OpBuilder};
|
|
|
|
use sdset::{SetOperation, duo::Union};
|
|
|
|
use serde::Serialize;
|
|
|
|
|
|
|
|
use crate::raw_indexer::RawIndexer;
|
|
|
|
use crate::serde::{extract_document_id, Serializer, RamDocumentStore};
|
|
|
|
use crate::store;
|
2019-10-08 23:24:11 +08:00
|
|
|
use crate::update::{Update, next_update_id, apply_documents_deletion};
|
2019-10-07 23:48:26 +08:00
|
|
|
use crate::{MResult, Error, RankedMap};
|
2019-10-03 21:04:11 +08:00
|
|
|
|
|
|
|
pub struct DocumentsAddition<D> {
|
|
|
|
updates_store: store::Updates,
|
2019-10-07 22:16:04 +08:00
|
|
|
updates_results_store: store::UpdatesResults,
|
|
|
|
updates_notifier: crossbeam_channel::Sender<()>,
|
2019-10-03 21:04:11 +08:00
|
|
|
documents: Vec<D>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<D> DocumentsAddition<D> {
|
2019-10-07 22:16:04 +08:00
|
|
|
pub fn new(
|
|
|
|
updates_store: store::Updates,
|
|
|
|
updates_results_store: store::UpdatesResults,
|
|
|
|
updates_notifier: crossbeam_channel::Sender<()>,
|
|
|
|
) -> DocumentsAddition<D>
|
|
|
|
{
|
|
|
|
DocumentsAddition {
|
|
|
|
updates_store,
|
|
|
|
updates_results_store,
|
|
|
|
updates_notifier,
|
|
|
|
documents: Vec::new(),
|
|
|
|
}
|
2019-10-03 21:04:11 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn update_document(&mut self, document: D) {
|
|
|
|
self.documents.push(document);
|
|
|
|
}
|
|
|
|
|
2019-10-11 17:29:47 +08:00
|
|
|
pub fn finalize(self, writer: &mut rkv::Writer) -> MResult<u64>
|
2019-10-03 21:04:11 +08:00
|
|
|
where D: serde::Serialize
|
|
|
|
{
|
2019-10-11 17:29:47 +08:00
|
|
|
let _ = self.updates_notifier.send(());
|
2019-10-07 22:16:04 +08:00
|
|
|
let update_id = push_documents_addition(
|
2019-10-11 17:29:47 +08:00
|
|
|
writer,
|
2019-10-07 22:16:04 +08:00
|
|
|
self.updates_store,
|
|
|
|
self.updates_results_store,
|
|
|
|
self.documents,
|
|
|
|
)?;
|
|
|
|
Ok(update_id)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<D> Extend<D> for DocumentsAddition<D> {
|
|
|
|
fn extend<T: IntoIterator<Item=D>>(&mut self, iter: T) {
|
|
|
|
self.documents.extend(iter)
|
2019-10-03 21:04:11 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-08 23:24:11 +08:00
|
|
|
pub fn push_documents_addition<D: serde::Serialize>(
|
|
|
|
writer: &mut rkv::Writer,
|
|
|
|
updates_store: store::Updates,
|
|
|
|
updates_results_store: store::UpdatesResults,
|
|
|
|
addition: Vec<D>,
|
|
|
|
) -> MResult<u64>
|
|
|
|
{
|
|
|
|
let mut values = Vec::with_capacity(addition.len());
|
|
|
|
for add in addition {
|
2019-10-11 22:16:21 +08:00
|
|
|
let vec = serde_json::to_vec(&add)?;
|
|
|
|
let add = serde_json::from_slice(&vec)?;
|
2019-10-08 23:24:11 +08:00
|
|
|
values.push(add);
|
|
|
|
}
|
|
|
|
|
|
|
|
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
|
|
|
|
|
|
|
let update = Update::DocumentsAddition(values);
|
2019-10-08 23:31:07 +08:00
|
|
|
updates_store.put_update(writer, last_update_id, &update)?;
|
2019-10-08 23:24:11 +08:00
|
|
|
|
|
|
|
Ok(last_update_id)
|
|
|
|
}
|
|
|
|
|
2019-10-03 21:04:11 +08:00
|
|
|
pub fn apply_documents_addition(
|
|
|
|
writer: &mut rkv::Writer,
|
|
|
|
main_store: store::Main,
|
|
|
|
documents_fields_store: store::DocumentsFields,
|
|
|
|
postings_lists_store: store::PostingsLists,
|
|
|
|
docs_words_store: store::DocsWords,
|
|
|
|
mut ranked_map: RankedMap,
|
2019-10-11 22:16:21 +08:00
|
|
|
addition: Vec<serde_json::Value>,
|
2019-10-07 23:48:26 +08:00
|
|
|
) -> MResult<()>
|
2019-10-03 21:04:11 +08:00
|
|
|
{
|
|
|
|
let mut document_ids = HashSet::new();
|
|
|
|
let mut document_store = RamDocumentStore::new();
|
|
|
|
let mut indexer = RawIndexer::new();
|
|
|
|
|
2019-10-07 23:48:26 +08:00
|
|
|
let schema = match main_store.schema(writer)? {
|
|
|
|
Some(schema) => schema,
|
|
|
|
None => return Err(Error::SchemaMissing),
|
|
|
|
};
|
|
|
|
|
2019-10-03 21:04:11 +08:00
|
|
|
let identifier = schema.identifier_name();
|
|
|
|
|
|
|
|
for document in addition {
|
|
|
|
let document_id = match extract_document_id(identifier, &document)? {
|
|
|
|
Some(id) => id,
|
|
|
|
None => return Err(Error::MissingDocumentId),
|
|
|
|
};
|
|
|
|
|
|
|
|
// 1. store the document id for future deletion
|
|
|
|
document_ids.insert(document_id);
|
|
|
|
|
|
|
|
// 2. index the document fields in ram stores
|
|
|
|
let serializer = Serializer {
|
2019-10-07 23:48:26 +08:00
|
|
|
schema: &schema,
|
2019-10-03 21:04:11 +08:00
|
|
|
document_store: &mut document_store,
|
|
|
|
indexer: &mut indexer,
|
|
|
|
ranked_map: &mut ranked_map,
|
|
|
|
document_id,
|
|
|
|
};
|
|
|
|
|
|
|
|
document.serialize(serializer)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
// 1. remove the previous documents match indexes
|
|
|
|
let documents_to_insert = document_ids.iter().cloned().collect();
|
|
|
|
apply_documents_deletion(
|
|
|
|
writer,
|
|
|
|
main_store,
|
|
|
|
documents_fields_store,
|
|
|
|
postings_lists_store,
|
|
|
|
docs_words_store,
|
|
|
|
ranked_map.clone(),
|
|
|
|
documents_to_insert,
|
|
|
|
)?;
|
|
|
|
|
|
|
|
// 2. insert new document attributes in the database
|
|
|
|
for ((id, attr), value) in document_store.into_inner() {
|
|
|
|
documents_fields_store.put_document_field(writer, id, attr, &value)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
let indexed = indexer.build();
|
|
|
|
let mut delta_words_builder = SetBuilder::memory();
|
|
|
|
|
|
|
|
for (word, delta_set) in indexed.words_doc_indexes {
|
|
|
|
delta_words_builder.insert(&word).unwrap();
|
|
|
|
|
|
|
|
let set = match postings_lists_store.postings_list(writer, &word)? {
|
|
|
|
Some(set) => Union::new(&set, &delta_set).into_set_buf(),
|
|
|
|
None => delta_set,
|
|
|
|
};
|
|
|
|
|
|
|
|
postings_lists_store.put_postings_list(writer, &word, &set)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
for (id, words) in indexed.docs_words {
|
|
|
|
docs_words_store.put_doc_words(writer, id, &words)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
let delta_words = delta_words_builder
|
|
|
|
.into_inner()
|
|
|
|
.and_then(fst::Set::from_bytes)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let words = match main_store.words_fst(writer)? {
|
|
|
|
Some(words) => {
|
|
|
|
let op = OpBuilder::new()
|
|
|
|
.add(words.stream())
|
|
|
|
.add(delta_words.stream())
|
|
|
|
.r#union();
|
|
|
|
|
|
|
|
let mut words_builder = SetBuilder::memory();
|
|
|
|
words_builder.extend_stream(op).unwrap();
|
|
|
|
words_builder
|
|
|
|
.into_inner()
|
|
|
|
.and_then(fst::Set::from_bytes)
|
|
|
|
.unwrap()
|
|
|
|
},
|
|
|
|
None => delta_words,
|
|
|
|
};
|
|
|
|
|
|
|
|
main_store.put_words_fst(writer, &words)?;
|
|
|
|
main_store.put_ranked_map(writer, &ranked_map)?;
|
|
|
|
|
|
|
|
let inserted_documents_len = document_ids.len() as u64;
|
|
|
|
main_store.put_number_of_documents(writer, |old| old + inserted_documents_len)?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|