mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-27 04:25:06 +08:00
Merging the main, word docids and words pairs proximity docids in parallel
This commit is contained in:
parent
99705deb7d
commit
9af946a306
@ -616,6 +616,12 @@ fn documents_merge(key: &[u8], _values: &[Vec<u8>]) -> Result<Vec<u8>, ()> {
|
|||||||
panic!("merging documents is an error ({:?})", key.as_bstr())
|
panic!("merging documents is an error ({:?})", key.as_bstr())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn merge_readers(sources: Vec<Reader<Mmap>>, merge: MergeFn) -> Merger<Mmap, MergeFn> {
|
||||||
|
let mut builder = Merger::builder(merge);
|
||||||
|
builder.extend(sources);
|
||||||
|
builder.build()
|
||||||
|
}
|
||||||
|
|
||||||
fn merge_into_lmdb_database(
|
fn merge_into_lmdb_database(
|
||||||
wtxn: &mut heed::RwTxn,
|
wtxn: &mut heed::RwTxn,
|
||||||
database: heed::PolyDatabase,
|
database: heed::PolyDatabase,
|
||||||
@ -625,11 +631,28 @@ fn merge_into_lmdb_database(
|
|||||||
debug!("Merging {} MTBL stores...", sources.len());
|
debug!("Merging {} MTBL stores...", sources.len());
|
||||||
let before = Instant::now();
|
let before = Instant::now();
|
||||||
|
|
||||||
let mut builder = Merger::builder(merge);
|
let merger = merge_readers(sources, merge);
|
||||||
builder.extend(sources);
|
|
||||||
let merger = builder.build();
|
|
||||||
|
|
||||||
let mut in_iter = merger.into_merge_iter()?;
|
let mut in_iter = merger.into_merge_iter()?;
|
||||||
|
|
||||||
|
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
|
||||||
|
while let Some(result) = in_iter.next() {
|
||||||
|
let (k, v) = result?;
|
||||||
|
out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("MTBL stores merged in {:.02?}!", before.elapsed());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_into_lmdb_database(
|
||||||
|
wtxn: &mut heed::RwTxn,
|
||||||
|
database: heed::PolyDatabase,
|
||||||
|
reader: Reader<Mmap>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
debug!("Writing MTBL stores...");
|
||||||
|
let before = Instant::now();
|
||||||
|
|
||||||
|
let mut in_iter = reader.into_iter()?;
|
||||||
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
|
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
|
||||||
while let Some(result) = in_iter.next() {
|
while let Some(result) = in_iter.next() {
|
||||||
let (k, v) = result?;
|
let (k, v) = result?;
|
||||||
@ -742,39 +765,61 @@ fn main() -> anyhow::Result<()> {
|
|||||||
})
|
})
|
||||||
.collect::<Result<Vec<_>, _>>()?;
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
let mut main_stores = Vec::with_capacity(readers.len());
|
let mut main_readers = Vec::with_capacity(readers.len());
|
||||||
let mut word_docids_stores = Vec::with_capacity(readers.len());
|
let mut word_docids_readers = Vec::with_capacity(readers.len());
|
||||||
let mut docid_word_positions_stores = Vec::with_capacity(readers.len());
|
let mut docid_word_positions_readers = Vec::with_capacity(readers.len());
|
||||||
let mut words_pairs_proximities_docids_stores = Vec::with_capacity(readers.len());
|
let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len());
|
||||||
let mut documents_stores = Vec::with_capacity(readers.len());
|
let mut documents_readers = Vec::with_capacity(readers.len());
|
||||||
readers.into_iter().for_each(|readers| {
|
readers.into_iter().for_each(|readers| {
|
||||||
main_stores.push(readers.main);
|
main_readers.push(readers.main);
|
||||||
word_docids_stores.push(readers.word_docids);
|
word_docids_readers.push(readers.word_docids);
|
||||||
docid_word_positions_stores.push(readers.docid_word_positions);
|
docid_word_positions_readers.push(readers.docid_word_positions);
|
||||||
words_pairs_proximities_docids_stores.push(readers.words_pairs_proximities_docids);
|
words_pairs_proximities_docids_readers.push(readers.words_pairs_proximities_docids);
|
||||||
documents_stores.push(readers.documents);
|
documents_readers.push(readers.documents);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let merge_readers = |readers, merge| {
|
||||||
|
let mut writer = tempfile().map(|f| {
|
||||||
|
create_writer(chunk_compression_type, chunk_compression_level, f)
|
||||||
|
})?;
|
||||||
|
let merger = merge_readers(readers, merge);
|
||||||
|
merger.write_into(&mut writer)?;
|
||||||
|
writer_into_reader(writer)
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!("Merging the main, word docids and words pairs proximity docids in parallel...");
|
||||||
|
let (main, (word_docids, words_pairs_proximities_docids)) = rayon::join(move || {
|
||||||
|
merge_readers(main_readers, main_merge)
|
||||||
|
}, || rayon::join(|| {
|
||||||
|
merge_readers(word_docids_readers, word_docids_merge)
|
||||||
|
}, || {
|
||||||
|
merge_readers(words_pairs_proximities_docids_readers, words_pairs_proximities_docids_merge)
|
||||||
|
}));
|
||||||
|
|
||||||
|
let main = main?;
|
||||||
|
let word_docids = word_docids?;
|
||||||
|
let words_pairs_proximities_docids = words_pairs_proximities_docids?;
|
||||||
|
|
||||||
let mut wtxn = env.write_txn()?;
|
let mut wtxn = env.write_txn()?;
|
||||||
|
|
||||||
debug!("Writing the main elements into LMDB on disk...");
|
debug!("Writing the main elements into LMDB on disk...");
|
||||||
merge_into_lmdb_database(&mut wtxn, index.main, main_stores, main_merge)?;
|
write_into_lmdb_database(&mut wtxn, index.main, main)?;
|
||||||
|
|
||||||
debug!("Writing the words docids into LMDB on disk...");
|
debug!("Writing the words docids into LMDB on disk...");
|
||||||
let db = *index.word_docids.as_polymorph();
|
let db = *index.word_docids.as_polymorph();
|
||||||
merge_into_lmdb_database(&mut wtxn, db, word_docids_stores, word_docids_merge)?;
|
write_into_lmdb_database(&mut wtxn, db, word_docids)?;
|
||||||
|
|
||||||
debug!("Writing the docid word positions into LMDB on disk...");
|
debug!("Writing the docid word positions into LMDB on disk...");
|
||||||
let db = *index.docid_word_positions.as_polymorph();
|
let db = *index.docid_word_positions.as_polymorph();
|
||||||
merge_into_lmdb_database(&mut wtxn, db, docid_word_positions_stores, docid_word_positions_merge)?;
|
merge_into_lmdb_database(&mut wtxn, db, docid_word_positions_readers, docid_word_positions_merge)?;
|
||||||
|
|
||||||
debug!("Writing the words pairs proximities docids into LMDB on disk...");
|
debug!("Writing the words pairs proximities docids into LMDB on disk...");
|
||||||
let db = *index.word_pair_proximity_docids.as_polymorph();
|
let db = *index.word_pair_proximity_docids.as_polymorph();
|
||||||
merge_into_lmdb_database(&mut wtxn, db, words_pairs_proximities_docids_stores, words_pairs_proximities_docids_merge)?;
|
write_into_lmdb_database(&mut wtxn, db, words_pairs_proximities_docids)?;
|
||||||
|
|
||||||
debug!("Writing the documents into LMDB on disk...");
|
debug!("Writing the documents into LMDB on disk...");
|
||||||
let db = *index.documents.as_polymorph();
|
let db = *index.documents.as_polymorph();
|
||||||
merge_into_lmdb_database(&mut wtxn, db, documents_stores, documents_merge)?;
|
merge_into_lmdb_database(&mut wtxn, db, documents_readers, documents_merge)?;
|
||||||
|
|
||||||
debug!("Retrieving the number of documents...");
|
debug!("Retrieving the number of documents...");
|
||||||
let count = index.number_of_documents(&wtxn)?;
|
let count = index.number_of_documents(&wtxn)?;
|
||||||
|
Loading…
Reference in New Issue
Block a user