Merge pull request #4181 from meilisearch/diff-indexing-parallel-transform

Use rayon to sort entries in parallel
This commit is contained in:
Clément Renault 2023-11-02 15:16:10 +01:00 committed by GitHub
commit 44e9033b3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 55 additions and 95 deletions

5
Cargo.lock generated
View File

@ -1663,12 +1663,13 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]] [[package]]
name = "grenad" name = "grenad"
version = "0.4.4" version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5232b2d157b7bf63d7abe1b12177039e58db2f29e377517c0cdee1578cca4c93" checksum = "6a007932af5475ebb5c63bef8812bb1c36f317983bb4ca663e9d6dd58d6a0f8c"
dependencies = [ dependencies = [
"bytemuck", "bytemuck",
"byteorder", "byteorder",
"rayon",
"tempfile", "tempfile",
] ]

View File

@ -24,7 +24,7 @@ use std::fs::{self, File};
use std::io::BufWriter; use std::io::BufWriter;
use dump::IndexMetadata; use dump::IndexMetadata;
use log::{debug, error, info}; use log::{debug, error, info, trace};
use meilisearch_types::error::Code; use meilisearch_types::error::Code;
use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
@ -1190,7 +1190,7 @@ impl IndexScheduler {
index, index,
indexer_config, indexer_config,
config, config,
|indexing_step| debug!("update: {:?}", indexing_step), |indexing_step| trace!("update: {:?}", indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; )?;
@ -1268,7 +1268,7 @@ impl IndexScheduler {
milli::update::Settings::new(index_wtxn, index, indexer_config); milli::update::Settings::new(index_wtxn, index, indexer_config);
builder.reset_primary_key(); builder.reset_primary_key();
builder.execute( builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step), |indexing_step| trace!("update: {:?}", indexing_step),
|| must_stop_processing.clone().get(), || must_stop_processing.clone().get(),
)?; )?;
} }
@ -1288,7 +1288,7 @@ impl IndexScheduler {
index, index,
indexer_config, indexer_config,
config, config,
|indexing_step| debug!("update: {:?}", indexing_step), |indexing_step| trace!("update: {:?}", indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; )?;

View File

@ -362,7 +362,7 @@ fn import_dump(
update_method: IndexDocumentsMethod::ReplaceDocuments, update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default() ..Default::default()
}, },
|indexing_step| log::debug!("update: {:?}", indexing_step), |indexing_step| log::trace!("update: {:?}", indexing_step),
|| false, || false,
)?; )?;

View File

