Make the merger multithreaded

This commit is contained in:
Clément Renault 2024-09-26 11:09:06 +02:00
parent 7d61697f19
commit ac2d54b27c
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 105 additions and 88 deletions

View File

@ -1,4 +1,5 @@
use std::marker::PhantomData;
use std::sync::atomic::Ordering;
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
use heed::types::Bytes;
@ -177,22 +178,22 @@ impl IntoIterator for WriterReceiver {
pub struct MergerSender {
sender: Sender<WriterOperation>,
/// The number of message we send in total in the channel.
send_count: std::cell::Cell<usize>,
send_count: std::sync::atomic::AtomicUsize,
/// The number of times we sent something in a channel that was full.
writer_contentious_count: std::cell::Cell<usize>,
writer_contentious_count: std::sync::atomic::AtomicUsize,
/// The number of times we sent something in a channel that was empty.
merger_contentious_count: std::cell::Cell<usize>,
merger_contentious_count: std::sync::atomic::AtomicUsize,
}
impl Drop for MergerSender {
fn drop(&mut self) {
eprintln!(
"Merger channel stats: {} sends, {} writer contentions ({}%), {} merger contentions ({}%)",
self.send_count.get(),
self.writer_contentious_count.get(),
(self.writer_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0,
self.merger_contentious_count.get(),
(self.merger_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0
self.send_count.load(Ordering::SeqCst),
self.writer_contentious_count.load(Ordering::SeqCst),
(self.writer_contentious_count.load(Ordering::SeqCst) as f32 / self.send_count.load(Ordering::SeqCst) as f32) * 100.0,
self.merger_contentious_count.load(Ordering::SeqCst),
(self.merger_contentious_count.load(Ordering::SeqCst) as f32 / self.send_count.load(Ordering::SeqCst) as f32) * 100.0
)
}
}
@ -227,12 +228,12 @@ impl MergerSender {
fn send(&self, op: WriterOperation) -> StdResult<(), SendError<()>> {
if self.sender.is_full() {
self.writer_contentious_count.set(self.writer_contentious_count.get() + 1);
self.writer_contentious_count.fetch_add(1, Ordering::SeqCst);
}
if self.sender.is_empty() {
self.merger_contentious_count.set(self.merger_contentious_count.get() + 1);
self.merger_contentious_count.fetch_add(1, Ordering::SeqCst);
}
self.send_count.set(self.send_count.get() + 1);
self.send_count.fetch_add(1, Ordering::SeqCst);
match self.sender.send(op) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),

View File

@ -14,7 +14,7 @@ use super::channel::*;
use super::document_change::DocumentChange;
use super::extract::*;
use super::merger::merge_grenad_entries;
use super::{StdResult, TopLevelMap};
use super::{ItemsPool, StdResult, TopLevelMap};
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::update::new::channel::ExtractorSender;
use crate::update::GrenadParameters;
@ -213,11 +213,12 @@ where
let span =
tracing::trace_span!(target: "indexing::documents", parent: &current_span, "merge");
let _entered = span.enter();
let rtxn = index.read_txn().unwrap();
let rtxn_pool = ItemsPool::new(|| index.read_txn().map_err(Into::into));
merge_grenad_entries(
merger_receiver,
merger_sender,
&rtxn,
&rtxn_pool,
index,
global_fields_ids_map_clone,
)

View File

@ -1,33 +1,34 @@
use std::fs::File;
use std::io::{self};
use bincode::ErrorKind;
use grenad::Merger;
use heed::types::Bytes;
use heed::{Database, RoTxn};
use rayon::iter::{ParallelBridge, ParallelIterator as _};
use roaring::RoaringBitmap;
use super::channel::*;
use super::extract::{FacetKind, HashMapMerger};
use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update};
use super::{Deletion, DocumentChange, Insertion, ItemsPool, KvReaderFieldId, Update};
use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation;
use crate::update::new::word_fst_builder::WordFstBuilder;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, Result};
/// TODO We must return some infos/stats
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")]
pub fn merge_grenad_entries(
pub fn merge_grenad_entries<'t>(
receiver: MergerReceiver,
sender: MergerSender,
rtxn: &RoTxn,
rtxn_pool: &ItemsPool<impl Fn() -> Result<RoTxn<'t>> + Send + Sync, RoTxn<'t>, Error>,
index: &Index,
mut global_fields_ids_map: GlobalFieldsIdsMap<'_>,
) -> Result<()> {
let mut buffer: Vec<u8> = Vec::new();
let mut documents_ids = index.documents_ids(rtxn)?;
let mut geo_extractor = GeoExtractor::new(rtxn, index)?;
let (mut documents_ids, mut geo_extractor) = rtxn_pool.with(|rtxn| {
let documents_ids = index.documents_ids(rtxn)?;
let geo_extractor = GeoExtractor::new(rtxn, index)?;
Ok((documents_ids, geo_extractor))
})?;
for merger_operation in receiver {
match merger_operation {
@ -39,10 +40,10 @@ pub fn merge_grenad_entries(
merger,
/// TODO do a MergerOperation::database(&Index) -> Database<Bytes, Bytes>.
index.exact_word_docids.remap_types(),
rtxn,
rtxn_pool,
&mut buffer,
sender.docids::<ExactWordDocids>(),
|_, _key| Ok(()),
// |_, _key| Ok(()),
)?;
}
MergerOperation::FidWordCountDocidsMerger(merger) => {
@ -51,15 +52,15 @@ pub fn merge_grenad_entries(
merge_and_send_docids(
merger,
index.field_id_word_count_docids.remap_types(),
rtxn,
rtxn_pool,
&mut buffer,
sender.docids::<FidWordCountDocids>(),
|_, _key| Ok(()),
// |_, _key| Ok(()),
)?;
}
MergerOperation::WordDocidsMerger(merger) => {
let words_fst = index.words_fst(rtxn)?;
let mut word_fst_builder = WordFstBuilder::new(&words_fst, 4)?;
// let words_fst = index.words_fst(rtxn)?;
// let mut word_fst_builder = WordFstBuilder::new(&words_fst, 4)?;
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
@ -68,10 +69,10 @@ pub fn merge_grenad_entries(
merge_and_send_docids(
merger,
index.word_docids.remap_types(),
rtxn,
rtxn_pool,
&mut buffer,
sender.docids::<WordDocids>(),
|deladd, key| word_fst_builder.register_word(deladd, key),
// |deladd, key| word_fst_builder.register_word(deladd, key),
)?;
}
@ -80,8 +81,8 @@ pub fn merge_grenad_entries(
tracing::trace_span!(target: "indexing::documents::merge", "words_fst");
let _entered = span.enter();
let (word_fst_mmap, prefix_fst_mmap) = word_fst_builder.build()?;
sender.main().write_words_fst(word_fst_mmap).unwrap();
// let (word_fst_mmap, prefix_fst_mmap) = word_fst_builder.build()?;
// sender.main().write_words_fst(word_fst_mmap).unwrap();
}
}
MergerOperation::WordFidDocidsMerger(merger) => {
@ -91,10 +92,10 @@ pub fn merge_grenad_entries(
merge_and_send_docids(
merger,
index.word_fid_docids.remap_types(),
rtxn,
rtxn_pool,
&mut buffer,
sender.docids::<WordFidDocids>(),
|_, _key| Ok(()),
// |_, _key| Ok(()),
)?;
}
MergerOperation::WordPairProximityDocidsMerger(merger) => {
@ -103,10 +104,10 @@ pub fn merge_grenad_entries(
merge_and_send_docids(
merger,
index.word_pair_proximity_docids.remap_types(),
rtxn,
rtxn_pool,
&mut buffer,
sender.docids::<WordPairProximityDocids>(),
|_, _key| Ok(()),
// |_, _key| Ok(()),
)?;
}
MergerOperation::WordPositionDocidsMerger(merger) => {
@ -115,10 +116,10 @@ pub fn merge_grenad_entries(
merge_and_send_docids(
merger,
index.word_position_docids.remap_types(),
rtxn,
rtxn_pool,
&mut buffer,
sender.docids::<WordPositionDocids>(),
|_, _key| Ok(()),
// |_, _key| Ok(()),
)?;
}
MergerOperation::InsertDocument { docid, document } => {
@ -129,15 +130,21 @@ pub fn merge_grenad_entries(
sender.documents().uncompressed(docid, &document).unwrap();
if let Some(geo_extractor) = geo_extractor.as_mut() {
let current = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?;
let current: Option<&KvReaderFieldId> = current.map(Into::into);
let change = match current {
Some(current) => {
DocumentChange::Update(Update::create(docid, current.boxed(), document))
}
None => DocumentChange::Insertion(Insertion::create(docid, document)),
};
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
rtxn_pool.with(|rtxn| {
let current =
index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?;
let current: Option<&KvReaderFieldId> = current.map(Into::into);
let change = match current {
Some(current) => DocumentChange::Update(Update::create(
docid,
current.boxed(),
document,
)),
None => DocumentChange::Insertion(Insertion::create(docid, document)),
};
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
Ok(())
})?;
}
}
MergerOperation::DeleteDocument { docid } => {
@ -150,9 +157,13 @@ pub fn merge_grenad_entries(
sender.documents().delete(docid).unwrap();
if let Some(geo_extractor) = geo_extractor.as_mut() {
let current = index.document(rtxn, docid)?;
let change = DocumentChange::Deletion(Deletion::create(docid, current.boxed()));
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
rtxn_pool.with(|rtxn| {
let current = index.document(rtxn, docid)?;
let change =
DocumentChange::Deletion(Deletion::create(docid, current.boxed()));
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
Ok(())
})?;
}
}
MergerOperation::FinishedDocument => {
@ -165,7 +176,7 @@ pub fn merge_grenad_entries(
merge_and_send_facet_docids(
merger,
FacetDatabases::new(index),
rtxn,
rtxn_pool,
&mut buffer,
sender.facet_docids(),
)?;
@ -237,56 +248,60 @@ impl GeoExtractor {
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_docids(
fn merge_and_send_docids<'t>(
merger: HashMapMerger,
database: Database<Bytes, Bytes>,
rtxn: &RoTxn<'_>,
rtxn_pool: &ItemsPool<impl Fn() -> Result<RoTxn<'t>> + Send + Sync, RoTxn<'t>, Error>,
buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender,
mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()>,
docids_sender: impl DocidsSender + Sync,
// mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()> + Send + Sync,
) -> Result<()> {
for (key, deladd) in merger.into_iter() {
let current = database.get(rtxn, &key)?;
match merge_cbo_bitmaps(current, deladd.del, deladd.add)? {
Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
docids_sender.write(&key, value).unwrap();
register_key(DelAdd::Addition, &key)?;
merger.into_iter().par_bridge().try_for_each(|(key, deladd)| {
rtxn_pool.with(|rtxn| {
let mut buffer = Vec::new();
let current = database.get(rtxn, &key)?;
match merge_cbo_bitmaps(current, deladd.del, deladd.add)? {
Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer);
docids_sender.write(&key, value).unwrap();
// register_key(DelAdd::Addition, &key)?;
}
Operation::Delete => {
docids_sender.delete(&key).unwrap();
// register_key(DelAdd::Deletion, &key)?;
}
Operation::Ignore => (),
}
Operation::Delete => {
docids_sender.delete(&key).unwrap();
register_key(DelAdd::Deletion, &key)?;
}
Operation::Ignore => (),
}
}
Ok(())
Ok(())
})
})
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_facet_docids(
fn merge_and_send_facet_docids<'t>(
merger: HashMapMerger,
database: FacetDatabases,
rtxn: &RoTxn<'_>,
rtxn_pool: &ItemsPool<impl Fn() -> Result<RoTxn<'t>> + Send + Sync, RoTxn<'t>, Error>,
buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender,
docids_sender: impl DocidsSender + Sync,
) -> Result<()> {
for (key, deladd) in merger.into_iter() {
let current = database.get(rtxn, &key)?;
match merge_cbo_bitmaps(current, deladd.del, deladd.add)? {
Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
docids_sender.write(&key, value).unwrap();
merger.into_iter().par_bridge().try_for_each(|(key, deladd)| {
rtxn_pool.with(|rtxn| {
let mut buffer = Vec::new();
let current = database.get(rtxn, &key)?;
match merge_cbo_bitmaps(current, deladd.del, deladd.add)? {
Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer);
docids_sender.write(&key, value).unwrap();
}
Operation::Delete => {
docids_sender.delete(&key).unwrap();
}
Operation::Ignore => (),
}
Operation::Delete => {
docids_sender.delete(&key).unwrap();
}
Operation::Ignore => (),
}
}
Ok(())
Ok(())
})
})
}
struct FacetDatabases {