fix race condition in document addition

This commit is contained in:
mpostma 2021-01-09 13:26:23 +01:00
parent fa40c6e3d4
commit 430a5f902b
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
4 changed files with 84 additions and 57 deletions

View File

@ -23,6 +23,8 @@ pub struct DocumentsAddition<D> {
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: UpdateEventsEmitter, updates_notifier: UpdateEventsEmitter,
// Whether the user explicitely set the primary key in the update
primary_key: Option<String>,
documents: Vec<D>, documents: Vec<D>,
is_partial: bool, is_partial: bool,
} }
@ -39,6 +41,7 @@ impl<D> DocumentsAddition<D> {
updates_notifier, updates_notifier,
documents: Vec::new(), documents: Vec::new(),
is_partial: false, is_partial: false,
primary_key: None,
} }
} }
@ -53,9 +56,14 @@ impl<D> DocumentsAddition<D> {
updates_notifier, updates_notifier,
documents: Vec::new(), documents: Vec::new(),
is_partial: true, is_partial: true,
primary_key: None,
} }
} }
pub fn set_primary_key(&mut self, primary_key: String) {
self.primary_key = Some(primary_key);
}
pub fn update_document(&mut self, document: D) { pub fn update_document(&mut self, document: D) {
self.documents.push(document); self.documents.push(document);
} }
@ -71,6 +79,7 @@ impl<D> DocumentsAddition<D> {
self.updates_results_store, self.updates_results_store,
self.documents, self.documents,
self.is_partial, self.is_partial,
self.primary_key,
)?; )?;
Ok(update_id) Ok(update_id)
} }
@ -88,6 +97,7 @@ pub fn push_documents_addition<D: serde::Serialize>(
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
addition: Vec<D>, addition: Vec<D>,
is_partial: bool, is_partial: bool,
primary_key: Option<String>,
) -> MResult<u64> { ) -> MResult<u64> {
let mut values = Vec::with_capacity(addition.len()); let mut values = Vec::with_capacity(addition.len());
for add in addition { for add in addition {
@ -99,9 +109,9 @@ pub fn push_documents_addition<D: serde::Serialize>(
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = if is_partial { let update = if is_partial {
Update::documents_partial(values) Update::documents_partial(primary_key, values)
} else { } else {
Update::documents_addition(values) Update::documents_addition(primary_key, values)
}; };
updates_store.put_update(writer, last_update_id, &update)?; updates_store.put_update(writer, last_update_id, &update)?;
@ -149,7 +159,8 @@ pub fn apply_addition(
writer: &mut heed::RwTxn<MainT>, writer: &mut heed::RwTxn<MainT>,
index: &store::Index, index: &store::Index,
new_documents: Vec<IndexMap<String, Value>>, new_documents: Vec<IndexMap<String, Value>>,
partial: bool partial: bool,
primary_key: Option<String>,
) -> MResult<()> ) -> MResult<()>
{ {
let mut schema = match index.main.schema(writer)? { let mut schema = match index.main.schema(writer)? {
@ -162,7 +173,14 @@ pub fn apply_addition(
let internal_docids = index.main.internal_docids(writer)?; let internal_docids = index.main.internal_docids(writer)?;
let mut available_ids = DiscoverIds::new(&internal_docids); let mut available_ids = DiscoverIds::new(&internal_docids);
let primary_key = schema.primary_key().ok_or(Error::MissingPrimaryKey)?; let primary_key = match schema.primary_key() {
Some(primary_key) => primary_key.to_string(),
None => {
let name = primary_key.ok_or(Error::MissingPrimaryKey)?;
schema.set_primary_key(&name)?;
name
}
};
// 1. store documents ids for future deletion // 1. store documents ids for future deletion
let mut documents_additions = HashMap::new(); let mut documents_additions = HashMap::new();
@ -275,16 +293,18 @@ pub fn apply_documents_partial_addition(
writer: &mut heed::RwTxn<MainT>, writer: &mut heed::RwTxn<MainT>,
index: &store::Index, index: &store::Index,
new_documents: Vec<IndexMap<String, Value>>, new_documents: Vec<IndexMap<String, Value>>,
primary_key: Option<String>,
) -> MResult<()> { ) -> MResult<()> {
apply_addition(writer, index, new_documents, true) apply_addition(writer, index, new_documents, true, primary_key)
} }
pub fn apply_documents_addition( pub fn apply_documents_addition(
writer: &mut heed::RwTxn<MainT>, writer: &mut heed::RwTxn<MainT>,
index: &store::Index, index: &store::Index,
new_documents: Vec<IndexMap<String, Value>>, new_documents: Vec<IndexMap<String, Value>>,
primary_key: Option<String>,
) -> MResult<()> { ) -> MResult<()> {
apply_addition(writer, index, new_documents, false) apply_addition(writer, index, new_documents, false, primary_key)
} }
pub fn reindex_all_documents(writer: &mut heed::RwTxn<MainT>, index: &store::Index) -> MResult<()> { pub fn reindex_all_documents(writer: &mut heed::RwTxn<MainT>, index: &store::Index) -> MResult<()> {

View File

@ -52,16 +52,16 @@ impl Update {
} }
} }
fn documents_addition(documents: Vec<IndexMap<String, Value>>) -> Update { fn documents_addition(primary_key: Option<String>, documents: Vec<IndexMap<String, Value>>) -> Update {
Update { Update {
data: UpdateData::DocumentsAddition(documents), data: UpdateData::DocumentsAddition{ documents, primary_key },
enqueued_at: Utc::now(), enqueued_at: Utc::now(),
} }
} }
fn documents_partial(documents: Vec<IndexMap<String, Value>>) -> Update { fn documents_partial(primary_key: Option<String>, documents: Vec<IndexMap<String, Value>>) -> Update {
Update { Update {
data: UpdateData::DocumentsPartial(documents), data: UpdateData::DocumentsPartial{ documents, primary_key },
enqueued_at: Utc::now(), enqueued_at: Utc::now(),
} }
} }
@ -85,8 +85,15 @@ impl Update {
pub enum UpdateData { pub enum UpdateData {
ClearAll, ClearAll,
Customs(Vec<u8>), Customs(Vec<u8>),
DocumentsAddition(Vec<IndexMap<String, Value>>), // (primary key, documents)
DocumentsPartial(Vec<IndexMap<String, Value>>), DocumentsAddition {
primary_key: Option<String>,
documents: Vec<IndexMap<String, Value>>
},
DocumentsPartial {
primary_key: Option<String>,
documents: Vec<IndexMap<String, Value>>,
},
DocumentsDeletion(Vec<String>), DocumentsDeletion(Vec<String>),
Settings(Box<SettingsUpdate>) Settings(Box<SettingsUpdate>)
} }
@ -96,11 +103,11 @@ impl UpdateData {
match self { match self {
UpdateData::ClearAll => UpdateType::ClearAll, UpdateData::ClearAll => UpdateType::ClearAll,
UpdateData::Customs(_) => UpdateType::Customs, UpdateData::Customs(_) => UpdateType::Customs,
UpdateData::DocumentsAddition(addition) => UpdateType::DocumentsAddition { UpdateData::DocumentsAddition{ documents, .. } => UpdateType::DocumentsAddition {
number: addition.len(), number: documents.len(),
}, },
UpdateData::DocumentsPartial(addition) => UpdateType::DocumentsPartial { UpdateData::DocumentsPartial{ documents, .. } => UpdateType::DocumentsPartial {
number: addition.len(), number: documents.len(),
}, },
UpdateData::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion { UpdateData::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion {
number: deletion.len(), number: deletion.len(),
@ -239,25 +246,25 @@ pub fn update_task(
(update_type, result, start.elapsed()) (update_type, result, start.elapsed())
} }
UpdateData::DocumentsAddition(documents) => { UpdateData::DocumentsAddition { documents, primary_key } => {
let start = Instant::now(); let start = Instant::now();
let update_type = UpdateType::DocumentsAddition { let update_type = UpdateType::DocumentsAddition {
number: documents.len(), number: documents.len(),
}; };
let result = apply_documents_addition(writer, index, documents); let result = apply_documents_addition(writer, index, documents, primary_key);
(update_type, result, start.elapsed()) (update_type, result, start.elapsed())
} }
UpdateData::DocumentsPartial(documents) => { UpdateData::DocumentsPartial{ documents, primary_key } => {
let start = Instant::now(); let start = Instant::now();
let update_type = UpdateType::DocumentsPartial { let update_type = UpdateType::DocumentsPartial {
number: documents.len(), number: documents.len(),
}; };
let result = apply_documents_partial_addition(writer, index, documents); let result = apply_documents_partial_addition(writer, index, documents, primary_key);
(update_type, result, start.elapsed()) (update_type, result, start.elapsed())
} }

View File

@ -130,13 +130,13 @@ fn import_index_v1(
// if buffer is full, create and apply a batch, and clean buffer // if buffer is full, create and apply a batch, and clean buffer
if values.len() == document_batch_size { if values.len() == document_batch_size {
let batch = std::mem::replace(&mut values, Vec::with_capacity(document_batch_size)); let batch = std::mem::replace(&mut values, Vec::with_capacity(document_batch_size));
apply_documents_addition(write_txn, &index, batch)?; apply_documents_addition(write_txn, &index, batch, None)?;
} }
} }
// apply documents remaining in the buffer // apply documents remaining in the buffer
if !values.is_empty() { if !values.is_empty() {
apply_documents_addition(write_txn, &index, values)?; apply_documents_addition(write_txn, &index, values, None)?;
} }
// sync index information: stats, updated_at, last_update // sync index information: stats, updated_at, last_update
@ -289,7 +289,6 @@ fn dump_index_documents(data: &web::Data<Data>, reader: &MainReader, dir_path: &
/// Write error with a context. /// Write error with a context.
fn fail_dump_process<E: std::error::Error>(data: &web::Data<Data>, dump_info: DumpInfo, context: &str, error: E) { fn fail_dump_process<E: std::error::Error>(data: &web::Data<Data>, dump_info: DumpInfo, context: &str, error: E) {
let error_message = format!("{}; {}", context, error); let error_message = format!("{}; {}", context, error);
error!("Something went wrong during dump process: {}", &error_message); error!("Something went wrong during dump process: {}", &error_message);
data.set_current_dump_info(dump_info.with_error(Error::dump_failed(error_message).into())) data.set_current_dump_info(dump_info.with_error(Error::dump_failed(error_message).into()))
} }

View File

@ -145,15 +145,6 @@ async fn get_all_documents(
Ok(HttpResponse::Ok().json(documents)) Ok(HttpResponse::Ok().json(documents))
} }
fn find_primary_key(document: &IndexMap<String, Value>) -> Option<String> {
for key in document.keys() {
if key.to_lowercase().contains("id") {
return Some(key.to_string());
}
}
None
}
#[derive(Deserialize)] #[derive(Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
struct UpdateDocumentsQuery { struct UpdateDocumentsQuery {
@ -168,26 +159,6 @@ async fn update_multiple_documents(
is_partial: bool, is_partial: bool,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let update_id = data.get_or_create_index(&path.index_uid, |index| { let update_id = data.get_or_create_index(&path.index_uid, |index| {
let reader = data.db.main_read_txn()?;
let mut schema = index
.main
.schema(&reader)?
.ok_or(meilisearch_core::Error::SchemaMissing)?;
if schema.primary_key().is_none() {
let id = match &params.primary_key {
Some(id) => id.to_string(),
None => body
.first()
.and_then(find_primary_key)
.ok_or(meilisearch_core::Error::MissingPrimaryKey)?,
};
schema.set_primary_key(&id).map_err(Error::bad_request)?;
data.db.main_write(|w| index.main.put_schema(w, &schema))?;
}
let mut document_addition = if is_partial { let mut document_addition = if is_partial {
index.documents_partial_addition() index.documents_partial_addition()
@ -195,6 +166,27 @@ async fn update_multiple_documents(
index.documents_addition() index.documents_addition()
}; };
// Return an early error if primary key is already set, otherwise, try to set it up in the
// update later.
let reader = data.db.main_read_txn()?;
let schema = index
.main
.schema(&reader)?
.ok_or(meilisearch_core::Error::SchemaMissing)?;
match (params.into_inner().primary_key, schema.primary_key()) {
(Some(_), Some(_)) => return Err(meilisearch_schema::Error::PrimaryKeyAlreadyPresent)?,
(Some(key), None) => document_addition.set_primary_key(key),
(None, None) => {
let key = body
.first()
.and_then(find_primary_key)
.ok_or(meilisearch_core::Error::MissingPrimaryKey)?;
document_addition.set_primary_key(key);
}
(None, Some(_)) => ()
}
for document in body.into_inner() { for document in body.into_inner() {
document_addition.update_document(document); document_addition.update_document(document);
} }
@ -204,6 +196,15 @@ async fn update_multiple_documents(
return Ok(HttpResponse::Accepted().json(IndexUpdateResponse::with_id(update_id))); return Ok(HttpResponse::Accepted().json(IndexUpdateResponse::with_id(update_id)));
} }
fn find_primary_key(document: &IndexMap<String, Value>) -> Option<String> {
for key in document.keys() {
if key.to_lowercase().contains("id") {
return Some(key.to_string());
}
}
None
}
#[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] #[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")]
async fn add_documents( async fn add_documents(
data: web::Data<Data>, data: web::Data<Data>,