diff --git a/src/data/mod.rs b/src/data/mod.rs index 3d6858763..ecc8a7e2e 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1,4 +1,4 @@ -mod search; +pub mod search; mod updates; pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; diff --git a/src/data/search.rs b/src/data/search.rs index 23c6b463e..6ab792073 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -123,27 +123,27 @@ impl SearchQuery { #[derive(Serialize)] #[serde(rename_all = "camelCase")] pub struct SearchResult { - hits: Vec>, - nb_hits: u64, - query: String, - limit: usize, - offset: usize, - processing_time_ms: u128, + pub hits: Vec>, + pub nb_hits: u64, + pub query: String, + pub limit: usize, + pub offset: usize, + pub processing_time_ms: u128, #[serde(skip_serializing_if = "Option::is_none")] - facet_distributions: Option>>, + pub facet_distributions: Option>>, } -struct Highlighter<'a, A> { +pub struct Highlighter<'a, A> { analyzer: Analyzer<'a, A>, } impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> { - fn new(stop_words: &'a fst::Set) -> Self { + pub fn new(stop_words: &'a fst::Set) -> Self { let analyzer = Analyzer::new(AnalyzerConfig::default_with_stopwords(stop_words)); Self { analyzer } } - fn highlight_value(&self, value: Value, words_to_highlight: &HashSet) -> Value { + pub fn highlight_value(&self, value: Value, words_to_highlight: &HashSet) -> Value { match value { Value::Null => Value::Null, Value::Bool(boolean) => Value::Bool(boolean), @@ -182,7 +182,7 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> { } } - fn highlight_record( + pub fn highlight_record( &self, object: &mut Map, words_to_highlight: &HashSet, @@ -199,16 +199,12 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> { } impl Data { - pub fn search>( + pub async fn search>( &self, - _index: S, - _search_query: SearchQuery, + index: S, + search_query: SearchQuery, ) -> anyhow::Result { - todo!() - //match self.index_controller.index(&index)? { - //Some(index) => Ok(search_query.perform(index)?), - //None => bail!("index {:?} doesn't exists", index.as_ref()), - //} + self.index_controller.search(index.as_ref().to_string(), search_query).await } pub async fn retrieve_documents( diff --git a/src/index_controller/actor_index_controller/index_actor.rs b/src/index_controller/actor_index_controller/index_actor.rs index 6123ca774..0cb057a9b 100644 --- a/src/index_controller/actor_index_controller/index_actor.rs +++ b/src/index_controller/actor_index_controller/index_actor.rs @@ -1,6 +1,7 @@ use std::fs::{File, create_dir_all}; use std::path::{PathBuf, Path}; use std::sync::Arc; +use std::time::Instant; use chrono::Utc; use heed::EnvOpenOptions; @@ -11,8 +12,12 @@ use thiserror::Error; use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; use log::info; +use crate::data::SearchQuery; +use futures::stream::{StreamExt, Stream}; use super::update_handler::UpdateHandler; +use async_stream::stream; +use crate::data::SearchResult; use crate::index_controller::{IndexMetadata, UpdateMeta, updates::{Processed, Failed, Processing}, UpdateResult as UResult}; use crate::option::IndexerOpts; @@ -22,7 +27,8 @@ type UpdateResult = std::result::Result, Failed, ret: oneshot::Sender> }, - Update { meta: Processing, data: std::fs::File, ret: oneshot::Sender}, + Update { meta: Processing, data: std::fs::File, ret: oneshot::Sender}, + Search { uuid: Uuid, query: SearchQuery, ret: oneshot::Sender> }, } struct IndexActor { @@ -43,6 +49,7 @@ pub enum IndexError { trait IndexStore { async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; async fn get_or_create(&self, uuid: Uuid) -> Result>; + async fn get(&self, uuid: Uuid) -> Result>>; } impl IndexActor { @@ -54,13 +61,109 @@ impl IndexActor { } async fn run(mut self) { - loop { - match self.inbox.recv().await { - Some(IndexMsg::CreateIndex { uuid, primary_key, ret }) => self.handle_create_index(uuid, primary_key, ret).await, - Some(IndexMsg::Update { ret, meta, data }) => self.handle_update(meta, data, ret).await, - None => break, + let stream = stream! { + loop { + match self.inbox.recv().await { + Some(msg) => yield msg, + None => break, + } } - } + }; + + stream.for_each_concurent(Some(10), |msg| { + match msg { + IndexMsg::CreateIndex { uuid, primary_key, ret } => self.handle_create_index(uuid, primary_key, ret), + IndexMsg::Update { ret, meta, data } => self.handle_update(meta, data, ret), + IndexMsg::Search { ret, query, uuid } => self.handle_search(uuid, query, ret), + } + }) + } + + async fn handle_search(&self, uuid: Uuid, query: SearchQuery, ret: oneshot::Sender>) { + let index = self.store.get(uuid).await.unwrap().unwrap(); + tokio::task::spawn_blocking(move || { + + let before_search = Instant::now(); + let rtxn = index.read_txn().unwrap(); + + let mut search = index.search(&rtxn); + + if let Some(ref query) = query.q { + search.query(query); + } + + search.limit(query.limit); + search.offset(query.offset.unwrap_or_default()); + + //if let Some(ref facets) = query.facet_filters { + //if let Some(facets) = parse_facets(facets, index, &rtxn)? { + //search.facet_condition(facets); + //} + //} + let milli::SearchResult { + documents_ids, + found_words, + candidates, + .. + } = search.execute().unwrap(); + let mut documents = Vec::new(); + let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + + let displayed_fields_ids = index.displayed_fields_ids(&rtxn).unwrap(); + + let attributes_to_retrieve_ids = match query.attributes_to_retrieve { + Some(ref attrs) if attrs.iter().any(|f| f == "*") => None, + Some(ref attrs) => attrs + .iter() + .filter_map(|f| fields_ids_map.id(f)) + .collect::>() + .into(), + None => None, + }; + + let displayed_fields_ids = match (displayed_fields_ids, attributes_to_retrieve_ids) { + (_, Some(ids)) => ids, + (Some(ids), None) => ids, + (None, None) => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; + + let stop_words = fst::Set::default(); + let highlighter = crate::data::search::Highlighter::new(&stop_words); + + for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() { + let mut object = milli::obkv_to_json(&displayed_fields_ids, &fields_ids_map, obkv).unwrap(); + if let Some(ref attributes_to_highlight) = query.attributes_to_highlight { + highlighter.highlight_record(&mut object, &found_words, attributes_to_highlight); + } + documents.push(object); + } + + let nb_hits = candidates.len(); + + let facet_distributions = match query.facet_distributions { + Some(ref fields) => { + let mut facet_distribution = index.facets_distribution(&rtxn); + if fields.iter().all(|f| f != "*") { + facet_distribution.facets(fields); + } + Some(facet_distribution.candidates(candidates).execute().unwrap()) + } + None => None, + }; + + let result = Ok(SearchResult { + hits: documents, + nb_hits, + query: query.q.clone().unwrap_or_default(), + limit: query.limit, + offset: query.offset.unwrap_or_default(), + processing_time_ms: before_search.elapsed().as_millis(), + facet_distributions, + }); + + ret.send(result) + }); + } async fn handle_create_index(&self, uuid: Uuid, primary_key: Option, ret: oneshot::Sender>) { @@ -107,6 +210,13 @@ impl IndexActorHandle { let _ = self.sender.send(msg).await; receiver.await.expect("IndexActor has been killed") } + + pub async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Search { uuid, query, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } } struct MapIndexStore { @@ -162,6 +272,10 @@ impl IndexStore for MapIndexStore { Entry::Occupied(entry) => Ok(entry.get().clone()), } } + + async fn get(&self, uuid: Uuid) -> Result>> { + Ok(self.index_store.read().await.get(&uuid).cloned()) + } } impl MapIndexStore { diff --git a/src/index_controller/actor_index_controller/mod.rs b/src/index_controller/actor_index_controller/mod.rs index 2936c59ea..646d9cf45 100644 --- a/src/index_controller/actor_index_controller/mod.rs +++ b/src/index_controller/actor_index_controller/mod.rs @@ -10,6 +10,7 @@ use uuid::Uuid; use super::IndexMetadata; use tokio::fs::File; use super::UpdateMeta; +use crate::data::{SearchResult, SearchQuery}; pub struct ActorIndexController { uuid_resolver: uuid_resolver::UuidResolverHandle, @@ -97,4 +98,10 @@ impl IndexController for ActorIndexController { fn update_index(&self, name: String, index_settings: super::IndexSettings) -> anyhow::Result { todo!() } + + async fn search(&self, name: String, query: SearchQuery) -> anyhow::Result { + let uuid = self.uuid_resolver.resolve(name).await.unwrap().unwrap(); + let result = self.index_handle.search(uuid, query).await?; + Ok(result) + } } diff --git a/src/index_controller/actor_index_controller/update_handler.rs b/src/index_controller/actor_index_controller/update_handler.rs new file mode 100644 index 000000000..d9ac2f866 --- /dev/null +++ b/src/index_controller/actor_index_controller/update_handler.rs @@ -0,0 +1,260 @@ +use std::collections::HashMap; +use std::io; +use std::fs::File; + +use anyhow::Result; +use flate2::read::GzDecoder; +use grenad::CompressionType; +use log::info; +use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; +use milli::Index; +use rayon::ThreadPool; + +use crate::index_controller::updates::{Failed, Processed, Processing}; +use crate::index_controller::{Facets, Settings, UpdateMeta, UpdateResult}; +use crate::option::IndexerOpts; + +pub struct UpdateHandler { + max_nb_chunks: Option, + chunk_compression_level: Option, + thread_pool: ThreadPool, + log_frequency: usize, + max_memory: usize, + linked_hash_map_size: usize, + chunk_compression_type: CompressionType, + chunk_fusing_shrink_size: u64, +} + +impl UpdateHandler { + pub fn new( + opt: &IndexerOpts, + ) -> anyhow::Result { + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(opt.indexing_jobs.unwrap_or(0)) + .build()?; + Ok(Self { + max_nb_chunks: opt.max_nb_chunks, + chunk_compression_level: opt.chunk_compression_level, + thread_pool, + log_frequency: opt.log_every_n, + max_memory: opt.max_memory.get_bytes() as usize, + linked_hash_map_size: opt.linked_hash_map_size, + chunk_compression_type: opt.chunk_compression_type, + chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(), + }) + } + + fn update_buidler(&self, update_id: u64) -> UpdateBuilder { + // We prepare the update by using the update builder. + let mut update_builder = UpdateBuilder::new(update_id); + if let Some(max_nb_chunks) = self.max_nb_chunks { + update_builder.max_nb_chunks(max_nb_chunks); + } + if let Some(chunk_compression_level) = self.chunk_compression_level { + update_builder.chunk_compression_level(chunk_compression_level); + } + update_builder.thread_pool(&self.thread_pool); + update_builder.log_every_n(self.log_frequency); + update_builder.max_memory(self.max_memory); + update_builder.linked_hash_map_size(self.linked_hash_map_size); + update_builder.chunk_compression_type(self.chunk_compression_type); + update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size); + update_builder + } + + fn update_documents( + &self, + format: UpdateFormat, + method: IndexDocumentsMethod, + content: File, + update_builder: UpdateBuilder, + primary_key: Option<&str>, + index: &Index, + ) -> anyhow::Result { + info!("performing document addition"); + // We must use the write transaction of the update here. + let mut wtxn = index.write_txn()?; + + // Set the primary key if not set already, ignore if already set. + match (index.primary_key(&wtxn)?, primary_key) { + (None, Some(ref primary_key)) => { + index.put_primary_key(&mut wtxn, primary_key)?; + } + _ => (), + } + + let mut builder = update_builder.index_documents(&mut wtxn, index); + builder.update_format(format); + builder.index_documents_method(method); + + let gzipped = false; + let reader = if gzipped { + Box::new(GzDecoder::new(content)) + } else { + Box::new(content) as Box + }; + + let result = builder.execute(reader, |indexing_step, update_id| { + info!("update {}: {:?}", update_id, indexing_step) + }); + + info!("document addition done: {:?}", result); + + match result { + Ok(addition_result) => wtxn + .commit() + .and(Ok(UpdateResult::DocumentsAddition(addition_result))) + .map_err(Into::into), + Err(e) => Err(e.into()), + } + } + + fn clear_documents(&self, update_builder: UpdateBuilder, index: &Index) -> anyhow::Result { + // We must use the write transaction of the update here. + let mut wtxn = index.write_txn()?; + let builder = update_builder.clear_documents(&mut wtxn, index); + + match builder.execute() { + Ok(_count) => wtxn + .commit() + .and(Ok(UpdateResult::Other)) + .map_err(Into::into), + Err(e) => Err(e.into()), + } + } + + fn update_settings( + &self, + settings: &Settings, + update_builder: UpdateBuilder, + index: &Index, + ) -> anyhow::Result { + // We must use the write transaction of the update here. + let mut wtxn = index.write_txn()?; + let mut builder = update_builder.settings(&mut wtxn, index); + + // We transpose the settings JSON struct into a real setting update. + if let Some(ref names) = settings.searchable_attributes { + match names { + Some(names) => builder.set_searchable_fields(names.clone()), + None => builder.reset_searchable_fields(), + } + } + + // We transpose the settings JSON struct into a real setting update. + if let Some(ref names) = settings.displayed_attributes { + match names { + Some(names) => builder.set_displayed_fields(names.clone()), + None => builder.reset_displayed_fields(), + } + } + + // We transpose the settings JSON struct into a real setting update. + if let Some(ref facet_types) = settings.faceted_attributes { + let facet_types = facet_types.clone().unwrap_or_else(|| HashMap::new()); + builder.set_faceted_fields(facet_types); + } + + // We transpose the settings JSON struct into a real setting update. + if let Some(ref criteria) = settings.criteria { + match criteria { + Some(criteria) => builder.set_criteria(criteria.clone()), + None => builder.reset_criteria(), + } + } + + let result = builder + .execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); + + match result { + Ok(()) => wtxn + .commit() + .and(Ok(UpdateResult::Other)) + .map_err(Into::into), + Err(e) => Err(e.into()), + } + } + + fn update_facets( + &self, + levels: &Facets, + update_builder: UpdateBuilder, + index: &Index, + ) -> anyhow::Result { + // We must use the write transaction of the update here. + let mut wtxn = index.write_txn()?; + let mut builder = update_builder.facets(&mut wtxn, index); + if let Some(value) = levels.level_group_size { + builder.level_group_size(value); + } + if let Some(value) = levels.min_level_size { + builder.min_level_size(value); + } + match builder.execute() { + Ok(()) => wtxn + .commit() + .and(Ok(UpdateResult::Other)) + .map_err(Into::into), + Err(e) => Err(e.into()), + } + } + + fn delete_documents( + &self, + document_ids: File, + update_builder: UpdateBuilder, + index: &Index, + ) -> anyhow::Result { + let ids: Vec = serde_json::from_reader(document_ids)?; + let mut txn = index.write_txn()?; + let mut builder = update_builder.delete_documents(&mut txn, index)?; + + // We ignore unexisting document ids + ids.iter().for_each(|id| { builder.delete_external_id(id); }); + + match builder.execute() { + Ok(deleted) => txn + .commit() + .and(Ok(UpdateResult::DocumentDeletion { deleted })) + .map_err(Into::into), + Err(e) => Err(e.into()) + } + } + + pub fn handle_update( + &self, + meta: Processing, + content: File, + index: &Index, + ) -> Result, Failed> { + use UpdateMeta::*; + + let update_id = meta.id(); + + let update_builder = self.update_buidler(update_id); + + let result = match meta.meta() { + DocumentsAddition { + method, + format, + primary_key, + } => self.update_documents( + *format, + *method, + content, + update_builder, + primary_key.as_deref(), + index, + ), + ClearDocuments => self.clear_documents(update_builder, index), + DeleteDocuments => self.delete_documents(content, update_builder, index), + Settings(settings) => self.update_settings(settings, update_builder, index), + Facets(levels) => self.update_facets(levels, update_builder, index), + }; + + match result { + Ok(result) => Ok(meta.process(result)), + Err(e) => Err(meta.fail(e.to_string())), + } + } +} diff --git a/src/index_controller/actor_index_controller/update_store.rs b/src/index_controller/actor_index_controller/update_store.rs new file mode 100644 index 000000000..371ac7bd9 --- /dev/null +++ b/src/index_controller/actor_index_controller/update_store.rs @@ -0,0 +1,423 @@ +use std::path::Path; +use std::sync::{Arc, RwLock}; +use std::io::{Cursor, SeekFrom, Seek}; + +use crossbeam_channel::Sender; +use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; +use heed::{EnvOpenOptions, Env, Database}; +use serde::{Serialize, Deserialize}; +use std::fs::File; +use uuid::Uuid; + +use crate::index_controller::updates::*; + +type BEU64 = heed::zerocopy::U64; + +#[derive(Clone)] +pub struct UpdateStore { + env: Env, + pending_meta: Database, SerdeJson>>, + pending: Database, ByteSlice>, + processed_meta: Database, SerdeJson>>, + failed_meta: Database, SerdeJson>>, + aborted_meta: Database, SerdeJson>>, + processing: Arc>>>, + notification_sender: Sender<()>, +} + +pub trait HandleUpdate { + fn handle_update(&mut self, meta: Processing, content: File) -> Result, Failed>; +} + +impl HandleUpdate for F +where F: FnMut(Processing, File) -> Result, Failed> +{ + fn handle_update(&mut self, meta: Processing, content: File) -> Result, Failed> { + self(meta, content) + } +} + +impl UpdateStore +where + M: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync + Clone, + N: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync, + E: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync, +{ + pub fn open( + mut options: EnvOpenOptions, + path: P, + mut update_handler: U, + ) -> heed::Result> + where + P: AsRef, + U: HandleUpdate + Send + 'static, + { + options.max_dbs(5); + + let env = options.open(path)?; + let pending_meta = env.create_database(Some("pending-meta"))?; + let pending = env.create_database(Some("pending"))?; + let processed_meta = env.create_database(Some("processed-meta"))?; + let aborted_meta = env.create_database(Some("aborted-meta"))?; + let failed_meta = env.create_database(Some("failed-meta"))?; + let processing = Arc::new(RwLock::new(None)); + + let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); + // Send a first notification to trigger the process. + let _ = notification_sender.send(()); + + let update_store = Arc::new(UpdateStore { + env, + pending, + pending_meta, + processed_meta, + aborted_meta, + notification_sender, + failed_meta, + processing, + }); + + // We need a weak reference so we can take ownership on the arc later when we + // want to close the index. + let update_store_weak = Arc::downgrade(&update_store); + std::thread::spawn(move || { + // Block and wait for something to process. + 'outer: for _ in notification_receiver { + loop { + match update_store_weak.upgrade() { + Some(update_store) => { + match update_store.process_pending_update(&mut update_handler) { + Ok(Some(_)) => (), + Ok(None) => break, + Err(e) => eprintln!("error while processing update: {}", e), + } + } + // the ownership on the arc has been taken, we need to exit. + None => break 'outer, + } + } + } + }); + + Ok(update_store) + } + + pub fn prepare_for_closing(self) -> heed::EnvClosingEvent { + self.env.prepare_for_closing() + } + + /// Returns the new biggest id to use to store the new update. + fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { + let last_pending = self.pending_meta + .remap_data_type::() + .last(txn)? + .map(|(k, _)| k.get()); + + let last_processed = self.processed_meta + .remap_data_type::() + .last(txn)? + .map(|(k, _)| k.get()); + + let last_aborted = self.aborted_meta + .remap_data_type::() + .last(txn)? + .map(|(k, _)| k.get()); + + let last_update_id = [last_pending, last_processed, last_aborted] + .iter() + .copied() + .flatten() + .max(); + + match last_update_id { + Some(last_id) => Ok(last_id + 1), + None => Ok(0), + } + } + + /// Registers the update content in the pending store and the meta + /// into the pending-meta store. Returns the new unique update id. + pub fn register_update( + &self, + meta: M, + content: &[u8], + index_uuid: Uuid, + ) -> heed::Result> { + let mut wtxn = self.env.write_txn()?; + + // We ask the update store to give us a new update id, this is safe, + // no other update can have the same id because we use a write txn before + // asking for the id and registering it so other update registering + // will be forced to wait for a new write txn. + let update_id = self.new_update_id(&wtxn)?; + let update_key = BEU64::new(update_id); + + let meta = Pending::new(meta, update_id, index_uuid); + self.pending_meta.put(&mut wtxn, &update_key, &meta)?; + self.pending.put(&mut wtxn, &update_key, content)?; + + wtxn.commit()?; + + if let Err(e) = self.notification_sender.try_send(()) { + assert!(!e.is_disconnected(), "update notification channel is disconnected"); + } + Ok(meta) + } + /// Executes the user provided function on the next pending update (the one with the lowest id). + /// This is asynchronous as it let the user process the update with a read-only txn and + /// only writing the result meta to the processed-meta store *after* it has been processed. + fn process_pending_update(&self, handler: &mut U) -> heed::Result> + where + U: HandleUpdate + Send + 'static, + { + // Create a read transaction to be able to retrieve the pending update in order. + let rtxn = self.env.read_txn()?; + let first_meta = self.pending_meta.first(&rtxn)?; + + // If there is a pending update we process and only keep + // a reader while processing it, not a writer. + match first_meta { + Some((first_id, pending)) => { + let first_content = self.pending + .get(&rtxn, &first_id)? + .expect("associated update content"); + + // we change the state of the update from pending to processing before we pass it + // to the update handler. Processing store is non persistent to be able recover + // from a failure + let processing = pending.processing(); + self.processing + .write() + .unwrap() + .replace(processing.clone()); + let mut cursor = Cursor::new(first_content); + let mut file = tempfile::tempfile()?; + std::io::copy(&mut cursor, &mut file)?; + file.seek(SeekFrom::Start(0))?; + // Process the pending update using the provided user function. + let result = handler.handle_update(processing, file); + drop(rtxn); + + // Once the pending update have been successfully processed + // we must remove the content from the pending and processing stores and + // write the *new* meta to the processed-meta store and commit. + let mut wtxn = self.env.write_txn()?; + self.processing + .write() + .unwrap() + .take(); + self.pending_meta.delete(&mut wtxn, &first_id)?; + self.pending.delete(&mut wtxn, &first_id)?; + match result { + Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?, + Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?, + } + wtxn.commit()?; + + Ok(Some(())) + }, + None => Ok(None) + } + } + + /// Execute the user defined function with the meta-store iterators, the first + /// iterator is the *processed* meta one, the second the *aborted* meta one + /// and, the last is the *pending* meta one. + pub fn iter_metas(&self, mut f: F) -> heed::Result + where + F: for<'a> FnMut( + Option>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + heed::RoIter<'a, OwnedType, SerdeJson>>, + ) -> heed::Result, + { + let rtxn = self.env.read_txn()?; + + // We get the pending, processed and aborted meta iterators. + let processed_iter = self.processed_meta.iter(&rtxn)?; + let aborted_iter = self.aborted_meta.iter(&rtxn)?; + let pending_iter = self.pending_meta.iter(&rtxn)?; + let processing = self.processing.read().unwrap().clone(); + let failed_iter = self.failed_meta.iter(&rtxn)?; + + // We execute the user defined function with both iterators. + (f)(processing, processed_iter, aborted_iter, pending_iter, failed_iter) + } + + /// Returns the update associated meta or `None` if the update doesn't exist. + pub fn meta(&self, update_id: u64) -> heed::Result>> { + let rtxn = self.env.read_txn()?; + let key = BEU64::new(update_id); + + if let Some(ref meta) = *self.processing.read().unwrap() { + if meta.id() == update_id { + return Ok(Some(UpdateStatus::Processing(meta.clone()))); + } + } + + if let Some(meta) = self.pending_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Pending(meta))); + } + + if let Some(meta) = self.processed_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Processed(meta))); + } + + if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Aborted(meta))); + } + + if let Some(meta) = self.failed_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatus::Failed(meta))); + } + + Ok(None) + } + + /// Aborts an update, an aborted update content is deleted and + /// the meta of it is moved into the aborted updates database. + /// + /// Trying to abort an update that is currently being processed, an update + /// that as already been processed or which doesn't actually exist, will + /// return `None`. + #[allow(dead_code)] + pub fn abort_update(&self, update_id: u64) -> heed::Result>> { + let mut wtxn = self.env.write_txn()?; + let key = BEU64::new(update_id); + + // We cannot abort an update that is currently being processed. + if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) { + return Ok(None); + } + + let pending = match self.pending_meta.get(&wtxn, &key)? { + Some(meta) => meta, + None => return Ok(None), + }; + + let aborted = pending.abort(); + + self.aborted_meta.put(&mut wtxn, &key, &aborted)?; + self.pending_meta.delete(&mut wtxn, &key)?; + self.pending.delete(&mut wtxn, &key)?; + + wtxn.commit()?; + + Ok(Some(aborted)) + } + + /// Aborts all the pending updates, and not the one being currently processed. + /// Returns the update metas and ids that were successfully aborted. + #[allow(dead_code)] + pub fn abort_pendings(&self) -> heed::Result)>> { + let mut wtxn = self.env.write_txn()?; + let mut aborted_updates = Vec::new(); + + // We skip the first pending update as it is currently being processed. + for result in self.pending_meta.iter(&wtxn)?.skip(1) { + let (key, pending) = result?; + let id = key.get(); + aborted_updates.push((id, pending.abort())); + } + + for (id, aborted) in &aborted_updates { + let key = BEU64::new(*id); + self.aborted_meta.put(&mut wtxn, &key, &aborted)?; + self.pending_meta.delete(&mut wtxn, &key)?; + self.pending.delete(&mut wtxn, &key)?; + } + + wtxn.commit()?; + + Ok(aborted_updates) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread; + use std::time::{Duration, Instant}; + + impl HandleUpdate for F + where F: FnMut(Processing, &[u8]) -> Result, Failed> + Send + 'static { + fn handle_update(&mut self, meta: Processing, content: &[u8]) -> Result, Failed> { + self(meta, content) + } + } + + #[test] + fn simple() { + let dir = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(4096 * 100); + let update_store = UpdateStore::open(options, dir, |meta: Processing, _content: &_| -> Result<_, Failed<_, ()>> { + let new_meta = meta.meta().to_string() + " processed"; + let processed = meta.process(new_meta); + Ok(processed) + }).unwrap(); + + let meta = String::from("kiki"); + let update = update_store.register_update(meta, &[]).unwrap(); + thread::sleep(Duration::from_millis(100)); + let meta = update_store.meta(update.id()).unwrap().unwrap(); + if let UpdateStatus::Processed(Processed { success, .. }) = meta { + assert_eq!(success, "kiki processed"); + } else { + panic!() + } + } + + #[test] + #[ignore] + fn long_running_update() { + let dir = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(4096 * 100); + let update_store = UpdateStore::open(options, dir, |meta: Processing, _content:&_| -> Result<_, Failed<_, ()>> { + thread::sleep(Duration::from_millis(400)); + let new_meta = meta.meta().to_string() + "processed"; + let processed = meta.process(new_meta); + Ok(processed) + }).unwrap(); + + let before_register = Instant::now(); + + let meta = String::from("kiki"); + let update_kiki = update_store.register_update(meta, &[]).unwrap(); + assert!(before_register.elapsed() < Duration::from_millis(200)); + + let meta = String::from("coco"); + let update_coco = update_store.register_update(meta, &[]).unwrap(); + assert!(before_register.elapsed() < Duration::from_millis(200)); + + let meta = String::from("cucu"); + let update_cucu = update_store.register_update(meta, &[]).unwrap(); + assert!(before_register.elapsed() < Duration::from_millis(200)); + + thread::sleep(Duration::from_millis(400 * 3 + 100)); + + let meta = update_store.meta(update_kiki.id()).unwrap().unwrap(); + if let UpdateStatus::Processed(Processed { success, .. }) = meta { + assert_eq!(success, "kiki processed"); + } else { + panic!() + } + + let meta = update_store.meta(update_coco.id()).unwrap().unwrap(); + if let UpdateStatus::Processed(Processed { success, .. }) = meta { + assert_eq!(success, "coco processed"); + } else { + panic!() + } + + let meta = update_store.meta(update_cucu.id()).unwrap().unwrap(); + if let UpdateStatus::Processed(Processed { success, .. }) = meta { + assert_eq!(success, "cucu processed"); + } else { + panic!() + } + } +} diff --git a/src/index_controller/actor_index_controller/uuid_resolver.rs b/src/index_controller/actor_index_controller/uuid_resolver.rs index b75c0402c..3143ae8fc 100644 --- a/src/index_controller/actor_index_controller/uuid_resolver.rs +++ b/src/index_controller/actor_index_controller/uuid_resolver.rs @@ -4,7 +4,7 @@ use uuid::Uuid; use std::collections::HashMap; use std::sync::Arc; use std::collections::hash_map::Entry; -use log::info; +use log::{info, warn}; pub type Result = std::result::Result; @@ -22,8 +22,6 @@ enum UuidResolveMsg { name: String, ret: oneshot::Sender>, }, - Shutdown, - } struct UuidResolverActor { @@ -46,11 +44,13 @@ impl UuidResolverActor { match self.inbox.recv().await { Some(Create { name, ret }) => self.handle_create(name, ret).await, Some(GetOrCreate { name, ret }) => self.handle_get_or_create(name, ret).await, - Some(_) => {} + Some(Resolve { name, ret }) => self.handle_resolve(name, ret).await, // all senders have been dropped, need to quit. None => break, } } + + warn!("exiting uuid resolver loop"); } async fn handle_create(&self, name: String, ret: oneshot::Sender>) { @@ -62,6 +62,11 @@ impl UuidResolverActor { let result = self.store.create_uuid(name, false).await; let _ = ret.send(result); } + + async fn handle_resolve(&self, name: String, ret: oneshot::Sender>>) { + let result = self.store.get_uuid(name).await; + let _ = ret.send(result); + } } #[derive(Clone)] diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index dc6cc3863..cee276ea0 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -13,6 +13,8 @@ use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; use uuid::Uuid; use tokio::fs::File; +use crate::data::SearchResult; +use crate::data::SearchQuery; pub use updates::{Processed, Processing, Failed}; @@ -135,6 +137,8 @@ pub trait IndexController { primary_key: Option, ) -> anyhow::Result; + async fn search(&self, name: String, query: SearchQuery) -> Result; + /// Clear all documents in the given index. fn clear_documents(&self, index: String) -> anyhow::Result; diff --git a/src/routes/search.rs b/src/routes/search.rs index 7919c5412..1f4218555 100644 --- a/src/routes/search.rs +++ b/src/routes/search.rs @@ -83,7 +83,7 @@ async fn search_with_url_query( return Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) } }; - let search_result = data.search(&path.index_uid, query); + let search_result = data.search(&path.index_uid, query).await; match search_result { Ok(docs) => { let docs = serde_json::to_string(&docs).unwrap(); @@ -101,7 +101,7 @@ async fn search_with_post( path: web::Path, params: web::Json, ) -> Result { - let search_result = data.search(&path.index_uid, params.into_inner()); + let search_result = data.search(&path.index_uid, params.into_inner()).await; match search_result { Ok(docs) => { let docs = serde_json::to_string(&docs).unwrap();