use std::ops::DerefMut; use rayon::iter::IndexedParallelIterator; use serde_json::value::RawValue; use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend, RefCellExt}; use crate::documents::PrimaryKey; use crate::update::concurrent_available_ids::ConcurrentAvailableIds; use crate::update::new::document::{DocumentFromVersions, Versions}; use crate::update::new::{DocumentChange, Insertion}; use crate::{Error, InternalError, Result, UserError}; pub struct PartialDump { iter: I, } impl PartialDump { pub fn new_from_jsonlines(iter: I) -> Self { PartialDump { iter } } pub fn into_changes<'index>( self, concurrent_available_ids: &'index ConcurrentAvailableIds, primary_key: &'index PrimaryKey, ) -> PartialDumpChanges<'index, I> { /// Note for future self: /// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items). PartialDumpChanges { iter: self.iter, concurrent_available_ids, primary_key } } } pub struct PartialDumpChanges<'doc, I> { iter: I, concurrent_available_ids: &'doc ConcurrentAvailableIds, primary_key: &'doc PrimaryKey<'doc>, } impl<'index, Iter> DocumentChanges<'index> for PartialDumpChanges<'index, Iter> where Iter: IndexedParallelIterator> + Clone + Sync + 'index, { type Item = Box; fn iter( &self, chunk_size: usize, ) -> impl IndexedParallelIterator> { self.iter.clone().chunks(chunk_size) } fn item_to_document_change<'doc, T: MostlySend + 'doc>( &'doc self, context: &'doc DocumentChangeContext, document: &'doc Self::Item, ) -> Result>> where 'index: 'doc, { let doc_alloc = &context.doc_alloc; let docid = match self.concurrent_available_ids.next() { Some(id) => id, None => return Err(Error::UserError(UserError::DocumentLimitReached)), }; let mut fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let fields_ids_map = fields_ids_map.deref_mut(); let document = doc_alloc.alloc_str(document.get()); let document: &RawValue = unsafe { std::mem::transmute(document) }; let external_document_id = self.primary_key.extract_fields_and_docid(document, fields_ids_map, doc_alloc)?; let external_document_id = external_document_id.to_de(); let document = raw_collections::RawMap::from_raw_value(document, doc_alloc) .map_err(InternalError::SerdeJson)?; let document = DocumentFromVersions::new(Versions::single(document)); let insertion = Insertion::create(docid, external_document_id, document); Ok(Some(DocumentChange::Insertion(insertion))) } }