Persistently save to DB the status of experimental features

This commit is contained in:
Louis Dureuil 2023-06-22 22:56:44 +02:00
parent 29ec02d4d4
commit 072d81843f
No known key found for this signature in database
5 changed files with 140 additions and 1 deletions

View File

@ -123,6 +123,8 @@ pub enum Error {
IoError(#[from] std::io::Error), IoError(#[from] std::io::Error),
#[error(transparent)] #[error(transparent)]
Persist(#[from] tempfile::PersistError), Persist(#[from] tempfile::PersistError),
#[error(transparent)]
FeatureNotEnabled(#[from] FeatureNotEnabledError),
#[error(transparent)] #[error(transparent)]
Anyhow(#[from] anyhow::Error), Anyhow(#[from] anyhow::Error),
@ -142,6 +144,16 @@ pub enum Error {
PlannedFailure, PlannedFailure,
} }
#[derive(Debug, thiserror::Error)]
#[error(
"{disabled_action} requires enabling the `{feature}` experimental feature. See {issue_link}"
)]
pub struct FeatureNotEnabledError {
pub disabled_action: &'static str,
pub feature: &'static str,
pub issue_link: &'static str,
}
impl Error { impl Error {
pub fn is_recoverable(&self) -> bool { pub fn is_recoverable(&self) -> bool {
match self { match self {
@ -170,6 +182,7 @@ impl Error {
| Error::FileStore(_) | Error::FileStore(_)
| Error::IoError(_) | Error::IoError(_)
| Error::Persist(_) | Error::Persist(_)
| Error::FeatureNotEnabled(_)
| Error::Anyhow(_) => true, | Error::Anyhow(_) => true,
Error::CreateBatch(_) Error::CreateBatch(_)
| Error::CorruptedTaskQueue | Error::CorruptedTaskQueue
@ -214,6 +227,7 @@ impl ErrorCode for Error {
Error::FileStore(e) => e.error_code(), Error::FileStore(e) => e.error_code(),
Error::IoError(e) => e.error_code(), Error::IoError(e) => e.error_code(),
Error::Persist(e) => e.error_code(), Error::Persist(e) => e.error_code(),
Error::FeatureNotEnabled(_) => Code::FeatureNotEnabled,
// Irrecoverable errors // Irrecoverable errors
Error::Anyhow(_) => Code::Internal, Error::Anyhow(_) => Code::Internal,

View File

@ -0,0 +1,98 @@
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use crate::error::FeatureNotEnabledError;
use crate::Result;
const EXPERIMENTAL_FEATURES: &str = "experimental-features";
#[derive(Clone)]
pub(crate) struct FeatureData {
runtime: Database<Str, SerdeJson<RuntimeTogglableFeatures>>,
instance: InstanceTogglableFeatures,
}
#[derive(Debug, Clone, Copy)]
pub struct RoFeatures {
runtime: RuntimeTogglableFeatures,
instance: InstanceTogglableFeatures,
}
impl RoFeatures {
fn new(txn: RoTxn<'_>, data: &FeatureData) -> Result<Self> {
let runtime = data.runtime_features(txn)?;
Ok(Self { runtime, instance: data.instance })
}
pub fn runtime_features(&self) -> RuntimeTogglableFeatures {
self.runtime
}
pub fn check_score_details(&self) -> Result<()> {
if self.runtime.score_details {
Ok(())
} else {
Err(FeatureNotEnabledError {
disabled_action: "Computing score details",
feature: "score details",
issue_link: "https://github.com/meilisearch/product/discussions/674",
}
.into())
}
}
pub fn check_metrics(&self) -> Result<()> {
if self.instance.metrics {
Ok(())
} else {
Err(FeatureNotEnabledError {
disabled_action: "Getting metrics",
feature: "metrics",
issue_link: "https://github.com/meilisearch/meilisearch/discussions/3518",
}
.into())
}
}
pub fn check_vector(&self) -> Result<()> {
if self.runtime.vector_store {
Ok(())
} else {
Err(FeatureNotEnabledError {
disabled_action: "Passing `vector` as a query parameter",
feature: "vector store",
issue_link: "https://github.com/meilisearch/meilisearch/discussions/TODO",
}
.into())
}
}
}
impl FeatureData {
pub fn new(env: &Env, instance_features: InstanceTogglableFeatures) -> Result<Self> {
let mut wtxn = env.write_txn()?;
let runtime_features = env.create_database(&mut wtxn, Some(EXPERIMENTAL_FEATURES))?;
wtxn.commit()?;
Ok(Self { runtime: runtime_features, instance: instance_features })
}
pub fn put_runtime_features(
&self,
mut wtxn: RwTxn,
features: RuntimeTogglableFeatures,
) -> Result<()> {
self.runtime.put(&mut wtxn, EXPERIMENTAL_FEATURES, &features)?;
wtxn.commit()?;
Ok(())
}
fn runtime_features(&self, txn: RoTxn) -> Result<RuntimeTogglableFeatures> {
Ok(self.runtime.get(&txn, EXPERIMENTAL_FEATURES)?.unwrap_or_default())
}
pub fn features(&self, txn: RoTxn) -> Result<RoFeatures> {
RoFeatures::new(txn, self)
}
}

View File

@ -28,6 +28,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
started_at, started_at,
finished_at, finished_at,
index_mapper, index_mapper,
features: _,
max_number_of_tasks: _, max_number_of_tasks: _,
wake_up: _, wake_up: _,
dumps_path: _, dumps_path: _,

View File

@ -21,6 +21,7 @@ content of the scheduler or enqueue new tasks.
mod autobatcher; mod autobatcher;
mod batch; mod batch;
pub mod error; pub mod error;
mod features;
mod index_mapper; mod index_mapper;
#[cfg(test)] #[cfg(test)]
mod insta_snapshot; mod insta_snapshot;
@ -41,8 +42,10 @@ use std::time::Duration;
use dump::{KindDump, TaskDump, UpdateFile}; use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error; pub use error::Error;
pub use features::RoFeatures;
use file_store::FileStore; use file_store::FileStore;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn}; use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn};
use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::documents::DocumentsBatchBuilder;
@ -247,6 +250,8 @@ pub struct IndexSchedulerOptions {
/// The maximum number of tasks stored in the task queue before starting /// The maximum number of tasks stored in the task queue before starting
/// to auto schedule task deletions. /// to auto schedule task deletions.
pub max_number_of_tasks: usize, pub max_number_of_tasks: usize,
/// The experimental features enabled for this instance.
pub instance_features: InstanceTogglableFeatures,
} }
/// Structure which holds meilisearch's indexes and schedules the tasks /// Structure which holds meilisearch's indexes and schedules the tasks
@ -290,6 +295,9 @@ pub struct IndexScheduler {
/// In charge of creating, opening, storing and returning indexes. /// In charge of creating, opening, storing and returning indexes.
pub(crate) index_mapper: IndexMapper, pub(crate) index_mapper: IndexMapper,
/// In charge of fetching and setting the status of experimental features.
features: features::FeatureData,
/// Get a signal when a batch needs to be processed. /// Get a signal when a batch needs to be processed.
pub(crate) wake_up: Arc<SignalEvent>, pub(crate) wake_up: Arc<SignalEvent>,
@ -360,6 +368,7 @@ impl IndexScheduler {
planned_failures: self.planned_failures.clone(), planned_failures: self.planned_failures.clone(),
#[cfg(test)] #[cfg(test)]
run_loop_iteration: self.run_loop_iteration.clone(), run_loop_iteration: self.run_loop_iteration.clone(),
features: self.features.clone(),
} }
} }
} }
@ -398,9 +407,12 @@ impl IndexScheduler {
}; };
let env = heed::EnvOpenOptions::new() let env = heed::EnvOpenOptions::new()
.max_dbs(10) .max_dbs(11)
.map_size(budget.task_db_size) .map_size(budget.task_db_size)
.open(options.tasks_path)?; .open(options.tasks_path)?;
let features = features::FeatureData::new(&env, options.instance_features)?;
let file_store = FileStore::new(&options.update_file_path)?; let file_store = FileStore::new(&options.update_file_path)?;
let mut wtxn = env.write_txn()?; let mut wtxn = env.write_txn()?;
@ -452,6 +464,7 @@ impl IndexScheduler {
planned_failures, planned_failures,
#[cfg(test)] #[cfg(test)]
run_loop_iteration: Arc::new(RwLock::new(0)), run_loop_iteration: Arc::new(RwLock::new(0)),
features,
}; };
this.run(); this.run();
@ -1214,6 +1227,17 @@ impl IndexScheduler {
Ok(IndexStats { is_indexing, inner_stats: index_stats }) Ok(IndexStats { is_indexing, inner_stats: index_stats })
} }
pub fn features(&self) -> Result<RoFeatures> {
let rtxn = self.read_txn()?;
self.features.features(rtxn)
}
pub fn put_runtime_features(&self, features: RuntimeTogglableFeatures) -> Result<()> {
let wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
self.features.put_runtime_features(wtxn, features)?;
Ok(())
}
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> { pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
match task.content_uuid() { match task.content_uuid() {
Some(content_file) => self.delete_update_file(content_file), Some(content_file) => self.delete_update_file(content_file),
@ -1534,6 +1558,7 @@ mod tests {
indexer_config, indexer_config,
autobatching_enabled: true, autobatching_enabled: true,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
instance_features: Default::default(),
}; };
configuration(&mut options); configuration(&mut options);

View File

@ -271,6 +271,7 @@ InvalidTaskStatuses , InvalidRequest , BAD_REQUEST ;
InvalidTaskTypes , InvalidRequest , BAD_REQUEST ; InvalidTaskTypes , InvalidRequest , BAD_REQUEST ;
InvalidTaskUids , InvalidRequest , BAD_REQUEST ; InvalidTaskUids , InvalidRequest , BAD_REQUEST ;
IoError , System , UNPROCESSABLE_ENTITY; IoError , System , UNPROCESSABLE_ENTITY;
FeatureNotEnabled , InvalidRequest , BAD_REQUEST ;
MalformedPayload , InvalidRequest , BAD_REQUEST ; MalformedPayload , InvalidRequest , BAD_REQUEST ;
MaxFieldsLimitExceeded , InvalidRequest , BAD_REQUEST ; MaxFieldsLimitExceeded , InvalidRequest , BAD_REQUEST ;
MissingApiKeyActions , InvalidRequest , BAD_REQUEST ; MissingApiKeyActions , InvalidRequest , BAD_REQUEST ;