Merge pull request #261 from meilisearch/partial-updates

Introduce the support of partial updates
This commit is contained in:
Clément Renault 2019-11-05 15:39:02 +01:00 committed by GitHub
commit d8392f2f18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 397 additions and 28 deletions

View File

@ -226,7 +226,10 @@ impl Database {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::update::{ProcessedUpdateResult, UpdateStatus}; use crate::update::{ProcessedUpdateResult, UpdateStatus};
use crate::DocumentId;
use serde::de::IgnoredAny;
use std::sync::mpsc; use std::sync::mpsc;
#[test] #[test]
@ -260,8 +263,6 @@ mod tests {
let mut writer = env.write_txn().unwrap(); let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap(); let _update_id = index.schema_update(&mut writer, schema).unwrap();
// don't forget to commit...
writer.commit().unwrap(); writer.commit().unwrap();
let mut additions = index.documents_addition(); let mut additions = index.documents_addition();
@ -283,8 +284,6 @@ mod tests {
let mut writer = env.write_txn().unwrap(); let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap(); let update_id = additions.finalize(&mut writer).unwrap();
// don't forget to commit...
writer.commit().unwrap(); writer.commit().unwrap();
// block until the transaction is processed // block until the transaction is processed
@ -326,8 +325,6 @@ mod tests {
let mut writer = env.write_txn().unwrap(); let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap(); let _update_id = index.schema_update(&mut writer, schema).unwrap();
// don't forget to commit...
writer.commit().unwrap(); writer.commit().unwrap();
let mut additions = index.documents_addition(); let mut additions = index.documents_addition();
@ -348,8 +345,6 @@ mod tests {
let mut writer = env.write_txn().unwrap(); let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap(); let update_id = additions.finalize(&mut writer).unwrap();
// don't forget to commit...
writer.commit().unwrap(); writer.commit().unwrap();
// block until the transaction is processed // block until the transaction is processed
@ -529,4 +524,213 @@ mod tests {
let result = index.update_status(&reader, update_id).unwrap(); let result = index.update_status(&reader, update_id).unwrap();
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_err()); assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_err());
} }
#[test]
fn deserialize_documents() {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
let index = database.create_index("test").unwrap();
let done = database.set_update_callback("test", Box::new(update_fn));
assert!(done, "could not set the index update function");
let schema = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
"#;
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
let mut additions = index.documents_addition();
// DocumentId(7900334843754999545)
let doc1 = serde_json::json!({
"id": 123,
"name": "Marvin",
"description": "My name is Marvin",
});
// DocumentId(8367468610878465872)
let doc2 = serde_json::json!({
"id": 234,
"name": "Kevin",
"description": "My name is Kevin",
});
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap();
assert!(document.is_none());
let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(7900334843754999545))
.unwrap();
assert!(document.is_some());
let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(8367468610878465872))
.unwrap();
assert!(document.is_some());
}
#[test]
fn partial_document_update() {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
let index = database.create_index("test").unwrap();
let done = database.set_update_callback("test", Box::new(update_fn));
assert!(done, "could not set the index update function");
let schema = {
let data = r#"
identifier = "id"
[attributes."id"]
displayed = true
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
"#;
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
let mut additions = index.documents_addition();
// DocumentId(7900334843754999545)
let doc1 = serde_json::json!({
"id": 123,
"name": "Marvin",
"description": "My name is Marvin",
});
// DocumentId(8367468610878465872)
let doc2 = serde_json::json!({
"id": 234,
"name": "Kevin",
"description": "My name is Kevin",
});
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap();
assert!(document.is_none());
let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(7900334843754999545))
.unwrap();
assert!(document.is_some());
let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(8367468610878465872))
.unwrap();
assert!(document.is_some());
reader.abort();
let mut partial_additions = index.documents_partial_addition();
// DocumentId(7900334843754999545)
let partial_doc1 = serde_json::json!({
"id": 123,
"description": "I am the new Marvin",
});
// DocumentId(8367468610878465872)
let partial_doc2 = serde_json::json!({
"id": 234,
"description": "I am the new Kevin",
});
partial_additions.update_document(partial_doc1);
partial_additions.update_document(partial_doc2);
let mut writer = env.write_txn().unwrap();
let update_id = partial_additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
let document: Option<serde_json::Value> = index
.document(&reader, None, DocumentId(7900334843754999545))
.unwrap();
let new_doc1 = serde_json::json!({
"id": 123,
"name": "Marvin",
"description": "I am the new Marvin",
});
assert_eq!(document, Some(new_doc1));
let document: Option<serde_json::Value> = index
.document(&reader, None, DocumentId(8367468610878465872))
.unwrap();
let new_doc2 = serde_json::json!({
"id": 234,
"name": "Kevin",
"description": "I am the new Kevin",
});
assert_eq!(document, Some(new_doc2));
}
} }

