mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 20:15:07 +08:00
Merge pull request #249 from meilisearch/display-all-updates
Display enqueued along with processed updates
This commit is contained in:
commit
a136c62208
@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
|
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
|
||||||
|
|
||||||
use meilidb_core::{Database, Highlight, UpdateResult};
|
use meilidb_core::{Database, Highlight, ProcessedUpdateResult};
|
||||||
use meilidb_schema::SchemaAttr;
|
use meilidb_schema::SchemaAttr;
|
||||||
|
|
||||||
const INDEX_NAME: &str = "default";
|
const INDEX_NAME: &str = "default";
|
||||||
@ -97,7 +97,7 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box<dy
|
|||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let (sender, receiver) = mpsc::sync_channel(100);
|
let (sender, receiver) = mpsc::sync_channel(100);
|
||||||
let update_fn = move |update: UpdateResult| sender.send(update.update_id).unwrap();
|
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
|
||||||
let index = match database.open_index(INDEX_NAME) {
|
let index = match database.open_index(INDEX_NAME) {
|
||||||
Some(index) => index,
|
Some(index) => index,
|
||||||
None => database.create_index(INDEX_NAME).unwrap(),
|
None => database.create_index(INDEX_NAME).unwrap(),
|
||||||
|
@ -11,7 +11,7 @@ use log::{debug, error};
|
|||||||
|
|
||||||
use crate::{store, update, Index, MResult};
|
use crate::{store, update, Index, MResult};
|
||||||
|
|
||||||
pub type BoxUpdateFn = Box<dyn Fn(update::UpdateResult) + Send + Sync + 'static>;
|
pub type BoxUpdateFn = Box<dyn Fn(update::ProcessedUpdateResult) + Send + Sync + 'static>;
|
||||||
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
|
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
|
||||||
|
|
||||||
pub struct Database {
|
pub struct Database {
|
||||||
|
@ -24,7 +24,7 @@ pub use self::number::{Number, ParseNumberError};
|
|||||||
pub use self::ranked_map::RankedMap;
|
pub use self::ranked_map::RankedMap;
|
||||||
pub use self::raw_document::RawDocument;
|
pub use self::raw_document::RawDocument;
|
||||||
pub use self::store::Index;
|
pub use self::store::Index;
|
||||||
pub use self::update::{UpdateResult, UpdateStatus, UpdateType};
|
pub use self::update::{EnqueuedUpdateResult, ProcessedUpdateResult, UpdateStatus, UpdateType};
|
||||||
|
|
||||||
use ::serde::{Deserialize, Serialize};
|
use ::serde::{Deserialize, Serialize};
|
||||||
use zerocopy::{AsBytes, FromBytes};
|
use zerocopy::{AsBytes, FromBytes};
|
||||||
|
@ -219,17 +219,29 @@ impl Index {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn all_updates_status(&self, reader: &heed::RoTxn) -> MResult<Vec<update::UpdateStatus>> {
|
pub fn all_updates_status(&self, reader: &heed::RoTxn) -> MResult<Vec<update::UpdateStatus>> {
|
||||||
match self.updates_results.last_update_id(reader)? {
|
let mut updates = Vec::new();
|
||||||
Some((last_id, _)) => {
|
let mut last_update_result_id = 0;
|
||||||
let mut updates = Vec::with_capacity(last_id as usize + 1);
|
|
||||||
for id in 0..=last_id {
|
// retrieve all updates results
|
||||||
let update = self.update_status(reader, id)?;
|
if let Some((last_id, _)) = self.updates_results.last_update_id(reader)? {
|
||||||
updates.push(update);
|
updates.reserve(last_id as usize);
|
||||||
}
|
|
||||||
Ok(updates)
|
for id in 0..=last_id {
|
||||||
|
let update = self.update_status(reader, id)?;
|
||||||
|
updates.push(update);
|
||||||
|
last_update_result_id = id;
|
||||||
}
|
}
|
||||||
None => Ok(Vec::new()),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// retrieve all enqueued updates
|
||||||
|
if let Some((last_id, _)) = self.updates.last_update_id(reader)? {
|
||||||
|
for id in last_update_result_id + 1..last_id {
|
||||||
|
let update = self.update_status(reader, id)?;
|
||||||
|
updates.push(update);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(updates)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn query_builder(&self) -> QueryBuilder {
|
pub fn query_builder(&self) -> QueryBuilder {
|
||||||
|
@ -26,9 +26,9 @@ impl Updates {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO do not trigger deserialize if possible
|
// TODO do not trigger deserialize if possible
|
||||||
pub fn contains(self, reader: &heed::RoTxn, update_id: u64) -> ZResult<bool> {
|
pub fn get(self, reader: &heed::RoTxn, update_id: u64) -> ZResult<Option<Update>> {
|
||||||
let update_id = BEU64::new(update_id);
|
let update_id = BEU64::new(update_id);
|
||||||
self.updates.get(reader, &update_id).map(|v| v.is_some())
|
self.updates.get(reader, &update_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn put_update(
|
pub fn put_update(
|
||||||
|
@ -1,15 +1,19 @@
|
|||||||
use super::BEU64;
|
use super::BEU64;
|
||||||
use crate::update::UpdateResult;
|
use crate::update::ProcessedUpdateResult;
|
||||||
use heed::types::{OwnedType, SerdeBincode};
|
use heed::types::{OwnedType, SerdeBincode};
|
||||||
use heed::Result as ZResult;
|
use heed::Result as ZResult;
|
||||||
|
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
pub struct UpdatesResults {
|
pub struct UpdatesResults {
|
||||||
pub(crate) updates_results: heed::Database<OwnedType<BEU64>, SerdeBincode<UpdateResult>>,
|
pub(crate) updates_results:
|
||||||
|
heed::Database<OwnedType<BEU64>, SerdeBincode<ProcessedUpdateResult>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UpdatesResults {
|
impl UpdatesResults {
|
||||||
pub fn last_update_id(self, reader: &heed::RoTxn) -> ZResult<Option<(u64, UpdateResult)>> {
|
pub fn last_update_id(
|
||||||
|
self,
|
||||||
|
reader: &heed::RoTxn,
|
||||||
|
) -> ZResult<Option<(u64, ProcessedUpdateResult)>> {
|
||||||
match self.updates_results.last(reader)? {
|
match self.updates_results.last(reader)? {
|
||||||
Some((key, data)) => Ok(Some((key.get(), data))),
|
Some((key, data)) => Ok(Some((key.get(), data))),
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
@ -20,7 +24,7 @@ impl UpdatesResults {
|
|||||||
self,
|
self,
|
||||||
writer: &mut heed::RwTxn,
|
writer: &mut heed::RwTxn,
|
||||||
update_id: u64,
|
update_id: u64,
|
||||||
update_result: &UpdateResult,
|
update_result: &ProcessedUpdateResult,
|
||||||
) -> ZResult<()> {
|
) -> ZResult<()> {
|
||||||
let update_id = BEU64::new(update_id);
|
let update_id = BEU64::new(update_id);
|
||||||
self.updates_results.put(writer, &update_id, update_result)
|
self.updates_results.put(writer, &update_id, update_result)
|
||||||
@ -30,7 +34,7 @@ impl UpdatesResults {
|
|||||||
self,
|
self,
|
||||||
reader: &heed::RoTxn,
|
reader: &heed::RoTxn,
|
||||||
update_id: u64,
|
update_id: u64,
|
||||||
) -> ZResult<Option<UpdateResult>> {
|
) -> ZResult<Option<ProcessedUpdateResult>> {
|
||||||
let update_id = BEU64::new(update_id);
|
let update_id = BEU64::new(update_id);
|
||||||
self.updates_results.get(reader, &update_id)
|
self.updates_results.get(reader, &update_id)
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,36 @@ pub enum Update {
|
|||||||
StopWordsDeletion(BTreeSet<String>),
|
StopWordsDeletion(BTreeSet<String>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Update {
|
||||||
|
pub fn update_type(&self) -> UpdateType {
|
||||||
|
match self {
|
||||||
|
Update::ClearAll => UpdateType::ClearAll,
|
||||||
|
Update::Schema(schema) => UpdateType::Schema {
|
||||||
|
schema: schema.clone(),
|
||||||
|
},
|
||||||
|
Update::Customs(_) => UpdateType::Customs,
|
||||||
|
Update::DocumentsAddition(addition) => UpdateType::DocumentsAddition {
|
||||||
|
number: addition.len(),
|
||||||
|
},
|
||||||
|
Update::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion {
|
||||||
|
number: deletion.len(),
|
||||||
|
},
|
||||||
|
Update::SynonymsAddition(addition) => UpdateType::SynonymsAddition {
|
||||||
|
number: addition.len(),
|
||||||
|
},
|
||||||
|
Update::SynonymsDeletion(deletion) => UpdateType::SynonymsDeletion {
|
||||||
|
number: deletion.len(),
|
||||||
|
},
|
||||||
|
Update::StopWordsAddition(addition) => UpdateType::StopWordsAddition {
|
||||||
|
number: addition.len(),
|
||||||
|
},
|
||||||
|
Update::StopWordsDeletion(deletion) => UpdateType::StopWordsDeletion {
|
||||||
|
number: deletion.len(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub enum UpdateType {
|
pub enum UpdateType {
|
||||||
ClearAll,
|
ClearAll,
|
||||||
@ -61,17 +91,23 @@ pub struct DetailedDuration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct UpdateResult {
|
pub struct ProcessedUpdateResult {
|
||||||
pub update_id: u64,
|
pub update_id: u64,
|
||||||
pub update_type: UpdateType,
|
pub update_type: UpdateType,
|
||||||
pub result: Result<(), String>,
|
pub result: Result<(), String>,
|
||||||
pub detailed_duration: DetailedDuration,
|
pub detailed_duration: DetailedDuration,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct EnqueuedUpdateResult {
|
||||||
|
pub update_id: u64,
|
||||||
|
pub update_type: UpdateType,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub enum UpdateStatus {
|
pub enum UpdateStatus {
|
||||||
Enqueued,
|
Enqueued(EnqueuedUpdateResult),
|
||||||
Processed(UpdateResult),
|
Processed(ProcessedUpdateResult),
|
||||||
Unknown,
|
Unknown,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,8 +120,11 @@ pub fn update_status(
|
|||||||
match updates_results_store.update_result(reader, update_id)? {
|
match updates_results_store.update_result(reader, update_id)? {
|
||||||
Some(result) => Ok(UpdateStatus::Processed(result)),
|
Some(result) => Ok(UpdateStatus::Processed(result)),
|
||||||
None => {
|
None => {
|
||||||
if updates_store.contains(reader, update_id)? {
|
if let Some(update) = updates_store.get(reader, update_id)? {
|
||||||
Ok(UpdateStatus::Enqueued)
|
Ok(UpdateStatus::Enqueued(EnqueuedUpdateResult {
|
||||||
|
update_id,
|
||||||
|
update_type: update.update_type(),
|
||||||
|
}))
|
||||||
} else {
|
} else {
|
||||||
Ok(UpdateStatus::Unknown)
|
Ok(UpdateStatus::Unknown)
|
||||||
}
|
}
|
||||||
@ -110,7 +149,10 @@ pub fn next_update_id(
|
|||||||
Ok(new_update_id)
|
Ok(new_update_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Option<UpdateResult>> {
|
pub fn update_task(
|
||||||
|
writer: &mut heed::RwTxn,
|
||||||
|
index: store::Index,
|
||||||
|
) -> MResult<Option<ProcessedUpdateResult>> {
|
||||||
let (update_id, update) = match index.updates.pop_front(writer)? {
|
let (update_id, update) = match index.updates.pop_front(writer)? {
|
||||||
Some(value) => value,
|
Some(value) => value,
|
||||||
None => return Ok(None),
|
None => return Ok(None),
|
||||||
@ -259,7 +301,7 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt
|
|||||||
);
|
);
|
||||||
|
|
||||||
let detailed_duration = DetailedDuration { main: duration };
|
let detailed_duration = DetailedDuration { main: duration };
|
||||||
let status = UpdateResult {
|
let status = ProcessedUpdateResult {
|
||||||
update_id,
|
update_id,
|
||||||
update_type,
|
update_type,
|
||||||
result: result.map_err(|e| e.to_string()),
|
result: result.map_err(|e| e.to_string()),
|
||||||
|
Loading…
Reference in New Issue
Block a user