199: fix flaky tests r=irevoire a=MarinPostma

fixes:
- index creation bug
- fix store lock
- fix decoding error
- fix stats


Co-authored-by: mpostma <postma.marin@protonmail.com>
Co-authored-by: Irevoire <tamo@meilisearch.com>
This commit is contained in:
bors[bot] 2021-06-10 14:13:25 +00:00 committed by GitHub
commit e6220a1346
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 205 additions and 171 deletions

View File

@ -81,7 +81,7 @@ impl UpdateHandler {
primary_key.as_deref(), primary_key.as_deref(),
), ),
ClearDocuments => index.clear_documents(update_builder), ClearDocuments => index.clear_documents(update_builder),
DeleteDocuments => index.delete_documents(content, update_builder), DeleteDocuments { ids } => index.delete_documents(ids, update_builder),
Settings(settings) => index.update_settings(&settings.clone().check(), update_builder), Settings(settings) => index.update_settings(&settings.clone().check(), update_builder),
}; };

View File

@ -298,18 +298,14 @@ impl Index {
pub fn delete_documents( pub fn delete_documents(
&self, &self,
document_ids: Option<impl io::Read>, document_ids: &[String],
update_builder: UpdateBuilder, update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> { ) -> anyhow::Result<UpdateResult> {
let ids = match document_ids {
Some(reader) => serde_json::from_reader(reader)?,
None => Vec::<String>::new(),
};
let mut txn = self.write_txn()?; let mut txn = self.write_txn()?;
let mut builder = update_builder.delete_documents(&mut txn, self)?; let mut builder = update_builder.delete_documents(&mut txn, self)?;
// We ignore unexisting document ids // We ignore unexisting document ids
ids.iter().for_each(|id| { document_ids.iter().for_each(|id| {
builder.delete_external_id(id); builder.delete_external_id(id);
}); });

View File

@ -40,6 +40,13 @@ impl MapIndexStore {
#[async_trait::async_trait] #[async_trait::async_trait]
impl IndexStore for MapIndexStore { impl IndexStore for MapIndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> IndexResult<Index> { async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> IndexResult<Index> {
// We need to keep the lock until we are sure the db file has been opened correclty, to
// ensure that another db is not created at the same time.
let mut lock = self.index_store.write().await;
if let Some(index) = lock.get(&uuid) {
return Ok(index.clone());
}
let path = self.path.join(format!("index-{}", uuid)); let path = self.path.join(format!("index-{}", uuid));
if path.exists() { if path.exists() {
return Err(IndexError::IndexAlreadyExists); return Err(IndexError::IndexAlreadyExists);
@ -57,7 +64,7 @@ impl IndexStore for MapIndexStore {
}) })
.await??; .await??;
self.index_store.write().await.insert(uuid, index.clone()); lock.insert(uuid, index.clone());
Ok(index) Ok(index)
} }

View File

@ -200,18 +200,11 @@ impl IndexController {
pub async fn delete_documents( pub async fn delete_documents(
&self, &self,
uid: String, uid: String,
document_ids: Vec<String>, documents: Vec<String>,
) -> anyhow::Result<UpdateStatus> { ) -> anyhow::Result<UpdateStatus> {
let uuid = self.uuid_resolver.get(uid).await?; let uuid = self.uuid_resolver.get(uid).await?;
let meta = UpdateMeta::DeleteDocuments; let meta = UpdateMeta::DeleteDocuments { ids: documents };
let (sender, receiver) = mpsc::channel(10); let (_, receiver) = mpsc::channel(1);
tokio::task::spawn(async move {
let json = serde_json::to_vec(&document_ids).unwrap();
let bytes = Bytes::from(json);
let _ = sender.send(Ok(bytes)).await;
});
let status = self.update_handle.update(meta, receiver, uuid).await?; let status = self.update_handle.update(meta, receiver, uuid).await?;
Ok(status) Ok(status)
} }
@ -249,8 +242,9 @@ impl IndexController {
) -> anyhow::Result<IndexMetadata> { ) -> anyhow::Result<IndexMetadata> {
let IndexSettings { uid, primary_key } = index_settings; let IndexSettings { uid, primary_key } = index_settings;
let uid = uid.ok_or_else(|| anyhow::anyhow!("Can't create an index without a uid."))?; let uid = uid.ok_or_else(|| anyhow::anyhow!("Can't create an index without a uid."))?;
let uuid = self.uuid_resolver.create(uid.clone()).await?; let uuid = Uuid::new_v4();
let meta = self.index_handle.create_index(uuid, primary_key).await?; let meta = self.index_handle.create_index(uuid, primary_key).await?;
self.uuid_resolver.insert(uid.clone(), uuid).await?;
let meta = IndexMetadata { let meta = IndexMetadata {
uuid, uuid,
name: uid.clone(), name: uid.clone(),

View File

@ -1,6 +1,7 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use log::info; use log::info;
@ -19,6 +20,7 @@ pub struct UpdateActor<D, I> {
store: Arc<UpdateStore>, store: Arc<UpdateStore>,
inbox: mpsc::Receiver<UpdateMsg<D>>, inbox: mpsc::Receiver<UpdateMsg<D>>,
index_handle: I, index_handle: I,
must_exit: Arc<AtomicBool>,
} }
impl<D, I> UpdateActor<D, I> impl<D, I> UpdateActor<D, I>
@ -39,14 +41,17 @@ where
let mut options = heed::EnvOpenOptions::new(); let mut options = heed::EnvOpenOptions::new();
options.map_size(update_db_size); options.map_size(update_db_size);
let store = UpdateStore::open(options, &path, index_handle.clone())?; let must_exit = Arc::new(AtomicBool::new(false));
let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?;
std::fs::create_dir_all(path.join("update_files"))?; std::fs::create_dir_all(path.join("update_files"))?;
assert!(path.exists());
Ok(Self { Ok(Self {
path, path,
store, store,
inbox, inbox,
index_handle, index_handle,
must_exit,
}) })
} }
@ -56,7 +61,13 @@ where
info!("Started update actor."); info!("Started update actor.");
loop { loop {
match self.inbox.recv().await { let msg = self.inbox.recv().await;
if self.must_exit.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
match msg {
Some(Update { Some(Update {
uuid, uuid,
meta, meta,
@ -95,7 +106,7 @@ where
mut payload: mpsc::Receiver<PayloadData<D>>, mut payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<UpdateStatus> { ) -> Result<UpdateStatus> {
let file_path = match meta { let file_path = match meta {
UpdateMeta::DocumentsAddition { .. } | UpdateMeta::DeleteDocuments => { UpdateMeta::DocumentsAddition { .. } => {
let update_file_id = uuid::Uuid::new_v4(); let update_file_id = uuid::Uuid::new_v4();
let path = self let path = self
.path .path
@ -170,10 +181,13 @@ where
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> { async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let store = self.store.clone(); let store = self.store.clone();
tokio::task::spawn_blocking(move || {
let result = store let result = store
.meta(uuid, id)? .meta(uuid, id)?
.ok_or(UpdateError::UnexistingUpdate(id))?; .ok_or(UpdateError::UnexistingUpdate(id))?;
Ok(result) Ok(result)
})
.await?
} }
async fn handle_delete(&self, uuid: Uuid) -> Result<()> { async fn handle_delete(&self, uuid: Uuid) -> Result<()> {

View File

@ -7,7 +7,8 @@ use uuid::Uuid;
use crate::index_controller::{IndexActorHandle, UpdateStatus}; use crate::index_controller::{IndexActorHandle, UpdateStatus};
use super::{ use super::{
PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStoreInfo, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateError, UpdateMeta, UpdateMsg,
UpdateStoreInfo,
}; };
#[derive(Clone)] #[derive(Clone)]
@ -47,42 +48,72 @@ where
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> { async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::ListUpdates { uuid, ret }; let msg = UpdateMsg::ListUpdates { uuid, ret };
let _ = self.sender.send(msg).await; self.sender
receiver.await.expect("update actor killed.") .send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
} }
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> { async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetUpdate { uuid, id, ret }; let msg = UpdateMsg::GetUpdate { uuid, id, ret };
let _ = self.sender.send(msg).await; self.sender
receiver.await.expect("update actor killed.") .send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
} }
async fn delete(&self, uuid: Uuid) -> Result<()> { async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Delete { uuid, ret }; let msg = UpdateMsg::Delete { uuid, ret };
let _ = self.sender.send(msg).await; self.sender
receiver.await.expect("update actor killed.") .send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
} }
async fn snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> { async fn snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Snapshot { uuids, path, ret }; let msg = UpdateMsg::Snapshot { uuids, path, ret };
let _ = self.sender.send(msg).await; self.sender
receiver.await.expect("update actor killed.") .send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
} }
async fn dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> { async fn dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Dump { uuids, path, ret }; let msg = UpdateMsg::Dump { uuids, path, ret };
let _ = self.sender.send(msg).await; self.sender
receiver.await.expect("update actor killed.") .send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
} }
async fn get_info(&self) -> Result<UpdateStoreInfo> { async fn get_info(&self) -> Result<UpdateStoreInfo> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetInfo { ret }; let msg = UpdateMsg::GetInfo { ret };
let _ = self.sender.send(msg).await; self.sender
receiver.await.expect("update actor killed.") .send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
} }
async fn update( async fn update(
@ -98,7 +129,12 @@ where
meta, meta,
ret, ret,
}; };
let _ = self.sender.send(msg).await; self.sender
receiver.await.expect("update actor killed.") .send(msg)
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?;
receiver
.await
.map_err(|_| UpdateError::FatalUpdateStoreError)?
} }
} }

View File

@ -30,6 +30,10 @@ pub enum UpdateError {
UnexistingUpdate(u64), UnexistingUpdate(u64),
#[error("Internal error processing update: {0}")] #[error("Internal error processing update: {0}")]
Internal(String), Internal(String),
#[error(
"Update store was shut down due to a fatal error, please check your logs for more info."
)]
FatalUpdateStoreError,
} }
macro_rules! internal_error { macro_rules! internal_error {

View File

@ -9,8 +9,7 @@ use heed::{EnvOpenOptions, RoTxn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use super::UpdateStore; use super::{State, UpdateStore};
use super::{codec::UpdateKeyCodec, State};
use crate::index_controller::{ use crate::index_controller::{
index_actor::IndexActorHandle, update_actor::store::update_uuid_to_file_path, Enqueued, index_actor::IndexActorHandle, update_actor::store::update_uuid_to_file_path, Enqueued,
UpdateStatus, UpdateStatus,
@ -105,11 +104,7 @@ impl UpdateStore {
uuids: &HashSet<Uuid>, uuids: &HashSet<Uuid>,
mut file: &mut File, mut file: &mut File,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let updates = self let updates = self.updates.iter(txn)?.lazily_decode_data();
.updates
.iter(txn)?
.remap_key_type::<UpdateKeyCodec>()
.lazily_decode_data();
for update in updates { for update in updates {
let ((uuid, _), data) = update?; let ((uuid, _), data) = update?;

View File

@ -3,6 +3,7 @@ pub mod dump;
use std::fs::{copy, create_dir_all, remove_file, File}; use std::fs::{copy, create_dir_all, remove_file, File};
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{ use std::{
collections::{BTreeMap, HashSet}, collections::{BTreeMap, HashSet},
@ -96,9 +97,9 @@ pub struct UpdateStore {
/// The keys are built as follow: /// The keys are built as follow:
/// | Uuid | id | /// | Uuid | id |
/// | 16-bytes | 8-bytes | /// | 16-bytes | 8-bytes |
updates: Database<ByteSlice, SerdeJson<UpdateStatus>>, updates: Database<UpdateKeyCodec, SerdeJson<UpdateStatus>>,
/// Indicates the current state of the update store, /// Indicates the current state of the update store,
pub state: Arc<StateLock>, state: Arc<StateLock>,
/// Wake up the loop when a new event occurs. /// Wake up the loop when a new event occurs.
notification_sender: mpsc::Sender<()>, notification_sender: mpsc::Sender<()>,
path: PathBuf, path: PathBuf,
@ -138,6 +139,7 @@ impl UpdateStore {
options: EnvOpenOptions, options: EnvOpenOptions,
path: impl AsRef<Path>, path: impl AsRef<Path>,
index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static,
must_exit: Arc<AtomicBool>,
) -> anyhow::Result<Arc<Self>> { ) -> anyhow::Result<Arc<Self>> {
let (update_store, mut notification_receiver) = Self::new(options, path)?; let (update_store, mut notification_receiver) = Self::new(options, path)?;
let update_store = Arc::new(update_store); let update_store = Arc::new(update_store);
@ -171,7 +173,11 @@ impl UpdateStore {
match res { match res {
Ok(Some(_)) => (), Ok(Some(_)) => (),
Ok(None) => break, Ok(None) => break,
Err(e) => error!("error while processing update: {}", e), Err(e) => {
error!("Fatal error while processing an update that requires the update store to shutdown: {}", e);
must_exit.store(true, Ordering::SeqCst);
break 'outer;
}
} }
} }
// the ownership on the arc has been taken, we need to exit. // the ownership on the arc has been taken, we need to exit.
@ -179,6 +185,8 @@ impl UpdateStore {
} }
} }
} }
error!("Update store loop exited.");
}); });
Ok(update_store) Ok(update_store)
@ -261,7 +269,7 @@ impl UpdateStore {
} }
_ => { _ => {
let _update_id = self.next_update_id_raw(wtxn, index_uuid)?; let _update_id = self.next_update_id_raw(wtxn, index_uuid)?;
self.updates.remap_key_type::<UpdateKeyCodec>().put( self.updates.put(
wtxn, wtxn,
&(index_uuid, update.id()), &(index_uuid, update.id()),
&update, &update,
@ -286,18 +294,38 @@ impl UpdateStore {
// If there is a pending update we process and only keep // If there is a pending update we process and only keep
// a reader while processing it, not a writer. // a reader while processing it, not a writer.
match first_meta { match first_meta {
Some(((global_id, index_uuid, update_id), mut pending)) => { Some(((global_id, index_uuid, _), mut pending)) => {
let content_path = pending.content.take(); let content = pending.content.take();
let processing = pending.processing(); let processing = pending.processing();
// Acquire the state lock and set the current state to processing. // Acquire the state lock and set the current state to processing.
// txn must *always* be acquired after state lock, or it will dead lock. // txn must *always* be acquired after state lock, or it will dead lock.
let state = self.state.write(); let state = self.state.write();
state.swap(State::Processing(index_uuid, processing.clone())); state.swap(State::Processing(index_uuid, processing.clone()));
let result =
self.perform_update(content, processing, index_handle, index_uuid, global_id);
state.swap(State::Idle);
result
}
None => Ok(None),
}
}
fn perform_update(
&self,
content: Option<Uuid>,
processing: Processing,
index_handle: impl IndexActorHandle,
index_uuid: Uuid,
global_id: u64,
) -> anyhow::Result<Option<()>> {
let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid));
let update_id = processing.id();
let file = match content_path { let file = match content_path {
Some(uuid) => { Some(ref path) => {
let path = update_uuid_to_file_path(&self.path, uuid);
let file = File::open(path)?; let file = File::open(path)?;
Some(file) Some(file)
} }
@ -306,7 +334,8 @@ impl UpdateStore {
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
let handle = Handle::current(); let handle = Handle::current();
let result = match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) { let result =
match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) {
Ok(result) => result, Ok(result) => result,
Err(e) => Err(processing.fail(e.to_string())), Err(e) => Err(processing.fail(e.to_string())),
}; };
@ -318,30 +347,25 @@ impl UpdateStore {
self.pending_queue self.pending_queue
.delete(&mut wtxn, &(global_id, index_uuid, update_id))?; .delete(&mut wtxn, &(global_id, index_uuid, update_id))?;
if let Some(uuid) = content_path {
let path = update_uuid_to_file_path(&self.path, uuid);
remove_file(&path)?;
}
let result = match result { let result = match result {
Ok(res) => res.into(), Ok(res) => res.into(),
Err(res) => res.into(), Err(res) => res.into(),
}; };
self.updates.remap_key_type::<UpdateKeyCodec>().put( self.updates.put(
&mut wtxn, &mut wtxn,
&(index_uuid, update_id), &(index_uuid, update_id),
&result, &result,
)?; )?;
wtxn.commit()?; wtxn.commit()?;
state.swap(State::Idle);
if let Some(ref path) = content_path {
remove_file(&path)?;
}
Ok(Some(())) Ok(Some(()))
} }
None => Ok(None),
}
}
/// List the updates for `index_uuid`. /// List the updates for `index_uuid`.
pub fn list(&self, index_uuid: Uuid) -> anyhow::Result<Vec<UpdateStatus>> { pub fn list(&self, index_uuid: Uuid) -> anyhow::Result<Vec<UpdateStatus>> {
@ -357,7 +381,11 @@ impl UpdateStore {
} }
} }
let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?; let updates = self
.updates
.remap_key_type::<ByteSlice>()
.prefix_iter(&txn, index_uuid.as_bytes())?;
for entry in updates { for entry in updates {
let (_, update) = entry?; let (_, update) = entry?;
update_list.insert(update.id(), update); update_list.insert(update.id(), update);
@ -388,24 +416,17 @@ impl UpdateStore {
let txn = self.env.read_txn()?; let txn = self.env.read_txn()?;
// Else, check if it is in the updates database: // Else, check if it is in the updates database:
let update = self let update = self.updates.get(&txn, &(index_uuid, update_id))?;
.updates
.remap_key_type::<UpdateKeyCodec>()
.get(&txn, &(index_uuid, update_id))?;
if let Some(update) = update { if let Some(update) = update {
return Ok(Some(update)); return Ok(Some(update));
} }
// If nothing was found yet, we resolve to iterate over the pending queue. // If nothing was found yet, we resolve to iterate over the pending queue.
let pendings = self let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
.pending_queue
.remap_key_type::<UpdateKeyCodec>()
.iter(&txn)?
.lazily_decode_data();
for entry in pendings { for entry in pendings {
let ((uuid, id), pending) = entry?; let ((_, uuid, id), pending) = entry?;
if uuid == index_uuid && id == update_id { if uuid == index_uuid && id == update_id {
return Ok(Some(pending.decode()?.into())); return Ok(Some(pending.decode()?.into()));
} }
@ -437,6 +458,7 @@ impl UpdateStore {
let mut updates = self let mut updates = self
.updates .updates
.remap_key_type::<ByteSlice>()
.prefix_iter_mut(&mut txn, index_uuid.as_bytes())? .prefix_iter_mut(&mut txn, index_uuid.as_bytes())?
.lazily_decode_data(); .lazily_decode_data();
@ -561,7 +583,13 @@ mod test {
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
let handle = Arc::new(MockIndexActorHandle::new()); let handle = Arc::new(MockIndexActorHandle::new());
options.map_size(4096 * 100); options.map_size(4096 * 100);
let update_store = UpdateStore::open(options, dir.path(), handle).unwrap(); let update_store = UpdateStore::open(
options,
dir.path(),
handle,
Arc::new(AtomicBool::new(false)),
)
.unwrap();
let index1_uuid = Uuid::new_v4(); let index1_uuid = Uuid::new_v4();
let index2_uuid = Uuid::new_v4(); let index2_uuid = Uuid::new_v4();
@ -588,7 +616,13 @@ mod test {
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
let handle = Arc::new(MockIndexActorHandle::new()); let handle = Arc::new(MockIndexActorHandle::new());
options.map_size(4096 * 100); options.map_size(4096 * 100);
let update_store = UpdateStore::open(options, dir.path(), handle).unwrap(); let update_store = UpdateStore::open(
options,
dir.path(),
handle,
Arc::new(AtomicBool::new(false)),
)
.unwrap();
let meta = UpdateMeta::ClearDocuments; let meta = UpdateMeta::ClearDocuments;
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let store_clone = update_store.clone(); let store_clone = update_store.clone();
@ -626,7 +660,13 @@ mod test {
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100); options.map_size(4096 * 100);
let store = UpdateStore::open(options, dir.path(), handle.clone()).unwrap(); let store = UpdateStore::open(
options,
dir.path(),
handle.clone(),
Arc::new(AtomicBool::new(false)),
)
.unwrap();
// wait a bit for the event loop exit. // wait a bit for the event loop exit.
tokio::time::sleep(std::time::Duration::from_millis(50)).await; tokio::time::sleep(std::time::Duration::from_millis(50)).await;
@ -665,7 +705,6 @@ mod test {
assert!(store.pending_queue.first(&txn).unwrap().is_none()); assert!(store.pending_queue.first(&txn).unwrap().is_none());
let update = store let update = store
.updates .updates
.remap_key_type::<UpdateKeyCodec>()
.get(&txn, &(uuid, 0)) .get(&txn, &(uuid, 0))
.unwrap() .unwrap()
.unwrap(); .unwrap();
@ -673,7 +712,6 @@ mod test {
assert!(matches!(update, UpdateStatus::Processed(_))); assert!(matches!(update, UpdateStatus::Processed(_)));
let update = store let update = store
.updates .updates
.remap_key_type::<UpdateKeyCodec>()
.get(&txn, &(uuid, 1)) .get(&txn, &(uuid, 1))
.unwrap() .unwrap()
.unwrap(); .unwrap();

View File

@ -3,7 +3,7 @@ use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use crate::index::{Unchecked, Settings}; use crate::index::{Settings, Unchecked};
pub type UpdateError = String; pub type UpdateError = String;
@ -23,7 +23,9 @@ pub enum UpdateMeta {
primary_key: Option<String>, primary_key: Option<String>,
}, },
ClearDocuments, ClearDocuments,
DeleteDocuments, DeleteDocuments {
ids: Vec<String>
},
Settings(Settings<Unchecked>), Settings(Settings<Unchecked>),
} }

View File

@ -23,9 +23,6 @@ impl<S: UuidStore> UuidResolverActor<S> {
loop { loop {
match self.inbox.recv().await { match self.inbox.recv().await {
Some(Create { uid: name, ret }) => {
let _ = ret.send(self.handle_create(name).await);
}
Some(Get { uid: name, ret }) => { Some(Get { uid: name, ret }) => {
let _ = ret.send(self.handle_get(name).await); let _ = ret.send(self.handle_get(name).await);
} }
@ -55,13 +52,6 @@ impl<S: UuidStore> UuidResolverActor<S> {
warn!("exiting uuid resolver loop"); warn!("exiting uuid resolver loop");
} }
async fn handle_create(&self, uid: String) -> Result<Uuid> {
if !is_index_uid_valid(&uid) {
return Err(UuidResolverError::BadlyFormatted(uid));
}
self.store.create_uuid(uid, true).await
}
async fn handle_get(&self, uid: String) -> Result<Uuid> { async fn handle_get(&self, uid: String) -> Result<Uuid> {
self.store self.store
.get_uuid(uid.clone()) .get_uuid(uid.clone())

View File

@ -32,15 +32,6 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
.expect("Uuid resolver actor has been killed")?) .expect("Uuid resolver actor has been killed")?)
} }
async fn create(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Create { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
async fn delete(&self, name: String) -> anyhow::Result<Uuid> { async fn delete(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Delete { uid: name, ret }; let msg = UuidResolveMsg::Delete { uid: name, ret };

View File

@ -11,10 +11,6 @@ pub enum UuidResolveMsg {
uid: String, uid: String,
ret: oneshot::Sender<Result<Uuid>>, ret: oneshot::Sender<Result<Uuid>>,
}, },
Create {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,
},
Delete { Delete {
uid: String, uid: String,
ret: oneshot::Sender<Result<Uuid>>, ret: oneshot::Sender<Result<Uuid>>,

View File

@ -28,7 +28,6 @@ pub type Result<T> = std::result::Result<T, UuidResolverError>;
pub trait UuidResolverHandle { pub trait UuidResolverHandle {
async fn get(&self, name: String) -> Result<Uuid>; async fn get(&self, name: String) -> Result<Uuid>;
async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()>; async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()>;
async fn create(&self, name: String) -> anyhow::Result<Uuid>;
async fn delete(&self, name: String) -> anyhow::Result<Uuid>; async fn delete(&self, name: String) -> anyhow::Result<Uuid>;
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>; async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>; async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;

View File

@ -23,7 +23,6 @@ const UUIDS_DB_PATH: &str = "index_uuids";
pub trait UuidStore: Sized { pub trait UuidStore: Sized {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return // Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise. // the uuid otherwise.
async fn create_uuid(&self, uid: String, err: bool) -> Result<Uuid>;
async fn get_uuid(&self, uid: String) -> Result<Option<Uuid>>; async fn get_uuid(&self, uid: String) -> Result<Option<Uuid>>;
async fn delete(&self, uid: String) -> Result<Option<Uuid>>; async fn delete(&self, uid: String) -> Result<Option<Uuid>>;
async fn list(&self) -> Result<Vec<(String, Uuid)>>; async fn list(&self) -> Result<Vec<(String, Uuid)>>;
@ -50,27 +49,6 @@ impl HeedUuidStore {
Ok(Self { env, db }) Ok(Self { env, db })
} }
pub fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
Err(UuidResolverError::NameAlreadyExist)
} else {
let uuid = Uuid::from_slice(uuid)?;
Ok(uuid)
}
}
None => {
let uuid = Uuid::new_v4();
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(uuid)
}
}
}
pub fn get_uuid(&self, name: String) -> Result<Option<Uuid>> { pub fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone(); let env = self.env.clone();
let db = self.db; let db = self.db;
@ -116,6 +94,11 @@ impl HeedUuidStore {
let env = self.env.clone(); let env = self.env.clone();
let db = self.db; let db = self.db;
let mut txn = env.write_txn()?; let mut txn = env.write_txn()?;
if db.get(&txn, &name)?.is_some() {
return Err(UuidResolverError::NameAlreadyExist);
}
db.put(&mut txn, &name, uuid.as_bytes())?; db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?; txn.commit()?;
Ok(()) Ok(())
@ -205,11 +188,6 @@ impl HeedUuidStore {
#[async_trait::async_trait] #[async_trait::async_trait]
impl UuidStore for HeedUuidStore { impl UuidStore for HeedUuidStore {
async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.create_uuid(name, err)).await?
}
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> { async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let this = self.clone(); let this = self.clone();
tokio::task::spawn_blocking(move || this.get_uuid(name)).await? tokio::task::spawn_blocking(move || this.get_uuid(name)).await?

View File

@ -114,7 +114,7 @@ async fn delete_no_document_batch() {
index.add_documents(json!([{ "id": 1, "content": "foobar" }, { "id": 0, "content": "foobar" }, { "id": 3, "content": "foobar" }]), Some("id")).await; index.add_documents(json!([{ "id": 1, "content": "foobar" }, { "id": 0, "content": "foobar" }, { "id": 3, "content": "foobar" }]), Some("id")).await;
index.wait_update_id(0).await; index.wait_update_id(0).await;
let (_response, code) = index.delete_batch(vec![]).await; let (_response, code) = index.delete_batch(vec![]).await;
assert_eq!(code, 202); assert_eq!(code, 202, "{}", _response);
let _update = index.wait_update_id(1).await; let _update = index.wait_update_id(1).await;
let (response, code) = index let (response, code) = index

View File

@ -35,11 +35,6 @@ async fn stats() {
assert_eq!(code, 202); assert_eq!(code, 202);
assert_eq!(response["updateId"], 0); assert_eq!(response["updateId"], 0);
let (response, code) = index.stats().await;
assert_eq!(code, 200);
assert_eq!(response["isIndexing"], true);
index.wait_update_id(0).await; index.wait_update_id(0).await;
let (response, code) = index.stats().await; let (response, code) = index.stats().await;

View File

@ -53,13 +53,12 @@ async fn stats() {
]); ]);
let (response, code) = index.add_documents(documents, None).await; let (response, code) = index.add_documents(documents, None).await;
assert_eq!(code, 202); assert_eq!(code, 202, "{}", response);
assert_eq!(response["updateId"], 0); assert_eq!(response["updateId"], 0);
let (response, code) = server.stats().await; let (response, code) = server.stats().await;
assert_eq!(code, 200); assert_eq!(code, 200, "{}", response);
assert_eq!(response["indexes"]["test"]["isIndexing"], true);
index.wait_update_id(0).await; index.wait_update_id(0).await;