diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index 0203e2e26..0e40b695d 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -4,7 +4,7 @@ use std::path::Path; use std::sync::{Arc, RwLock}; use std::{fs, thread}; -use crossbeam_channel::Receiver; +use crossbeam_channel::{Receiver, Sender}; use heed::types::{Str, Unit}; use heed::{CompactionOption, Result as ZResult}; use log::debug; @@ -33,9 +33,17 @@ macro_rules! r#break_try { }; } -fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc, index: Index) { - for () in receiver { - // consume all updates in order (oldest first) +pub enum UpdateEvent { + NewUpdate, + MustStop, +} + +pub type UpdateEvents = Receiver; +pub type UpdateEventsEmitter = Sender; + +fn update_awaiter(receiver: UpdateEvents, env: heed::Env, update_fn: Arc, index: Index) { + let mut receiver = receiver.into_iter(); + while let Some(UpdateEvent::NewUpdate) = receiver.next() { loop { // instantiate a main/parent transaction let mut writer = break_try!(env.write_txn(), "LMDB write transaction begin failed"); @@ -80,6 +88,8 @@ fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc) -> MResult { + let name = name.as_ref(); + let mut indexes_lock = self.indexes.write().unwrap(); + + match indexes_lock.remove_entry(name) { + Some((name, (index, _fn, handle))) => { + // remove the index name from the list of indexes + // and clear all the LMDB dbi + let mut writer = self.env.write_txn()?; + self.indexes_store.delete(&mut writer, &name)?; + store::clear(&mut writer, &index)?; + writer.commit()?; + + // join the update loop thread to ensure it is stopped + handle.join().unwrap(); + + Ok(true) + } + None => Ok(false), + } + } + pub fn set_update_callback(&self, name: impl AsRef, update_fn: BoxUpdateFn) -> bool { let indexes_lock = self.indexes.read().unwrap(); match indexes_lock.get(name.as_ref()) { @@ -733,4 +765,18 @@ mod tests { }); assert_eq!(document, Some(new_doc2)); } + + #[test] + fn delete_index() { + let dir = tempfile::tempdir().unwrap(); + + let database = Database::open_or_create(dir.path()).unwrap(); + let _index = database.create_index("test").unwrap(); + + let deleted = database.delete_index("test").unwrap(); + assert!(deleted); + + let result = database.open_index("test"); + assert!(result.is_none()); + } } diff --git a/meilidb-core/src/store/main.rs b/meilidb-core/src/store/main.rs index dca995759..5755c2da1 100644 --- a/meilidb-core/src/store/main.rs +++ b/meilidb-core/src/store/main.rs @@ -18,6 +18,10 @@ pub struct Main { } impl Main { + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + self.main.clear(writer) + } + pub fn put_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> { let bytes = fst.as_fst().as_bytes(); self.main.put::(writer, WORDS_KEY, bytes) diff --git a/meilidb-core/src/store/mod.rs b/meilidb-core/src/store/mod.rs index f53900a79..2c5bdc31e 100644 --- a/meilidb-core/src/store/mod.rs +++ b/meilidb-core/src/store/mod.rs @@ -26,6 +26,7 @@ use serde::de::{self, Deserialize}; use zerocopy::{AsBytes, FromBytes}; use crate::criterion::Criteria; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::serde::Deserializer; use crate::{query_builder::QueryBuilder, update, DocumentId, Error, MResult}; @@ -91,7 +92,7 @@ pub struct Index { pub updates: Updates, pub updates_results: UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, } impl Index { @@ -139,12 +140,12 @@ impl Index { } pub fn schema_update(&self, writer: &mut heed::RwTxn, schema: Schema) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); update::push_schema_update(writer, self.updates, self.updates_results, schema) } pub fn customs_update(&self, writer: &mut heed::RwTxn, customs: Vec) -> ZResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); update::push_customs_update(writer, self.updates, self.updates_results, customs) } @@ -173,7 +174,7 @@ impl Index { } pub fn clear_all(&self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); update::push_clear_all(writer, self.updates, self.updates_results) } @@ -276,7 +277,7 @@ impl Index { pub fn create( env: &heed::Env, name: &str, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> MResult { // create all the store names let main_name = main_name(name); @@ -316,7 +317,7 @@ pub fn create( pub fn open( env: &heed::Env, name: &str, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> MResult> { // create all the store names let main_name = main_name(name); @@ -376,3 +377,19 @@ pub fn open( updates_notifier, })) } + +pub fn clear(writer: &mut heed::RwTxn, index: &Index) -> MResult<()> { + // send a stop event to the update loop of the index + index.updates_notifier.send(UpdateEvent::MustStop).unwrap(); + + // clear all the stores + index.main.clear(writer)?; + index.postings_lists.clear(writer)?; + index.documents_fields.clear(writer)?; + index.documents_fields_counts.clear(writer)?; + index.synonyms.clear(writer)?; + index.docs_words.clear(writer)?; + index.updates.clear(writer)?; + index.updates_results.clear(writer)?; + Ok(()) +} diff --git a/meilidb-core/src/store/synonyms.rs b/meilidb-core/src/store/synonyms.rs index 80b8bba67..2c497b86a 100644 --- a/meilidb-core/src/store/synonyms.rs +++ b/meilidb-core/src/store/synonyms.rs @@ -22,6 +22,10 @@ impl Synonyms { self.synonyms.delete(writer, word) } + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + self.synonyms.clear(writer) + } + pub fn synonyms(self, reader: &heed::RoTxn, word: &[u8]) -> ZResult> { match self.synonyms.get(reader, word)? { Some(bytes) => { diff --git a/meilidb-core/src/store/updates.rs b/meilidb-core/src/store/updates.rs index 4b3cdc51c..984da7b58 100644 --- a/meilidb-core/src/store/updates.rs +++ b/meilidb-core/src/store/updates.rs @@ -52,4 +52,8 @@ impl Updates { None => Ok(None), } } + + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + self.updates.clear(writer) + } } diff --git a/meilidb-core/src/store/updates_results.rs b/meilidb-core/src/store/updates_results.rs index 0a9d370a9..cf99c2acf 100644 --- a/meilidb-core/src/store/updates_results.rs +++ b/meilidb-core/src/store/updates_results.rs @@ -38,4 +38,8 @@ impl UpdatesResults { let update_id = BEU64::new(update_id); self.updates_results.get(reader, &update_id) } + + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + self.updates_results.clear(writer) + } } diff --git a/meilidb-core/src/update/documents_addition.rs b/meilidb-core/src/update/documents_addition.rs index 7a7f03362..91683a931 100644 --- a/meilidb-core/src/update/documents_addition.rs +++ b/meilidb-core/src/update/documents_addition.rs @@ -4,6 +4,7 @@ use fst::{set::OpBuilder, SetBuilder}; use sdset::{duo::Union, SetOperation}; use serde::{Deserialize, Serialize}; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::raw_indexer::RawIndexer; use crate::serde::{extract_document_id, serialize_value, Deserializer, Serializer}; use crate::store; @@ -13,7 +14,7 @@ use crate::{Error, MResult, RankedMap}; pub struct DocumentsAddition { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, documents: Vec, is_partial: bool, } @@ -22,7 +23,7 @@ impl DocumentsAddition { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> DocumentsAddition { DocumentsAddition { updates_store, @@ -36,7 +37,7 @@ impl DocumentsAddition { pub fn new_partial( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> DocumentsAddition { DocumentsAddition { updates_store, @@ -55,7 +56,7 @@ impl DocumentsAddition { where D: serde::Serialize, { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_documents_addition( writer, self.updates_store, diff --git a/meilidb-core/src/update/documents_deletion.rs b/meilidb-core/src/update/documents_deletion.rs index 31a9f4927..fb0f16ea1 100644 --- a/meilidb-core/src/update/documents_deletion.rs +++ b/meilidb-core/src/update/documents_deletion.rs @@ -4,6 +4,7 @@ use fst::{SetBuilder, Streamer}; use meilidb_schema::Schema; use sdset::{duo::DifferenceByKey, SetBuf, SetOperation}; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::serde::extract_document_id; use crate::store; use crate::update::{next_update_id, Update}; @@ -12,7 +13,7 @@ use crate::{DocumentId, Error, MResult, RankedMap}; pub struct DocumentsDeletion { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, documents: Vec, } @@ -20,7 +21,7 @@ impl DocumentsDeletion { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> DocumentsDeletion { DocumentsDeletion { updates_store, @@ -50,7 +51,7 @@ impl DocumentsDeletion { } pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_documents_deletion( writer, self.updates_store, diff --git a/meilidb-core/src/update/stop_words_addition.rs b/meilidb-core/src/update/stop_words_addition.rs index 6adba450b..12b00e373 100644 --- a/meilidb-core/src/update/stop_words_addition.rs +++ b/meilidb-core/src/update/stop_words_addition.rs @@ -3,13 +3,14 @@ use std::collections::BTreeSet; use fst::{set::OpBuilder, SetBuilder}; use crate::automaton::normalize_str; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::{next_update_id, Update}; use crate::{store, MResult}; pub struct StopWordsAddition { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, stop_words: BTreeSet, } @@ -17,7 +18,7 @@ impl StopWordsAddition { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> StopWordsAddition { StopWordsAddition { updates_store, @@ -33,7 +34,7 @@ impl StopWordsAddition { } pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_stop_words_addition( writer, self.updates_store, diff --git a/meilidb-core/src/update/stop_words_deletion.rs b/meilidb-core/src/update/stop_words_deletion.rs index 11c72ded9..eb4d1700c 100644 --- a/meilidb-core/src/update/stop_words_deletion.rs +++ b/meilidb-core/src/update/stop_words_deletion.rs @@ -3,6 +3,7 @@ use std::collections::BTreeSet; use fst::{set::OpBuilder, SetBuilder}; use crate::automaton::normalize_str; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::documents_addition::reindex_all_documents; use crate::update::{next_update_id, Update}; use crate::{store, MResult}; @@ -10,7 +11,7 @@ use crate::{store, MResult}; pub struct StopWordsDeletion { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, stop_words: BTreeSet, } @@ -18,7 +19,7 @@ impl StopWordsDeletion { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> StopWordsDeletion { StopWordsDeletion { updates_store, @@ -34,7 +35,7 @@ impl StopWordsDeletion { } pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_stop_words_deletion( writer, self.updates_store, diff --git a/meilidb-core/src/update/synonyms_addition.rs b/meilidb-core/src/update/synonyms_addition.rs index 656e5e727..b6a5ab2d4 100644 --- a/meilidb-core/src/update/synonyms_addition.rs +++ b/meilidb-core/src/update/synonyms_addition.rs @@ -4,13 +4,14 @@ use fst::{set::OpBuilder, SetBuilder}; use sdset::SetBuf; use crate::automaton::normalize_str; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::{next_update_id, Update}; use crate::{store, MResult}; pub struct SynonymsAddition { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, synonyms: BTreeMap>, } @@ -18,7 +19,7 @@ impl SynonymsAddition { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> SynonymsAddition { SynonymsAddition { updates_store, @@ -43,7 +44,7 @@ impl SynonymsAddition { } pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_synonyms_addition( writer, self.updates_store, diff --git a/meilidb-core/src/update/synonyms_deletion.rs b/meilidb-core/src/update/synonyms_deletion.rs index b76f3f761..6ec875bb6 100644 --- a/meilidb-core/src/update/synonyms_deletion.rs +++ b/meilidb-core/src/update/synonyms_deletion.rs @@ -5,13 +5,14 @@ use fst::{set::OpBuilder, SetBuilder}; use sdset::SetBuf; use crate::automaton::normalize_str; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::{next_update_id, Update}; use crate::{store, MResult}; pub struct SynonymsDeletion { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, synonyms: BTreeMap>>, } @@ -19,7 +20,7 @@ impl SynonymsDeletion { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> SynonymsDeletion { SynonymsDeletion { updates_store, @@ -50,7 +51,7 @@ impl SynonymsDeletion { } pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_synonyms_deletion( writer, self.updates_store, diff --git a/meilidb-http/src/routes/document.rs b/meilidb-http/src/routes/document.rs index a627b2195..289fb31ae 100644 --- a/meilidb-http/src/routes/document.rs +++ b/meilidb-http/src/routes/document.rs @@ -151,7 +151,7 @@ fn infered_schema(document: &IndexMap) -> Option) -> SResult { +async fn update_multiple_documents(mut ctx: Context, is_partial: bool) -> SResult { ctx.is_allowed(DocumentsWrite)?; if !ctx.state().accept_updates() { @@ -179,7 +179,11 @@ pub async fn add_or_update_multiple_documents(mut ctx: Context) -> SResult } } - let mut document_addition = index.documents_addition(); + let mut document_addition = if is_partial { + index.documents_partial_addition() + } else { + index.documents_addition() + }; for document in data { document_addition.update_document(document); @@ -197,6 +201,14 @@ pub async fn add_or_update_multiple_documents(mut ctx: Context) -> SResult .into_response()) } +pub async fn add_or_replace_multiple_documents(ctx: Context) -> SResult { + update_multiple_documents(ctx, false).await +} + +pub async fn add_or_update_multiple_documents(ctx: Context) -> SResult { + update_multiple_documents(ctx, true).await +} + pub async fn delete_multiple_documents(mut ctx: Context) -> SResult { ctx.is_allowed(DocumentsWrite)?; if !ctx.state().accept_updates() { diff --git a/meilidb-http/src/routes/index.rs b/meilidb-http/src/routes/index.rs index 85cd4b236..320c0ff48 100644 --- a/meilidb-http/src/routes/index.rs +++ b/meilidb-http/src/routes/index.rs @@ -188,15 +188,19 @@ pub async fn get_all_updates_status(ctx: Context) -> SResult { pub async fn delete_index(ctx: Context) -> SResult { ctx.is_allowed(IndexesWrite)?; - let _index_name = ctx.url_param("index")?; - let _index = ctx.index()?; + let index_name = ctx.url_param("index")?; - // ctx.state() - // .db - // .delete_index(&index_name) - // .map_err(ResponseError::internal)?; + let found = ctx + .state() + .db + .delete_index(&index_name) + .map_err(ResponseError::internal)?; - Ok(StatusCode::NOT_IMPLEMENTED) + if found { + Ok(StatusCode::OK) + } else { + Ok(StatusCode::NOT_FOUND) + } } pub fn index_update_callback(index_name: &str, data: &Data, _status: ProcessedUpdateResult) { diff --git a/meilidb-http/src/routes/mod.rs b/meilidb-http/src/routes/mod.rs index 7bb56ab21..efcf5d4f4 100644 --- a/meilidb-http/src/routes/mod.rs +++ b/meilidb-http/src/routes/mod.rs @@ -37,7 +37,8 @@ pub fn load_routes(app: &mut tide::App) { router .at("/") .get(document::browse_documents) - .post(document::add_or_update_multiple_documents) + .post(document::add_or_replace_multiple_documents) + .put(document::add_or_update_multiple_documents) .delete(document::clear_all_documents); router.at("/:identifier").nest(|router| {