View File

@ -63,13 +63,14 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> {
where where
V: de::Visitor<'de>, V: de::Visitor<'de>,
{ {
self.deserialize_map(visitor) self.deserialize_option(visitor)
} }
forward_to_deserialize_any! { fn deserialize_option<V>(self, visitor: V) -> Result<V::Value, Self::Error>
bool i8 i16 i32 i64 i128 u8 u16 u32 u64 u128 f32 f64 char str string where
bytes byte_buf option unit unit_struct newtype_struct seq tuple V: de::Visitor<'de>,
tuple_struct struct enum identifier ignored_any {
self.deserialize_map(visitor)
} }
fn deserialize_map<V>(self, visitor: V) -> Result<V::Value, Self::Error> fn deserialize_map<V>(self, visitor: V) -> Result<V::Value, Self::Error>
@ -104,16 +105,29 @@ impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> {
} }
}); });
let map_deserializer = de::value::MapDeserializer::new(iter); let mut iter = iter.peekable();
let result = visitor
.visit_map(map_deserializer) let result = match iter.peek() {
.map_err(DeserializerError::from); Some(_) => {
let map_deserializer = de::value::MapDeserializer::new(iter);
visitor
.visit_some(map_deserializer)
.map_err(DeserializerError::from)
}
None => visitor.visit_none(),
};
match error.take() { match error.take() {
Some(error) => Err(error.into()), Some(error) => Err(error.into()),
None => result, None => result,
} }
} }
forward_to_deserialize_any! {
bool i8 i16 i32 i64 i128 u8 u16 u32 u64 u128 f32 f64 char str string
bytes byte_buf unit unit_struct newtype_struct seq tuple
tuple_struct struct enum identifier ignored_any
}
} }
struct Value(SerdeJsonDeserializer<SerdeJsonIoRead<Cursor<Vec<u8>>>>); struct Value(SerdeJsonDeserializer<SerdeJsonIoRead<Cursor<Vec<u8>>>>);

View File

