144: Concurrent update run loop (refactor) r=MarinPostma a=MarinPostma

This PR allows multiple request to the update store to be performed concurently (i.e, one can list updates while an updates in being written to the update store).


173: Convert UpdateStatus to legacy meilisearch format r=MarinPostma a=MarinPostma

Returns the update statuses with the same format as legacy meilisearch.

The number of documents in a document addition/deletion is not known before processing, so it is only returned when the update is `processed`.

close #78 

associated milli PR: https://github.com/meilisearch/milli/pull/178


Co-authored-by: marin postma <postma.marin@protonmail.com>
Co-authored-by: Marin Postma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2021-06-14 11:30:44 +00:00 committed by GitHub
commit d65b5db97f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 536 additions and 283 deletions

483
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,8 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use async_stream::stream;
use futures::StreamExt;
use log::info;
use oxidized_json_checker::JsonChecker;
use tokio::fs;
@ -18,7 +20,7 @@ use crate::index_controller::{UpdateMeta, UpdateStatus};
pub struct UpdateActor<D, I> {
path: PathBuf,
store: Arc<UpdateStore>,
inbox: mpsc::Receiver<UpdateMsg<D>>,
inbox: Option<mpsc::Receiver<UpdateMsg<D>>>,
index_handle: I,
must_exit: Arc<AtomicBool>,
}
@ -45,7 +47,7 @@ where
let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?;
std::fs::create_dir_all(path.join("update_files"))?;
let inbox = Some(inbox);
Ok(Self {
path,
store,
@ -60,43 +62,59 @@ where
info!("Started update actor.");
loop {
let msg = self.inbox.recv().await;
let mut inbox = self
.inbox
.take()
.expect("A receiver should be present by now.");
if self.must_exit.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let must_exit = self.must_exit.clone();
let stream = stream! {
loop {
let msg = inbox.recv().await;
match msg {
Some(Update {
uuid,
meta,
data,
ret,
}) => {
let _ = ret.send(self.handle_update(uuid, meta, data).await);
if must_exit.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
Some(ListUpdates { uuid, ret }) => {
let _ = ret.send(self.handle_list_updates(uuid).await);
match msg {
Some(msg) => yield msg,
None => break,
}
Some(GetUpdate { uuid, ret, id }) => {
let _ = ret.send(self.handle_get_update(uuid, id).await);
}
Some(Delete { uuid, ret }) => {
let _ = ret.send(self.handle_delete(uuid).await);
}
Some(Snapshot { uuids, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuids, path).await);
}
Some(Dump { uuids, path, ret }) => {
let _ = ret.send(self.handle_dump(uuids, path).await);
}
Some(GetInfo { ret }) => {
let _ = ret.send(self.handle_get_info().await);
}
None => break,
}
}
};
stream
.for_each_concurrent(Some(10), |msg| async {
match msg {
Update {
uuid,
meta,
data,
ret,
} => {
let _ = ret.send(self.handle_update(uuid, meta, data).await);
}
ListUpdates { uuid, ret } => {
let _ = ret.send(self.handle_list_updates(uuid).await);
}
GetUpdate { uuid, ret, id } => {
let _ = ret.send(self.handle_get_update(uuid, id).await);
}
Delete { uuid, ret } => {
let _ = ret.send(self.handle_delete(uuid).await);
}
Snapshot { uuids, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuids, path).await);
}
GetInfo { ret } => {
let _ = ret.send(self.handle_get_info().await);
}
Dump { uuids, path, ret } => {
let _ = ret.send(self.handle_dump(uuids, path).await);
}
}
})
.await;
}
async fn handle_update(

View File

@ -84,6 +84,10 @@ impl Processed {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn meta(&self) -> &UpdateMeta {
self.from.meta()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -132,21 +136,29 @@ impl Aborted {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn meta(&self) -> &UpdateMeta {
self.from.meta()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Failed {
#[serde(flatten)]
from: Processing,
error: UpdateError,
failed_at: DateTime<Utc>,
pub from: Processing,
pub error: UpdateError,
pub failed_at: DateTime<Utc>,
}
impl Failed {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn meta(&self) -> &UpdateMeta {
self.from.meta()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -170,6 +182,16 @@ impl UpdateStatus {
}
}
pub fn meta(&self) -> &UpdateMeta {
match self {
UpdateStatus::Processing(u) => u.meta(),
UpdateStatus::Enqueued(u) => u.meta(),
UpdateStatus::Processed(u) => u.meta(),
UpdateStatus::Aborted(u) => u.meta(),
UpdateStatus::Failed(u) => u.meta(),
}
}
pub fn processed(&self) -> Option<&Processed> {
match self {
UpdateStatus::Processed(p) => Some(p),

View File

@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use crate::error::ResponseError;
use crate::helpers::Authentication;
use crate::routes::IndexParam;
use super::{UpdateStatusResponse, IndexParam};
use crate::Data;
pub fn services(cfg: &mut web::ServiceConfig) {
@ -129,7 +129,10 @@ async fn get_update_status(
.get_update_status(params.index_uid, params.update_id)
.await;
match result {
Ok(meta) => Ok(HttpResponse::Ok().json(meta)),
Ok(meta) => {
let meta = UpdateStatusResponse::from(meta);
Ok(HttpResponse::Ok().json(meta))
},
Err(e) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() })))
}
@ -143,7 +146,14 @@ async fn get_all_updates_status(
) -> Result<HttpResponse, ResponseError> {
let result = data.get_updates_status(path.into_inner().index_uid).await;
match result {
Ok(metas) => Ok(HttpResponse::Ok().json(metas)),
Ok(metas) => {
let metas = metas
.into_iter()
.map(UpdateStatusResponse::from)
.collect::<Vec<_>>();
Ok(HttpResponse::Ok().json(metas))
},
Err(e) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() })))
}

View File

@ -1,6 +1,12 @@
use std::time::Duration;
use actix_web::{get, HttpResponse};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::index::{Settings, Unchecked};
use crate::index_controller::{UpdateMeta, UpdateResult, UpdateStatus};
pub mod document;
pub mod dump;
pub mod health;
@ -11,6 +17,185 @@ pub mod settings;
pub mod stats;
pub mod synonym;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "name")]
pub enum UpdateType {
ClearAll,
Customs,
DocumentsAddition {
#[serde(skip_serializing_if = "Option::is_none")]
number: Option<usize>
},
DocumentsPartial {
#[serde(skip_serializing_if = "Option::is_none")]
number: Option<usize>
},
DocumentsDeletion {
#[serde(skip_serializing_if = "Option::is_none")]
number: Option<usize>
},
Settings { settings: Settings<Unchecked> },
}
impl From<&UpdateStatus> for UpdateType {
fn from(other: &UpdateStatus) -> Self {
use milli::update::IndexDocumentsMethod::*;
match other.meta() {
UpdateMeta::DocumentsAddition { method, .. } => {
let number = match other {
UpdateStatus::Processed(processed) => match processed.success {
UpdateResult::DocumentsAddition(ref addition) => {
Some(addition.nb_documents)
}
_ => None,
},
_ => None,
};
match method {
ReplaceDocuments => UpdateType::DocumentsAddition { number },
UpdateDocuments => UpdateType::DocumentsPartial { number },
_ => unreachable!(),
}
}
UpdateMeta::ClearDocuments => UpdateType::ClearAll,
UpdateMeta::DeleteDocuments { ids } => {
UpdateType::DocumentsDeletion { number: Some(ids.len()) }
}
UpdateMeta::Settings(settings) => UpdateType::Settings {
settings: settings.clone(),
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ProcessedUpdateResult {
pub update_id: u64,
#[serde(rename = "type")]
pub update_type: UpdateType,
pub duration: f64, // in seconds
pub enqueued_at: DateTime<Utc>,
pub processed_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FailedUpdateResult {
pub update_id: u64,
#[serde(rename = "type")]
pub update_type: UpdateType,
pub error: String,
pub error_type: String,
pub error_code: String,
pub error_link: String,
pub duration: f64, // in seconds
pub enqueued_at: DateTime<Utc>,
pub processed_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EnqueuedUpdateResult {
pub update_id: u64,
#[serde(rename = "type")]
pub update_type: UpdateType,
pub enqueued_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub started_processing_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "status")]
pub enum UpdateStatusResponse {
Enqueued {
#[serde(flatten)]
content: EnqueuedUpdateResult,
},
Processing {
#[serde(flatten)]
content: EnqueuedUpdateResult,
},
Failed {
#[serde(flatten)]
content: FailedUpdateResult,
},
Processed {
#[serde(flatten)]
content: ProcessedUpdateResult,
},
}
impl From<UpdateStatus> for UpdateStatusResponse {
fn from(other: UpdateStatus) -> Self {
let update_type = UpdateType::from(&other);
match other {
UpdateStatus::Processing(processing) => {
let content = EnqueuedUpdateResult {
update_id: processing.id(),
update_type,
enqueued_at: processing.from.enqueued_at,
started_processing_at: Some(processing.started_processing_at),
};
UpdateStatusResponse::Processing { content }
}
UpdateStatus::Enqueued(enqueued) => {
let content = EnqueuedUpdateResult {
update_id: enqueued.id(),
update_type,
enqueued_at: enqueued.enqueued_at,
started_processing_at: None,
};
UpdateStatusResponse::Enqueued { content }
}
UpdateStatus::Processed(processed) => {
let duration = processed
.processed_at
.signed_duration_since(processed.from.started_processing_at)
.num_milliseconds();
// necessary since chrono::duration don't expose a f64 secs method.
let duration = Duration::from_millis(duration as u64).as_secs_f64();
let content = ProcessedUpdateResult {
update_id: processed.id(),
update_type,
duration,
enqueued_at: processed.from.from.enqueued_at,
processed_at: processed.processed_at,
};
UpdateStatusResponse::Processed { content }
}
UpdateStatus::Aborted(_) => unreachable!(),
UpdateStatus::Failed(failed) => {
let duration = failed
.failed_at
.signed_duration_since(failed.from.started_processing_at)
.num_milliseconds();
// necessary since chrono::duration don't expose a f64 secs method.
let duration = Duration::from_millis(duration as u64).as_secs_f64();
let content = FailedUpdateResult {
update_id: failed.id(),
update_type,
error: failed.error,
error_type: String::from("todo"),
error_code: String::from("todo"),
error_link: String::from("todo"),
duration,
enqueued_at: failed.from.from.enqueued_at,
processed_at: failed.failed_at,
};
UpdateStatusResponse::Failed { content }
}
}
}
}
#[derive(Deserialize)]
pub struct IndexParam {
index_uid: String,

View File

@ -33,16 +33,14 @@ async fn add_documents_no_index_creation() {
assert_eq!(code, 200);
assert_eq!(response["status"], "processed");
assert_eq!(response["updateId"], 0);
assert_eq!(response["success"]["DocumentsAddition"]["nb_documents"], 1);
assert_eq!(response["type"]["name"], "DocumentsAddition");
assert_eq!(response["type"]["number"], 1);
let processed_at =
DateTime::parse_from_rfc3339(response["processedAt"].as_str().unwrap()).unwrap();
let enqueued_at =
DateTime::parse_from_rfc3339(response["enqueuedAt"].as_str().unwrap()).unwrap();
let started_processing_at =
DateTime::parse_from_rfc3339(response["startedProcessingAt"].as_str().unwrap()).unwrap();
assert!(processed_at > started_processing_at);
assert!(started_processing_at > enqueued_at);
assert!(processed_at > enqueued_at);
// index was created, and primary key was infered.
let (response, code) = index.get().await;
@ -86,7 +84,8 @@ async fn document_addition_with_primary_key() {
assert_eq!(code, 200);
assert_eq!(response["status"], "processed");
assert_eq!(response["updateId"], 0);
assert_eq!(response["success"]["DocumentsAddition"]["nb_documents"], 1);
assert_eq!(response["type"]["name"], "DocumentsAddition");
assert_eq!(response["type"]["number"], 1);
let (response, code) = index.get().await;
assert_eq!(code, 200);
@ -113,7 +112,8 @@ async fn document_update_with_primary_key() {
assert_eq!(code, 200);
assert_eq!(response["status"], "processed");
assert_eq!(response["updateId"], 0);
assert_eq!(response["success"]["DocumentsAddition"]["nb_documents"], 1);
assert_eq!(response["type"]["name"], "DocumentsPartial");
assert_eq!(response["type"]["number"], 1);
let (response, code) = index.get().await;
assert_eq!(code, 200);
@ -282,7 +282,8 @@ async fn add_larger_dataset() {
let (response, code) = index.get_update(update_id).await;
assert_eq!(code, 200);
assert_eq!(response["status"], "processed");
assert_eq!(response["success"]["DocumentsAddition"]["nb_documents"], 77);
assert_eq!(response["type"]["name"], "DocumentsAddition");
assert_eq!(response["type"]["number"], 77);
let (response, code) = index
.get_all_documents(GetAllDocumentsOptions {
limit: Some(1000),
@ -302,8 +303,8 @@ async fn update_larger_dataset() {
index.wait_update_id(0).await;
let (response, code) = index.get_update(0).await;
assert_eq!(code, 200);
assert_eq!(response["status"], "processed");
assert_eq!(response["success"]["DocumentsAddition"]["nb_documents"], 77);
assert_eq!(response["type"]["name"], "DocumentsPartial");
assert_eq!(response["type"]["number"], 77);
let (response, code) = index
.get_all_documents(GetAllDocumentsOptions {
limit: Some(1000),