rename RegisterUpdate to store::Update

This commit is contained in:
mpostma 2021-09-28 20:20:13 +02:00
parent 6f8e670dee
commit 7a27cbcc78
8 changed files with 41 additions and 42 deletions

View File

@ -6,7 +6,7 @@ use log::debug;
use meilisearch_lib::index_controller::updates::status::{UpdateResult, UpdateStatus}; use meilisearch_lib::index_controller::updates::status::{UpdateResult, UpdateStatus};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use meilisearch_lib::{MeiliSearch, RegisterUpdate}; use meilisearch_lib::{MeiliSearch, Update};
use meilisearch_lib::index::{Settings, Unchecked}; use meilisearch_lib::index::{Settings, Unchecked};
use crate::error::ResponseError; use crate::error::ResponseError;
@ -52,7 +52,7 @@ impl From<&UpdateStatus> for UpdateType {
fn from(other: &UpdateStatus) -> Self { fn from(other: &UpdateStatus) -> Self {
use milli::update::IndexDocumentsMethod::*; use milli::update::IndexDocumentsMethod::*;
match other.meta() { match other.meta() {
RegisterUpdate::DocumentAddition{ method, .. } => { Update::DocumentAddition{ method, .. } => {
let number = match other { let number = match other {
UpdateStatus::Processed(processed) => match processed.success { UpdateStatus::Processed(processed) => match processed.success {
UpdateResult::DocumentsAddition(ref addition) => { UpdateResult::DocumentsAddition(ref addition) => {
@ -69,11 +69,11 @@ impl From<&UpdateStatus> for UpdateType {
_ => unreachable!(), _ => unreachable!(),
} }
} }
RegisterUpdate::Settings(settings) => UpdateType::Settings { Update::Settings(settings) => UpdateType::Settings {
settings: settings.clone(), settings: settings.clone(),
}, },
RegisterUpdate::ClearDocuments => UpdateType::ClearAll, Update::ClearDocuments => UpdateType::ClearAll,
RegisterUpdate::DeleteDocuments(ids) => UpdateType::DocumentsDeletion { Update::DeleteDocuments(ids) => UpdateType::DocumentsDeletion {
number: Some(ids.len()), number: Some(ids.len()),
}, },
} }

View File

@ -8,7 +8,7 @@ use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder};
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
use uuid::Uuid; use uuid::Uuid;
use crate::RegisterUpdate; use crate::Update;
use crate::index_controller::updates::status::{Failed, Processed, Processing, UpdateResult}; use crate::index_controller::updates::status::{Failed, Processed, Processing, UpdateResult};
use super::{Index, IndexMeta}; use super::{Index, IndexMeta};
@ -170,19 +170,19 @@ impl Index {
let result = (|| { let result = (|| {
let mut txn = self.write_txn()?; let mut txn = self.write_txn()?;
let result = match update.meta() { let result = match update.meta() {
RegisterUpdate::DocumentAddition { primary_key, content_uuid, method } => { Update::DocumentAddition { primary_key, content_uuid, method } => {
self.update_documents(&mut txn, *method, *content_uuid, update_builder, primary_key.as_deref()) self.update_documents(&mut txn, *method, *content_uuid, update_builder, primary_key.as_deref())
} }
RegisterUpdate::Settings(settings) => { Update::Settings(settings) => {
let settings = settings.clone().check(); let settings = settings.clone().check();
self.update_settings(&mut txn, &settings, update_builder) self.update_settings(&mut txn, &settings, update_builder)
}, },
RegisterUpdate::ClearDocuments => { Update::ClearDocuments => {
let builder = update_builder.clear_documents(&mut txn, self); let builder = update_builder.clear_documents(&mut txn, self);
let _count = builder.execute()?; let _count = builder.execute()?;
Ok(UpdateResult::Other) Ok(UpdateResult::Other)
}, },
RegisterUpdate::DeleteDocuments(ids) => { Update::DeleteDocuments(ids) => {
let mut builder = update_builder.delete_documents(&mut txn, self)?; let mut builder = update_builder.delete_documents(&mut txn, self)?;
// We ignore unexisting document ids // We ignore unexisting document ids

View File

@ -36,7 +36,7 @@ impl IndexResolver<HeedUuidStore, MapIndexStore> {
let indexes_path = src.as_ref().join("indexes"); let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?; let indexes = indexes_path.read_dir()?;
let update_handler = UpdateHandler::new(indexer_opts).unwrap(); let update_handler = UpdateHandler::new(indexer_opts)?;
for index in indexes { for index in indexes {
let index = index?; let index = index?;
Index::load_dump(&index.path(), &dst, index_db_size, &update_handler)?; Index::load_dump(&index.path(), &dst, index_db_size, &update_handler)?;

View File

@ -45,18 +45,6 @@ pub fn create_update_handler(
Ok(sender) Ok(sender)
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RegisterUpdate {
DeleteDocuments(Vec<String>),
DocumentAddition {
primary_key: Option<String>,
method: IndexDocumentsMethod,
content_uuid: Uuid,
},
Settings(Settings<Unchecked>),
ClearDocuments,
}
/// A wrapper type to implement read on a `Stream<Result<Bytes, Error>>`. /// A wrapper type to implement read on a `Stream<Result<Bytes, Error>>`.
struct StreamReader<S> { struct StreamReader<S> {
stream: S, stream: S,
@ -209,15 +197,15 @@ impl UpdateLoop {
}).await??; }).await??;
RegisterUpdate::DocumentAddition { store::Update::DocumentAddition {
primary_key, primary_key,
method, method,
content_uuid, content_uuid,
} }
} }
Update::Settings(settings) => RegisterUpdate::Settings(settings), Update::Settings(settings) => store::Update::Settings(settings),
Update::ClearDocuments => RegisterUpdate::ClearDocuments, Update::ClearDocuments => store::Update::ClearDocuments,
Update::DeleteDocuments(ids) => RegisterUpdate::DeleteDocuments(ids), Update::DeleteDocuments(ids) => store::Update::DeleteDocuments(ids),
}; };
let store = self.store.clone(); let store = self.store.clone();

View File

@ -6,7 +6,7 @@ use meilisearch_error::{Code, ErrorCode};
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod}; use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{RegisterUpdate, index::{Settings, Unchecked}}; use crate::{Update, index::{Settings, Unchecked}};
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateResult { pub enum UpdateResult {
@ -34,12 +34,12 @@ pub enum UpdateMeta {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Enqueued { pub struct Enqueued {
pub update_id: u64, pub update_id: u64,
pub meta: RegisterUpdate, pub meta: Update,
pub enqueued_at: DateTime<Utc>, pub enqueued_at: DateTime<Utc>,
} }
impl Enqueued { impl Enqueued {
pub fn new(meta: RegisterUpdate, update_id: u64) -> Self { pub fn new(meta: Update, update_id: u64) -> Self {
Self { Self {
enqueued_at: Utc::now(), enqueued_at: Utc::now(),
meta, meta,
@ -61,7 +61,7 @@ impl Enqueued {
} }
} }
pub fn meta(&self) -> &RegisterUpdate { pub fn meta(&self) -> &Update {
&self.meta &self.meta
} }
@ -84,7 +84,7 @@ impl Processed {
self.from.id() self.from.id()
} }
pub fn meta(&self) -> &RegisterUpdate { pub fn meta(&self) -> &Update {
self.from.meta() self.from.meta()
} }
} }
@ -102,7 +102,7 @@ impl Processing {
self.from.id() self.from.id()
} }
pub fn meta(&self) -> &RegisterUpdate { pub fn meta(&self) -> &Update {
self.from.meta() self.from.meta()
} }
@ -139,7 +139,7 @@ impl Aborted {
self.from.id() self.from.id()
} }
pub fn meta(&self) -> &RegisterUpdate { pub fn meta(&self) -> &Update {
self.from.meta() self.from.meta()
} }
} }
@ -173,7 +173,7 @@ impl Failed {
self.from.id() self.from.id()
} }
pub fn meta(&self) -> &RegisterUpdate { pub fn meta(&self) -> &Update {
self.from.meta() self.from.meta()
} }
} }
@ -199,7 +199,7 @@ impl UpdateStatus {
} }
} }
pub fn meta(&self) -> &RegisterUpdate { pub fn meta(&self) -> &Update {
match self { match self {
UpdateStatus::Processing(u) => u.meta(), UpdateStatus::Processing(u) => u.meta(),
UpdateStatus::Enqueued(u) => u.meta(), UpdateStatus::Enqueued(u) => u.meta(),

View File

@ -11,7 +11,7 @@ use tempfile::{NamedTempFile, TempDir};
use uuid::Uuid; use uuid::Uuid;
use super::{Result, State, UpdateStore}; use super::{Result, State, UpdateStore};
use crate::{RegisterUpdate, index::Index, index_controller::{update_file_store::UpdateFileStore, updates::status::{Enqueued, UpdateStatus}}}; use crate::{Update, index::Index, index_controller::{update_file_store::UpdateFileStore, updates::status::{Enqueued, UpdateStatus}}};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct UpdateEntry { struct UpdateEntry {
@ -74,7 +74,7 @@ impl UpdateStore {
let update = data.decode()?; let update = data.decode()?;
if let Enqueued { if let Enqueued {
meta: RegisterUpdate::DocumentAddition { meta: Update::DocumentAddition {
content_uuid, .. content_uuid, ..
}, .. }, ..
} = update { } = update {

View File

@ -26,7 +26,6 @@ use rayon::prelude::*;
use codec::*; use codec::*;
use super::RegisterUpdate;
use super::error::Result; use super::error::Result;
use super::status::{Enqueued, Processing}; use super::status::{Enqueued, Processing};
use crate::EnvSizer; use crate::EnvSizer;
@ -37,6 +36,18 @@ use crate::index::Index;
#[allow(clippy::upper_case_acronyms)] #[allow(clippy::upper_case_acronyms)]
type BEU64 = U64<heed::byteorder::BE>; type BEU64 = U64<heed::byteorder::BE>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Update {
DeleteDocuments(Vec<String>),
DocumentAddition {
primary_key: Option<String>,
method: IndexDocumentsMethod,
content_uuid: Uuid,
},
Settings(Settings<Unchecked>),
ClearDocuments,
}
#[derive(Debug)] #[derive(Debug)]
pub struct UpdateStoreInfo { pub struct UpdateStoreInfo {
/// Size of the update store in bytes. /// Size of the update store in bytes.
@ -242,7 +253,7 @@ impl UpdateStore {
pub fn register_update( pub fn register_update(
&self, &self,
index_uuid: Uuid, index_uuid: Uuid,
update: RegisterUpdate, update: Update,
) -> heed::Result<Enqueued> { ) -> heed::Result<Enqueued> {
let mut txn = self.env.write_txn()?; let mut txn = self.env.write_txn()?;
let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?;
@ -512,7 +523,7 @@ impl UpdateStore {
let ((_, uuid, _), pending) = entry?; let ((_, uuid, _), pending) = entry?;
if uuids.contains(&uuid) { if uuids.contains(&uuid) {
if let Enqueued { if let Enqueued {
meta: RegisterUpdate::DocumentAddition { meta: Update::DocumentAddition {
content_uuid, .. content_uuid, ..
}, },
.. ..

View File

@ -5,7 +5,7 @@ pub mod options;
pub mod index; pub mod index;
pub mod index_controller; pub mod index_controller;
pub use index_controller::{IndexController as MeiliSearch, updates::RegisterUpdate}; pub use index_controller::{IndexController as MeiliSearch, updates::store::Update};
mod compression; mod compression;
mod document_formats; mod document_formats;