mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 08:48:32 +08:00
Merge pull request #280 from meilisearch/format-updates-json
Format updates json
This commit is contained in:
commit
6803a8fad0
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -864,6 +864,7 @@ dependencies = [
|
|||||||
"assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"bincode 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"bincode 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"csv 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
"csv 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"deunicode 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"deunicode 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
@ -8,6 +8,7 @@ edition = "2018"
|
|||||||
arc-swap = "0.4.3"
|
arc-swap = "0.4.3"
|
||||||
bincode = "1.1.4"
|
bincode = "1.1.4"
|
||||||
byteorder = "1.3.2"
|
byteorder = "1.3.2"
|
||||||
|
chrono = { version = "0.4.9", features = ["serde"] }
|
||||||
crossbeam-channel = "0.4.0"
|
crossbeam-channel = "0.4.0"
|
||||||
deunicode = "1.0.0"
|
deunicode = "1.0.0"
|
||||||
env_logger = "0.7.0"
|
env_logger = "0.7.0"
|
||||||
|
@ -68,7 +68,7 @@ fn update_awaiter(receiver: UpdateEvents, env: heed::Env, update_fn: Arc<ArcSwap
|
|||||||
let status = break_try!(result, "update task failed");
|
let status = break_try!(result, "update task failed");
|
||||||
|
|
||||||
// commit the nested transaction if the update was successful, abort it otherwise
|
// commit the nested transaction if the update was successful, abort it otherwise
|
||||||
if status.result.is_ok() {
|
if status.error.is_none() {
|
||||||
break_try!(nested_writer.commit(), "commit nested transaction failed");
|
break_try!(nested_writer.commit(), "commit nested transaction failed");
|
||||||
} else {
|
} else {
|
||||||
nested_writer.abort()
|
nested_writer.abort()
|
||||||
@ -323,7 +323,7 @@ mod tests {
|
|||||||
|
|
||||||
let reader = env.read_txn().unwrap();
|
let reader = env.read_txn().unwrap();
|
||||||
let result = index.update_status(&reader, update_id).unwrap();
|
let result = index.update_status(&reader, update_id).unwrap();
|
||||||
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
|
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -384,11 +384,11 @@ mod tests {
|
|||||||
|
|
||||||
let reader = env.read_txn().unwrap();
|
let reader = env.read_txn().unwrap();
|
||||||
let result = index.update_status(&reader, update_id).unwrap();
|
let result = index.update_status(&reader, update_id).unwrap();
|
||||||
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_err());
|
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn ignored_words_to_long() {
|
fn ignored_words_too_long() {
|
||||||
let dir = tempfile::tempdir().unwrap();
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
|
||||||
let database = Database::open_or_create(dir.path()).unwrap();
|
let database = Database::open_or_create(dir.path()).unwrap();
|
||||||
@ -434,7 +434,7 @@ mod tests {
|
|||||||
|
|
||||||
let reader = env.read_txn().unwrap();
|
let reader = env.read_txn().unwrap();
|
||||||
let result = index.update_status(&reader, update_id).unwrap();
|
let result = index.update_status(&reader, update_id).unwrap();
|
||||||
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
|
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -524,7 +524,7 @@ mod tests {
|
|||||||
// check if it has been accepted
|
// check if it has been accepted
|
||||||
let reader = env.read_txn().unwrap();
|
let reader = env.read_txn().unwrap();
|
||||||
let result = index.update_status(&reader, update_id).unwrap();
|
let result = index.update_status(&reader, update_id).unwrap();
|
||||||
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
|
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
|
||||||
reader.abort();
|
reader.abort();
|
||||||
|
|
||||||
let mut additions = index.documents_addition();
|
let mut additions = index.documents_addition();
|
||||||
@ -558,7 +558,7 @@ mod tests {
|
|||||||
// check if it has been accepted
|
// check if it has been accepted
|
||||||
let reader = env.read_txn().unwrap();
|
let reader = env.read_txn().unwrap();
|
||||||
let result = index.update_status(&reader, update_id).unwrap();
|
let result = index.update_status(&reader, update_id).unwrap();
|
||||||
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
|
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
|
||||||
|
|
||||||
// even try to search for a document
|
// even try to search for a document
|
||||||
let results = index.query_builder().query(&reader, "21 ", 0..20).unwrap();
|
let results = index.query_builder().query(&reader, "21 ", 0..20).unwrap();
|
||||||
@ -604,7 +604,7 @@ mod tests {
|
|||||||
// check if it has been accepted
|
// check if it has been accepted
|
||||||
let reader = env.read_txn().unwrap();
|
let reader = env.read_txn().unwrap();
|
||||||
let result = index.update_status(&reader, update_id).unwrap();
|
let result = index.update_status(&reader, update_id).unwrap();
|
||||||
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_err());
|
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -668,7 +668,7 @@ mod tests {
|
|||||||
|
|
||||||
let reader = env.read_txn().unwrap();
|
let reader = env.read_txn().unwrap();
|
||||||
let result = index.update_status(&reader, update_id).unwrap();
|
let result = index.update_status(&reader, update_id).unwrap();
|
||||||
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
|
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
|
||||||
|
|
||||||
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap();
|
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap();
|
||||||
assert!(document.is_none());
|
assert!(document.is_none());
|
||||||
@ -748,7 +748,7 @@ mod tests {
|
|||||||
|
|
||||||
let reader = env.read_txn().unwrap();
|
let reader = env.read_txn().unwrap();
|
||||||
let result = index.update_status(&reader, update_id).unwrap();
|
let result = index.update_status(&reader, update_id).unwrap();
|
||||||
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
|
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
|
||||||
|
|
||||||
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap();
|
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap();
|
||||||
assert!(document.is_none());
|
assert!(document.is_none());
|
||||||
@ -791,7 +791,7 @@ mod tests {
|
|||||||
|
|
||||||
let reader = env.read_txn().unwrap();
|
let reader = env.read_txn().unwrap();
|
||||||
let result = index.update_status(&reader, update_id).unwrap();
|
let result = index.update_status(&reader, update_id).unwrap();
|
||||||
assert_matches!(result, UpdateStatus::Processed(status) if status.result.is_ok());
|
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
|
||||||
|
|
||||||
let document: Option<serde_json::Value> = index
|
let document: Option<serde_json::Value> = index
|
||||||
.document(&reader, None, DocumentId(7900334843754999545))
|
.document(&reader, None, DocumentId(7900334843754999545))
|
||||||
|
@ -221,7 +221,7 @@ impl Index {
|
|||||||
&self,
|
&self,
|
||||||
reader: &heed::RoTxn,
|
reader: &heed::RoTxn,
|
||||||
update_id: u64,
|
update_id: u64,
|
||||||
) -> MResult<update::UpdateStatus> {
|
) -> MResult<Option<update::UpdateStatus>> {
|
||||||
update::update_status(reader, self.updates, self.updates_results, update_id)
|
update::update_status(reader, self.updates, self.updates_results, update_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -234,17 +234,19 @@ impl Index {
|
|||||||
updates.reserve(last_id as usize);
|
updates.reserve(last_id as usize);
|
||||||
|
|
||||||
for id in 0..=last_id {
|
for id in 0..=last_id {
|
||||||
let update = self.update_status(reader, id)?;
|
if let Some(update) = self.update_status(reader, id)? {
|
||||||
updates.push(update);
|
updates.push(update);
|
||||||
last_update_result_id = id;
|
last_update_result_id = id;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// retrieve all enqueued updates
|
// retrieve all enqueued updates
|
||||||
if let Some((last_id, _)) = self.updates.last_update_id(reader)? {
|
if let Some((last_id, _)) = self.updates.last_update_id(reader)? {
|
||||||
for id in last_update_result_id + 1..last_id {
|
for id in last_update_result_id + 1..=last_id {
|
||||||
let update = self.update_status(reader, id)?;
|
if let Some(update) = self.update_status(reader, id)? {
|
||||||
updates.push(update);
|
updates.push(update);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,12 +1,11 @@
|
|||||||
use super::BEU64;
|
use super::BEU64;
|
||||||
use crate::update::ProcessedUpdateResult;
|
use crate::update::ProcessedUpdateResult;
|
||||||
use heed::types::{OwnedType, SerdeBincode};
|
use heed::types::{OwnedType, SerdeJson};
|
||||||
use heed::Result as ZResult;
|
use heed::Result as ZResult;
|
||||||
|
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
pub struct UpdatesResults {
|
pub struct UpdatesResults {
|
||||||
pub(crate) updates_results:
|
pub(crate) updates_results: heed::Database<OwnedType<BEU64>, SerdeJson<ProcessedUpdateResult>>,
|
||||||
heed::Database<OwnedType<BEU64>, SerdeBincode<ProcessedUpdateResult>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UpdatesResults {
|
impl UpdatesResults {
|
||||||
|
@ -26,7 +26,7 @@ pub fn push_clear_all(
|
|||||||
updates_results_store: store::UpdatesResults,
|
updates_results_store: store::UpdatesResults,
|
||||||
) -> MResult<u64> {
|
) -> MResult<u64> {
|
||||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||||
let update = Update::ClearAll;
|
let update = Update::clear_all();
|
||||||
updates_store.put_update(writer, last_update_id, &update)?;
|
updates_store.put_update(writer, last_update_id, &update)?;
|
||||||
|
|
||||||
Ok(last_update_id)
|
Ok(last_update_id)
|
||||||
|
@ -18,7 +18,7 @@ pub fn push_customs_update(
|
|||||||
) -> ZResult<u64> {
|
) -> ZResult<u64> {
|
||||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||||
|
|
||||||
let update = Update::Customs(customs);
|
let update = Update::customs(customs);
|
||||||
updates_store.put_update(writer, last_update_id, &update)?;
|
updates_store.put_update(writer, last_update_id, &update)?;
|
||||||
|
|
||||||
Ok(last_update_id)
|
Ok(last_update_id)
|
||||||
|
@ -91,9 +91,9 @@ pub fn push_documents_addition<D: serde::Serialize>(
|
|||||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||||
|
|
||||||
let update = if is_partial {
|
let update = if is_partial {
|
||||||
Update::DocumentsPartial(values)
|
Update::documents_partial(values)
|
||||||
} else {
|
} else {
|
||||||
Update::DocumentsAddition(values)
|
Update::documents_addition(values)
|
||||||
};
|
};
|
||||||
|
|
||||||
updates_store.put_update(writer, last_update_id, &update)?;
|
updates_store.put_update(writer, last_update_id, &update)?;
|
||||||
|
@ -76,7 +76,7 @@ pub fn push_documents_deletion(
|
|||||||
) -> MResult<u64> {
|
) -> MResult<u64> {
|
||||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||||
|
|
||||||
let update = Update::DocumentsDeletion(deletion);
|
let update = Update::documents_deletion(deletion);
|
||||||
updates_store.put_update(writer, last_update_id, &update)?;
|
updates_store.put_update(writer, last_update_id, &update)?;
|
||||||
|
|
||||||
Ok(last_update_id)
|
Ok(last_update_id)
|
||||||
|
@ -22,8 +22,9 @@ pub use self::synonyms_deletion::{apply_synonyms_deletion, SynonymsDeletion};
|
|||||||
|
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
use heed::Result as ZResult;
|
use heed::Result as ZResult;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -32,7 +33,85 @@ use crate::{store, DocumentId, MResult};
|
|||||||
use meilidb_schema::Schema;
|
use meilidb_schema::Schema;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub enum Update {
|
pub struct Update {
|
||||||
|
data: UpdateData,
|
||||||
|
enqueued_at: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Update {
|
||||||
|
fn clear_all() -> Update {
|
||||||
|
Update {
|
||||||
|
data: UpdateData::ClearAll,
|
||||||
|
enqueued_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn schema(data: Schema) -> Update {
|
||||||
|
Update {
|
||||||
|
data: UpdateData::Schema(data),
|
||||||
|
enqueued_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn customs(data: Vec<u8>) -> Update {
|
||||||
|
Update {
|
||||||
|
data: UpdateData::Customs(data),
|
||||||
|
enqueued_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn documents_addition(data: Vec<HashMap<String, serde_json::Value>>) -> Update {
|
||||||
|
Update {
|
||||||
|
data: UpdateData::DocumentsAddition(data),
|
||||||
|
enqueued_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn documents_partial(data: Vec<HashMap<String, serde_json::Value>>) -> Update {
|
||||||
|
Update {
|
||||||
|
data: UpdateData::DocumentsPartial(data),
|
||||||
|
enqueued_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn documents_deletion(data: Vec<DocumentId>) -> Update {
|
||||||
|
Update {
|
||||||
|
data: UpdateData::DocumentsDeletion(data),
|
||||||
|
enqueued_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn synonyms_addition(data: BTreeMap<String, Vec<String>>) -> Update {
|
||||||
|
Update {
|
||||||
|
data: UpdateData::SynonymsAddition(data),
|
||||||
|
enqueued_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn synonyms_deletion(data: BTreeMap<String, Option<Vec<String>>>) -> Update {
|
||||||
|
Update {
|
||||||
|
data: UpdateData::SynonymsDeletion(data),
|
||||||
|
enqueued_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stop_words_addition(data: BTreeSet<String>) -> Update {
|
||||||
|
Update {
|
||||||
|
data: UpdateData::StopWordsAddition(data),
|
||||||
|
enqueued_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stop_words_deletion(data: BTreeSet<String>) -> Update {
|
||||||
|
Update {
|
||||||
|
data: UpdateData::StopWordsDeletion(data),
|
||||||
|
enqueued_at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum UpdateData {
|
||||||
ClearAll,
|
ClearAll,
|
||||||
Schema(Schema),
|
Schema(Schema),
|
||||||
Customs(Vec<u8>),
|
Customs(Vec<u8>),
|
||||||
@ -45,33 +124,31 @@ pub enum Update {
|
|||||||
StopWordsDeletion(BTreeSet<String>),
|
StopWordsDeletion(BTreeSet<String>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Update {
|
impl UpdateData {
|
||||||
pub fn update_type(&self) -> UpdateType {
|
pub fn update_type(&self) -> UpdateType {
|
||||||
match self {
|
match self {
|
||||||
Update::ClearAll => UpdateType::ClearAll,
|
UpdateData::ClearAll => UpdateType::ClearAll,
|
||||||
Update::Schema(schema) => UpdateType::Schema {
|
UpdateData::Schema(_) => UpdateType::Schema,
|
||||||
schema: schema.clone(),
|
UpdateData::Customs(_) => UpdateType::Customs,
|
||||||
},
|
UpdateData::DocumentsAddition(addition) => UpdateType::DocumentsAddition {
|
||||||
Update::Customs(_) => UpdateType::Customs,
|
|
||||||
Update::DocumentsAddition(addition) => UpdateType::DocumentsAddition {
|
|
||||||
number: addition.len(),
|
number: addition.len(),
|
||||||
},
|
},
|
||||||
Update::DocumentsPartial(addition) => UpdateType::DocumentsPartial {
|
UpdateData::DocumentsPartial(addition) => UpdateType::DocumentsPartial {
|
||||||
number: addition.len(),
|
number: addition.len(),
|
||||||
},
|
},
|
||||||
Update::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion {
|
UpdateData::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion {
|
||||||
number: deletion.len(),
|
number: deletion.len(),
|
||||||
},
|
},
|
||||||
Update::SynonymsAddition(addition) => UpdateType::SynonymsAddition {
|
UpdateData::SynonymsAddition(addition) => UpdateType::SynonymsAddition {
|
||||||
number: addition.len(),
|
number: addition.len(),
|
||||||
},
|
},
|
||||||
Update::SynonymsDeletion(deletion) => UpdateType::SynonymsDeletion {
|
UpdateData::SynonymsDeletion(deletion) => UpdateType::SynonymsDeletion {
|
||||||
number: deletion.len(),
|
number: deletion.len(),
|
||||||
},
|
},
|
||||||
Update::StopWordsAddition(addition) => UpdateType::StopWordsAddition {
|
UpdateData::StopWordsAddition(addition) => UpdateType::StopWordsAddition {
|
||||||
number: addition.len(),
|
number: addition.len(),
|
||||||
},
|
},
|
||||||
Update::StopWordsDeletion(deletion) => UpdateType::StopWordsDeletion {
|
UpdateData::StopWordsDeletion(deletion) => UpdateType::StopWordsDeletion {
|
||||||
number: deletion.len(),
|
number: deletion.len(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -79,9 +156,10 @@ impl Update {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "name")]
|
||||||
pub enum UpdateType {
|
pub enum UpdateType {
|
||||||
ClearAll,
|
ClearAll,
|
||||||
Schema { schema: Schema },
|
Schema,
|
||||||
Customs,
|
Customs,
|
||||||
DocumentsAddition { number: usize },
|
DocumentsAddition { number: usize },
|
||||||
DocumentsPartial { number: usize },
|
DocumentsPartial { number: usize },
|
||||||
@ -92,30 +170,36 @@ pub enum UpdateType {
|
|||||||
StopWordsDeletion { number: usize },
|
StopWordsDeletion { number: usize },
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct DetailedDuration {
|
|
||||||
pub main: Duration,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct ProcessedUpdateResult {
|
pub struct ProcessedUpdateResult {
|
||||||
pub update_id: u64,
|
pub update_id: u64,
|
||||||
|
#[serde(rename = "type")]
|
||||||
pub update_type: UpdateType,
|
pub update_type: UpdateType,
|
||||||
pub result: Result<(), String>,
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub detailed_duration: DetailedDuration,
|
pub error: Option<String>,
|
||||||
|
pub duration: f64, // in seconds
|
||||||
|
pub enqueued_at: DateTime<Utc>,
|
||||||
|
pub processed_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct EnqueuedUpdateResult {
|
pub struct EnqueuedUpdateResult {
|
||||||
pub update_id: u64,
|
pub update_id: u64,
|
||||||
pub update_type: UpdateType,
|
pub update_type: UpdateType,
|
||||||
|
pub enqueued_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase", tag = "status")]
|
||||||
pub enum UpdateStatus {
|
pub enum UpdateStatus {
|
||||||
Enqueued(EnqueuedUpdateResult),
|
Enqueued {
|
||||||
Processed(ProcessedUpdateResult),
|
#[serde(flatten)]
|
||||||
Unknown,
|
content: EnqueuedUpdateResult,
|
||||||
|
},
|
||||||
|
Processed {
|
||||||
|
#[serde(flatten)]
|
||||||
|
content: ProcessedUpdateResult,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_status(
|
pub fn update_status(
|
||||||
@ -123,19 +207,19 @@ pub fn update_status(
|
|||||||
updates_store: store::Updates,
|
updates_store: store::Updates,
|
||||||
updates_results_store: store::UpdatesResults,
|
updates_results_store: store::UpdatesResults,
|
||||||
update_id: u64,
|
update_id: u64,
|
||||||
) -> MResult<UpdateStatus> {
|
) -> MResult<Option<UpdateStatus>> {
|
||||||
match updates_results_store.update_result(reader, update_id)? {
|
match updates_results_store.update_result(reader, update_id)? {
|
||||||
Some(result) => Ok(UpdateStatus::Processed(result)),
|
Some(result) => Ok(Some(UpdateStatus::Processed { content: result })),
|
||||||
None => {
|
None => match updates_store.get(reader, update_id)? {
|
||||||
if let Some(update) = updates_store.get(reader, update_id)? {
|
Some(update) => Ok(Some(UpdateStatus::Enqueued {
|
||||||
Ok(UpdateStatus::Enqueued(EnqueuedUpdateResult {
|
content: EnqueuedUpdateResult {
|
||||||
update_id,
|
update_id,
|
||||||
update_type: update.update_type(),
|
update_type: update.data.update_type(),
|
||||||
}))
|
enqueued_at: update.enqueued_at,
|
||||||
} else {
|
},
|
||||||
Ok(UpdateStatus::Unknown)
|
})),
|
||||||
}
|
None => Ok(None),
|
||||||
}
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,8 +248,10 @@ pub fn update_task<'a, 'b>(
|
|||||||
) -> MResult<ProcessedUpdateResult> {
|
) -> MResult<ProcessedUpdateResult> {
|
||||||
debug!("Processing update number {}", update_id);
|
debug!("Processing update number {}", update_id);
|
||||||
|
|
||||||
let (update_type, result, duration) = match update {
|
let Update { enqueued_at, data } = update;
|
||||||
Update::ClearAll => {
|
|
||||||
|
let (update_type, result, duration) = match data {
|
||||||
|
UpdateData::ClearAll => {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let update_type = UpdateType::ClearAll;
|
let update_type = UpdateType::ClearAll;
|
||||||
@ -180,12 +266,10 @@ pub fn update_task<'a, 'b>(
|
|||||||
|
|
||||||
(update_type, result, start.elapsed())
|
(update_type, result, start.elapsed())
|
||||||
}
|
}
|
||||||
Update::Schema(schema) => {
|
UpdateData::Schema(schema) => {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let update_type = UpdateType::Schema {
|
let update_type = UpdateType::Schema;
|
||||||
schema: schema.clone(),
|
|
||||||
};
|
|
||||||
let result = apply_schema_update(
|
let result = apply_schema_update(
|
||||||
writer,
|
writer,
|
||||||
&schema,
|
&schema,
|
||||||
@ -198,7 +282,7 @@ pub fn update_task<'a, 'b>(
|
|||||||
|
|
||||||
(update_type, result, start.elapsed())
|
(update_type, result, start.elapsed())
|
||||||
}
|
}
|
||||||
Update::Customs(customs) => {
|
UpdateData::Customs(customs) => {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let update_type = UpdateType::Customs;
|
let update_type = UpdateType::Customs;
|
||||||
@ -206,7 +290,7 @@ pub fn update_task<'a, 'b>(
|
|||||||
|
|
||||||
(update_type, result, start.elapsed())
|
(update_type, result, start.elapsed())
|
||||||
}
|
}
|
||||||
Update::DocumentsAddition(documents) => {
|
UpdateData::DocumentsAddition(documents) => {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let update_type = UpdateType::DocumentsAddition {
|
let update_type = UpdateType::DocumentsAddition {
|
||||||
@ -225,7 +309,7 @@ pub fn update_task<'a, 'b>(
|
|||||||
|
|
||||||
(update_type, result, start.elapsed())
|
(update_type, result, start.elapsed())
|
||||||
}
|
}
|
||||||
Update::DocumentsPartial(documents) => {
|
UpdateData::DocumentsPartial(documents) => {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let update_type = UpdateType::DocumentsPartial {
|
let update_type = UpdateType::DocumentsPartial {
|
||||||
@ -244,7 +328,7 @@ pub fn update_task<'a, 'b>(
|
|||||||
|
|
||||||
(update_type, result, start.elapsed())
|
(update_type, result, start.elapsed())
|
||||||
}
|
}
|
||||||
Update::DocumentsDeletion(documents) => {
|
UpdateData::DocumentsDeletion(documents) => {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let update_type = UpdateType::DocumentsDeletion {
|
let update_type = UpdateType::DocumentsDeletion {
|
||||||
@ -263,7 +347,7 @@ pub fn update_task<'a, 'b>(
|
|||||||
|
|
||||||
(update_type, result, start.elapsed())
|
(update_type, result, start.elapsed())
|
||||||
}
|
}
|
||||||
Update::SynonymsAddition(synonyms) => {
|
UpdateData::SynonymsAddition(synonyms) => {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let update_type = UpdateType::SynonymsAddition {
|
let update_type = UpdateType::SynonymsAddition {
|
||||||
@ -274,7 +358,7 @@ pub fn update_task<'a, 'b>(
|
|||||||
|
|
||||||
(update_type, result, start.elapsed())
|
(update_type, result, start.elapsed())
|
||||||
}
|
}
|
||||||
Update::SynonymsDeletion(synonyms) => {
|
UpdateData::SynonymsDeletion(synonyms) => {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let update_type = UpdateType::SynonymsDeletion {
|
let update_type = UpdateType::SynonymsDeletion {
|
||||||
@ -285,7 +369,7 @@ pub fn update_task<'a, 'b>(
|
|||||||
|
|
||||||
(update_type, result, start.elapsed())
|
(update_type, result, start.elapsed())
|
||||||
}
|
}
|
||||||
Update::StopWordsAddition(stop_words) => {
|
UpdateData::StopWordsAddition(stop_words) => {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let update_type = UpdateType::StopWordsAddition {
|
let update_type = UpdateType::StopWordsAddition {
|
||||||
@ -297,7 +381,7 @@ pub fn update_task<'a, 'b>(
|
|||||||
|
|
||||||
(update_type, result, start.elapsed())
|
(update_type, result, start.elapsed())
|
||||||
}
|
}
|
||||||
Update::StopWordsDeletion(stop_words) => {
|
UpdateData::StopWordsDeletion(stop_words) => {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let update_type = UpdateType::StopWordsDeletion {
|
let update_type = UpdateType::StopWordsDeletion {
|
||||||
@ -323,12 +407,13 @@ pub fn update_task<'a, 'b>(
|
|||||||
update_id, update_type, result
|
update_id, update_type, result
|
||||||
);
|
);
|
||||||
|
|
||||||
let detailed_duration = DetailedDuration { main: duration };
|
|
||||||
let status = ProcessedUpdateResult {
|
let status = ProcessedUpdateResult {
|
||||||
update_id,
|
update_id,
|
||||||
update_type,
|
update_type,
|
||||||
result: result.map_err(|e| e.to_string()),
|
error: result.map_err(|e| e.to_string()).err(),
|
||||||
detailed_duration,
|
duration: duration.as_secs_f64(),
|
||||||
|
enqueued_at,
|
||||||
|
processed_at: Utc::now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(status)
|
Ok(status)
|
||||||
|
@ -68,7 +68,7 @@ pub fn push_schema_update(
|
|||||||
) -> MResult<u64> {
|
) -> MResult<u64> {
|
||||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||||
|
|
||||||
let update = Update::Schema(schema);
|
let update = Update::schema(schema);
|
||||||
updates_store.put_update(writer, last_update_id, &update)?;
|
updates_store.put_update(writer, last_update_id, &update)?;
|
||||||
|
|
||||||
Ok(last_update_id)
|
Ok(last_update_id)
|
||||||
|
@ -53,7 +53,7 @@ pub fn push_stop_words_addition(
|
|||||||
) -> MResult<u64> {
|
) -> MResult<u64> {
|
||||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||||
|
|
||||||
let update = Update::StopWordsAddition(addition);
|
let update = Update::stop_words_addition(addition);
|
||||||
updates_store.put_update(writer, last_update_id, &update)?;
|
updates_store.put_update(writer, last_update_id, &update)?;
|
||||||
|
|
||||||
Ok(last_update_id)
|
Ok(last_update_id)
|
||||||
|
@ -54,7 +54,7 @@ pub fn push_stop_words_deletion(
|
|||||||
) -> MResult<u64> {
|
) -> MResult<u64> {
|
||||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||||
|
|
||||||
let update = Update::StopWordsDeletion(deletion);
|
let update = Update::stop_words_deletion(deletion);
|
||||||
updates_store.put_update(writer, last_update_id, &update)?;
|
updates_store.put_update(writer, last_update_id, &update)?;
|
||||||
|
|
||||||
Ok(last_update_id)
|
Ok(last_update_id)
|
||||||
|
@ -63,7 +63,7 @@ pub fn push_synonyms_addition(
|
|||||||
) -> MResult<u64> {
|
) -> MResult<u64> {
|
||||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||||
|
|
||||||
let update = Update::SynonymsAddition(addition);
|
let update = Update::synonyms_addition(addition);
|
||||||
updates_store.put_update(writer, last_update_id, &update)?;
|
updates_store.put_update(writer, last_update_id, &update)?;
|
||||||
|
|
||||||
Ok(last_update_id)
|
Ok(last_update_id)
|
||||||
|
@ -70,7 +70,7 @@ pub fn push_synonyms_deletion(
|
|||||||
) -> MResult<u64> {
|
) -> MResult<u64> {
|
||||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||||
|
|
||||||
let update = Update::SynonymsDeletion(deletion);
|
let update = Update::synonyms_deletion(deletion);
|
||||||
updates_store.put_update(writer, last_update_id, &update)?;
|
updates_store.put_update(writer, last_update_id, &update)?;
|
||||||
|
|
||||||
Ok(last_update_id)
|
Ok(last_update_id)
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
use meilidb_core::{ProcessedUpdateResult, UpdateStatus};
|
use meilidb_core::ProcessedUpdateResult;
|
||||||
use meilidb_schema::Schema;
|
use meilidb_schema::Schema;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use tide::response::IntoResponse;
|
use tide::response::IntoResponse;
|
||||||
@ -150,17 +150,10 @@ pub async fn get_update_status(ctx: Context<Data>) -> SResult<Response> {
|
|||||||
.map_err(ResponseError::internal)?;
|
.map_err(ResponseError::internal)?;
|
||||||
|
|
||||||
let response = match status {
|
let response = match status {
|
||||||
UpdateStatus::Enqueued(data) => {
|
Some(status) => tide::response::json(status)
|
||||||
tide::response::json(json!({ "status": "enqueued", "data": data }))
|
.with_status(StatusCode::OK)
|
||||||
.with_status(StatusCode::OK)
|
.into_response(),
|
||||||
.into_response()
|
None => tide::response::json(json!({ "message": "unknown update id" }))
|
||||||
}
|
|
||||||
UpdateStatus::Processed(data) => {
|
|
||||||
tide::response::json(json!({ "status": "processed", "data": data }))
|
|
||||||
.with_status(StatusCode::OK)
|
|
||||||
.into_response()
|
|
||||||
}
|
|
||||||
UpdateStatus::Unknown => tide::response::json(json!({ "message": "unknown update id" }))
|
|
||||||
.with_status(StatusCode::NOT_FOUND)
|
.with_status(StatusCode::NOT_FOUND)
|
||||||
.into_response(),
|
.into_response(),
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user