fix update dedup

This commit is contained in:
mpostma 2021-03-11 20:58:51 +01:00
parent 79a4bc8129
commit 3f68460d6c
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
4 changed files with 79 additions and 71 deletions

1
Cargo.lock generated
View File

@ -1822,6 +1822,7 @@ dependencies = [
"milli", "milli",
"mime", "mime",
"once_cell", "once_cell",
"parking_lot",
"rand 0.7.3", "rand 0.7.3",
"rayon", "rayon",
"regex", "regex",

View File

@ -62,6 +62,7 @@ either = "1.6.1"
async-trait = "0.1.42" async-trait = "0.1.42"
thiserror = "1.0.24" thiserror = "1.0.24"
async-stream = "0.3.0" async-stream = "0.3.0"
parking_lot = "0.11.1"
[dependencies.sentry] [dependencies.sentry]
default-features = false default-features = false

View File

@ -3,18 +3,17 @@ use std::fs::{create_dir_all, remove_dir_all};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use itertools::Itertools;
use log::info;
use super::index_actor::IndexActorHandle; use super::index_actor::IndexActorHandle;
use log::info;
use thiserror::Error; use thiserror::Error;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid; use uuid::Uuid;
use super::get_arc_ownership_blocking;
use crate::index::UpdateResult; use crate::index::UpdateResult;
use crate::index_controller::{UpdateMeta, UpdateStatus}; use crate::index_controller::{UpdateMeta, UpdateStatus};
use super::get_arc_ownership_blocking;
pub type Result<T> = std::result::Result<T, UpdateError>; pub type Result<T> = std::result::Result<T, UpdateError>;
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>; type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
@ -68,7 +67,11 @@ where
D: AsRef<[u8]> + Sized + 'static, D: AsRef<[u8]> + Sized + 'static,
S: UpdateStoreStore, S: UpdateStoreStore,
{ {
fn new(store: S, inbox: mpsc::Receiver<UpdateMsg<D>>, path: impl AsRef<Path>) -> anyhow::Result<Self> { fn new(
store: S,
inbox: mpsc::Receiver<UpdateMsg<D>>,
path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned().join("update_files"); let path = path.as_ref().to_owned().join("update_files");
create_dir_all(&path)?; create_dir_all(&path)?;
assert!(path.exists()); assert!(path.exists());
@ -87,12 +90,12 @@ where
meta, meta,
data, data,
ret, ret,
}) => { }) => {
let _ = ret.send(self.handle_update(uuid, meta, data).await); let _ = ret.send(self.handle_update(uuid, meta, data).await);
} }
Some(ListUpdates { uuid, ret }) => { Some(ListUpdates { uuid, ret }) => {
let _ = ret.send(self.handle_list_updates(uuid).await); let _ = ret.send(self.handle_list_updates(uuid).await);
} , }
Some(GetUpdate { uuid, ret, id }) => { Some(GetUpdate { uuid, ret, id }) => {
let _ = ret.send(self.handle_get_update(uuid, id).await); let _ = ret.send(self.handle_get_update(uuid, id).await);
} }
@ -113,13 +116,15 @@ where
let update_store = self.store.get_or_create(uuid).await?; let update_store = self.store.get_or_create(uuid).await?;
let update_file_id = uuid::Uuid::new_v4(); let update_file_id = uuid::Uuid::new_v4();
let path = self.path.join(format!("update_{}", update_file_id)); let path = self.path.join(format!("update_{}", update_file_id));
let mut file = File::create(&path).await let mut file = File::create(&path)
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?; .map_err(|e| UpdateError::Error(Box::new(e)))?;
while let Some(bytes) = payload.recv().await { while let Some(bytes) = payload.recv().await {
match bytes { match bytes {
Ok(bytes) => { Ok(bytes) => {
file.write_all(bytes.as_ref()).await file.write_all(bytes.as_ref())
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?; .map_err(|e| UpdateError::Error(Box::new(e)))?;
} }
Err(e) => { Err(e) => {
@ -128,7 +133,8 @@ where
} }
} }
file.flush().await file.flush()
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?; .map_err(|e| UpdateError::Error(Box::new(e)))?;
let file = file.into_std().await; let file = file.into_std().await;
@ -144,50 +150,33 @@ where
.map_err(|e| UpdateError::Error(Box::new(e)))? .map_err(|e| UpdateError::Error(Box::new(e)))?
} }
async fn handle_list_updates( async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
&self, let update_store = self.store.get(&uuid).await?;
uuid: Uuid,
) -> Result<Vec<UpdateStatus>> {
let store = self.store.get(&uuid).await?;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = match store { let result = update_store
Some(update_store) => { .ok_or(UpdateError::UnexistingIndex(uuid))?
let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| { .list()
Ok(processing .map_err(|e| UpdateError::Error(e.into()))?;
.map(UpdateStatus::from) Ok(result)
.into_iter() })
.chain(pending.filter_map(|p| p.ok()).map(|(_, u)| UpdateStatus::from(u))) .await
.chain(aborted.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u)))
.chain(processed.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u)))
.chain(failed.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u)))
.sorted_by(|a, b| a.id().cmp(&b.id()))
.collect())
})
.map_err(|e| UpdateError::Error(Box::new(e)))?;
Ok(updates)
}
None => Err(UpdateError::UnexistingIndex(uuid)),
};
result
}).await
.map_err(|e| UpdateError::Error(Box::new(e)))? .map_err(|e| UpdateError::Error(Box::new(e)))?
} }
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<Option<UpdateStatus>> { async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<Option<UpdateStatus>> {
let store = self.store let store = self
.store
.get(&uuid) .get(&uuid)
.await? .await?
.ok_or(UpdateError::UnexistingIndex(uuid))?; .ok_or(UpdateError::UnexistingIndex(uuid))?;
let result = store.meta(id) let result = store
.meta(id)
.map_err(|e| UpdateError::Error(Box::new(e)))?; .map_err(|e| UpdateError::Error(Box::new(e)))?;
Ok(result) Ok(result)
} }
async fn handle_delete(&self, uuid: Uuid) -> Result<()> { async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let store = self.store let store = self.store.delete(&uuid).await?;
.delete(&uuid)
.await?;
if let Some(store) = store { if let Some(store) = store {
tokio::task::spawn(async move { tokio::task::spawn(async move {

View File

@ -1,6 +1,6 @@
use std::fs::remove_file; use std::fs::remove_file;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use heed::types::{DecodeIgnore, OwnedType, SerdeJson}; use heed::types::{DecodeIgnore, OwnedType, SerdeJson};
use heed::{Database, Env, EnvOpenOptions}; use heed::{Database, Env, EnvOpenOptions};
@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
use std::fs::File; use std::fs::File;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use parking_lot::RwLock;
use crate::index_controller::updates::*; use crate::index_controller::updates::*;
@ -206,7 +207,7 @@ where
// to the update handler. Processing store is non persistent to be able recover // to the update handler. Processing store is non persistent to be able recover
// from a failure // from a failure
let processing = pending.processing(); let processing = pending.processing();
self.processing.write().unwrap().replace(processing.clone()); self.processing.write().replace(processing.clone());
let file = File::open(&content_path)?; let file = File::open(&content_path)?;
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
let result = handler.handle_update(processing, file)?; let result = handler.handle_update(processing, file)?;
@ -216,7 +217,7 @@ where
// we must remove the content from the pending and processing stores and // we must remove the content from the pending and processing stores and
// write the *new* meta to the processed-meta store and commit. // write the *new* meta to the processed-meta store and commit.
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
self.processing.write().unwrap().take(); self.processing.write().take();
self.pending_meta.delete(&mut wtxn, &first_id)?; self.pending_meta.delete(&mut wtxn, &first_id)?;
remove_file(&content_path)?; remove_file(&content_path)?;
self.pending.delete(&mut wtxn, &first_id)?; self.pending.delete(&mut wtxn, &first_id)?;
@ -232,36 +233,52 @@ where
} }
} }
/// Execute the user defined function with the meta-store iterators, the first pub fn list(&self) -> anyhow::Result<Vec<UpdateStatus<M, N, E>>> {
/// iterator is the *processed* meta one, the second the *aborted* meta one
/// and, the last is the *pending* meta one.
pub fn iter_metas<F, T>(&self, mut f: F) -> heed::Result<T>
where
F: for<'a> FnMut(
Option<Processing<M>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Processed<M, N>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Pending<M>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Failed<M, E>>>,
) -> heed::Result<T>,
{
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let mut updates = Vec::new();
// We get the pending, processed and aborted meta iterators. let processing = self.processing.read();
let processed_iter = self.processed_meta.iter(&rtxn)?; if let Some(ref processing) = *processing {
let aborted_iter = self.aborted_meta.iter(&rtxn)?; let update = UpdateStatus::from(processing.clone());
let pending_iter = self.pending_meta.iter(&rtxn)?; updates.push(update);
let processing = self.processing.read().unwrap().clone(); }
let failed_iter = self.failed_meta.iter(&rtxn)?;
// We execute the user defined function with both iterators. let pending = self
(f)( .pending_meta
processing, .iter(&rtxn)?
processed_iter, .filter_map(Result::ok)
aborted_iter, .filter_map(|(_, p)| (Some(p.id()) != processing.as_ref().map(|p| p.id())).then(|| p))
pending_iter, .map(UpdateStatus::from);
failed_iter,
) updates.extend(pending);
let aborted =
self.aborted_meta.iter(&rtxn)?
.filter_map(Result::ok)
.map(|(_, p)| p)
.map(UpdateStatus::from);
updates.extend(aborted);
let processed =
self.processed_meta.iter(&rtxn)?
.filter_map(Result::ok)
.map(|(_, p)| p)
.map(UpdateStatus::from);
updates.extend(processed);
let failed =
self.failed_meta.iter(&rtxn)?
.filter_map(Result::ok)
.map(|(_, p)| p)
.map(UpdateStatus::from);
updates.extend(failed);
updates.sort_unstable_by(|a, b| a.id().cmp(&b.id()));
Ok(updates)
} }
/// Returns the update associated meta or `None` if the update doesn't exist. /// Returns the update associated meta or `None` if the update doesn't exist.
@ -269,7 +286,7 @@ where
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let key = BEU64::new(update_id); let key = BEU64::new(update_id);
if let Some(ref meta) = *self.processing.read().unwrap() { if let Some(ref meta) = *self.processing.read() {
if meta.id() == update_id { if meta.id() == update_id {
return Ok(Some(UpdateStatus::Processing(meta.clone()))); return Ok(Some(UpdateStatus::Processing(meta.clone())));
} }