mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 18:17:39 +08:00
external changes
This commit is contained in:
parent
6ad3f57bc1
commit
c01ee7b732
@ -5,9 +5,8 @@ use roaring::RoaringBitmap;
|
|||||||
|
|
||||||
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
|
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
|
||||||
use crate::documents::PrimaryKey;
|
use crate::documents::PrimaryKey;
|
||||||
use crate::index::db_name::EXTERNAL_DOCUMENTS_IDS;
|
|
||||||
use crate::update::new::{Deletion, DocumentChange};
|
use crate::update::new::{Deletion, DocumentChange};
|
||||||
use crate::{DocumentId, InternalError, Result};
|
use crate::{DocumentId, Result};
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct DocumentDeletion {
|
pub struct DocumentDeletion {
|
||||||
@ -61,12 +60,15 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
|
|||||||
'pl: 'doc, // the payload must survive the process calls
|
'pl: 'doc, // the payload must survive the process calls
|
||||||
{
|
{
|
||||||
let current = context.index.document(&context.txn, docid)?;
|
let current = context.index.document(&context.txn, docid)?;
|
||||||
let new_fields_ids_map = context.new_fields_ids_map.borrow();
|
|
||||||
let new_fields_ids_map = new_fields_ids_map.local_map();
|
let external_document_id = self.primary_key.extract_docid_from_db(
|
||||||
let external_document_id =
|
current,
|
||||||
self.primary_key.document_id(current, new_fields_ids_map)?.map_err(|_| {
|
&context.db_fields_ids_map,
|
||||||
InternalError::DatabaseMissingEntry { db_name: EXTERNAL_DOCUMENTS_IDS, key: None }
|
&context.doc_alloc,
|
||||||
})?;
|
)?;
|
||||||
|
|
||||||
|
let external_document_id = external_document_id.to_bump(&context.doc_alloc);
|
||||||
|
|
||||||
Ok(DocumentChange::Deletion(Deletion::create(docid, external_document_id)))
|
Ok(DocumentChange::Deletion(Deletion::create(docid, external_document_id)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,10 +11,10 @@ use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend
|
|||||||
use crate::documents::{DocumentIdExtractionError, PrimaryKey};
|
use crate::documents::{DocumentIdExtractionError, PrimaryKey};
|
||||||
use crate::update::new::document::DocumentFromVersions;
|
use crate::update::new::document::DocumentFromVersions;
|
||||||
use crate::update::new::document_change::Versions;
|
use crate::update::new::document_change::Versions;
|
||||||
use crate::update::new::indexer::de::DocumentVisitor;
|
use crate::update::new::indexer::de::FieldAndDocidExtractor;
|
||||||
use crate::update::new::{Deletion, Insertion, Update};
|
use crate::update::new::{Deletion, Insertion, Update};
|
||||||
use crate::update::{AvailableIds, IndexDocumentsMethod};
|
use crate::update::{AvailableIds, IndexDocumentsMethod};
|
||||||
use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError};
|
use crate::{external_documents_ids, DocumentId, Error, FieldsIdsMap, Index, Result, UserError};
|
||||||
|
|
||||||
pub struct DocumentOperation<'pl> {
|
pub struct DocumentOperation<'pl> {
|
||||||
operations: Vec<Payload<'pl>>,
|
operations: Vec<Payload<'pl>>,
|
||||||
@ -98,7 +98,7 @@ impl<'pl> DocumentOperation<'pl> {
|
|||||||
iter.next().transpose().map_err(UserError::SerdeJson)?
|
iter.next().transpose().map_err(UserError::SerdeJson)?
|
||||||
{
|
{
|
||||||
let res = document
|
let res = document
|
||||||
.deserialize_map(DocumentVisitor::new(
|
.deserialize_map(FieldAndDocidExtractor::new(
|
||||||
new_fields_ids_map,
|
new_fields_ids_map,
|
||||||
primary_key,
|
primary_key,
|
||||||
indexer,
|
indexer,
|
||||||
@ -122,6 +122,8 @@ impl<'pl> DocumentOperation<'pl> {
|
|||||||
}
|
}
|
||||||
}?;
|
}?;
|
||||||
|
|
||||||
|
let external_document_id = external_document_id.to_de();
|
||||||
|
|
||||||
let current_offset = iter.byte_offset();
|
let current_offset = iter.byte_offset();
|
||||||
let document_operation = InnerDocOp::Addition(DocumentOffset {
|
let document_operation = InnerDocOp::Addition(DocumentOffset {
|
||||||
content: &payload[previous_offset..current_offset],
|
content: &payload[previous_offset..current_offset],
|
||||||
@ -310,23 +312,14 @@ impl MergeChanges for MergeDocumentForReplacement {
|
|||||||
let document = DocumentFromVersions::new(Versions::Single(document));
|
let document = DocumentFromVersions::new(Versions::Single(document));
|
||||||
|
|
||||||
if is_new {
|
if is_new {
|
||||||
Ok(DocumentChange::Insertion(Insertion::create(
|
Ok(DocumentChange::Insertion(Insertion::create(docid, external_doc, document)))
|
||||||
docid,
|
|
||||||
external_doc.to_owned(),
|
|
||||||
document,
|
|
||||||
)))
|
|
||||||
} else {
|
} else {
|
||||||
Ok(DocumentChange::Update(Update::create(
|
Ok(DocumentChange::Update(Update::create(docid, external_doc, document, true)))
|
||||||
docid,
|
|
||||||
external_doc.to_owned(),
|
|
||||||
document,
|
|
||||||
true,
|
|
||||||
)))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(InnerDocOp::Deletion) => {
|
Some(InnerDocOp::Deletion) => {
|
||||||
let deletion = if is_new {
|
let deletion = if is_new {
|
||||||
Deletion::create(docid, external_doc.to_owned())
|
Deletion::create(docid, external_doc)
|
||||||
} else {
|
} else {
|
||||||
todo!("Do that with Louis")
|
todo!("Do that with Louis")
|
||||||
};
|
};
|
||||||
@ -373,7 +366,7 @@ impl MergeChanges for MergeDocumentForUpdates {
|
|||||||
|
|
||||||
if operations.is_empty() {
|
if operations.is_empty() {
|
||||||
let deletion = if !is_new {
|
let deletion = if !is_new {
|
||||||
Deletion::create(docid, external_docid.to_owned())
|
Deletion::create(docid, external_docid)
|
||||||
} else {
|
} else {
|
||||||
todo!("Do that with Louis")
|
todo!("Do that with Louis")
|
||||||
};
|
};
|
||||||
@ -408,15 +401,11 @@ impl MergeChanges for MergeDocumentForUpdates {
|
|||||||
let document = DocumentFromVersions::new(versions);
|
let document = DocumentFromVersions::new(versions);
|
||||||
|
|
||||||
if is_new {
|
if is_new {
|
||||||
Ok(DocumentChange::Insertion(Insertion::create(
|
Ok(DocumentChange::Insertion(Insertion::create(docid, external_docid, document)))
|
||||||
docid,
|
|
||||||
external_docid.to_owned(),
|
|
||||||
document,
|
|
||||||
)))
|
|
||||||
} else {
|
} else {
|
||||||
Ok(DocumentChange::Update(Update::create(
|
Ok(DocumentChange::Update(Update::create(
|
||||||
docid,
|
docid,
|
||||||
external_docid.to_owned(),
|
external_docid,
|
||||||
document,
|
document,
|
||||||
has_deletion,
|
has_deletion,
|
||||||
)))
|
)))
|
||||||
|
@ -4,7 +4,7 @@ use rayon::iter::IndexedParallelIterator;
|
|||||||
use serde::Deserializer;
|
use serde::Deserializer;
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
|
|
||||||
use super::de::DocumentVisitor;
|
use super::de::FieldAndDocidExtractor;
|
||||||
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
|
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
|
||||||
use crate::documents::{DocumentIdExtractionError, PrimaryKey};
|
use crate::documents::{DocumentIdExtractionError, PrimaryKey};
|
||||||
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
|
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
|
||||||
@ -66,36 +66,20 @@ where
|
|||||||
let mut fields_ids_map = context.new_fields_ids_map.borrow_mut();
|
let mut fields_ids_map = context.new_fields_ids_map.borrow_mut();
|
||||||
let fields_ids_map = fields_ids_map.deref_mut();
|
let fields_ids_map = fields_ids_map.deref_mut();
|
||||||
|
|
||||||
let res = document
|
|
||||||
.deserialize_map(DocumentVisitor::new(fields_ids_map, self.primary_key, doc_alloc))
|
|
||||||
.map_err(UserError::SerdeJson)?;
|
|
||||||
|
|
||||||
let external_document_id = match res {
|
|
||||||
Ok(document_id) => Ok(document_id),
|
|
||||||
Err(DocumentIdExtractionError::InvalidDocumentId(e)) => Err(e),
|
|
||||||
Err(DocumentIdExtractionError::MissingDocumentId) => {
|
|
||||||
Err(UserError::MissingDocumentId {
|
|
||||||
primary_key: self.primary_key.name().to_string(),
|
|
||||||
document: serde_json::from_str(document.get()).unwrap(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => {
|
|
||||||
Err(UserError::TooManyDocumentIds {
|
|
||||||
primary_key: self.primary_key.name().to_string(),
|
|
||||||
document: serde_json::from_str(document.get()).unwrap(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}?;
|
|
||||||
let document = doc_alloc.alloc_str(document.get());
|
let document = doc_alloc.alloc_str(document.get());
|
||||||
let document: &RawValue = unsafe { std::mem::transmute(document) };
|
let document: &RawValue = unsafe { std::mem::transmute(document) };
|
||||||
|
|
||||||
|
let external_document_id =
|
||||||
|
self.primary_key.extract_fields_and_docid(document, fields_ids_map, doc_alloc)?;
|
||||||
|
let external_document_id = external_document_id.to_de();
|
||||||
|
|
||||||
let document = raw_collections::RawMap::from_raw_value(document, doc_alloc)
|
let document = raw_collections::RawMap::from_raw_value(document, doc_alloc)
|
||||||
.map_err(InternalError::SerdeJson)?;
|
.map_err(InternalError::SerdeJson)?;
|
||||||
|
|
||||||
let document = document.into_bump_slice();
|
let document = document.into_bump_slice();
|
||||||
let document = DocumentFromVersions::new(Versions::Single(document));
|
let document = DocumentFromVersions::new(Versions::Single(document));
|
||||||
|
|
||||||
let insertion = Insertion::create(docid, external_document_id.to_owned(), document);
|
let insertion = Insertion::create(docid, external_document_id, document);
|
||||||
Ok(DocumentChange::Insertion(insertion))
|
Ok(DocumentChange::Insertion(insertion))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user