2024-09-02 10:42:19 +02:00
use rayon ::iter ::{ ParallelBridge , ParallelIterator } ;
2024-09-02 15:10:21 +02:00
use super ::DocumentChanges ;
2024-09-02 10:42:19 +02:00
use crate ::documents ::{ DocumentIdExtractionError , PrimaryKey } ;
use crate ::update ::concurrent_available_ids ::ConcurrentAvailableIds ;
use crate ::update ::new ::{ DocumentChange , Insertion , KvWriterFieldId } ;
use crate ::{ all_obkv_to_json , Error , FieldsIdsMap , Object , Result , UserError } ;
2024-09-02 14:42:27 +02:00
pub struct PartialDump < I > {
2024-09-02 19:39:48 +02:00
iter : I ,
2024-09-02 10:42:19 +02:00
}
2024-09-02 14:42:27 +02:00
impl < I > PartialDump < I > {
2024-09-02 10:42:19 +02:00
pub fn new_from_jsonlines ( iter : I ) -> Self {
2024-09-02 14:42:27 +02:00
PartialDump { iter }
2024-09-02 10:42:19 +02:00
}
}
2024-09-02 15:10:21 +02:00
impl < ' p , I > DocumentChanges < ' p > for PartialDump < I >
2024-09-02 10:42:19 +02:00
where
I : IntoIterator < Item = Object > ,
2024-09-02 19:39:48 +02:00
I ::IntoIter : Send + Clone + ' p ,
2024-09-02 10:42:19 +02:00
I ::Item : Send ,
{
type Parameter = ( & ' p FieldsIdsMap , & ' p ConcurrentAvailableIds , & ' p PrimaryKey < ' p > ) ;
/// Note for future self:
/// - the field ids map must already be valid so you must have to generate it beforehand.
/// - We should probably expose another method that generates the fields ids map from an iterator of JSON objects.
/// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items).
fn document_changes (
self ,
2024-09-03 12:01:01 +02:00
_fields_ids_map : & mut FieldsIdsMap ,
2024-09-02 10:42:19 +02:00
param : Self ::Parameter ,
2024-09-02 19:39:48 +02:00
) -> Result < impl ParallelIterator < Item = Result < DocumentChange > > + Clone + ' p > {
2024-09-02 10:42:19 +02:00
let ( fields_ids_map , concurrent_available_ids , primary_key ) = param ;
Ok ( self . iter . into_iter ( ) . par_bridge ( ) . map ( | object | {
let docid = match concurrent_available_ids . next ( ) {
Some ( id ) = > id ,
None = > return Err ( Error ::UserError ( UserError ::DocumentLimitReached ) ) ,
} ;
let mut writer = KvWriterFieldId ::memory ( ) ;
object . iter ( ) . for_each ( | ( key , value ) | {
let key = fields_ids_map . id ( key ) . unwrap ( ) ;
/// TODO better error management
let value = serde_json ::to_vec ( & value ) . unwrap ( ) ;
2024-09-02 14:42:27 +02:00
/// TODO it is not ordered
2024-09-02 10:42:19 +02:00
writer . insert ( key , value ) . unwrap ( ) ;
} ) ;
let document = writer . into_boxed ( ) ;
let external_docid = match primary_key . document_id ( & document , fields_ids_map ) ? {
Ok ( document_id ) = > Ok ( document_id ) ,
Err ( DocumentIdExtractionError ::InvalidDocumentId ( user_error ) ) = > Err ( user_error ) ,
Err ( DocumentIdExtractionError ::MissingDocumentId ) = > {
Err ( UserError ::MissingDocumentId {
primary_key : primary_key . name ( ) . to_string ( ) ,
document : all_obkv_to_json ( & document , fields_ids_map ) ? ,
} )
}
Err ( DocumentIdExtractionError ::TooManyDocumentIds ( _ ) ) = > {
Err ( UserError ::TooManyDocumentIds {
primary_key : primary_key . name ( ) . to_string ( ) ,
document : all_obkv_to_json ( & document , fields_ids_map ) ? ,
} )
}
} ? ;
2024-09-12 18:01:02 +02:00
let insertion = Insertion ::create ( docid , document ) ;
2024-09-02 15:21:00 +02:00
Ok ( DocumentChange ::Insertion ( insertion ) )
2024-09-02 10:42:19 +02:00
} ) )
}
}