Introduce the PartialDumpIndexer indexer that generates document ids in parallel

This commit is contained in:
Clément Renault 2024-08-30 15:07:21 +02:00
parent 6487a67f2b
commit b625d31c7d
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -21,6 +21,7 @@ mod indexer {
use heed::types::Bytes; use heed::types::Bytes;
use heed::{RoTxn, RwTxn}; use heed::{RoTxn, RwTxn};
use memmap2::Mmap; use memmap2::Mmap;
use obkv::KvWriter;
use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use rayon::ThreadPool; use rayon::ThreadPool;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
@ -35,14 +36,13 @@ mod indexer {
use crate::documents::{ use crate::documents::{
obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey,
}; };
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation; use crate::update::new::channel::MergerOperation;
use crate::update::{ use crate::update::{AvailableIds, IndexDocumentsMethod, MergeDeladdCboRoaringBitmaps};
AvailableDocumentsIds, IndexDocumentsMethod, MergeDeladdCboRoaringBitmaps,
};
use crate::{ use crate::{
CboRoaringBitmapCodec, DocumentId, Error, FieldId, FieldsIdsMap, Index, InternalError, all_obkv_to_json, obkv_to_json, CboRoaringBitmapCodec, DocumentId, Error, FieldId,
Result, UserError, FieldsIdsMap, Index, InternalError, Object, Result, UserError,
}; };
pub type KvReaderFieldId = obkv::KvReader<FieldId>; pub type KvReaderFieldId = obkv::KvReader<FieldId>;
@ -108,7 +108,7 @@ mod indexer {
primary_key: &'a PrimaryKey<'a>, primary_key: &'a PrimaryKey<'a>,
) -> Result<impl ParallelIterator<Item = Result<Option<DocumentChange>>> + 'a> { ) -> Result<impl ParallelIterator<Item = Result<Option<DocumentChange>>> + 'a> {
let documents_ids = index.documents_ids(rtxn)?; let documents_ids = index.documents_ids(rtxn)?;
let mut available_docids = AvailableDocumentsIds::from_documents_ids(&documents_ids); let mut available_docids = AvailableIds::new(&documents_ids);
let mut docids_version_offsets = HashMap::<String, _>::new(); let mut docids_version_offsets = HashMap::<String, _>::new();
for operation in self.operations { for operation in self.operations {
@ -127,7 +127,7 @@ mod indexer {
let mut offset: u32 = 0; let mut offset: u32 = 0;
while let Some(document) = batch_cursor.next_document()? { while let Some(document) = batch_cursor.next_document()? {
let external_document_id = let external_document_id =
match primary_key.document_id(&document, &batch_index)? { match primary_key.document_id(document, &batch_index)? {
Ok(document_id) => Ok(document_id), Ok(document_id) => Ok(document_id),
Err(DocumentIdExtractionError::InvalidDocumentId( Err(DocumentIdExtractionError::InvalidDocumentId(
user_error, user_error,
@ -135,13 +135,13 @@ mod indexer {
Err(DocumentIdExtractionError::MissingDocumentId) => { Err(DocumentIdExtractionError::MissingDocumentId) => {
Err(UserError::MissingDocumentId { Err(UserError::MissingDocumentId {
primary_key: primary_key.name().to_string(), primary_key: primary_key.name().to_string(),
document: obkv_to_object(&document, &batch_index)?, document: obkv_to_object(document, &batch_index)?,
}) })
} }
Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => {
Err(UserError::TooManyDocumentIds { Err(UserError::TooManyDocumentIds {
primary_key: primary_key.name().to_string(), primary_key: primary_key.name().to_string(),
document: obkv_to_object(&document, &batch_index)?, document: obkv_to_object(document, &batch_index)?,
}) })
} }
}?; }?;
@ -163,7 +163,7 @@ mod indexer {
}; };
docids_version_offsets.insert( docids_version_offsets.insert(
external_document_id.into(), external_document_id,
(docid, vec![document_operation]), (docid, vec![document_operation]),
); );
} }
@ -275,43 +275,71 @@ mod indexer {
} }
} }
pub struct DumpIndexer; pub struct PartialDumpIndexer<I> {
iter: I,
}
impl DumpIndexer { impl<I> PartialDumpIndexer<I>
pub fn new() -> Self { where
todo!() I: IntoIterator<Item = Object>,
I::IntoIter: Send,
I::Item: Send,
{
pub fn new_from_jsonlines(iter: I) -> Self {
PartialDumpIndexer { iter }
} }
pub fn document_changes_from_json_iter<I>( /// Note for future self:
/// - the field ids map must already be valid so you must have to generate it beforehand.
/// - We should probably expose another method that generates the fields ids map from an iterator of JSON objects.
/// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items).
pub fn document_changes<'a>(
self, self,
iter: I, fields_ids_map: &'a FieldsIdsMap,
index: &Index, concurrent_available_ids: &'a ConcurrentAvailableIds,
) -> impl ParallelIterator<Item = DocumentChange> primary_key: &'a PrimaryKey<'a>,
) -> impl ParallelIterator<Item = Result<Option<DocumentChange>>> + 'a
where where
I: IntoIterator<Item = Value>, // I don't like this, it will not fit in the future trait easily
I::IntoIter: 'a,
{ {
// let items = Arc::new(ItemsPool::new(|| { self.iter.into_iter().par_bridge().map(|object| {
// let rtxn = index.read_txn()?; let docid = match concurrent_available_ids.next() {
// let fields = index.fields_ids_map(&rtxn)?; Some(id) => id,
// let primary_key = None => return Err(Error::UserError(UserError::DocumentLimitReached)),
// index.primary_key(&rtxn)?.ok_or(InternalError::DatabaseMissingEntry { };
// db_name: db_name::MAIN,
// key: Some(main_key::PRIMARY_KEY_KEY),
// })?;
// let primary_key = PrimaryKey::new(primary_key, &fields).ok_or_else(|| {
// InternalError::FieldIdMapMissingEntry(
// crate::FieldIdMapMissingEntry::FieldName {
// field_name: primary_key.to_owned(),
// process: "external_id_of",
// },
// )
// })?;
// Ok(DeleteDocumentExternalDocumentIdGetter { rtxn, fields, primary_key })
// as crate::Result<_>
// }));
todo!(); let mut writer = KvWriterFieldId::memory();
vec![].into_par_iter() object.iter().for_each(|(key, value)| {
let key = fields_ids_map.id(key).unwrap();
/// TODO better error management
let value = serde_json::to_vec(&value).unwrap();
writer.insert(key, value).unwrap();
});
let document = writer.into_boxed();
let external_docid = match primary_key.document_id(&document, fields_ids_map)? {
Ok(document_id) => Ok(document_id),
Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => {
Err(user_error)
}
Err(DocumentIdExtractionError::MissingDocumentId) => {
Err(UserError::MissingDocumentId {
primary_key: primary_key.name().to_string(),
document: all_obkv_to_json(&document, fields_ids_map)?,
})
}
Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => {
Err(UserError::TooManyDocumentIds {
primary_key: primary_key.name().to_string(),
document: all_obkv_to_json(&document, fields_ids_map)?,
})
}
}?;
let insertion = Insertion::create(docid, external_docid, document);
Ok(Some(DocumentChange::Insertion(insertion)))
})
} }
} }