@ -26,8 +26,8 @@ flatten-serde-json = { path = "../flatten-serde-json" }
fst = "0.4.7" fst = "0.4.7"
fxhash = "0.2.1" fxhash = "0.2.1"
geoutils = "0.5.1" geoutils = "0.5.1"
grenad = { version = "0.4.4", default-features = false, features = [ grenad = { version = "0.4.5", default-features = false, features = [
"tempfile", "rayon", "tempfile"
] } ] }
heed = { git = "https://github.com/meilisearch/heed", tag = "v0.12.7", default-features = false, features = [ heed = { git = "https://github.com/meilisearch/heed", tag = "v0.12.7", default-features = false, features = [
"lmdb", "read-txn-no-tls" "lmdb", "read-txn-no-tls"

View File

@ -47,6 +47,7 @@ pub fn create_sorter(
builder.allow_realloc(false); builder.allow_realloc(false);
} }
builder.sort_algorithm(sort_algorithm); builder.sort_algorithm(sort_algorithm);
builder.sort_in_parallel(true);
builder.build() builder.build()
} }

View File

@ -150,6 +150,7 @@ impl<'a, 'i> Transform<'a, 'i> {
}) })
} }
#[logging_timer::time]
pub fn read_documents<R, FP, FA>( pub fn read_documents<R, FP, FA>(
&mut self, &mut self,
reader: EnrichedDocumentsBatchReader<R>, reader: EnrichedDocumentsBatchReader<R>,
@ -162,6 +163,8 @@ impl<'a, 'i> Transform<'a, 'i> {
FP: Fn(UpdateIndexingStep) + Sync, FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync, FA: Fn() -> bool + Sync,
{ {
puffin::profile_function!();
let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); let (mut cursor, fields_index) = reader.into_cursor_and_fields_index();
let external_documents_ids = self.index.external_documents_ids(); let external_documents_ids = self.index.external_documents_ids();
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
@ -212,13 +215,12 @@ impl<'a, 'i> Transform<'a, 'i> {
field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2)); field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2));
// Build the new obkv document. // Build the new obkv document.
let mut writer = obkv::KvWriter::new(&mut obkv_buffer); let mut writer = KvWriter::new(&mut obkv_buffer);
for (k, v) in field_buffer_cache.iter() { for (k, v) in field_buffer_cache.iter() {
writer.insert(*k, v)?; writer.insert(*k, v)?;
} }
let mut original_docid = None; let mut original_docid = None;
let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) { let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
HEntry::Occupied(entry) => *entry.get() as u32, HEntry::Occupied(entry) => *entry.get() as u32,
HEntry::Vacant(entry) => { HEntry::Vacant(entry) => {
@ -275,8 +277,8 @@ impl<'a, 'i> Transform<'a, 'i> {
&mut document_sorter_buffer, &mut document_sorter_buffer,
)?; )?;
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { let base_obkv = KvReader::new(base_obkv);
Some(flattened_obkv) => { if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? {
// we recreate our buffer with the flattened documents // we recreate our buffer with the flattened documents
document_sorter_buffer.clear(); document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8); document_sorter_buffer.push(Operation::Addition as u8);
@ -286,13 +288,8 @@ impl<'a, 'i> Transform<'a, 'i> {
keep_original_version, keep_original_version,
&mut document_sorter_buffer, &mut document_sorter_buffer,
)?; )?;
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
}
None => self
.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
} }
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
} }
} }
@ -310,23 +307,18 @@ impl<'a, 'i> Transform<'a, 'i> {
// We use the extracted/generated user id as the key for this document. // We use the extracted/generated user id as the key for this document.
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? { let flattened_obkv = KvReader::new(&obkv_buffer);
Some(flattened_obkv) => { if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
document_sorter_buffer.clear(); document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8); document_sorter_buffer.push(Operation::Addition as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(&flattened_obkv), KvReaderU16::new(&obkv),
false, false,
true, true,
&mut document_sorter_buffer, &mut document_sorter_buffer,
)?; )?
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
}
None => self
.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
} }
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
} }
documents_count += 1; documents_count += 1;
@ -361,6 +353,7 @@ impl<'a, 'i> Transform<'a, 'i> {
/// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db, /// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db,
/// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids. /// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids.
/// - If the document to remove was not present in either the db or the transform we do nothing. /// - If the document to remove was not present in either the db or the transform we do nothing.
#[logging_timer::time]
pub fn remove_documents<FA>( pub fn remove_documents<FA>(
&mut self, &mut self,
mut to_remove: Vec<String>, mut to_remove: Vec<String>,
@ -370,6 +363,8 @@ impl<'a, 'i> Transform<'a, 'i> {
where where
FA: Fn() -> bool + Sync, FA: Fn() -> bool + Sync,
{ {
puffin::profile_function!();
// there may be duplicates in the documents to remove. // there may be duplicates in the documents to remove.
to_remove.sort_unstable(); to_remove.sort_unstable();
to_remove.dedup(); to_remove.dedup();
@ -439,24 +434,19 @@ impl<'a, 'i> Transform<'a, 'i> {
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
// flatten it and push it as to delete in the flattened_sorter // flatten it and push it as to delete in the flattened_sorter
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { let flattened_obkv = KvReader::new(base_obkv);
Some(flattened_obkv) => { if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
// we recreate our buffer with the flattened documents // we recreate our buffer with the flattened documents
document_sorter_buffer.clear(); document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Deletion as u8); document_sorter_buffer.push(Operation::Deletion as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(&flattened_obkv), KvReaderU16::new(&obkv),
true, true,
false, false,
&mut document_sorter_buffer, &mut document_sorter_buffer,
)?; )?;
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
}
None => self
.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
} }
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
true true
} }
@ -591,42 +581,10 @@ impl<'a, 'i> Transform<'a, 'i> {
Ok(()) Ok(())
} }
fn remove_deleted_documents_from_field_distribution(
&self,
rtxn: &RoTxn,
field_distribution: &mut FieldDistribution,
) -> Result<()> {
for deleted_docid in self.replaced_documents_ids.iter() {
let obkv = self.index.documents.get(rtxn, &BEU32::new(deleted_docid))?.ok_or(
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?;
for (key, _) in obkv.iter() {
let name =
self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId {
field_id: key,
process: "Computing field distribution in transform.",
})?;
// We checked that the document was in the db earlier. If we can't find it it means
// there is an inconsistency between the field distribution and the field id map.
let field =
field_distribution.get_mut(name).ok_or(FieldIdMapMissingEntry::FieldId {
field_id: key,
process: "Accessing field distribution in transform.",
})?;
*field -= 1;
if *field == 0 {
// since we were able to get the field right before it's safe to unwrap here
field_distribution.remove(name).unwrap();
}
}
}
Ok(())
}
/// Generate the `TransformOutput` based on the given sorter that can be generated from any /// Generate the `TransformOutput` based on the given sorter that can be generated from any
/// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document
/// id for the user side and the value must be an obkv where keys are valid fields ids. /// id for the user side and the value must be an obkv where keys are valid fields ids.
#[logging_timer::time]
pub(crate) fn output_from_sorter<F>( pub(crate) fn output_from_sorter<F>(
self, self,
wtxn: &mut heed::RwTxn, wtxn: &mut heed::RwTxn,
@ -816,7 +774,7 @@ impl<'a, 'i> Transform<'a, 'i> {
let (docid, obkv) = result?; let (docid, obkv) = result?;
obkv_buffer.clear(); obkv_buffer.clear();
let mut obkv_writer = obkv::KvWriter::<_, FieldId>::new(&mut obkv_buffer); let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer);
// We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv. // We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv.
for (id, name) in new_fields_ids_map.iter() { for (id, name) in new_fields_ids_map.iter() {