Generate internal ids only when needed

This commit is contained in:
Clément Renault 2024-11-19 14:46:19 +01:00
parent 6641c3f59b
commit 23f0c2c29b
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 91 additions and 112 deletions

View File

@ -1752,7 +1752,7 @@ pub(crate) mod tests {
let embedders = EmbeddingConfigs::default();
/// TODO: fetch configs from the index
let mut indexer =
indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
indexer::DocumentOperation::new(self.index_documents_config.update_method);
indexer.add_documents(&documents).unwrap();
let indexer_alloc = Bump::new();
@ -1832,7 +1832,7 @@ pub(crate) mod tests {
let embedders = EmbeddingConfigs::default();
let mut indexer =
indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
indexer::DocumentOperation::new(self.index_documents_config.update_method);
let external_document_ids: Vec<_> =
external_document_ids.iter().map(AsRef::as_ref).collect();
indexer.delete_documents(external_document_ids.as_slice());
@ -2429,7 +2429,16 @@ pub(crate) mod tests {
// And adding lots of documents afterwards instead of just one.
// These extra subtests don't add much, but it's better than nothing.
index.add_documents(documents!([{ "primary_key": 38 }, { "primary_key": 39 }, { "primary_key": 41 }, { "primary_key": 40 }, { "primary_key": 41 }, { "primary_key": 42 }])).unwrap();
index
.add_documents(documents!([
{ "primary_key": 38 },
{ "primary_key": 39 },
{ "primary_key": 41 },
{ "primary_key": 40 },
{ "primary_key": 41 },
{ "primary_key": 42 },
]))
.unwrap();
db_snap!(index, documents_ids, @"[0, 1, 2, 3, 4, 5, ]");
db_snap!(index, external_documents_ids, 7, @r###"

View File

@ -57,7 +57,9 @@ impl<'pl> DocumentOperation<'pl> {
let mut primary_key = None;
for operation in operations {
let (bytes, document_count, result) = match operation {
let mut bytes = 0;
let mut document_count = 0;
let result = match operation {
Payload::Addition(payload) => extract_addition_payload_changes(
indexer,
index,
@ -66,6 +68,8 @@ impl<'pl> DocumentOperation<'pl> {
&mut primary_key,
new_fields_ids_map,
&mut available_docids,
&mut bytes,
&mut document_count,
&docids_version_offsets,
method,
payload,
@ -74,6 +78,7 @@ impl<'pl> DocumentOperation<'pl> {
index,
rtxn,
&mut available_docids,
&mut document_count,
&docids_version_offsets,
method,
to_delete,
@ -122,67 +127,38 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
primary_key: &mut Option<PrimaryKey<'r>>,
new_fields_ids_map: &mut FieldsIdsMap,
available_docids: &mut AvailableIds,
bytes: &mut u64,
number_of_documents: &mut u64,
main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>,
method: MergeMethod,
payload: &'pl [u8],
) -> (u64, u64, Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>>) {
) -> Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>> {
let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
/// TODO manage the error
let mut previous_offset = 0;
let mut iter = Deserializer::from_slice(payload).into_iter::<&RawValue>();
loop {
let optdoc = match iter.next().transpose() {
Ok(optdoc) => optdoc,
Err(e) => {
return (
payload.len() as u64,
new_docids_version_offsets.len() as u64,
Err(InternalError::SerdeJson(e).into()),
)
}
};
while let Some(doc) = iter.next().transpose().map_err(InternalError::SerdeJson)? {
*bytes = previous_offset as u64;
*number_of_documents = new_docids_version_offsets.len() as u64;
// Only guess the primary key if it is the first document
let retrieved_primary_key = if previous_offset == 0 {
let optdoc = match optdoc {
Some(doc) => match RawMap::from_raw_value(doc, indexer) {
Ok(docmap) => Some(docmap),
Err(error) => {
return (
payload.len() as u64,
new_docids_version_offsets.len() as u64,
Err(Error::UserError(UserError::SerdeJson(error))),
)
}
},
None => None,
};
let doc =
RawMap::from_raw_value(doc, indexer).map(Some).map_err(UserError::SerdeJson)?;
let result = retrieve_or_guess_primary_key(
rtxn,
index,
new_fields_ids_map,
primary_key_from_op,
optdoc,
doc,
);
let (pk, _has_been_changed) = match result {
Ok(Ok(pk)) => pk,
Ok(Err(user_error)) => {
return (
payload.len() as u64,
new_docids_version_offsets.len() as u64,
Err(Error::UserError(user_error)),
)
}
Err(error) => {
return (
payload.len() as u64,
new_docids_version_offsets.len() as u64,
Err(error),
)
}
Ok(Err(user_error)) => return Err(Error::UserError(user_error)),
Err(error) => return Err(error),
};
primary_key.get_or_insert(pk)
@ -190,20 +166,13 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
primary_key.as_ref().unwrap()
};
let doc = match optdoc {
Some(doc) => doc,
None => break,
};
let external_id = match retrieved_primary_key.extract_fields_and_docid(
doc,
new_fields_ids_map,
indexer,
) {
Ok(edi) => edi,
Err(e) => {
return (payload.len() as u64, new_docids_version_offsets.len() as u64, Err(e))
}
Err(e) => return Err(e),
};
let external_id = external_id.to_de();
@ -212,40 +181,38 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
match main_docids_version_offsets.get(external_id) {
None => {
let (docid, is_new) = match index.external_documents_ids().get(rtxn, external_id) {
Ok(Some(docid)) => (docid, false),
Ok(None) => (
match available_docids.next() {
Some(docid) => docid,
None => {
return (
payload.len() as u64,
new_docids_version_offsets.len() as u64,
Err(UserError::DocumentLimitReached.into()),
)
}
},
true,
),
Err(e) => {
return (
payload.len() as u64,
new_docids_version_offsets.len() as u64,
Err(e.into()),
)
}
};
match new_docids_version_offsets.entry(external_id) {
Entry::Occupied(mut entry) => entry.get_mut().push_addition(document_offset),
Entry::Vacant(entry) => {
entry.insert(PayloadOperations::new_addition(
method,
docid,
is_new,
document_offset,
));
}
match index.external_documents_ids().get(rtxn, external_id) {
Ok(Some(docid)) => match new_docids_version_offsets.entry(external_id) {
Entry::Occupied(mut entry) => {
entry.get_mut().push_addition(document_offset)
}
Entry::Vacant(entry) => {
entry.insert(PayloadOperations::new_addition(
method,
docid,
false, // is new
document_offset,
));
}
},
Ok(None) => match new_docids_version_offsets.entry(external_id) {
Entry::Occupied(mut entry) => {
entry.get_mut().push_addition(document_offset)
}
Entry::Vacant(entry) => {
let docid = match available_docids.next() {
Some(docid) => docid,
None => return Err(UserError::DocumentLimitReached.into()),
};
entry.insert(PayloadOperations::new_addition(
method,
docid,
true, // is new
document_offset,
));
}
},
Err(e) => return Err(e.into()),
}
}
Some(payload_operations) => match new_docids_version_offsets.entry(external_id) {
@ -264,46 +231,49 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
previous_offset = iter.byte_offset();
}
(payload.len() as u64, new_docids_version_offsets.len() as u64, Ok(new_docids_version_offsets))
Ok(new_docids_version_offsets)
}
fn extract_deletion_payload_changes<'s, 'pl: 's>(
index: &Index,
rtxn: &RoTxn,
available_docids: &mut AvailableIds,
number_of_documents: &mut u64,
main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
method: MergeMethod,
to_delete: &'pl [&'pl str],
) -> (u64, u64, Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>>) {
) -> Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>> {
let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
let mut document_count = 0;
for external_id in to_delete {
match main_docids_version_offsets.get(external_id) {
None => {
let (docid, is_new) = match index.external_documents_ids().get(rtxn, external_id) {
Ok(Some(docid)) => (docid, false),
Ok(None) => (
match available_docids.next() {
Some(docid) => docid,
None => {
return (
0,
new_docids_version_offsets.len() as u64,
Err(UserError::DocumentLimitReached.into()),
)
match index.external_documents_ids().get(rtxn, external_id) {
Ok(Some(docid)) => {
match new_docids_version_offsets.entry(external_id) {
Entry::Occupied(mut entry) => entry.get_mut().push_deletion(),
Entry::Vacant(entry) => {
entry.insert(PayloadOperations::new_deletion(
method, docid, false, // is new
));
}
},
true,
),
Err(e) => return (0, new_docids_version_offsets.len() as u64, Err(e.into())),
};
match new_docids_version_offsets.entry(external_id) {
Entry::Occupied(mut entry) => entry.get_mut().push_deletion(),
Entry::Vacant(entry) => {
entry.insert(PayloadOperations::new_deletion(method, docid, is_new));
}
}
Ok(None) => {
let docid = match available_docids.next() {
Some(docid) => docid,
None => return Err(UserError::DocumentLimitReached.into()),
};
match new_docids_version_offsets.entry(external_id) {
Entry::Occupied(mut entry) => entry.get_mut().push_deletion(),
Entry::Vacant(entry) => {
entry.insert(PayloadOperations::new_deletion(
method, docid, true, // is new
));
}
}
}
Err(e) => return Err(e.into()),
}
}
Some(payload_operations) => match new_docids_version_offsets.entry(external_id) {
@ -317,10 +287,10 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>(
}
},
}
document_count += 1;
*number_of_documents += 1;
}
(0, document_count, Ok(new_docids_version_offsets))
Ok(new_docids_version_offsets)
}
fn merge_version_offsets<'s, 'pl>(