mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-27 04:25:06 +08:00
feat: Try to simplify Store trait bound with a rayon scope
This commit is contained in:
parent
b7b60b5fe5
commit
0ee56314fb
@ -316,128 +316,130 @@ fn multiword_rewrite_matches(
|
||||
}
|
||||
|
||||
impl<'c, S, FI> QueryBuilder<'c, S, FI>
|
||||
where S: 'static + Store + Send + Clone,
|
||||
where S: Store + Sync,
|
||||
S::Error: Send,
|
||||
{
|
||||
fn query_all(&self, query: &str) -> Result<Vec<RawDocument>, S::Error> {
|
||||
let (automatons, query_enhancer) = generate_automatons(query, &self.store)?;
|
||||
let searchables = self.searchable_attrs.clone();
|
||||
let store = self.store.clone();
|
||||
let searchables = self.searchable_attrs.as_ref();
|
||||
let store = &self.store;
|
||||
|
||||
let mut matches = Vec::new();
|
||||
let mut highlights = Vec::new();
|
||||
|
||||
let recv_end_time = Instant::now() + Duration::from_millis(30);
|
||||
let start = Instant::now();
|
||||
|
||||
let (sender, receiver) = crossbeam_channel::bounded(10);
|
||||
|
||||
rayon::spawn(move || {
|
||||
rayon::scope(move |s| {
|
||||
enum Error<E> {
|
||||
SendError,
|
||||
StoreError(E),
|
||||
}
|
||||
|
||||
let result = automatons
|
||||
.into_iter()
|
||||
.par_bridge()
|
||||
.try_for_each_with((sender, store, searchables.as_ref()), |data, automaton| {
|
||||
let (sender, store, searchables) = data;
|
||||
let Automaton { index, is_exact, query_len, dfa, .. } = automaton;
|
||||
let mut matches = Vec::new();
|
||||
let mut highlights = Vec::new();
|
||||
|
||||
let words = store.words().map_err(Error::StoreError)?;
|
||||
let mut stream = words.search(&dfa).into_stream();
|
||||
let recv_end_time = Instant::now() + Duration::from_millis(30);
|
||||
let start = Instant::now();
|
||||
|
||||
let mut matches = Vec::new();
|
||||
let mut highlights = Vec::new();
|
||||
let (sender, receiver) = crossbeam_channel::bounded(10);
|
||||
|
||||
while let Some(input) = stream.next() {
|
||||
let distance = dfa.eval(input).to_u8();
|
||||
let is_exact = is_exact && distance == 0 && input.len() == query_len;
|
||||
s.spawn(move |_| {
|
||||
let result = automatons
|
||||
.into_iter()
|
||||
.par_bridge()
|
||||
.try_for_each_with((sender, store, searchables), |data, automaton| {
|
||||
let (sender, store, searchables) = data;
|
||||
let Automaton { index, is_exact, query_len, dfa, .. } = automaton;
|
||||
|
||||
let doc_indexes = store.word_indexes(input).map_err(Error::StoreError)?;
|
||||
let doc_indexes = match doc_indexes {
|
||||
Some(doc_indexes) => doc_indexes,
|
||||
None => continue,
|
||||
};
|
||||
let words = store.words().map_err(Error::StoreError)?;
|
||||
let mut stream = words.search(&dfa).into_stream();
|
||||
|
||||
matches.reserve(doc_indexes.len());
|
||||
highlights.reserve(doc_indexes.len());
|
||||
let mut matches = Vec::new();
|
||||
let mut highlights = Vec::new();
|
||||
|
||||
for di in doc_indexes.as_slice() {
|
||||
while let Some(input) = stream.next() {
|
||||
let distance = dfa.eval(input).to_u8();
|
||||
let is_exact = is_exact && distance == 0 && input.len() == query_len;
|
||||
|
||||
let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute));
|
||||
if let Some(attribute) = attribute {
|
||||
let match_ = TmpMatch {
|
||||
query_index: index as u32,
|
||||
distance,
|
||||
attribute,
|
||||
word_index: di.word_index,
|
||||
is_exact,
|
||||
};
|
||||
let doc_indexes = store.word_indexes(input).map_err(Error::StoreError)?;
|
||||
let doc_indexes = match doc_indexes {
|
||||
Some(doc_indexes) => doc_indexes,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let highlight = Highlight {
|
||||
attribute: di.attribute,
|
||||
char_index: di.char_index,
|
||||
char_length: di.char_length,
|
||||
};
|
||||
matches.reserve(doc_indexes.len());
|
||||
highlights.reserve(doc_indexes.len());
|
||||
|
||||
matches.push((di.document_id, match_));
|
||||
highlights.push((di.document_id, highlight));
|
||||
for di in doc_indexes.as_slice() {
|
||||
|
||||
let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute));
|
||||
if let Some(attribute) = attribute {
|
||||
let match_ = TmpMatch {
|
||||
query_index: index as u32,
|
||||
distance,
|
||||
attribute,
|
||||
word_index: di.word_index,
|
||||
is_exact,
|
||||
};
|
||||
|
||||
let highlight = Highlight {
|
||||
attribute: di.attribute,
|
||||
char_index: di.char_index,
|
||||
char_length: di.char_length,
|
||||
};
|
||||
|
||||
matches.push((di.document_id, match_));
|
||||
highlights.push((di.document_id, highlight));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sender.send((matches, highlights)).map_err(|_| Error::SendError)
|
||||
});
|
||||
sender.send((matches, highlights)).map_err(|_| Error::SendError)
|
||||
});
|
||||
|
||||
if let Err(Error::StoreError(e)) = result {
|
||||
error!("{}", e);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
let iter = receiver.recv().into_iter().chain(iter::from_fn(|| {
|
||||
match recv_end_time.checked_duration_since(Instant::now()) {
|
||||
Some(timeout) => receiver.recv_timeout(timeout).ok(),
|
||||
None => None,
|
||||
let iter = receiver.recv().into_iter().chain(iter::from_fn(|| {
|
||||
match recv_end_time.checked_duration_since(Instant::now()) {
|
||||
Some(timeout) => receiver.recv_timeout(timeout).ok(),
|
||||
None => None,
|
||||
}
|
||||
}));
|
||||
|
||||
for (mut rcv_matches, mut rcv_highlights) in iter {
|
||||
matches.append(&mut rcv_matches);
|
||||
highlights.append(&mut rcv_highlights);
|
||||
}
|
||||
}));
|
||||
|
||||
for (mut rcv_matches, mut rcv_highlights) in iter {
|
||||
matches.append(&mut rcv_matches);
|
||||
highlights.append(&mut rcv_highlights);
|
||||
}
|
||||
drop(receiver);
|
||||
|
||||
drop(receiver);
|
||||
info!("main query all took {:.2?}", start.elapsed());
|
||||
info!("{} total matches to rewrite", matches.len());
|
||||
|
||||
info!("main query all took {:.2?}", start.elapsed());
|
||||
info!("{} total matches to rewrite", matches.len());
|
||||
let start = Instant::now();
|
||||
let matches = multiword_rewrite_matches(matches, &query_enhancer);
|
||||
info!("multiword rewrite took {:.2?}", start.elapsed());
|
||||
|
||||
let start = Instant::now();
|
||||
let matches = multiword_rewrite_matches(matches, &query_enhancer);
|
||||
info!("multiword rewrite took {:.2?}", start.elapsed());
|
||||
let start = Instant::now();
|
||||
let highlights = {
|
||||
highlights.par_sort_unstable_by_key(|(id, _)| *id);
|
||||
SetBuf::new_unchecked(highlights)
|
||||
};
|
||||
info!("sorting highlights took {:.2?}", start.elapsed());
|
||||
|
||||
let start = Instant::now();
|
||||
let highlights = {
|
||||
highlights.par_sort_unstable_by_key(|(id, _)| *id);
|
||||
SetBuf::new_unchecked(highlights)
|
||||
};
|
||||
info!("sorting highlights took {:.2?}", start.elapsed());
|
||||
info!("{} total matches to classify", matches.len());
|
||||
|
||||
info!("{} total matches to classify", matches.len());
|
||||
let start = Instant::now();
|
||||
let raw_documents = raw_documents_from(matches, highlights);
|
||||
info!("making raw documents took {:.2?}", start.elapsed());
|
||||
|
||||
let start = Instant::now();
|
||||
let raw_documents = raw_documents_from(matches, highlights);
|
||||
info!("making raw documents took {:.2?}", start.elapsed());
|
||||
info!("{} total documents to classify", raw_documents.len());
|
||||
|
||||
info!("{} total documents to classify", raw_documents.len());
|
||||
|
||||
Ok(raw_documents)
|
||||
Ok(raw_documents)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'c, S, FI> QueryBuilder<'c, S, FI>
|
||||
where S: 'static + Store + Send + Clone,
|
||||
where S: Store + Sync,
|
||||
S::Error: Send,
|
||||
FI: Fn(DocumentId) -> bool,
|
||||
{
|
||||
@ -515,7 +517,7 @@ impl<'c, I, FI, FD> DistinctQueryBuilder<'c, I, FI, FD>
|
||||
}
|
||||
|
||||
impl<'c, S, FI, FD, K> DistinctQueryBuilder<'c, S, FI, FD>
|
||||
where S: 'static + Store + Send + Clone,
|
||||
where S: Store + Sync,
|
||||
S::Error: Send,
|
||||
FI: Fn(DocumentId) -> bool,
|
||||
FD: Fn(DocumentId) -> Option<K>,
|
||||
|
Loading…
Reference in New Issue
Block a user