feat: Introduce the UpdateStatus type

This commit is contained in:
Clément Renault 2019-08-26 18:21:16 +02:00
parent cd8535d410
commit f40b373f9f
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 68 additions and 28 deletions

View File

@ -1,6 +1,8 @@
use std::collections::{HashSet, BTreeMap};
use std::convert::TryInto;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use arc_swap::{ArcSwap, Guard};
use meilidb_core::criterion::Criteria;
@ -58,41 +60,79 @@ enum Update {
SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>),
}
#[derive(Serialize, Deserialize)]
pub enum UpdateType {
DocumentsAddition { number: usize },
DocumentsDeletion { number: usize },
SynonymsAddition { number: usize },
SynonymsDeletion { number: usize },
}
#[derive(Serialize, Deserialize)]
pub struct DetailedDuration {
main: Duration,
}
#[derive(Serialize, Deserialize)]
pub struct UpdateStatus {
pub update_id: u64,
pub update_type: UpdateType,
pub result: Result<(), String>,
pub detailed_duration: DetailedDuration,
}
fn spawn_update_system(index: Index) -> thread::JoinHandle<()> {
thread::spawn(move || {
loop {
let subscription = index.updates_index.watch_prefix(vec![]);
while let Some(result) = index.updates_index.iter().next() {
let (key, _) = result.unwrap();
let update_id = key.as_ref().try_into().map(u64::from_be_bytes).unwrap();
let updates = &index.updates_index;
let results = &index.updates_results_index;
(updates, results).transaction(|(updates, results)| {
let update = updates.remove(&key)?.unwrap();
// this is an emulation of the try block (#31436)
let result: Result<(), Error> = (|| {
match rmp_serde::from_read_ref(&update)? {
UpdateOwned::DocumentsAddition(documents) => {
let ranked_map = index.cache.load().ranked_map.clone();
apply_documents_addition(&index, ranked_map, documents)?;
},
UpdateOwned::DocumentsDeletion(documents) => {
let ranked_map = index.cache.load().ranked_map.clone();
apply_documents_deletion(&index, ranked_map, documents)?;
},
UpdateOwned::SynonymsAddition(synonyms) => {
apply_synonyms_addition(&index, synonyms)?;
},
UpdateOwned::SynonymsDeletion(synonyms) => {
apply_synonyms_deletion(&index, synonyms)?;
},
}
Ok(())
})();
let (update_type, result, duration) = match rmp_serde::from_read_ref(&update).unwrap() {
UpdateOwned::DocumentsAddition(documents) => {
let update_type = UpdateType::DocumentsAddition { number: documents.len() };
let ranked_map = index.cache.load().ranked_map.clone();
let start = Instant::now();
let result = apply_documents_addition(&index, ranked_map, documents);
(update_type, result, start.elapsed())
},
UpdateOwned::DocumentsDeletion(documents) => {
let update_type = UpdateType::DocumentsDeletion { number: documents.len() };
let ranked_map = index.cache.load().ranked_map.clone();
let start = Instant::now();
let result = apply_documents_deletion(&index, ranked_map, documents);
(update_type, result, start.elapsed())
},
UpdateOwned::SynonymsAddition(synonyms) => {
let update_type = UpdateType::SynonymsAddition { number: synonyms.len() };
let start = Instant::now();
let result = apply_synonyms_addition(&index, synonyms);
(update_type, result, start.elapsed())
},
UpdateOwned::SynonymsDeletion(synonyms) => {
let update_type = UpdateType::SynonymsDeletion { number: synonyms.len() };
let start = Instant::now();
let result = apply_synonyms_deletion(&index, synonyms);
(update_type, result, start.elapsed())
},
};
let result = result.map_err(|e| e.to_string());
let value = bincode::serialize(&result).unwrap();
let detailed_duration = DetailedDuration { main: duration };
let status = UpdateStatus {
update_id,
update_type,
result: result.map_err(|e| e.to_string()),
detailed_duration,
};
let value = bincode::serialize(&status).unwrap();
results.insert(&key, value)
})
.unwrap();
@ -267,7 +307,7 @@ impl Index {
pub fn update_status(
&self,
update_id: u64,
) -> Result<Option<Result<(), String>>, Error>
) -> Result<Option<UpdateStatus>, Error>
{
let update_id = update_id.to_be_bytes();
match self.updates_results_index.get(update_id)? {
@ -282,7 +322,7 @@ impl Index {
pub fn update_status_blocking(
&self,
update_id: u64,
) -> Result<Result<(), String>, Error>
) -> Result<UpdateStatus, Error>
{
let update_id_bytes = update_id.to_be_bytes().to_vec();
let mut subscription = self.updates_results_index.watch_prefix(update_id_bytes);

View File

@ -23,7 +23,7 @@ fn insert_delete_document() {
addition.update_document(&doc1);
let update_id = addition.finalize().unwrap();
let status = index.update_status_blocking(update_id).unwrap();
assert_eq!(status, Ok(()));
assert!(status.result.is_ok());
let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 1);
@ -33,7 +33,7 @@ fn insert_delete_document() {
deletion.delete_document(&doc1).unwrap();
let update_id = deletion.finalize().unwrap();
let status = index.update_status_blocking(update_id).unwrap();
assert_eq!(status, Ok(()));
assert!(status.result.is_ok());
let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 0);
@ -54,7 +54,7 @@ fn replace_document() {
addition.update_document(&doc1);
let update_id = addition.finalize().unwrap();
let status = index.update_status_blocking(update_id).unwrap();
assert_eq!(status, Ok(()));
assert!(status.result.is_ok());
let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 1);
@ -64,7 +64,7 @@ fn replace_document() {
deletion.update_document(&doc2);
let update_id = deletion.finalize().unwrap();
let status = index.update_status_blocking(update_id).unwrap();
assert_eq!(status, Ok(()));
assert!(status.result.is_ok());
let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 0);