89 lines
3.1 KiB
Rust
Raw Normal View History

2024-10-03 18:08:09 +02:00
use std::ops::DerefMut;
use rayon::iter::IndexedParallelIterator;
2024-10-03 18:08:09 +02:00
use serde::Deserializer;
use serde_json::value::RawValue;
2024-10-14 15:41:31 +02:00
use super::de::FieldAndDocidExtractor;
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend, RefCellExt};
use crate::documents::{DocumentIdExtractionError, PrimaryKey};
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
2024-10-03 18:08:09 +02:00
use crate::update::new::document::DocumentFromVersions;
use crate::update::new::document_change::Versions;
use crate::update::new::{DocumentChange, Insertion};
use crate::{Error, InternalError, Result, UserError};
2024-09-02 14:42:27 +02:00
pub struct PartialDump<I> {
iter: I,
}
2024-09-02 14:42:27 +02:00
impl<I> PartialDump<I> {
pub fn new_from_jsonlines(iter: I) -> Self {
2024-09-02 14:42:27 +02:00
PartialDump { iter }
}
2024-10-03 18:08:09 +02:00
pub fn into_changes<'index>(
self,
concurrent_available_ids: &'index ConcurrentAvailableIds,
primary_key: &'index PrimaryKey,
) -> PartialDumpChanges<'index, I> {
/// Note for future self:
/// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items).
PartialDumpChanges { iter: self.iter, concurrent_available_ids, primary_key }
}
}
pub struct PartialDumpChanges<'doc, I> {
iter: I,
concurrent_available_ids: &'doc ConcurrentAvailableIds,
primary_key: &'doc PrimaryKey<'doc>,
}
2024-10-03 18:08:09 +02:00
impl<'index, Iter> DocumentChanges<'index> for PartialDumpChanges<'index, Iter>
where
2024-10-03 18:08:09 +02:00
Iter: IndexedParallelIterator<Item = Box<RawValue>> + Clone + Sync + 'index,
{
2024-10-03 18:08:09 +02:00
type Item = Box<RawValue>;
2024-10-16 15:44:04 +02:00
fn iter(
&self,
chunk_size: usize,
) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
self.iter.clone().chunks(chunk_size)
2024-10-03 18:08:09 +02:00
}
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
2024-10-16 15:44:04 +02:00
document: &'doc Self::Item,
2024-10-14 15:41:10 +02:00
) -> Result<Option<DocumentChange<'doc>>>
2024-10-03 18:08:09 +02:00
where
'index: 'doc,
{
let doc_alloc = &context.doc_alloc;
let docid = match self.concurrent_available_ids.next() {
Some(id) => id,
None => return Err(Error::UserError(UserError::DocumentLimitReached)),
};
let mut fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
2024-10-03 18:08:09 +02:00
let fields_ids_map = fields_ids_map.deref_mut();
let document = doc_alloc.alloc_str(document.get());
let document: &RawValue = unsafe { std::mem::transmute(document) };
2024-10-14 15:41:31 +02:00
let external_document_id =
self.primary_key.extract_fields_and_docid(document, fields_ids_map, doc_alloc)?;
let external_document_id = external_document_id.to_de();
2024-10-03 18:08:09 +02:00
let document = raw_collections::RawMap::from_raw_value(document, doc_alloc)
.map_err(InternalError::SerdeJson)?;
2024-10-03 18:08:09 +02:00
let document = document.into_bump_slice();
let document = DocumentFromVersions::new(Versions::Single(document));
2024-10-14 15:41:31 +02:00
let insertion = Insertion::create(docid, external_document_id, document);
2024-10-14 15:41:10 +02:00
Ok(Some(DocumentChange::Insertion(insertion)))
}
}