diff --git a/meilidb-core/src/query_builder.rs b/meilidb-core/src/query_builder.rs index 97a750d18..b436f8604 100644 --- a/meilidb-core/src/query_builder.rs +++ b/meilidb-core/src/query_builder.rs @@ -316,128 +316,130 @@ fn multiword_rewrite_matches( } impl<'c, S, FI> QueryBuilder<'c, S, FI> -where S: 'static + Store + Send + Clone, +where S: Store + Sync, S::Error: Send, { fn query_all(&self, query: &str) -> Result, S::Error> { let (automatons, query_enhancer) = generate_automatons(query, &self.store)?; - let searchables = self.searchable_attrs.clone(); - let store = self.store.clone(); + let searchables = self.searchable_attrs.as_ref(); + let store = &self.store; - let mut matches = Vec::new(); - let mut highlights = Vec::new(); - - let recv_end_time = Instant::now() + Duration::from_millis(30); - let start = Instant::now(); - - let (sender, receiver) = crossbeam_channel::bounded(10); - - rayon::spawn(move || { + rayon::scope(move |s| { enum Error { SendError, StoreError(E), } - let result = automatons - .into_iter() - .par_bridge() - .try_for_each_with((sender, store, searchables.as_ref()), |data, automaton| { - let (sender, store, searchables) = data; - let Automaton { index, is_exact, query_len, dfa, .. } = automaton; + let mut matches = Vec::new(); + let mut highlights = Vec::new(); - let words = store.words().map_err(Error::StoreError)?; - let mut stream = words.search(&dfa).into_stream(); + let recv_end_time = Instant::now() + Duration::from_millis(30); + let start = Instant::now(); - let mut matches = Vec::new(); - let mut highlights = Vec::new(); + let (sender, receiver) = crossbeam_channel::bounded(10); - while let Some(input) = stream.next() { - let distance = dfa.eval(input).to_u8(); - let is_exact = is_exact && distance == 0 && input.len() == query_len; + s.spawn(move |_| { + let result = automatons + .into_iter() + .par_bridge() + .try_for_each_with((sender, store, searchables), |data, automaton| { + let (sender, store, searchables) = data; + let Automaton { index, is_exact, query_len, dfa, .. } = automaton; - let doc_indexes = store.word_indexes(input).map_err(Error::StoreError)?; - let doc_indexes = match doc_indexes { - Some(doc_indexes) => doc_indexes, - None => continue, - }; + let words = store.words().map_err(Error::StoreError)?; + let mut stream = words.search(&dfa).into_stream(); - matches.reserve(doc_indexes.len()); - highlights.reserve(doc_indexes.len()); + let mut matches = Vec::new(); + let mut highlights = Vec::new(); - for di in doc_indexes.as_slice() { + while let Some(input) = stream.next() { + let distance = dfa.eval(input).to_u8(); + let is_exact = is_exact && distance == 0 && input.len() == query_len; - let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute)); - if let Some(attribute) = attribute { - let match_ = TmpMatch { - query_index: index as u32, - distance, - attribute, - word_index: di.word_index, - is_exact, - }; + let doc_indexes = store.word_indexes(input).map_err(Error::StoreError)?; + let doc_indexes = match doc_indexes { + Some(doc_indexes) => doc_indexes, + None => continue, + }; - let highlight = Highlight { - attribute: di.attribute, - char_index: di.char_index, - char_length: di.char_length, - }; + matches.reserve(doc_indexes.len()); + highlights.reserve(doc_indexes.len()); - matches.push((di.document_id, match_)); - highlights.push((di.document_id, highlight)); + for di in doc_indexes.as_slice() { + + let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute)); + if let Some(attribute) = attribute { + let match_ = TmpMatch { + query_index: index as u32, + distance, + attribute, + word_index: di.word_index, + is_exact, + }; + + let highlight = Highlight { + attribute: di.attribute, + char_index: di.char_index, + char_length: di.char_length, + }; + + matches.push((di.document_id, match_)); + highlights.push((di.document_id, highlight)); + } } } - } - sender.send((matches, highlights)).map_err(|_| Error::SendError) - }); + sender.send((matches, highlights)).map_err(|_| Error::SendError) + }); if let Err(Error::StoreError(e)) = result { error!("{}", e); } - }); + }); - let iter = receiver.recv().into_iter().chain(iter::from_fn(|| { - match recv_end_time.checked_duration_since(Instant::now()) { - Some(timeout) => receiver.recv_timeout(timeout).ok(), - None => None, + let iter = receiver.recv().into_iter().chain(iter::from_fn(|| { + match recv_end_time.checked_duration_since(Instant::now()) { + Some(timeout) => receiver.recv_timeout(timeout).ok(), + None => None, + } + })); + + for (mut rcv_matches, mut rcv_highlights) in iter { + matches.append(&mut rcv_matches); + highlights.append(&mut rcv_highlights); } - })); - for (mut rcv_matches, mut rcv_highlights) in iter { - matches.append(&mut rcv_matches); - highlights.append(&mut rcv_highlights); - } + drop(receiver); - drop(receiver); + info!("main query all took {:.2?}", start.elapsed()); + info!("{} total matches to rewrite", matches.len()); - info!("main query all took {:.2?}", start.elapsed()); - info!("{} total matches to rewrite", matches.len()); + let start = Instant::now(); + let matches = multiword_rewrite_matches(matches, &query_enhancer); + info!("multiword rewrite took {:.2?}", start.elapsed()); - let start = Instant::now(); - let matches = multiword_rewrite_matches(matches, &query_enhancer); - info!("multiword rewrite took {:.2?}", start.elapsed()); + let start = Instant::now(); + let highlights = { + highlights.par_sort_unstable_by_key(|(id, _)| *id); + SetBuf::new_unchecked(highlights) + }; + info!("sorting highlights took {:.2?}", start.elapsed()); - let start = Instant::now(); - let highlights = { - highlights.par_sort_unstable_by_key(|(id, _)| *id); - SetBuf::new_unchecked(highlights) - }; - info!("sorting highlights took {:.2?}", start.elapsed()); + info!("{} total matches to classify", matches.len()); - info!("{} total matches to classify", matches.len()); + let start = Instant::now(); + let raw_documents = raw_documents_from(matches, highlights); + info!("making raw documents took {:.2?}", start.elapsed()); - let start = Instant::now(); - let raw_documents = raw_documents_from(matches, highlights); - info!("making raw documents took {:.2?}", start.elapsed()); + info!("{} total documents to classify", raw_documents.len()); - info!("{} total documents to classify", raw_documents.len()); - - Ok(raw_documents) + Ok(raw_documents) + }) } } impl<'c, S, FI> QueryBuilder<'c, S, FI> -where S: 'static + Store + Send + Clone, +where S: Store + Sync, S::Error: Send, FI: Fn(DocumentId) -> bool, { @@ -515,7 +517,7 @@ impl<'c, I, FI, FD> DistinctQueryBuilder<'c, I, FI, FD> } impl<'c, S, FI, FD, K> DistinctQueryBuilder<'c, S, FI, FD> -where S: 'static + Store + Send + Clone, +where S: Store + Sync, S::Error: Send, FI: Fn(DocumentId) -> bool, FD: Fn(DocumentId) -> Option,