diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 013dd4981..2e391d770 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -4,25 +4,23 @@ mod documents_deletion; pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; -use std::time::Duration; use std::collections::BTreeMap; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; + use serde::{Serialize, Deserialize}; -use crate::{store, Error, MResult, DocumentId}; +use crate::{store, Error, MResult, DocumentId, RankedMap}; #[derive(Serialize, Deserialize)] pub enum Update { DocumentsAddition(Vec), DocumentsDeletion(Vec), - SynonymsAddition(BTreeMap>), - SynonymsDeletion(BTreeMap>>), } #[derive(Clone, Serialize, Deserialize)] pub enum UpdateType { DocumentsAddition { number: usize }, DocumentsDeletion { number: usize }, - SynonymsAddition { number: usize }, - SynonymsDeletion { number: usize }, } #[derive(Clone, Serialize, Deserialize)] @@ -91,22 +89,87 @@ pub fn push_documents_deletion( Ok(updates_store.push_back(writer, &update)?) } -pub fn push_synonyms_addition( - writer: &mut rkv::Writer, - updates_store: store::Updates, - addition: BTreeMap>, -) -> Result +pub fn update_task( + rkv: Arc>, + index: store::Index, + mut callback: Option, +) -> MResult<()> { - let update = Update::SynonymsAddition(addition); - Ok(updates_store.push_back(writer, &update)?) -} + let rkv = rkv.read().unwrap(); + let mut writer = rkv.write()?; -pub fn push_synonyms_deletion( - writer: &mut rkv::Writer, - updates_store: store::Updates, - deletion: BTreeMap>>, -) -> Result -{ - let update = Update::SynonymsDeletion(deletion); - Ok(updates_store.push_back(writer, &update)?) + if let Some((update_id, update)) = index.updates.pop_back(&mut writer)? { + let (update_type, result, duration) = match update { + Update::DocumentsAddition(documents) => { + let update_type = UpdateType::DocumentsAddition { number: documents.len() }; + + let schema = match index.main.schema(&writer)? { + Some(schema) => schema, + None => return Err(Error::SchemaMissing), + }; + let ranked_map = match index.main.ranked_map(&writer)? { + Some(ranked_map) => ranked_map, + None => RankedMap::default(), + }; + + let start = Instant::now(); + let result = apply_documents_addition( + &mut writer, + index.main, + index.documents_fields, + index.postings_lists, + index.docs_words, + &schema, + ranked_map, + documents, + ); + + (update_type, result, start.elapsed()) + }, + Update::DocumentsDeletion(documents) => { + let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; + + let schema = match index.main.schema(&writer)? { + Some(schema) => schema, + None => return Err(Error::SchemaMissing), + }; + let ranked_map = match index.main.ranked_map(&writer)? { + Some(ranked_map) => ranked_map, + None => RankedMap::default(), + }; + + let start = Instant::now(); + let result = apply_documents_deletion( + &mut writer, + index.main, + index.documents_fields, + index.postings_lists, + index.docs_words, + &schema, + ranked_map, + documents, + ); + + (update_type, result, start.elapsed()) + }, + }; + + let detailed_duration = DetailedDuration { main: duration }; + let status = UpdateResult { + update_id, + update_type, + result: result.map_err(|e| e.to_string()), + detailed_duration, + }; + + index.updates_results.put_update_result(&mut writer, update_id, &status)?; + + if let Some(callback) = callback.take() { + (callback)(status); + } + } + + writer.commit()?; + + Ok(()) }