Compare commits

..

No commits in common. "45823fc820cc04d036f9f49919aa2b78ad4793fa" and "4224edea28bc66e8de30d6ad69cdab5aaa922335" have entirely different histories.

5 changed files with 10 additions and 143 deletions

View File

@ -127,8 +127,8 @@ pub enum Error {
_ => format!("{error}") _ => format!("{error}")
})] })]
Milli { error: milli::Error, index_uid: Option<String> }, Milli { error: milli::Error, index_uid: Option<String> },
#[error("An unexpected crash occurred when processing the task: {0}")] #[error("An unexpected crash occurred when processing the task.")]
ProcessBatchPanicked(String), ProcessBatchPanicked,
#[error(transparent)] #[error(transparent)]
FileStore(#[from] file_store::Error), FileStore(#[from] file_store::Error),
#[error(transparent)] #[error(transparent)]
@ -196,7 +196,7 @@ impl Error {
| Error::Dump(_) | Error::Dump(_)
| Error::Heed(_) | Error::Heed(_)
| Error::Milli { .. } | Error::Milli { .. }
| Error::ProcessBatchPanicked(_) | Error::ProcessBatchPanicked
| Error::FileStore(_) | Error::FileStore(_)
| Error::IoError(_) | Error::IoError(_)
| Error::Persist(_) | Error::Persist(_)
@ -257,7 +257,7 @@ impl ErrorCode for Error {
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice, Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
Error::Dump(e) => e.error_code(), Error::Dump(e) => e.error_code(),
Error::Milli { error, .. } => error.error_code(), Error::Milli { error, .. } => error.error_code(),
Error::ProcessBatchPanicked(_) => Code::Internal, Error::ProcessBatchPanicked => Code::Internal,
Error::Heed(e) => e.error_code(), Error::Heed(e) => e.error_code(),
Error::HeedTransaction(e) => e.error_code(), Error::HeedTransaction(e) => e.error_code(),
Error::FileStore(e) => e.error_code(), Error::FileStore(e) => e.error_code(),

View File

@ -166,41 +166,13 @@ impl IndexScheduler {
let processing_batch = &mut processing_batch; let processing_batch = &mut processing_batch;
let progress = progress.clone(); let progress = progress.clone();
std::thread::scope(|s| { std::thread::scope(|s| {
let p = progress.clone();
let handle = std::thread::Builder::new() let handle = std::thread::Builder::new()
.name(String::from("batch-operation")) .name(String::from("batch-operation"))
.spawn_scoped(s, move || { .spawn_scoped(s, move || {
cloned_index_scheduler.process_batch(batch, processing_batch, p) cloned_index_scheduler.process_batch(batch, processing_batch, progress)
}) })
.unwrap(); .unwrap();
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
match handle.join() {
Ok(ret) => {
if ret.is_err() {
if let Ok(progress_view) =
serde_json::to_string(&progress.as_progress_view())
{
tracing::warn!("Batch failed while doing: {progress_view}")
}
}
ret
}
Err(panic) => {
if let Ok(progress_view) =
serde_json::to_string(&progress.as_progress_view())
{
tracing::warn!("Batch failed while doing: {progress_view}")
}
let msg = match panic.downcast_ref::<&'static str>() {
Some(s) => *s,
None => match panic.downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
Err(Error::ProcessBatchPanicked(msg.to_string()))
}
}
}) })
}; };

View File

@ -326,17 +326,8 @@ impl IndexScheduler {
match ret { match ret {
Ok(Ok(())) => (), Ok(Ok(())) => (),
Ok(Err(e)) => return Err(Error::DatabaseUpgrade(Box::new(e))), Ok(Err(e)) => return Err(Error::DatabaseUpgrade(Box::new(e))),
Err(e) => { Err(_e) => {
let msg = match e.downcast_ref::<&'static str>() { return Err(Error::DatabaseUpgrade(Box::new(Error::ProcessBatchPanicked)));
Some(s) => *s,
None => match e.downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
return Err(Error::DatabaseUpgrade(Box::new(Error::ProcessBatchPanicked(
msg.to_string(),
))));
} }
} }

View File

@ -7,7 +7,7 @@ snapshot_kind: text
[] []
---------------------------------------------------------------------- ----------------------------------------------------------------------
### All Tasks: ### All Tasks:
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "An unexpected crash occurred when processing the task: simulated panic", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} 0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "An unexpected crash occurred when processing the task.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Status: ### Status:
enqueued [] enqueued []

View File

@ -1,7 +1,6 @@
use std::fs::{read_dir, read_to_string, remove_file, File}; use std::fs::{read_dir, read_to_string, remove_file, File};
use std::io::BufWriter; use std::io::BufWriter;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Instant;
use anyhow::Context; use anyhow::Context;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
@ -9,9 +8,7 @@ use dump::{DumpWriter, IndexMetadata};
use file_store::FileStore; use file_store::FileStore;
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{ use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified};
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::{obkv_to_json, BEU32}; use meilisearch_types::milli::{obkv_to_json, BEU32};
use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::tasks::{Status, Task};
@ -81,27 +78,6 @@ enum Command {
#[arg(long)] #[arg(long)]
target_version: String, target_version: String,
}, },
/// Compact the index by using LMDB.
///
/// You must run this command while Meilisearch is off. The reason is that Meilisearch keep the
/// indexes opened and this compaction operation writes into another file. Meilisearch will not
/// switch to the new file.
///
/// **Another possibility** is to keep Meilisearch running to serve search requests, run the
/// compaction and once done, close and immediately reopen Meilisearch. This way Meilisearch
/// will reopened the data.mdb file when rebooting and see the newly compacted file, ignoring
/// the previous non-compacted data.
///
/// Note that the compaction will open the index, copy and compact the index into another file
/// **on the same disk as the index** and replace the previous index with the newly compacted
/// one. This means that the disk must have enough room for at most two times the index size.
///
/// To make sure not to lose any data, this tool takes a mutable transaction on the index
/// before running the copy and compaction. This way the current indexation must finish before
/// the compaction operation can start. Once the compaction is done, the big index is replaced
/// by the compacted one and the mutable transaction is released.
CompactIndex { index_name: String },
} }
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
@ -118,7 +94,6 @@ fn main() -> anyhow::Result<()> {
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?; let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade() OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
} }
Command::CompactIndex { index_name } => compact_index(db_path, &index_name),
} }
} }
@ -372,74 +347,3 @@ fn export_a_dump(
Ok(()) Ok(())
} }
fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> {
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let rtxn = env.read_txn()?;
let index_mapping: Database<Str, UuidCodec> =
try_opening_database(&env, &rtxn, "index-mapping")?;
for result in index_mapping.iter(&rtxn)? {
let (uid, uuid) = result?;
if uid != index_name {
eprintln!("Found index {uid} and skipping it");
continue;
} else {
eprintln!("Found index {uid} 🎉");
}
let index_path = db_path.join("indexes").join(uuid.to_string());
let index = Index::new(EnvOpenOptions::new(), &index_path).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
eprintln!("Awaiting for a mutable transaction...");
let _wtxn = index.write_txn().context("While awaiting for a write transaction")?;
// We create and immediately drop the file because the
let non_compacted_index_file_path = index_path.join("data.mdb");
let compacted_index_file_path = index_path.join("data.mdb.cpy");
eprintln!("Compacting the index...");
let before_compaction = Instant::now();
let new_file = index
.copy_to_file(&compacted_index_file_path, CompactionOption::Enabled)
.with_context(|| format!("While compacting {}", compacted_index_file_path.display()))?;
let after_size = new_file.metadata()?.len();
let before_size = std::fs::metadata(&non_compacted_index_file_path)
.with_context(|| {
format!(
"While retrieving the metadata of {}",
non_compacted_index_file_path.display(),
)
})?
.len();
let reduction = before_size as f64 / after_size as f64;
println!("Compaction successful. Took around {:.2?}", before_compaction.elapsed());
eprintln!("The index went from {before_size} bytes to {after_size} bytes ({reduction:.2}x reduction)");
eprintln!("Replacing the non-compacted index by the compacted one...");
std::fs::rename(&compacted_index_file_path, &non_compacted_index_file_path).with_context(
|| {
format!(
"While renaming {} into {}",
compacted_index_file_path.display(),
non_compacted_index_file_path.display(),
)
},
)?;
drop(new_file);
println!("Everything's done 🎉");
return Ok(());
}
bail!("Target index {index_name} not found!")
}