@ -22,7 +22,7 @@ use std::collections::HashSet;
use heed::Result as ZResult; use heed::Result as ZResult;
use meilidb_schema::{Schema, SchemaAttr}; use meilidb_schema::{Schema, SchemaAttr};
use serde::de; use serde::de::{self, Deserialize};
use zerocopy::{AsBytes, FromBytes}; use zerocopy::{AsBytes, FromBytes};
use crate::criterion::Criteria; use crate::criterion::Criteria;
@ -120,9 +120,7 @@ impl Index {
attributes: attributes.as_ref(), attributes: attributes.as_ref(),
}; };
// TODO: currently we return an error if all document fields are missing, Ok(Option::<T>::deserialize(&mut deserializer)?)
// returning None would have been better
Ok(T::deserialize(&mut deserializer).map(Some)?)
} }
pub fn document_attribute<T: de::DeserializeOwned>( pub fn document_attribute<T: de::DeserializeOwned>(
@ -158,6 +156,14 @@ impl Index {
) )
} }
pub fn documents_partial_addition<D>(&self) -> update::DocumentsAddition<D> {
update::DocumentsAddition::new_partial(
self.updates,
self.updates_results,
self.updates_notifier.clone(),
)
}
pub fn documents_deletion(&self) -> update::DocumentsDeletion { pub fn documents_deletion(&self) -> update::DocumentsDeletion {
update::DocumentsDeletion::new( update::DocumentsDeletion::new(
self.updates, self.updates,

View File

@ -2,10 +2,10 @@ use std::collections::HashMap;
use fst::{set::OpBuilder, SetBuilder}; use fst::{set::OpBuilder, SetBuilder};
use sdset::{duo::Union, SetOperation}; use sdset::{duo::Union, SetOperation};
use serde::Serialize; use serde::{Deserialize, Serialize};
use crate::raw_indexer::RawIndexer; use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, serialize_value, Serializer}; use crate::serde::{extract_document_id, serialize_value, Deserializer, Serializer};
use crate::store; use crate::store;
use crate::update::{apply_documents_deletion, next_update_id, Update}; use crate::update::{apply_documents_deletion, next_update_id, Update};
use crate::{Error, MResult, RankedMap}; use crate::{Error, MResult, RankedMap};
@ -15,6 +15,7 @@ pub struct DocumentsAddition<D> {
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: crossbeam_channel::Sender<()>,
documents: Vec<D>, documents: Vec<D>,
is_partial: bool,
} }
impl<D> DocumentsAddition<D> { impl<D> DocumentsAddition<D> {
@ -28,6 +29,21 @@ impl<D> DocumentsAddition<D> {
updates_results_store, updates_results_store,
updates_notifier, updates_notifier,
documents: Vec::new(), documents: Vec::new(),
is_partial: false,
}
}
pub fn new_partial(
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>,
) -> DocumentsAddition<D> {
DocumentsAddition {
updates_store,
updates_results_store,
updates_notifier,
documents: Vec::new(),
is_partial: true,
} }
} }
@ -45,6 +61,7 @@ impl<D> DocumentsAddition<D> {
self.updates_store, self.updates_store,
self.updates_results_store, self.updates_results_store,
self.documents, self.documents,
self.is_partial,
)?; )?;
Ok(update_id) Ok(update_id)
} }
@ -61,6 +78,7 @@ pub fn push_documents_addition<D: serde::Serialize>(
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
addition: Vec<D>, addition: Vec<D>,
is_partial: bool,
) -> MResult<u64> { ) -> MResult<u64> {
let mut values = Vec::with_capacity(addition.len()); let mut values = Vec::with_capacity(addition.len());
for add in addition { for add in addition {
@ -71,7 +89,12 @@ pub fn push_documents_addition<D: serde::Serialize>(
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::DocumentsAddition(values); let update = if is_partial {
Update::DocumentsPartial(values)
} else {
Update::DocumentsAddition(values)
};
updates_store.put_update(writer, last_update_id, &update)?; updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id) Ok(last_update_id)
@ -84,7 +107,7 @@ pub fn apply_documents_addition<'a, 'b>(
documents_fields_counts_store: store::DocumentsFieldsCounts, documents_fields_counts_store: store::DocumentsFieldsCounts,
postings_lists_store: store::PostingsLists, postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords, docs_words_store: store::DocsWords,
addition: Vec<serde_json::Value>, addition: Vec<HashMap<String, serde_json::Value>>,
) -> MResult<()> { ) -> MResult<()> {
let mut documents_additions = HashMap::new(); let mut documents_additions = HashMap::new();
@ -156,6 +179,102 @@ pub fn apply_documents_addition<'a, 'b>(
) )
} }
pub fn apply_documents_partial_addition<'a, 'b>(
writer: &'a mut heed::RwTxn<'b>,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
addition: Vec<HashMap<String, serde_json::Value>>,
) -> MResult<()> {
let mut documents_additions = HashMap::new();
let schema = match main_store.schema(writer)? {
Some(schema) => schema,
None => return Err(Error::SchemaMissing),
};
let identifier = schema.identifier_name();
// 1. store documents ids for future deletion
for mut document in addition {
let document_id = match extract_document_id(identifier, &document)? {
Some(id) => id,
None => return Err(Error::MissingDocumentId),
};
let mut deserializer = Deserializer {
document_id,
reader: writer,
documents_fields: documents_fields_store,
schema: &schema,
attributes: None,
};
// retrieve the old document and
// update the new one with missing keys found in the old one
let result = Option::<HashMap<String, serde_json::Value>>::deserialize(&mut deserializer)?;
if let Some(old_document) = result {
for (key, value) in old_document {
document.entry(key).or_insert(value);
}
}
documents_additions.insert(document_id, document);
}
// 2. remove the documents posting lists
let number_of_inserted_documents = documents_additions.len();
let documents_ids = documents_additions.iter().map(|(id, _)| *id).collect();
apply_documents_deletion(
writer,
main_store,
documents_fields_store,
documents_fields_counts_store,
postings_lists_store,
docs_words_store,
documents_ids,
)?;
let mut ranked_map = match main_store.ranked_map(writer)? {
Some(ranked_map) => ranked_map,
None => RankedMap::default(),
};
let stop_words = match main_store.stop_words_fst(writer)? {
Some(stop_words) => stop_words,
None => fst::Set::default(),
};
// 3. index the documents fields in the stores
let mut indexer = RawIndexer::new(stop_words);
for (document_id, document) in documents_additions {
let serializer = Serializer {
txn: writer,
schema: &schema,
document_store: documents_fields_store,
document_fields_counts: documents_fields_counts_store,
indexer: &mut indexer,
ranked_map: &mut ranked_map,
document_id,
};
document.serialize(serializer)?;
}
write_documents_addition_index(
writer,
main_store,
postings_lists_store,
docs_words_store,
&ranked_map,
number_of_inserted_documents,
indexer,
)
}
pub fn reindex_all_documents( pub fn reindex_all_documents(
writer: &mut heed::RwTxn, writer: &mut heed::RwTxn,
main_store: store::Main, main_store: store::Main,

View File

@ -10,7 +10,9 @@ mod synonyms_deletion;
pub use self::clear_all::{apply_clear_all, push_clear_all}; pub use self::clear_all::{apply_clear_all, push_clear_all};
pub use self::customs_update::{apply_customs_update, push_customs_update}; pub use self::customs_update::{apply_customs_update, push_customs_update};
pub use self::documents_addition::{apply_documents_addition, DocumentsAddition}; pub use self::documents_addition::{
apply_documents_addition, apply_documents_partial_addition, DocumentsAddition,
};
pub use self::documents_deletion::{apply_documents_deletion, DocumentsDeletion}; pub use self::documents_deletion::{apply_documents_deletion, DocumentsDeletion};
pub use self::schema_update::{apply_schema_update, push_schema_update}; pub use self::schema_update::{apply_schema_update, push_schema_update};
pub use self::stop_words_addition::{apply_stop_words_addition, StopWordsAddition}; pub use self::stop_words_addition::{apply_stop_words_addition, StopWordsAddition};
@ -19,7 +21,7 @@ pub use self::synonyms_addition::{apply_synonyms_addition, SynonymsAddition};
pub use self::synonyms_deletion::{apply_synonyms_deletion, SynonymsDeletion}; pub use self::synonyms_deletion::{apply_synonyms_deletion, SynonymsDeletion};
use std::cmp; use std::cmp;
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use heed::Result as ZResult; use heed::Result as ZResult;
@ -34,7 +36,8 @@ pub enum Update {
ClearAll, ClearAll,
Schema(Schema), Schema(Schema),
Customs(Vec<u8>), Customs(Vec<u8>),
DocumentsAddition(Vec<serde_json::Value>), DocumentsAddition(Vec<HashMap<String, serde_json::Value>>),
DocumentsPartial(Vec<HashMap<String, serde_json::Value>>),
DocumentsDeletion(Vec<DocumentId>), DocumentsDeletion(Vec<DocumentId>),
SynonymsAddition(BTreeMap<String, Vec<String>>), SynonymsAddition(BTreeMap<String, Vec<String>>),
SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>), SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>),
@ -53,6 +56,9 @@ impl Update {
Update::DocumentsAddition(addition) => UpdateType::DocumentsAddition { Update::DocumentsAddition(addition) => UpdateType::DocumentsAddition {
number: addition.len(), number: addition.len(),
}, },
Update::DocumentsPartial(addition) => UpdateType::DocumentsPartial {
number: addition.len(),
},
Update::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion { Update::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion {
number: deletion.len(), number: deletion.len(),
}, },
@ -78,6 +84,7 @@ pub enum UpdateType {
Schema { schema: Schema }, Schema { schema: Schema },
Customs, Customs,
DocumentsAddition { number: usize }, DocumentsAddition { number: usize },
DocumentsPartial { number: usize },
DocumentsDeletion { number: usize }, DocumentsDeletion { number: usize },
SynonymsAddition { number: usize }, SynonymsAddition { number: usize },
SynonymsDeletion { number: usize }, SynonymsDeletion { number: usize },
@ -218,6 +225,25 @@ pub fn update_task<'a, 'b>(
(update_type, result, start.elapsed()) (update_type, result, start.elapsed())
} }
Update::DocumentsPartial(documents) => {
let start = Instant::now();
let update_type = UpdateType::DocumentsPartial {
number: documents.len(),
};
let result = apply_documents_partial_addition(
writer,
index.main,
index.documents_fields,
index.documents_fields_counts,
index.postings_lists,
index.docs_words,
documents,
);
(update_type, result, start.elapsed())
}
Update::DocumentsDeletion(documents) => { Update::DocumentsDeletion(documents) => {
let start = Instant::now(); let start = Instant::now();