Merge pull request #253 from meilisearch/fix-updates-system

Fix the updates system
This commit is contained in:
Clément Renault 2019-11-04 13:46:37 +01:00 committed by GitHub
commit 70589c136f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 207 additions and 57 deletions

View File

@ -12,7 +12,7 @@ crossbeam-channel = "0.3.9"
deunicode = "1.0.0"
env_logger = "0.7.0"
hashbrown = { version = "0.6.0", features = ["serde"] }
heed = "0.3.0"
heed = "0.5.0"
log = "0.4.8"
meilidb-schema = { path = "../meilidb-schema", version = "0.6.0" }
meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.6.0" }

View File

@ -7,7 +7,7 @@ use std::{fs, thread};
use crossbeam_channel::Receiver;
use heed::types::{Str, Unit};
use heed::{CompactionOption, Result as ZResult};
use log::{debug, error};
use log::debug;
use crate::{store, update, Index, MResult};
@ -21,43 +21,62 @@ pub struct Database {
indexes: RwLock<HashMap<String, (Index, Arc<ArcSwapFn>, thread::JoinHandle<()>)>>,
}
macro_rules! r#break_try {
($expr:expr, $msg:tt) => {
match $expr {
core::result::Result::Ok(val) => val,
core::result::Result::Err(err) => {
log::error!(concat!($msg, ": {}"), err);
break;
}
}
};
}
fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc<ArcSwapFn>, index: Index) {
for () in receiver {
// consume all updates in order (oldest first)
loop {
let mut writer = match env.write_txn() {
Ok(writer) => writer,
Err(e) => {
error!("LMDB writer transaction begin failed: {}", e);
break;
}
};
// instantiate a main/parent transaction
let mut writer = break_try!(env.write_txn(), "LMDB write transaction begin failed");
match update::update_task(&mut writer, index.clone()) {
Ok(Some(status)) => {
match status.result {
Ok(_) => {
if let Err(e) = writer.commit() {
error!("update transaction failed: {}", e)
}
}
Err(_) => writer.abort(),
}
if let Some(ref callback) = *update_fn.load() {
(callback)(status);
}
}
// no more updates to handle for now
Ok(None) => {
// retrieve the update that needs to be processed
let result = index.updates.pop_front(&mut writer);
let (update_id, update) = match break_try!(result, "pop front update failed") {
Some(value) => value,
None => {
debug!("no more updates");
writer.abort();
break;
}
Err(e) => {
error!("update task failed: {}", e);
writer.abort()
}
};
// instantiate a nested transaction
let result = env.nested_write_txn(&mut writer);
let mut nested_writer = break_try!(result, "LMDB nested write transaction failed");
// try to apply the update to the database using the nested transaction
let result = update::update_task(&mut nested_writer, index.clone(), update_id, update);
let status = break_try!(result, "update task failed");
// commit the nested transaction if the update was successful, abort it otherwise
if status.result.is_ok() {
break_try!(nested_writer.commit(), "commit nested transaction failed");
} else {
nested_writer.abort()
}
// write the result of the update in the updates-results store
let updates_results = index.updates_results;
let result = updates_results.put_update_result(&mut writer, update_id, &status);
// always commit the main/parent transaction, even if the update was unsuccessful
break_try!(result, "update result store commit failed");
break_try!(writer.commit(), "update parent transaction failed");
// call the user callback when the update and the result are written consistently
if let Some(ref callback) = *update_fn.load() {
(callback)(status);
}
}
}
@ -203,3 +222,141 @@ impl Database {
self.common_store
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::update::{ProcessedUpdateResult, UpdateStatus};
use std::sync::mpsc;
#[test]
fn valid_updates() {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
let index = database.create_index("test").unwrap();
let done = database.set_update_callback("test", Box::new(update_fn));
assert!(done, "could not set the index update function");
let schema = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
"#;
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
// don't forget to commit...
writer.commit().unwrap();
let mut additions = index.documents_addition();
let doc1 = serde_json::json!({
"id": 123,
"name": "Marvin",
"description": "My name is Marvin",
});
let doc2 = serde_json::json!({
"id": 234,
"name": "Kevin",
"description": "My name is Kevin",
});
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
// don't forget to commit...
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
}
#[test]
fn invalid_updates() {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
let index = database.create_index("test").unwrap();
let done = database.set_update_callback("test", Box::new(update_fn));
assert!(done, "could not set the index update function");
let schema = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
"#;
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
// don't forget to commit...
writer.commit().unwrap();
let mut additions = index.documents_addition();
let doc1 = serde_json::json!({
"id": 123,
"name": "Marvin",
"description": "My name is Marvin",
});
let doc2 = serde_json::json!({
"name": "Kevin",
"description": "My name is Kevin",
});
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
// don't forget to commit...
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_err());
}
}

View File

@ -7,8 +7,8 @@ use crate::{DocumentId, RankedMap};
use super::{ConvertToNumber, ConvertToString, Indexer, SerializerError};
pub struct Serializer<'a> {
pub txn: &'a mut heed::RwTxn,
pub struct Serializer<'a, 'b> {
pub txn: &'a mut heed::RwTxn<'b>,
pub schema: &'a Schema,
pub document_store: DocumentsFields,
pub document_fields_counts: DocumentsFieldsCounts,
@ -17,15 +17,15 @@ pub struct Serializer<'a> {
pub document_id: DocumentId,
}
impl<'a> ser::Serializer for Serializer<'a> {
impl<'a, 'b> ser::Serializer for Serializer<'a, 'b> {
type Ok = ();
type Error = SerializerError;
type SerializeSeq = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTuple = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleStruct = ser::Impossible<Self::Ok, Self::Error>;
type SerializeTupleVariant = ser::Impossible<Self::Ok, Self::Error>;
type SerializeMap = MapSerializer<'a>;
type SerializeStruct = StructSerializer<'a>;
type SerializeMap = MapSerializer<'a, 'b>;
type SerializeStruct = StructSerializer<'a, 'b>;
type SerializeStructVariant = ser::Impossible<Self::Ok, Self::Error>;
forward_to_unserializable_type! {
@ -190,8 +190,8 @@ impl<'a> ser::Serializer for Serializer<'a> {
}
}
pub struct MapSerializer<'a> {
txn: &'a mut heed::RwTxn,
pub struct MapSerializer<'a, 'b> {
txn: &'a mut heed::RwTxn<'b>,
schema: &'a Schema,
document_id: DocumentId,
document_store: DocumentsFields,
@ -201,7 +201,7 @@ pub struct MapSerializer<'a> {
current_key_name: Option<String>,
}
impl<'a> ser::SerializeMap for MapSerializer<'a> {
impl<'a, 'b> ser::SerializeMap for MapSerializer<'a, 'b> {
type Ok = ();
type Error = SerializerError;
@ -253,8 +253,8 @@ impl<'a> ser::SerializeMap for MapSerializer<'a> {
}
}
pub struct StructSerializer<'a> {
txn: &'a mut heed::RwTxn,
pub struct StructSerializer<'a, 'b> {
txn: &'a mut heed::RwTxn<'b>,
schema: &'a Schema,
document_id: DocumentId,
document_store: DocumentsFields,
@ -263,7 +263,7 @@ pub struct StructSerializer<'a> {
ranked_map: &'a mut RankedMap,
}
impl<'a> ser::SerializeStruct for StructSerializer<'a> {
impl<'a, 'b> ser::SerializeStruct for StructSerializer<'a, 'b> {
type Ok = ();
type Error = SerializerError;

View File

@ -77,8 +77,8 @@ pub fn push_documents_addition<D: serde::Serialize>(
Ok(last_update_id)
}
pub fn apply_documents_addition(
writer: &mut heed::RwTxn,
pub fn apply_documents_addition<'a, 'b>(
writer: &'a mut heed::RwTxn<'b>,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,

View File

@ -149,15 +149,12 @@ pub fn next_update_id(
Ok(new_update_id)
}
pub fn update_task(
writer: &mut heed::RwTxn,
pub fn update_task<'a, 'b>(
writer: &'a mut heed::RwTxn<'b>,
index: store::Index,
) -> MResult<Option<ProcessedUpdateResult>> {
let (update_id, update) = match index.updates.pop_front(writer)? {
Some(value) => value,
None => return Ok(None),
};
update_id: u64,
update: Update,
) -> MResult<ProcessedUpdateResult> {
debug!("Processing update number {}", update_id);
let (update_type, result, duration) = match update {
@ -308,9 +305,5 @@ pub fn update_task(
detailed_duration,
};
index
.updates_results
.put_update_result(writer, update_id, &status)?;
Ok(Some(status))
Ok(status)
}

View File

@ -13,7 +13,7 @@ chrono = { version = "0.4.9", features = ["serde"] }
crossbeam-channel = "0.3.9"
envconfig = "0.5.1"
envconfig_derive = "0.5.1"
heed = "0.3.0"
heed = "0.5.0"
http = "0.1.19"
indexmap = { version = "1.3.0", features = ["serde-1"] }
jemallocator = "0.3.2"