diff --git a/meilidb-data/Cargo.toml b/meilidb-data/Cargo.toml index f523792f0..d73e04da1 100644 --- a/meilidb-data/Cargo.toml +++ b/meilidb-data/Cargo.toml @@ -18,7 +18,7 @@ sdset = "0.3.2" serde = { version = "1.0.99", features = ["derive"] } serde_json = "1.0.40" siphasher = "0.3.0" -sled = "0.25.0" +sled = "0.26.0" zerocopy = "0.2.8" [dependencies.rmp-serde] @@ -30,4 +30,4 @@ git = "https://github.com/Kerollmops/fst.git" branch = "arc-byte-slice" [dev-dependencies] -tempfile = "3.0.7" +tempfile = "3.1.0" diff --git a/meilidb-data/src/database/documents_addition.rs b/meilidb-data/src/database/documents_addition.rs index 2ca5af7d3..e2739e9d6 100644 --- a/meilidb-data/src/database/documents_addition.rs +++ b/meilidb-data/src/database/documents_addition.rs @@ -12,7 +12,28 @@ use crate::RankedMap; use super::{Error, Index, DocumentsDeletion}; use super::index::Cache; -pub struct DocumentsAddition<'a> { +pub struct DocumentsAddition<'a, D> { + index: &'a Index, + documents: Vec, +} + +impl<'a, D> DocumentsAddition<'a, D> { + pub fn new(index: &'a Index) -> DocumentsAddition<'a, D> { + DocumentsAddition { index, documents: Vec::new() } + } + + pub fn update_document(&mut self, document: D) { + self.documents.push(document); + } + + pub fn finalize(self) -> Result + where D: serde::Serialize + { + self.index.push_documents_addition(self.documents) + } +} + +pub struct FinalDocumentsAddition<'a> { inner: &'a Index, document_ids: HashSet, document_store: RamDocumentStore, @@ -20,9 +41,9 @@ pub struct DocumentsAddition<'a> { ranked_map: RankedMap, } -impl<'a> DocumentsAddition<'a> { - pub fn new(inner: &'a Index, ranked_map: RankedMap) -> DocumentsAddition<'a> { - DocumentsAddition { +impl<'a> FinalDocumentsAddition<'a> { + pub fn new(inner: &'a Index, ranked_map: RankedMap) -> FinalDocumentsAddition<'a> { + FinalDocumentsAddition { inner, document_ids: HashSet::new(), document_store: RamDocumentStore::new(), diff --git a/meilidb-data/src/database/index/mod.rs b/meilidb-data/src/database/index/mod.rs index 2fa6c20e2..edb548e0a 100644 --- a/meilidb-data/src/database/index/mod.rs +++ b/meilidb-data/src/database/index/mod.rs @@ -1,12 +1,15 @@ use std::collections::HashSet; +use std::convert::TryInto; use std::sync::Arc; +use std::thread; use arc_swap::{ArcSwap, Guard}; use meilidb_core::criterion::Criteria; use meilidb_core::{DocIndex, Store, DocumentId, QueryBuilder}; use meilidb_schema::Schema; use sdset::SetBuf; -use serde::de; +use serde::{de, Serialize, Deserialize}; +use sled::Transactional; use crate::ranked_map::RankedMap; use crate::serde::{Deserializer, DeserializerError}; @@ -18,11 +21,11 @@ use self::docs_words_index::DocsWordsIndex; use self::documents_index::DocumentsIndex; use self::main_index::MainIndex; use self::synonyms_index::SynonymsIndex; -use self::updates_index::UpdatesIndex; use self::words_index::WordsIndex; use super::{ - DocumentsAddition, DocumentsDeletion, + DocumentsAddition, FinalDocumentsAddition, + DocumentsDeletion, SynonymsAddition, SynonymsDeletion, }; @@ -31,9 +34,83 @@ mod docs_words_index; mod documents_index; mod main_index; mod synonyms_index; -mod updates_index; mod words_index; +fn event_is_set(event: &sled::Event) -> bool { + match event { + sled::Event::Set(_, _) => true, + _ => false, + } +} + +#[derive(Deserialize)] +enum UpdateOwned { + DocumentsAddition(Vec), + DocumentsDeletion( () /*DocumentsDeletion*/), + SynonymsAddition( () /*SynonymsAddition*/), + SynonymsDeletion( () /*SynonymsDeletion*/), +} + +#[derive(Serialize)] +enum Update { + DocumentsAddition(Vec), + DocumentsDeletion( () /*DocumentsDeletion*/), + SynonymsAddition( () /*SynonymsAddition*/), + SynonymsDeletion( () /*SynonymsDeletion*/), +} + +fn spawn_update_system(index: Index) -> thread::JoinHandle<()> { + thread::spawn(move || { + loop { + let subscription = index.updates_index.watch_prefix(vec![]); + while let Some(result) = index.updates_index.iter().next() { + let (key, _) = result.unwrap(); + + let updates = &index.updates_index; + let results = &index.updates_results_index; + (updates, results).transaction(|(updates, results)| { + let update = updates.remove(&key)?.unwrap(); + let array_id = key.as_ref().try_into().unwrap(); + let id = u64::from_be_bytes(array_id); + + // this is an emulation of the try block (#31436) + let result: Result<(), Error> = (|| { + match bincode::deserialize(&update)? { + UpdateOwned::DocumentsAddition(documents) => { + let ranked_map = index.cache.load().ranked_map.clone(); + let mut addition = FinalDocumentsAddition::new(&index, ranked_map); + for document in documents { + addition.update_document(document)?; + } + addition.finalize()?; + }, + UpdateOwned::DocumentsDeletion(_) => { + // ... + }, + UpdateOwned::SynonymsAddition(_) => { + // ... + }, + UpdateOwned::SynonymsDeletion(_) => { + // ... + }, + } + Ok(()) + })(); + + let result = result.map_err(|e| e.to_string()); + let value = bincode::serialize(&result).unwrap(); + results.insert(&array_id, value) + }) + .unwrap(); + } + + // this subscription is just used to block + // the loop until a new update is inserted + subscription.filter(event_is_set).next(); + } + }) +} + #[derive(Copy, Clone)] pub struct IndexStats { pub number_of_words: usize, @@ -52,7 +129,11 @@ pub struct Index { docs_words_index: DocsWordsIndex, documents_index: DocumentsIndex, custom_settings_index: CustomSettingsIndex, - updates_index: UpdatesIndex, + + // used by the update system + db: sled::Db, + updates_index: Arc, + updates_results_index: Arc, } pub(crate) struct Cache { @@ -63,64 +144,23 @@ pub(crate) struct Cache { } impl Index { - pub fn new(db: &sled::Db, name: &str) -> Result { - let main_index = db.open_tree(name).map(MainIndex)?; - let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?; - let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?; - let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?; - let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?; - let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?; - - let updates = db.open_tree(format!("{}-updates", name))?; - let updates_results = db.open_tree(format!("{}-updates-results", name))?; - let updates_index = UpdatesIndex::new(db.clone(), updates, updates_results); - - let words = match main_index.words_set()? { - Some(words) => Arc::new(words), - None => Arc::new(fst::Set::default()), - }; - - let synonyms = match main_index.synonyms_set()? { - Some(synonyms) => Arc::new(synonyms), - None => Arc::new(fst::Set::default()), - }; - - let schema = match main_index.schema()? { - Some(schema) => schema, - None => return Err(Error::SchemaMissing), - }; - - let ranked_map = match main_index.ranked_map()? { - Some(map) => map, - None => RankedMap::default(), - }; - - let cache = Cache { words, synonyms, schema, ranked_map }; - let cache = ArcSwap::from_pointee(cache); - - Ok(Index { - cache, - main_index, - synonyms_index, - words_index, - docs_words_index, - documents_index, - custom_settings_index, - updates_index, - }) + pub fn new(db: sled::Db, name: &str) -> Result { + Index::new_raw(db, name, None) } - pub fn with_schema(db: &sled::Db, name: &str, schema: Schema) -> Result { + pub fn with_schema(db: sled::Db, name: &str, schema: Schema) -> Result { + Index::new_raw(db, name, Some(schema)) + } + + fn new_raw(db: sled::Db, name: &str, schema: Option) -> Result { let main_index = db.open_tree(name).map(MainIndex)?; let synonyms_index = db.open_tree(format!("{}-synonyms", name)).map(SynonymsIndex)?; let words_index = db.open_tree(format!("{}-words", name)).map(WordsIndex)?; let docs_words_index = db.open_tree(format!("{}-docs-words", name)).map(DocsWordsIndex)?; let documents_index = db.open_tree(format!("{}-documents", name)).map(DocumentsIndex)?; let custom_settings_index = db.open_tree(format!("{}-custom", name)).map(CustomSettingsIndex)?; - - let updates = db.open_tree(format!("{}-updates", name))?; - let updates_results = db.open_tree(format!("{}-updates-results", name))?; - let updates_index = UpdatesIndex::new(db.clone(), updates, updates_results); + let updates_index = db.open_tree(format!("{}-updates", name))?; + let updates_results_index = db.open_tree(format!("{}-updates-results", name))?; let words = match main_index.words_set()? { Some(words) => Arc::new(words), @@ -132,12 +172,18 @@ impl Index { None => Arc::new(fst::Set::default()), }; - match main_index.schema()? { - Some(current) => if current != schema { + let schema = match (schema, main_index.schema()?) { + (Some(ref expected), Some(ref current)) if current != expected => { return Err(Error::SchemaDiffer) }, - None => main_index.set_schema(&schema)?, - } + (Some(expected), Some(_)) => expected, + (Some(expected), None) => { + main_index.set_schema(&expected)?; + expected + }, + (None, Some(current)) => current, + (None, None) => return Err(Error::SchemaMissing), + }; let ranked_map = match main_index.ranked_map()? { Some(map) => map, @@ -147,7 +193,7 @@ impl Index { let cache = Cache { words, synonyms, schema, ranked_map }; let cache = ArcSwap::from_pointee(cache); - Ok(Index { + let index = Index { cache, main_index, synonyms_index, @@ -155,8 +201,14 @@ impl Index { docs_words_index, documents_index, custom_settings_index, + db, updates_index, - }) + updates_results_index, + }; + + let _handle = spawn_update_system(index.clone()); + + Ok(index) } pub fn stats(&self) -> sled::Result { @@ -202,9 +254,8 @@ impl Index { self.custom_settings_index.clone() } - pub fn documents_addition(&self) -> DocumentsAddition { - let ranked_map = self.cache.load().ranked_map.clone(); - DocumentsAddition::new(self, ranked_map) + pub fn documents_addition(&self) -> DocumentsAddition { + DocumentsAddition::new(self) } pub fn documents_deletion(&self) -> DocumentsDeletion { @@ -245,6 +296,40 @@ impl Index { } } +impl Index { + pub(crate) fn push_documents_addition(&self, addition: Vec) -> Result + where D: serde::Serialize + { + let addition = Update::DocumentsAddition(addition); + let update = bincode::serialize(&addition)?; + self.raw_push_update(update) + } + + pub(crate) fn push_documents_deletion(&self, deletion: DocumentsDeletion) -> Result { + let update = bincode::serialize(&())?; + self.raw_push_update(update) + } + + pub(crate) fn push_synonyms_addition(&self, addition: SynonymsAddition) -> Result { + let update = bincode::serialize(&())?; + self.raw_push_update(update) + } + + pub(crate) fn push_synonyms_deletion(&self, deletion: SynonymsDeletion) -> Result { + let update = bincode::serialize(&())?; + self.raw_push_update(update) + } + + fn raw_push_update(&self, raw_update: Vec) -> Result { + let update_id = self.db.generate_id()?; + let update_id_array = update_id.to_be_bytes(); + + self.updates_index.insert(update_id_array, raw_update)?; + + Ok(update_id) + } +} + pub struct RefIndex<'a> { pub(crate) cache: Guard<'static, Arc>, pub main_index: &'a MainIndex, diff --git a/meilidb-data/src/database/index/updates_index.rs b/meilidb-data/src/database/index/updates_index.rs deleted file mode 100644 index 7b91eaaa2..000000000 --- a/meilidb-data/src/database/index/updates_index.rs +++ /dev/null @@ -1,110 +0,0 @@ -use std::convert::TryInto; -use std::sync::Arc; -use std::thread; - -use log::info; -use sled::Event; -use serde::{Serialize, Deserialize}; - -use super::Error; -use crate::database::{ - DocumentsAddition, DocumentsDeletion, SynonymsAddition, SynonymsDeletion -}; - -fn event_is_set(event: &Event) -> bool { - match event { - Event::Set(_, _) => true, - _ => false, - } -} - -#[derive(Serialize, Deserialize)] -enum Update { - DocumentsAddition( () /*DocumentsAddition*/), - DocumentsDeletion( () /*DocumentsDeletion*/), - SynonymsAddition( () /*SynonymsAddition*/), - SynonymsDeletion( () /*SynonymsDeletion*/), -} - -#[derive(Clone)] -pub struct UpdatesIndex { - db: sled::Db, - updates: Arc, - results: Arc, -} - -impl UpdatesIndex { - pub fn new( - db: sled::Db, - updates: Arc, - results: Arc, - ) -> UpdatesIndex - { - let updates_clone = updates.clone(); - let results_clone = results.clone(); - let _handle = thread::spawn(move || { - loop { - let mut subscription = updates_clone.watch_prefix(vec![]); - - while let Some((key, update)) = updates_clone.pop_min().unwrap() { - let array = key.as_ref().try_into().unwrap(); - let id = u64::from_be_bytes(array); - - match bincode::deserialize(&update).unwrap() { - Update::DocumentsAddition(_) => { - info!("processing the document addition (update number {})", id); - // ... - }, - Update::DocumentsDeletion(_) => { - info!("processing the document deletion (update number {})", id); - // ... - }, - Update::SynonymsAddition(_) => { - info!("processing the synonyms addition (update number {})", id); - // ... - }, - Update::SynonymsDeletion(_) => { - info!("processing the synonyms deletion (update number {})", id); - // ... - }, - } - } - - // this subscription is just used to block - // the loop until a new update is inserted - subscription.filter(event_is_set).next(); - } - }); - - UpdatesIndex { db, updates, results } - } - - pub fn push_documents_addition(&self, addition: DocumentsAddition) -> Result { - let update = bincode::serialize(&())?; - self.raw_push_update(update) - } - - pub fn push_documents_deletion(&self, deletion: DocumentsDeletion) -> Result { - let update = bincode::serialize(&())?; - self.raw_push_update(update) - } - - pub fn push_synonyms_addition(&self, addition: SynonymsAddition) -> Result { - let update = bincode::serialize(&())?; - self.raw_push_update(update) - } - - pub fn push_synonyms_deletion(&self, deletion: SynonymsDeletion) -> Result { - let update = bincode::serialize(&())?; - self.raw_push_update(update) - } - - fn raw_push_update(&self, raw_update: Vec) -> Result { - let update_id = self.db.generate_id()?; - let update_id_array = update_id.to_be_bytes(); - - self.updates.insert(update_id_array, raw_update)?; - - Ok(update_id) - } -} diff --git a/meilidb-data/src/database/mod.rs b/meilidb-data/src/database/mod.rs index 4a1bb7a0b..b23856ca3 100644 --- a/meilidb-data/src/database/mod.rs +++ b/meilidb-data/src/database/mod.rs @@ -15,7 +15,7 @@ mod synonyms_deletion; pub use self::error::Error; pub use self::index::{Index, CustomSettingsIndex}; -use self::documents_addition::DocumentsAddition; +use self::documents_addition::{DocumentsAddition, FinalDocumentsAddition}; use self::documents_deletion::DocumentsDeletion; use self::synonyms_addition::SynonymsAddition; use self::synonyms_deletion::SynonymsDeletion; @@ -75,7 +75,7 @@ impl Database { return Ok(None) } - let index = Index::new(&self.inner, name)?; + let index = Index::new(self.inner.clone(), name)?; vacant.insert(index).clone() }, }; @@ -91,7 +91,7 @@ impl Database { occupied.get().clone() }, Entry::Vacant(vacant) => { - let index = Index::with_schema(&self.inner, name, schema)?; + let index = Index::with_schema(self.inner.clone(), name, schema)?; let mut indexes = self.indexes()?; indexes.insert(name.to_string()); diff --git a/meilidb-data/tests/updates.rs b/meilidb-data/tests/updates.rs index 2b832236e..3c630ebb4 100644 --- a/meilidb-data/tests/updates.rs +++ b/meilidb-data/tests/updates.rs @@ -20,7 +20,7 @@ fn insert_delete_document() { let doc1 = json!({ "objectId": 123, "title": "hello" }); let mut addition = index.documents_addition(); - addition.update_document(&doc1).unwrap(); + addition.update_document(&doc1); addition.finalize().unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap(); @@ -28,7 +28,7 @@ fn insert_delete_document() { assert_eq!(index.document(None, docs[0].id).unwrap().as_ref(), Some(&doc1)); let mut deletion = index.documents_deletion(); - deletion.delete_document(&doc1).unwrap(); + deletion.delete_document(&doc1); deletion.finalize().unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap(); @@ -47,7 +47,7 @@ fn replace_document() { let doc2 = json!({ "objectId": 123, "title": "coucou" }); let mut addition = index.documents_addition(); - addition.update_document(&doc1).unwrap(); + addition.update_document(&doc1); addition.finalize().unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap(); @@ -55,7 +55,7 @@ fn replace_document() { assert_eq!(index.document(None, docs[0].id).unwrap().as_ref(), Some(&doc1)); let mut deletion = index.documents_addition(); - deletion.update_document(&doc2).unwrap(); + deletion.update_document(&doc2); deletion.finalize().unwrap(); let docs = index.query_builder().query("hello", 0..10).unwrap(); diff --git a/meilidb/examples/create-database.rs b/meilidb/examples/create-database.rs index a663bc35c..d49979f28 100644 --- a/meilidb/examples/create-database.rs +++ b/meilidb/examples/create-database.rs @@ -6,7 +6,6 @@ use std::io::{self, BufRead, BufReader}; use std::path::{Path, PathBuf}; use std::time::Instant; use std::error::Error; -use std::borrow::Cow; use std::fs::File; use diskus::Walk; @@ -44,9 +43,8 @@ pub struct Opt { } #[derive(Serialize, Deserialize)] -struct Document<'a> ( - #[serde(borrow)] - HashMap, Cow<'a, str>> +struct Document ( + HashMap ); #[derive(Debug, Clone, Serialize, Deserialize)] @@ -138,7 +136,7 @@ fn index( } }; - update.update_document(&document)?; + update.update_document(document); print!("\rindexing document {}", i); i += 1;