Merge #5316
Some checks failed
Test suite / Tests on ubuntu-20.04 (push) Failing after 2s
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Run tests in debug (push) Failing after 16s
Test suite / Run Clippy (push) Failing after 12s
Test suite / Run Rustfmt (push) Failing after 32s
Test suite / Tests on macos-13 (push) Has been cancelled
Test suite / Tests on windows-2022 (push) Has been cancelled

5316: Fix the dumpless upgrade corruption r=dureuill a=irevoire

# Pull Request

## Related issue
Fixes https://github.com/meilisearch/meilisearch/issues/5280

## What does this PR do?
- Add a test that ensure we write the version in the index-scheduler even if we have a bug while writing the VERSION file
- Do what was described in the issue


Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-02-10 09:53:57 +00:00 committed by GitHub
commit 0c3e7fe963
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 110 additions and 64 deletions

View File

@ -33,7 +33,7 @@ mod test_utils;
pub mod upgrade; pub mod upgrade;
mod utils; mod utils;
pub mod uuid_codec; pub mod uuid_codec;
mod versioning; pub mod versioning;
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
pub type TaskId = u32; pub type TaskId = u32;

View File

@ -6,8 +6,7 @@ use meili_snap::snapshot;
use meilisearch_types::milli::obkv_to_json; use meilisearch_types::milli::obkv_to_json;
use meilisearch_types::milli::update::IndexDocumentsMethod::*; use meilisearch_types::milli::update::IndexDocumentsMethod::*;
use meilisearch_types::milli::update::Setting; use meilisearch_types::milli::update::Setting;
use meilisearch_types::tasks::Kind; use meilisearch_types::tasks::{Kind, KindWithContent};
use meilisearch_types::tasks::KindWithContent;
use crate::insta_snapshot::snapshot_index_scheduler; use crate::insta_snapshot::snapshot_index_scheduler;
use crate::test_utils::Breakpoint::*; use crate::test_utils::Breakpoint::*;

View File

