From d76634a36c65c51c0d364a75c3c3b8229f5a5ff3 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 3 Oct 2022 16:53:07 +0200 Subject: [PATCH] Remove the Index wrapper and use milli::Index directly --- index-scheduler/src/index_mapper.rs | 38 ++- index-scheduler/src/lib.rs | 3 +- index/src/index.rs | 311 ------------------------ index/src/lib.rs | 260 +-------------------- index/src/search.rs | 351 ++++++++++++++-------------- index/src/updates.rs | 132 +---------- 6 files changed, 196 insertions(+), 899 deletions(-) delete mode 100644 index/src/index.rs diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index e8720821f..f39af072b 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -1,18 +1,14 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::fs; use std::path::PathBuf; -use std::sync::Arc; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; -use index::Index; +use milli::Index; use uuid::Uuid; -use milli::heed::types::SerdeBincode; -use milli::heed::types::Str; -use milli::heed::Database; -use milli::heed::Env; -use milli::heed::RoTxn; -use milli::heed::RwTxn; +use milli::heed::types::{SerdeBincode, Str}; +use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; use milli::update::IndexerConfig; use crate::{Error, Result}; @@ -56,12 +52,12 @@ impl IndexMapper { Err(Error::IndexNotFound(_)) => { let uuid = Uuid::new_v4(); self.index_mapping.put(wtxn, name, &uuid)?; - Index::open( - self.base_path.join(uuid.to_string()), - name.to_string(), - self.index_size, - self.indexer_config.clone(), - )? + + let index_path = self.base_path.join(uuid.to_string()); + fs::create_dir_all(&index_path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(self.index_size); + milli::Index::new(options, &index_path)? } error => return error, }; @@ -91,12 +87,12 @@ impl IndexMapper { // the entry method. match index_map.entry(uuid) { Entry::Vacant(entry) => { - let index = Index::open( - self.base_path.join(uuid.to_string()), - name.to_string(), - self.index_size, - self.indexer_config.clone(), - )?; + let index_path = self.base_path.join(uuid.to_string()); + fs::create_dir_all(&index_path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(self.index_size); + let index = milli::Index::new(options, &index_path)?; + entry.insert(index.clone()); index } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 84c76198f..921980ac7 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -17,7 +17,6 @@ use std::path::PathBuf; use std::sync::{Arc, RwLock}; use file_store::{File, FileStore}; -use index::Index; use roaring::RoaringBitmap; use serde::Deserialize; use synchronoise::SignalEvent; @@ -27,7 +26,7 @@ use uuid::Uuid; use milli::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use milli::heed::{self, Database, Env}; use milli::update::IndexerConfig; -use milli::{RoaringBitmapCodec, BEU32}; +use milli::{Index, RoaringBitmapCodec, BEU32}; use crate::index_mapper::IndexMapper; use crate::task::Task; diff --git a/index/src/index.rs b/index/src/index.rs deleted file mode 100644 index 2a1d67e46..000000000 --- a/index/src/index.rs +++ /dev/null @@ -1,311 +0,0 @@ -use std::collections::BTreeSet; -use std::fs::create_dir_all; -use std::marker::PhantomData; -use std::ops::Deref; -use std::path::Path; -use std::sync::Arc; - -use fst::IntoStreamer; -use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn}; -use milli::update::{IndexerConfig, Setting}; -use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET}; -use serde_json::{Map, Value}; -use time::OffsetDateTime; - -use crate::search::DEFAULT_PAGINATION_MAX_TOTAL_HITS; - -use super::error::IndexError; -use super::error::Result; -use super::updates::{FacetingSettings, MinWordSizeTyposSetting, PaginationSettings, TypoSettings}; -use super::{Checked, Settings}; - -pub type Document = Map; - -#[derive(Clone, derivative::Derivative)] -#[derivative(Debug)] -pub struct Index { - pub name: String, - #[derivative(Debug = "ignore")] - pub inner: Arc, - #[derivative(Debug = "ignore")] - pub indexer_config: Arc, -} - -impl Deref for Index { - type Target = milli::Index; - - fn deref(&self) -> &Self::Target { - self.inner.as_ref() - } -} - -impl Index { - pub fn open( - path: impl AsRef, - name: String, - size: usize, - update_handler: Arc, - ) -> Result { - log::debug!("opening index in {}", path.as_ref().display()); - create_dir_all(&path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let inner = Arc::new(milli::Index::new(options, &path)?); - Ok(Index { - name, - inner, - indexer_config: update_handler, - }) - } - - /// Asynchronously close the underlying index - pub fn close(self) { - self.inner.as_ref().clone().prepare_for_closing(); - } - - pub fn delete(self) -> Result<()> { - let path = self.path().to_path_buf(); - self.inner.as_ref().clone().prepare_for_closing().wait(); - std::fs::remove_file(path)?; - - Ok(()) - } - - pub fn number_of_documents(&self) -> Result { - let rtxn = self.read_txn()?; - Ok(self.inner.number_of_documents(&rtxn)?) - } - - pub fn field_distribution(&self) -> Result { - let rtxn = self.read_txn()?; - Ok(self.inner.field_distribution(&rtxn)?) - } - - pub fn settings(&self) -> Result> { - let txn = self.read_txn()?; - self.settings_txn(&txn) - } - - pub fn name(&self) -> &str { - &self.name - } - - pub fn settings_txn(&self, txn: &RoTxn) -> Result> { - let displayed_attributes = self - .displayed_fields(txn)? - .map(|fields| fields.into_iter().map(String::from).collect()); - - let searchable_attributes = self - .user_defined_searchable_fields(txn)? - .map(|fields| fields.into_iter().map(String::from).collect()); - - let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect(); - - let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect(); - - let criteria = self - .criteria(txn)? - .into_iter() - .map(|c| c.to_string()) - .collect(); - - let stop_words = self - .stop_words(txn)? - .map(|stop_words| -> Result> { - Ok(stop_words.stream().into_strs()?.into_iter().collect()) - }) - .transpose()? - .unwrap_or_default(); - let distinct_field = self.distinct_field(txn)?.map(String::from); - - // in milli each word in the synonyms map were split on their separator. Since we lost - // this information we are going to put space between words. - let synonyms = self - .synonyms(txn)? - .iter() - .map(|(key, values)| { - ( - key.join(" "), - values.iter().map(|value| value.join(" ")).collect(), - ) - }) - .collect(); - - let min_typo_word_len = MinWordSizeTyposSetting { - one_typo: Setting::Set(self.min_word_len_one_typo(txn)?), - two_typos: Setting::Set(self.min_word_len_two_typos(txn)?), - }; - - let disabled_words = match self.exact_words(txn)? { - Some(fst) => fst.into_stream().into_strs()?.into_iter().collect(), - None => BTreeSet::new(), - }; - - let disabled_attributes = self - .exact_attributes(txn)? - .into_iter() - .map(String::from) - .collect(); - - let typo_tolerance = TypoSettings { - enabled: Setting::Set(self.authorize_typos(txn)?), - min_word_size_for_typos: Setting::Set(min_typo_word_len), - disable_on_words: Setting::Set(disabled_words), - disable_on_attributes: Setting::Set(disabled_attributes), - }; - - let faceting = FacetingSettings { - max_values_per_facet: Setting::Set( - self.max_values_per_facet(txn)? - .unwrap_or(DEFAULT_VALUES_PER_FACET), - ), - }; - - let pagination = PaginationSettings { - max_total_hits: Setting::Set( - self.pagination_max_total_hits(txn)? - .unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS), - ), - }; - - Ok(Settings { - displayed_attributes: match displayed_attributes { - Some(attrs) => Setting::Set(attrs), - None => Setting::Reset, - }, - searchable_attributes: match searchable_attributes { - Some(attrs) => Setting::Set(attrs), - None => Setting::Reset, - }, - filterable_attributes: Setting::Set(filterable_attributes), - sortable_attributes: Setting::Set(sortable_attributes), - ranking_rules: Setting::Set(criteria), - stop_words: Setting::Set(stop_words), - distinct_attribute: match distinct_field { - Some(field) => Setting::Set(field), - None => Setting::Reset, - }, - synonyms: Setting::Set(synonyms), - typo_tolerance: Setting::Set(typo_tolerance), - faceting: Setting::Set(faceting), - pagination: Setting::Set(pagination), - _kind: PhantomData, - }) - } - - /// Return the total number of documents contained in the index + the selected documents. - pub fn retrieve_documents>( - &self, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result<(u64, Vec)> { - let rtxn = self.read_txn()?; - - let mut documents = Vec::new(); - for document in self.all_documents(&rtxn)?.skip(offset).take(limit) { - let document = match &attributes_to_retrieve { - Some(attributes_to_retrieve) => permissive_json_pointer::select_values( - &document?, - attributes_to_retrieve.iter().map(|s| s.as_ref()), - ), - None => document?, - }; - documents.push(document); - } - let number_of_documents = self.inner.number_of_documents(&rtxn)?; - - Ok((number_of_documents, documents)) - } - - pub fn retrieve_document>( - &self, - doc_id: &str, - attributes_to_retrieve: Option>, - ) -> Result { - let txn = self.read_txn()?; - - let fields_ids_map = self.fields_ids_map(&txn)?; - let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - - let internal_id = self - .external_documents_ids(&txn)? - .get(doc_id.as_bytes()) - .ok_or_else(|| IndexError::DocumentNotFound(doc_id.to_string()))?; - - let document = self - .documents(&txn, std::iter::once(internal_id))? - .into_iter() - .next() - .map(|(_, d)| d) - .ok_or_else(|| IndexError::DocumentNotFound(doc_id.to_string()))?; - - let document = obkv_to_json(&all_fields, &fields_ids_map, document)?; - let document = match &attributes_to_retrieve { - Some(attributes_to_retrieve) => permissive_json_pointer::select_values( - &document, - attributes_to_retrieve.iter().map(|s| s.as_ref()), - ), - None => document, - }; - - Ok(document) - } - - pub fn all_documents<'a>( - &self, - rtxn: &'a RoTxn, - ) -> Result> + 'a> { - let fields_ids_map = self.fields_ids_map(rtxn)?; - let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - - Ok(self.inner.all_documents(rtxn)?.map(move |ret| { - ret.map_err(IndexError::from) - .and_then(|(_key, document)| -> Result<_> { - Ok(obkv_to_json(&all_fields, &fields_ids_map, document)?) - }) - })) - } - - pub fn created_at(&self) -> Result { - let rtxn = self.read_txn()?; - Ok(self.inner.created_at(&rtxn)?) - } - - pub fn updated_at(&self) -> Result { - let rtxn = self.read_txn()?; - Ok(self.inner.updated_at(&rtxn)?) - } - - pub fn primary_key(&self) -> Result> { - let rtxn = self.read_txn()?; - Ok(self.inner.primary_key(&rtxn)?.map(str::to_string)) - } - - pub fn size(&self) -> Result { - Ok(self.inner.on_disk_size()?) - } - - pub fn snapshot(&self, path: impl AsRef) -> Result<()> { - let mut dst = path.as_ref().join(format!("indexes/{}/", self.name)); - create_dir_all(&dst)?; - dst.push("data.mdb"); - let _txn = self.write_txn()?; - self.inner.copy_to_path(dst, CompactionOption::Enabled)?; - Ok(()) - } -} - -/// When running tests, when a server instance is dropped, the environment is not actually closed, -/// leaving a lot of open file descriptors. -impl Drop for Index { - fn drop(&mut self) { - // When dropping the last instance of an index, we want to close the index - // Note that the close is actually performed only if all the instances a effectively - // dropped - - if Arc::strong_count(&self.inner) == 1 { - self.inner.as_ref().clone().prepare_for_closing(); - } - } -} diff --git a/index/src/lib.rs b/index/src/lib.rs index 2662b7d05..401e77286 100644 --- a/index/src/lib.rs +++ b/index/src/lib.rs @@ -4,265 +4,11 @@ pub use search::{ }; pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecked}; +use serde_json::{Map, Value}; + // mod dump; pub mod error; mod search; pub mod updates; -#[allow(clippy::module_inception)] -mod index; - -pub use self::index::Document; - -#[cfg(not(test))] -pub use self::index::Index; - -#[cfg(test)] -pub use test::MockIndex as Index; - -/// The index::test module provides means of mocking an index instance. I can be used throughout the -/// code for unit testing, in places where an index would normally be used. -#[cfg(test)] -pub mod test { - use std::path::{Path, PathBuf}; - use std::sync::Arc; - - use milli::update::{ - DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, IndexerConfig, - }; - use milli::FieldDistribution; - use nelson::Mocker; - use time::OffsetDateTime; - use uuid::Uuid; - - use super::error::Result; - use super::index::Index; - use super::Document; - use super::{Checked, SearchQuery, SearchResult, Settings}; - use file_store::FileStore; - - #[derive(Clone)] - pub enum MockIndex { - Real(Index), - Mock(Arc), - } - - impl MockIndex { - pub fn mock(mocker: Mocker) -> Self { - Self::Mock(Arc::new(mocker)) - } - - pub fn open( - path: impl AsRef, - name: String, - size: usize, - update_handler: Arc, - ) -> Result { - let index = Index::open(path, name, size, update_handler)?; - Ok(Self::Real(index)) - } - - /* - pub fn load_dump( - src: impl AsRef, - dst: impl AsRef, - size: usize, - update_handler: &IndexerConfig, - ) -> anyhow::Result<()> { - Index::load_dump(src, dst, size, update_handler) - } - */ - - pub fn settings(&self) -> Result> { - match self { - MockIndex::Real(index) => index.settings(), - MockIndex::Mock(_) => todo!(), - } - } - - pub fn retrieve_documents>( - &self, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result<(u64, Vec)> { - match self { - MockIndex::Real(index) => { - index.retrieve_documents(offset, limit, attributes_to_retrieve) - } - MockIndex::Mock(_) => todo!(), - } - } - - pub fn retrieve_document>( - &self, - doc_id: &str, - attributes_to_retrieve: Option>, - ) -> Result { - match self { - MockIndex::Real(index) => index.retrieve_document(doc_id, attributes_to_retrieve), - MockIndex::Mock(_) => todo!(), - } - } - - pub fn size(&self) -> Result { - match self { - MockIndex::Real(index) => index.size(), - MockIndex::Mock(_) => todo!(), - } - } - - pub fn snapshot(&self, path: impl AsRef) -> Result<()> { - match self { - MockIndex::Real(index) => index.snapshot(path), - MockIndex::Mock(m) => unsafe { m.get("snapshot").call(path.as_ref()) }, - } - } - - pub fn close(self) { - match self { - MockIndex::Real(index) => index.close(), - MockIndex::Mock(m) => unsafe { m.get("close").call(()) }, - } - } - - pub fn delete(self) -> Result<()> { - match self { - MockIndex::Real(index) => index.delete(), - MockIndex::Mock(m) => unsafe { m.get("delete").call(()) }, - } - } - - pub fn number_of_documents(&self) -> Result { - match self { - MockIndex::Real(index) => index.number_of_documents(), - MockIndex::Mock(m) => unsafe { m.get("number_of_documents").call(()) }, - } - } - - pub fn field_distribution(&self) -> Result { - match self { - MockIndex::Real(index) => index.field_distribution(), - MockIndex::Mock(m) => unsafe { m.get("field_distribution").call(()) }, - } - } - - pub fn perform_search(&self, query: SearchQuery) -> Result { - match self { - MockIndex::Real(index) => index.perform_search(query), - MockIndex::Mock(m) => unsafe { m.get("perform_search").call(query) }, - } - } - - pub fn update_documents( - &self, - method: IndexDocumentsMethod, - primary_key: Option, - file_store: FileStore, - contents: impl Iterator, - ) -> Result>> { - match self { - MockIndex::Real(index) => { - index.update_documents(method, primary_key, file_store, contents) - } - MockIndex::Mock(mocker) => unsafe { - mocker - .get("update_documents") - .call((method, primary_key, file_store, contents)) - }, - } - } - - pub fn update_settings(&self, settings: &Settings) -> Result<()> { - match self { - MockIndex::Real(index) => index.update_settings(settings), - MockIndex::Mock(m) => unsafe { m.get("update_settings").call(settings) }, - } - } - - pub fn update_primary_key(&self, primary_key: String) -> Result<()> { - match self { - MockIndex::Real(index) => index.update_primary_key(primary_key), - MockIndex::Mock(m) => unsafe { m.get("update_primary_key").call(primary_key) }, - } - } - - pub fn delete_documents(&self, ids: &[String]) -> Result { - match self { - MockIndex::Real(index) => index.delete_documents(ids), - MockIndex::Mock(m) => unsafe { m.get("delete_documents").call(ids) }, - } - } - - pub fn clear_documents(&self) -> Result<()> { - match self { - MockIndex::Real(index) => index.clear_documents(), - MockIndex::Mock(m) => unsafe { m.get("clear_documents").call(()) }, - } - } - - pub fn created_at(&self) -> Result { - match self { - MockIndex::Real(index) => index.created_at(), - MockIndex::Mock(m) => unsafe { m.get("created_ad").call(()) }, - } - } - - pub fn updated_at(&self) -> Result { - match self { - MockIndex::Real(index) => index.updated_at(), - MockIndex::Mock(m) => unsafe { m.get("updated_ad").call(()) }, - } - } - - pub fn primary_key(&self) -> Result> { - match self { - MockIndex::Real(index) => index.primary_key(), - MockIndex::Mock(m) => unsafe { m.get("primary_key").call(()) }, - } - } - } - - #[test] - fn test_faux_index() { - let faux = Mocker::default(); - faux.when("snapshot") - .times(2) - .then(|_: &Path| -> Result<()> { Ok(()) }); - - let index = MockIndex::mock(faux); - - let path = PathBuf::from("hello"); - index.snapshot(&path).unwrap(); - index.snapshot(&path).unwrap(); - } - - #[test] - #[should_panic] - fn test_faux_unexisting_method_stub() { - let faux = Mocker::default(); - - let index = MockIndex::mock(faux); - - let path = PathBuf::from("hello"); - index.snapshot(&path).unwrap(); - index.snapshot(&path).unwrap(); - } - - #[test] - #[should_panic] - fn test_faux_panic() { - let faux = Mocker::default(); - faux.when("snapshot") - .times(2) - .then(|_: &Path| -> Result<()> { - panic!(); - }); - - let index = MockIndex::mock(faux); - - let path = PathBuf::from("hello"); - index.snapshot(&path).unwrap(); - index.snapshot(&path).unwrap(); - } -} +pub type Document = Map; diff --git a/index/src/search.rs b/index/src/search.rs index e53bb6476..fdd785c73 100644 --- a/index/src/search.rs +++ b/index/src/search.rs @@ -6,8 +6,8 @@ use std::time::Instant; use either::Either; use milli::tokenizer::TokenizerBuilder; use milli::{ - AscDesc, FieldId, FieldsIdsMap, Filter, FormatOptions, MatchBounds, MatcherBuilder, SortError, - TermsMatchingStrategy, DEFAULT_VALUES_PER_FACET, + AscDesc, FieldId, FieldsIdsMap, Filter, FormatOptions, Index, MatchBounds, MatcherBuilder, + SortError, TermsMatchingStrategy, DEFAULT_VALUES_PER_FACET, }; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -16,7 +16,6 @@ use serde_json::{json, Value}; use crate::error::FacetError; use super::error::{IndexError, Result}; -use super::index::Index; pub type Document = serde_json::Map; type MatchesPosition = BTreeMap>; @@ -106,183 +105,181 @@ pub struct SearchResult { pub facet_distribution: Option>>, } -impl Index { - pub fn perform_search(&self, query: SearchQuery) -> Result { - let before_search = Instant::now(); - let rtxn = self.read_txn()?; +pub fn perform_search(index: &Index, query: SearchQuery) -> Result { + let before_search = Instant::now(); + let rtxn = index.read_txn()?; - let mut search = self.search(&rtxn); + let mut search = index.search(&rtxn); - if let Some(ref query) = query.q { - search.query(query); - } - - search.terms_matching_strategy(query.matching_strategy.into()); - - let max_total_hits = self - .pagination_max_total_hits(&rtxn)? - .unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS); - - // Make sure that a user can't get more documents than the hard limit, - // we align that on the offset too. - let offset = min(query.offset.unwrap_or(0), max_total_hits); - let limit = min(query.limit, max_total_hits.saturating_sub(offset)); - - search.offset(offset); - search.limit(limit); - - if let Some(ref filter) = query.filter { - if let Some(facets) = parse_filter(filter)? { - search.filter(facets); - } - } - - if let Some(ref sort) = query.sort { - let sort = match sort.iter().map(|s| AscDesc::from_str(s)).collect() { - Ok(sorts) => sorts, - Err(asc_desc_error) => { - return Err(IndexError::Milli(SortError::from(asc_desc_error).into())) - } - }; - - search.sort_criteria(sort); - } - - let milli::SearchResult { - documents_ids, - matching_words, - candidates, - .. - } = search.execute()?; - - let fields_ids_map = self.fields_ids_map(&rtxn).unwrap(); - - let displayed_ids = self - .displayed_fields_ids(&rtxn)? - .map(|fields| fields.into_iter().collect::>()) - .unwrap_or_else(|| fields_ids_map.iter().map(|(id, _)| id).collect()); - - let fids = |attrs: &BTreeSet| { - let mut ids = BTreeSet::new(); - for attr in attrs { - if attr == "*" { - ids = displayed_ids.clone(); - break; - } - - if let Some(id) = fields_ids_map.id(attr) { - ids.insert(id); - } - } - ids - }; - - // The attributes to retrieve are the ones explicitly marked as to retrieve (all by default), - // but these attributes must be also be present - // - in the fields_ids_map - // - in the the displayed attributes - let to_retrieve_ids: BTreeSet<_> = query - .attributes_to_retrieve - .as_ref() - .map(fids) - .unwrap_or_else(|| displayed_ids.clone()) - .intersection(&displayed_ids) - .cloned() - .collect(); - - let attr_to_highlight = query.attributes_to_highlight.unwrap_or_default(); - - let attr_to_crop = query.attributes_to_crop.unwrap_or_default(); - - // Attributes in `formatted_options` correspond to the attributes that will be in `_formatted` - // These attributes are: - // - the attributes asked to be highlighted or cropped (with `attributesToCrop` or `attributesToHighlight`) - // - the attributes asked to be retrieved: these attributes will not be highlighted/cropped - // But these attributes must be also present in displayed attributes - let formatted_options = compute_formatted_options( - &attr_to_highlight, - &attr_to_crop, - query.crop_length, - &to_retrieve_ids, - &fields_ids_map, - &displayed_ids, - ); - - let tokenizer = TokenizerBuilder::default().build(); - - let mut formatter_builder = MatcherBuilder::new(matching_words, tokenizer); - formatter_builder.crop_marker(query.crop_marker); - formatter_builder.highlight_prefix(query.highlight_pre_tag); - formatter_builder.highlight_suffix(query.highlight_post_tag); - - let mut documents = Vec::new(); - - let documents_iter = self.documents(&rtxn, documents_ids)?; - - for (_id, obkv) in documents_iter { - // First generate a document with all the displayed fields - let displayed_document = make_document(&displayed_ids, &fields_ids_map, obkv)?; - - // select the attributes to retrieve - let attributes_to_retrieve = to_retrieve_ids - .iter() - .map(|&fid| fields_ids_map.name(fid).expect("Missing field name")); - let mut document = - permissive_json_pointer::select_values(&displayed_document, attributes_to_retrieve); - - let (matches_position, formatted) = format_fields( - &displayed_document, - &fields_ids_map, - &formatter_builder, - &formatted_options, - query.show_matches_position, - &displayed_ids, - )?; - - if let Some(sort) = query.sort.as_ref() { - insert_geo_distance(sort, &mut document); - } - - let hit = SearchHit { - document, - formatted, - matches_position, - }; - documents.push(hit); - } - - let estimated_total_hits = candidates.len(); - - let facet_distribution = match query.facets { - Some(ref fields) => { - let mut facet_distribution = self.facets_distribution(&rtxn); - - let max_values_by_facet = self - .max_values_per_facet(&rtxn)? - .unwrap_or(DEFAULT_VALUES_PER_FACET); - facet_distribution.max_values_per_facet(max_values_by_facet); - - if fields.iter().all(|f| f != "*") { - facet_distribution.facets(fields); - } - let distribution = facet_distribution.candidates(candidates).execute()?; - - Some(distribution) - } - None => None, - }; - - let result = SearchResult { - hits: documents, - estimated_total_hits, - query: query.q.clone().unwrap_or_default(), - limit: query.limit, - offset: query.offset.unwrap_or_default(), - processing_time_ms: before_search.elapsed().as_millis(), - facet_distribution, - }; - Ok(result) + if let Some(ref query) = query.q { + search.query(query); } + + search.terms_matching_strategy(query.matching_strategy.into()); + + let max_total_hits = index + .pagination_max_total_hits(&rtxn)? + .unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS); + + // Make sure that a user can't get more documents than the hard limit, + // we align that on the offset too. + let offset = min(query.offset.unwrap_or(0), max_total_hits); + let limit = min(query.limit, max_total_hits.saturating_sub(offset)); + + search.offset(offset); + search.limit(limit); + + if let Some(ref filter) = query.filter { + if let Some(facets) = parse_filter(filter)? { + search.filter(facets); + } + } + + if let Some(ref sort) = query.sort { + let sort = match sort.iter().map(|s| AscDesc::from_str(s)).collect() { + Ok(sorts) => sorts, + Err(asc_desc_error) => { + return Err(IndexError::Milli(SortError::from(asc_desc_error).into())) + } + }; + + search.sort_criteria(sort); + } + + let milli::SearchResult { + documents_ids, + matching_words, + candidates, + .. + } = search.execute()?; + + let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + + let displayed_ids = index + .displayed_fields_ids(&rtxn)? + .map(|fields| fields.into_iter().collect::>()) + .unwrap_or_else(|| fields_ids_map.iter().map(|(id, _)| id).collect()); + + let fids = |attrs: &BTreeSet| { + let mut ids = BTreeSet::new(); + for attr in attrs { + if attr == "*" { + ids = displayed_ids.clone(); + break; + } + + if let Some(id) = fields_ids_map.id(attr) { + ids.insert(id); + } + } + ids + }; + + // The attributes to retrieve are the ones explicitly marked as to retrieve (all by default), + // but these attributes must be also be present + // - in the fields_ids_map + // - in the the displayed attributes + let to_retrieve_ids: BTreeSet<_> = query + .attributes_to_retrieve + .as_ref() + .map(fids) + .unwrap_or_else(|| displayed_ids.clone()) + .intersection(&displayed_ids) + .cloned() + .collect(); + + let attr_to_highlight = query.attributes_to_highlight.unwrap_or_default(); + + let attr_to_crop = query.attributes_to_crop.unwrap_or_default(); + + // Attributes in `formatted_options` correspond to the attributes that will be in `_formatted` + // These attributes are: + // - the attributes asked to be highlighted or cropped (with `attributesToCrop` or `attributesToHighlight`) + // - the attributes asked to be retrieved: these attributes will not be highlighted/cropped + // But these attributes must be also present in displayed attributes + let formatted_options = compute_formatted_options( + &attr_to_highlight, + &attr_to_crop, + query.crop_length, + &to_retrieve_ids, + &fields_ids_map, + &displayed_ids, + ); + + let tokenizer = TokenizerBuilder::default().build(); + + let mut formatter_builder = MatcherBuilder::new(matching_words, tokenizer); + formatter_builder.crop_marker(query.crop_marker); + formatter_builder.highlight_prefix(query.highlight_pre_tag); + formatter_builder.highlight_suffix(query.highlight_post_tag); + + let mut documents = Vec::new(); + + let documents_iter = index.documents(&rtxn, documents_ids)?; + + for (_id, obkv) in documents_iter { + // First generate a document with all the displayed fields + let displayed_document = make_document(&displayed_ids, &fields_ids_map, obkv)?; + + // select the attributes to retrieve + let attributes_to_retrieve = to_retrieve_ids + .iter() + .map(|&fid| fields_ids_map.name(fid).expect("Missing field name")); + let mut document = + permissive_json_pointer::select_values(&displayed_document, attributes_to_retrieve); + + let (matches_position, formatted) = format_fields( + &displayed_document, + &fields_ids_map, + &formatter_builder, + &formatted_options, + query.show_matches_position, + &displayed_ids, + )?; + + if let Some(sort) = query.sort.as_ref() { + insert_geo_distance(sort, &mut document); + } + + let hit = SearchHit { + document, + formatted, + matches_position, + }; + documents.push(hit); + } + + let estimated_total_hits = candidates.len(); + + let facet_distribution = match query.facets { + Some(ref fields) => { + let mut facet_distribution = index.facets_distribution(&rtxn); + + let max_values_by_facet = index + .max_values_per_facet(&rtxn)? + .unwrap_or(DEFAULT_VALUES_PER_FACET); + facet_distribution.max_values_per_facet(max_values_by_facet); + + if fields.iter().all(|f| f != "*") { + facet_distribution.facets(fields); + } + let distribution = facet_distribution.candidates(candidates).execute()?; + + Some(distribution) + } + None => None, + }; + + let result = SearchResult { + hits: documents, + estimated_total_hits, + query: query.q.clone().unwrap_or_default(), + limit: query.limit, + offset: query.offset.unwrap_or_default(), + processing_time_ms: before_search.elapsed().as_millis(), + facet_distribution, + }; + Ok(result) } fn insert_geo_distance(sorts: &[String], document: &mut Document) { diff --git a/index/src/updates.rs b/index/src/updates.rs index be5b9d51a..a6d13d99f 100644 --- a/index/src/updates.rs +++ b/index/src/updates.rs @@ -2,18 +2,8 @@ use std::collections::{BTreeMap, BTreeSet}; use std::marker::PhantomData; use std::num::NonZeroUsize; -use log::{debug, info, trace}; -use milli::documents::DocumentsBatchReader; -use milli::update::{ - DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, - Setting, -}; +use milli::update::Setting; use serde::{Deserialize, Serialize, Serializer}; -use uuid::Uuid; - -use super::error::{IndexError, Result}; -use super::index::Index; -use file_store::FileStore; fn serialize_with_wildcard( field: &Setting>, @@ -246,126 +236,6 @@ pub struct Facets { pub min_level_size: Option, } -impl Index { - fn update_primary_key_txn<'a, 'b>( - &'a self, - txn: &mut milli::heed::RwTxn<'a, 'b>, - primary_key: String, - ) -> Result<()> { - let mut builder = milli::update::Settings::new(txn, self, self.indexer_config.as_ref()); - builder.set_primary_key(primary_key); - builder.execute(|_| ())?; - Ok(()) - } - - pub fn update_primary_key(&self, primary_key: String) -> Result<()> { - let mut txn = self.write_txn()?; - self.update_primary_key_txn(&mut txn, primary_key)?; - txn.commit()?; - Ok(()) - } - - /// Deletes `ids` from the index, and returns how many documents were deleted. - pub fn delete_documents(&self, ids: &[String]) -> Result { - let mut txn = self.write_txn()?; - let mut builder = milli::update::DeleteDocuments::new(&mut txn, self)?; - - // We ignore unexisting document ids - ids.iter().for_each(|id| { - builder.delete_external_id(id); - }); - - let deleted = builder.execute()?; - - txn.commit()?; - - Ok(deleted) - } - - pub fn clear_documents(&self) -> Result<()> { - let mut txn = self.write_txn()?; - milli::update::ClearDocuments::new(&mut txn, self).execute()?; - txn.commit()?; - - Ok(()) - } - - pub fn update_documents( - &self, - method: IndexDocumentsMethod, - primary_key: Option, - file_store: FileStore, - contents: impl IntoIterator, - ) -> Result>> { - trace!("performing document addition"); - let mut txn = self.write_txn()?; - - if let Some(primary_key) = primary_key { - if self.inner.primary_key(&txn)?.is_none() { - self.update_primary_key_txn(&mut txn, primary_key)?; - } - } - - let config = IndexDocumentsConfig { - update_method: method, - ..Default::default() - }; - - let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step); - let mut builder = milli::update::IndexDocuments::new( - &mut txn, - self, - self.indexer_config.as_ref(), - config, - indexing_callback, - )?; - - let mut results = Vec::new(); - for content_uuid in contents.into_iter() { - let content_file = file_store.get_update(content_uuid)?; - let reader = DocumentsBatchReader::from_reader(content_file)?; - let (new_builder, user_result) = builder.add_documents(reader)?; - builder = new_builder; - - let user_result = match user_result { - Ok(count) => { - let addition = DocumentAdditionResult { - indexed_documents: count, - number_of_documents: count, - }; - info!("document addition done: {:?}", addition); - Ok(addition) - } - Err(e) => Err(IndexError::from(e)), - }; - - results.push(user_result); - } - - if results.iter().any(Result::is_ok) { - let _addition = builder.execute()?; - txn.commit()?; - } - - Ok(results) - } - - pub fn update_settings(&self, settings: &Settings) -> Result<()> { - // We must use the write transaction of the update here. - let mut txn = self.write_txn()?; - let mut builder = - milli::update::Settings::new(&mut txn, self, self.indexer_config.as_ref()); - - apply_settings_to_builder(settings, &mut builder); - - builder.execute(|indexing_step| debug!("update: {:?}", indexing_step))?; - - txn.commit()?; - - Ok(()) - } -} - pub fn apply_settings_to_builder( settings: &Settings, builder: &mut milli::update::Settings,