mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 20:15:07 +08:00
Add IndexStatus::BeingResized
This commit is contained in:
parent
6cc3797aa1
commit
1c670d7fa0
@ -9,10 +9,11 @@ use meilisearch_types::heed::types::Str;
|
|||||||
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn};
|
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn};
|
||||||
use meilisearch_types::milli::update::IndexerConfig;
|
use meilisearch_types::milli::update::IndexerConfig;
|
||||||
use meilisearch_types::milli::Index;
|
use meilisearch_types::milli::Index;
|
||||||
|
use synchronoise::SignalEvent;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use self::IndexStatus::{Available, BeingDeleted};
|
use self::IndexStatus::{Available, BeingDeleted, BeingResized};
|
||||||
use crate::uuid_codec::UuidCodec;
|
use crate::uuid_codec::UuidCodec;
|
||||||
use crate::{clamp_to_page_size, Error, Result};
|
use crate::{clamp_to_page_size, Error, Result};
|
||||||
|
|
||||||
@ -45,6 +46,8 @@ pub struct IndexMapper {
|
|||||||
pub enum IndexStatus {
|
pub enum IndexStatus {
|
||||||
/// Do not insert it back in the index map as it is currently being deleted.
|
/// Do not insert it back in the index map as it is currently being deleted.
|
||||||
BeingDeleted,
|
BeingDeleted,
|
||||||
|
/// Temporarily do not insert the index in the index map as it is currently being resized.
|
||||||
|
BeingResized(Arc<SignalEvent>),
|
||||||
/// You can use the index without worrying about anything.
|
/// You can use the index without worrying about anything.
|
||||||
Available(Index),
|
Available(Index),
|
||||||
}
|
}
|
||||||
@ -106,11 +109,12 @@ impl IndexMapper {
|
|||||||
let index = self.create_or_open_index(&index_path, date, self.index_size)?;
|
let index = self.create_or_open_index(&index_path, date, self.index_size)?;
|
||||||
|
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
|
// Error if the UUIDv4 somehow already exists in the map, since it should be fresh.
|
||||||
|
// This is very unlikely to happen in practice.
|
||||||
// TODO: it would be better to lazily create the index. But we need an Index::open function for milli.
|
// TODO: it would be better to lazily create the index. But we need an Index::open function for milli.
|
||||||
if let Some(BeingDeleted) =
|
if self.index_map.write().unwrap().insert(uuid, Available(index.clone())).is_some()
|
||||||
self.index_map.write().unwrap().insert(uuid, Available(index.clone()))
|
|
||||||
{
|
{
|
||||||
panic!("Uuid v4 conflict.");
|
panic!("Uuid v4 conflict: index with UUID {uuid} already exists.");
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(index)
|
Ok(index)
|
||||||
@ -132,13 +136,23 @@ impl IndexMapper {
|
|||||||
|
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
// We remove the index from the in-memory index map.
|
// We remove the index from the in-memory index map.
|
||||||
|
let closing_event = loop {
|
||||||
let mut lock = self.index_map.write().unwrap();
|
let mut lock = self.index_map.write().unwrap();
|
||||||
let closing_event = match lock.insert(uuid, BeingDeleted) {
|
let resize_operation = match lock.insert(uuid, BeingDeleted) {
|
||||||
Some(Available(index)) => Some(index.prepare_for_closing()),
|
Some(Available(index)) => break Some(index.prepare_for_closing()),
|
||||||
_ => None,
|
// The target index is in the middle of a resize operation.
|
||||||
|
// Wait for this operation to complete, then try again.
|
||||||
|
Some(BeingResized(resize_operation)) => resize_operation.clone(),
|
||||||
|
// The index is already being deleted or doesn't exist.
|
||||||
|
// It's OK to remove it from the map again.
|
||||||
|
_ => break None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Avoiding deadlocks: we need to drop the lock before waiting for the end of the resize, which
|
||||||
|
// will involve operations on the very map we're locking.
|
||||||
drop(lock);
|
drop(lock);
|
||||||
|
resize_operation.wait();
|
||||||
|
};
|
||||||
|
|
||||||
let index_map = self.index_map.clone();
|
let index_map = self.index_map.clone();
|
||||||
let index_path = self.base_path.join(uuid.to_string());
|
let index_path = self.base_path.join(uuid.to_string());
|
||||||
@ -180,9 +194,16 @@ impl IndexMapper {
|
|||||||
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
|
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
|
||||||
|
|
||||||
// we clone here to drop the lock before entering the match
|
// we clone here to drop the lock before entering the match
|
||||||
|
let index = loop {
|
||||||
let index = self.index_map.read().unwrap().get(&uuid).cloned();
|
let index = self.index_map.read().unwrap().get(&uuid).cloned();
|
||||||
let index = match index {
|
|
||||||
Some(Available(index)) => index,
|
match index {
|
||||||
|
Some(Available(index)) => break index,
|
||||||
|
Some(BeingResized(ref resize_operation)) => {
|
||||||
|
// Avoiding deadlocks: no lock taken while doing this operation.
|
||||||
|
resize_operation.wait();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())),
|
Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())),
|
||||||
// since we're lazy, it's possible that the index has not been opened yet.
|
// since we're lazy, it's possible that the index has not been opened yet.
|
||||||
None => {
|
None => {
|
||||||
@ -200,14 +221,22 @@ impl IndexMapper {
|
|||||||
let index =
|
let index =
|
||||||
self.create_or_open_index(&index_path, None, self.index_size)?;
|
self.create_or_open_index(&index_path, None, self.index_size)?;
|
||||||
entry.insert(Available(index.clone()));
|
entry.insert(Available(index.clone()));
|
||||||
index
|
break index;
|
||||||
}
|
}
|
||||||
Entry::Occupied(entry) => match entry.get() {
|
Entry::Occupied(entry) => match entry.get() {
|
||||||
Available(index) => index.clone(),
|
Available(index) => break index.clone(),
|
||||||
|
BeingResized(resize_operation) => {
|
||||||
|
// Avoiding the deadlock: we drop the lock before waiting
|
||||||
|
let resize_operation = resize_operation.clone();
|
||||||
|
drop(index_map);
|
||||||
|
resize_operation.wait();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
|
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(index)
|
Ok(index)
|
||||||
|
Loading…
Reference in New Issue
Block a user