@ -1,9 +1,10 @@
use crate::{upgrade::upgrade_index_scheduler, Result}; use meilisearch_types::heed::types::Str;
use meilisearch_types::{ use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn};
heed::{types::Str, Database, Env, RoTxn, RwTxn}, use meilisearch_types::milli::heed_codec::version::VersionCodec;
milli::heed_codec::version::VersionCodec, use meilisearch_types::versioning;
versioning,
}; use crate::upgrade::upgrade_index_scheduler;
use crate::Result;
/// The number of database used by queue itself /// The number of database used by queue itself
const NUMBER_OF_DATABASES: u32 = 1; const NUMBER_OF_DATABASES: u32 = 1;
@ -21,30 +22,38 @@ pub struct Versioning {
} }
impl Versioning { impl Versioning {
pub(crate) const fn nb_db() -> u32 { pub const fn nb_db() -> u32 {
NUMBER_OF_DATABASES NUMBER_OF_DATABASES
} }
pub fn get_version(&self, rtxn: &RoTxn) -> Result<Option<(u32, u32, u32)>> { pub fn get_version(&self, rtxn: &RoTxn) -> Result<Option<(u32, u32, u32)>, heed::Error> {
Ok(self.version.get(rtxn, entry_name::MAIN)?) self.version.get(rtxn, entry_name::MAIN)
} }
pub fn set_version(&self, wtxn: &mut RwTxn, version: (u32, u32, u32)) -> Result<()> { pub fn set_version(
Ok(self.version.put(wtxn, entry_name::MAIN, &version)?) &self,
wtxn: &mut RwTxn,
version: (u32, u32, u32),
) -> Result<(), heed::Error> {
self.version.put(wtxn, entry_name::MAIN, &version)
} }
pub fn set_current_version(&self, wtxn: &mut RwTxn) -> Result<()> { pub fn set_current_version(&self, wtxn: &mut RwTxn) -> Result<(), heed::Error> {
let major = versioning::VERSION_MAJOR.parse().unwrap(); let major = versioning::VERSION_MAJOR.parse().unwrap();
let minor = versioning::VERSION_MINOR.parse().unwrap(); let minor = versioning::VERSION_MINOR.parse().unwrap();
let patch = versioning::VERSION_PATCH.parse().unwrap(); let patch = versioning::VERSION_PATCH.parse().unwrap();
self.set_version(wtxn, (major, minor, patch)) self.set_version(wtxn, (major, minor, patch))
} }
/// Create an index scheduler and start its run loop. /// Return `Self` without checking anything about the version
pub fn raw_new(env: &Env, wtxn: &mut RwTxn) -> Result<Self, heed::Error> {
let version = env.create_database(wtxn, Some(db_name::VERSION))?;
Ok(Self { version })
}
pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result<Self> { pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result<Self> {
let mut wtxn = env.write_txn()?; let mut wtxn = env.write_txn()?;
let version = env.create_database(&mut wtxn, Some(db_name::VERSION))?; let this = Self::raw_new(env, &mut wtxn)?;
let this = Self { version };
let from = match this.get_version(&wtxn)? { let from = match this.get_version(&wtxn)? {
Some(version) => version, Some(version) => version,
// fresh DB: use the db version // fresh DB: use the db version

View File

@ -1,7 +1,10 @@
use std::fs; use std::fs;
use std::io::{self, ErrorKind}; use std::io::{ErrorKind, Write};
use std::path::Path; use std::path::Path;
use milli::heed;
use tempfile::NamedTempFile;
/// The name of the file that contains the version of the database. /// The name of the file that contains the version of the database.
pub const VERSION_FILE_NAME: &str = "VERSION"; pub const VERSION_FILE_NAME: &str = "VERSION";
@ -10,37 +13,7 @@ pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH"); pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
/// Persists the version of the current Meilisearch binary to a VERSION file /// Persists the version of the current Meilisearch binary to a VERSION file
pub fn update_version_file_for_dumpless_upgrade( pub fn create_current_version_file(db_path: &Path) -> anyhow::Result<()> {
db_path: &Path,
from: (u32, u32, u32),
to: (u32, u32, u32),
) -> Result<(), VersionFileError> {
let (from_major, from_minor, from_patch) = from;
let (to_major, to_minor, to_patch) = to;
if from_major > to_major
|| (from_major == to_major && from_minor > to_minor)
|| (from_major == to_major && from_minor == to_minor && from_patch > to_patch)
{
Err(VersionFileError::DowngradeNotSupported {
major: from_major,
minor: from_minor,
patch: from_patch,
})
} else if from_major < 1 || (from_major == to_major && from_minor < 12) {
Err(VersionFileError::TooOldForAutomaticUpgrade {
major: from_major,
minor: from_minor,
patch: from_patch,
})
} else {
create_current_version_file(db_path)?;
Ok(())
}
}
/// Persists the version of the current Meilisearch binary to a VERSION file
pub fn create_current_version_file(db_path: &Path) -> io::Result<()> {
create_version_file(db_path, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) create_version_file(db_path, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)
} }
@ -49,9 +22,14 @@ pub fn create_version_file(
major: &str, major: &str,
minor: &str, minor: &str,
patch: &str, patch: &str,
) -> io::Result<()> { ) -> anyhow::Result<()> {
let version_path = db_path.join(VERSION_FILE_NAME); let version_path = db_path.join(VERSION_FILE_NAME);
fs::write(version_path, format!("{}.{}.{}", major, minor, patch)) // In order to persist the file later we must create it in the `data.ms` and not in `/tmp`
let mut file = NamedTempFile::new_in(db_path)?;
file.write_all(format!("{}.{}.{}", major, minor, patch).as_bytes())?;
file.flush()?;
file.persist(version_path)?;
Ok(())
} }
pub fn get_version(db_path: &Path) -> Result<(u32, u32, u32), VersionFileError> { pub fn get_version(db_path: &Path) -> Result<(u32, u32, u32), VersionFileError> {
@ -61,7 +39,7 @@ pub fn get_version(db_path: &Path) -> Result<(u32, u32, u32), VersionFileError>
Ok(version) => parse_version(&version), Ok(version) => parse_version(&version),
Err(error) => match error.kind() { Err(error) => match error.kind() {
ErrorKind::NotFound => Err(VersionFileError::MissingVersionFile), ErrorKind::NotFound => Err(VersionFileError::MissingVersionFile),
_ => Err(error.into()), _ => Err(anyhow::Error::from(error).into()),
}, },
} }
} }
@ -112,7 +90,9 @@ pub enum VersionFileError {
DowngradeNotSupported { major: u32, minor: u32, patch: u32 }, DowngradeNotSupported { major: u32, minor: u32, patch: u32 },
#[error("Database version {major}.{minor}.{patch} is too old for the experimental dumpless upgrade feature. Please generate a dump using the v{major}.{minor}.{patch} and import it in the v{VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH}")] #[error("Database version {major}.{minor}.{patch} is too old for the experimental dumpless upgrade feature. Please generate a dump using the v{major}.{minor}.{patch} and import it in the v{VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH}")]
TooOldForAutomaticUpgrade { major: u32, minor: u32, patch: u32 }, TooOldForAutomaticUpgrade { major: u32, minor: u32, patch: u32 },
#[error("Error while modifying the database: {0}")]
ErrorWhileModifyingTheDatabase(#[from] heed::Error),
#[error(transparent)] #[error(transparent)]
IoError(#[from] std::io::Error), AnyhowError(#[from] anyhow::Error),
} }

View File

@ -32,6 +32,7 @@ use analytics::Analytics;
use anyhow::bail; use anyhow::bail;
use error::PayloadError; use error::PayloadError;
use extractors::payload::PayloadConfig; use extractors::payload::PayloadConfig;
use index_scheduler::versioning::Versioning;
use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::constants::VERSION_MAJOR;
@ -40,10 +41,9 @@ use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMetho
use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::tasks::KindWithContent; use meilisearch_types::tasks::KindWithContent;
use meilisearch_types::versioning::{ use meilisearch_types::versioning::{
create_current_version_file, get_version, update_version_file_for_dumpless_upgrade, create_current_version_file, get_version, VersionFileError, VERSION_MINOR, VERSION_PATCH,
VersionFileError, VERSION_MINOR, VERSION_PATCH,
}; };
use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; use meilisearch_types::{compression, heed, milli, VERSION_FILE_NAME};
pub use option::Opt; pub use option::Opt;
use option::ScheduleSnapshot; use option::ScheduleSnapshot;
use search_queue::SearchQueue; use search_queue::SearchQueue;
@ -356,14 +356,19 @@ fn open_or_create_database_unchecked(
/// Ensures Meilisearch version is compatible with the database, returns an error in case of version mismatch. /// Ensures Meilisearch version is compatible with the database, returns an error in case of version mismatch.
/// Returns the version that was contained in the version file /// Returns the version that was contained in the version file
fn check_version(opt: &Opt, binary_version: (u32, u32, u32)) -> anyhow::Result<(u32, u32, u32)> { fn check_version(
opt: &Opt,
index_scheduler_opt: &IndexSchedulerOptions,
binary_version: (u32, u32, u32),
) -> anyhow::Result<(u32, u32, u32)> {
let (bin_major, bin_minor, bin_patch) = binary_version; let (bin_major, bin_minor, bin_patch) = binary_version;
let (db_major, db_minor, db_patch) = get_version(&opt.db_path)?; let (db_major, db_minor, db_patch) = get_version(&opt.db_path)?;
if db_major != bin_major || db_minor != bin_minor || db_patch > bin_patch { if db_major != bin_major || db_minor != bin_minor || db_patch > bin_patch {
if opt.experimental_dumpless_upgrade { if opt.experimental_dumpless_upgrade {
update_version_file_for_dumpless_upgrade( update_version_file_for_dumpless_upgrade(
&opt.db_path, opt,
index_scheduler_opt,
(db_major, db_minor, db_patch), (db_major, db_minor, db_patch),
(bin_major, bin_minor, bin_patch), (bin_major, bin_minor, bin_patch),
)?; )?;
@ -380,6 +385,57 @@ fn check_version(opt: &Opt, binary_version: (u32, u32, u32)) -> anyhow::Result<(
Ok((db_major, db_minor, db_patch)) Ok((db_major, db_minor, db_patch))
} }
/// Persists the version of the current Meilisearch binary to a VERSION file
pub fn update_version_file_for_dumpless_upgrade(
opt: &Opt,
index_scheduler_opt: &IndexSchedulerOptions,
from: (u32, u32, u32),
to: (u32, u32, u32),
) -> Result<(), VersionFileError> {
let (from_major, from_minor, from_patch) = from;
let (to_major, to_minor, to_patch) = to;
// Early exit in case of error
if from_major > to_major
|| (from_major == to_major && from_minor > to_minor)
|| (from_major == to_major && from_minor == to_minor && from_patch > to_patch)
{
return Err(VersionFileError::DowngradeNotSupported {
major: from_major,
minor: from_minor,
patch: from_patch,
});
} else if from_major < 1 || (from_major == to_major && from_minor < 12) {
return Err(VersionFileError::TooOldForAutomaticUpgrade {
major: from_major,
minor: from_minor,
patch: from_patch,
});
}
// In the case of v1.12, the index-scheduler didn't store its internal version at the time.
// => We must write it immediately **in the index-scheduler** otherwise we'll update the version file
// there is a risk of DB corruption if a restart happens after writing the version file but before
// writing the version in the index-scheduler. See <https://github.com/meilisearch/meilisearch/issues/5280>
if from_major == 1 && from_minor == 12 {
let env = unsafe {
heed::EnvOpenOptions::new()
.max_dbs(Versioning::nb_db())
.map_size(index_scheduler_opt.task_db_size)
.open(&index_scheduler_opt.tasks_path)
}?;
let mut wtxn = env.write_txn()?;
let versioning = Versioning::raw_new(&env, &mut wtxn)?;
versioning.set_version(&mut wtxn, (from_major, from_minor, from_patch))?;
wtxn.commit()?;
// Should be instant since we're the only one using the env
env.prepare_for_closing().wait();
}
create_current_version_file(&opt.db_path)?;
Ok(())
}
/// Ensure you're in a valid state and open the IndexScheduler + AuthController for you. /// Ensure you're in a valid state and open the IndexScheduler + AuthController for you.
fn open_or_create_database( fn open_or_create_database(
opt: &Opt, opt: &Opt,
@ -387,7 +443,11 @@ fn open_or_create_database(
empty_db: bool, empty_db: bool,
binary_version: (u32, u32, u32), binary_version: (u32, u32, u32),
) -> anyhow::Result<(IndexScheduler, AuthController)> { ) -> anyhow::Result<(IndexScheduler, AuthController)> {
let version = if !empty_db { check_version(opt, binary_version)? } else { binary_version }; let version = if !empty_db {
check_version(opt, &index_scheduler_opt, binary_version)?
} else {
binary_version
};
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version) open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version)
} }

View File

@ -1,5 +1,5 @@
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::RwLock; use std::sync::{Once, RwLock};
use std::thread::{self, Builder}; use std::thread::{self, Builder};
use big_s::S; use big_s::S;
@ -21,7 +21,6 @@ use crate::progress::Progress;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::vector::{ArroyWrapper, EmbeddingConfigs}; use crate::vector::{ArroyWrapper, EmbeddingConfigs};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
use std::sync::Once;
pub(crate) mod de; pub(crate) mod de;
pub mod document_changes; pub mod document_changes;

View File

@ -1,11 +1,10 @@
use heed::RwTxn; use heed::RwTxn;
use super::UpgradeIndex;
use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use crate::progress::Progress; use crate::progress::Progress;
use crate::{make_enum_progress, Index, Result}; use crate::{make_enum_progress, Index, Result};
use super::UpgradeIndex;
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
pub(super) struct V1_12_To_V1_12_3 {} pub(super) struct V1_12_To_V1_12_3 {}