mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-02-24 03:25:43 +08:00
Compare commits
8 Commits
4224edea28
...
45823fc820
Author | SHA1 | Date | |
---|---|---|---|
|
45823fc820 | ||
|
62ced0e3f1 | ||
|
71bb24f17e | ||
|
c72f114b33 | ||
|
8ed39f5de0 | ||
|
bdd3005d10 | ||
|
8439aeb7cf | ||
|
e6295c9c5f |
@ -127,8 +127,8 @@ pub enum Error {
|
|||||||
_ => format!("{error}")
|
_ => format!("{error}")
|
||||||
})]
|
})]
|
||||||
Milli { error: milli::Error, index_uid: Option<String> },
|
Milli { error: milli::Error, index_uid: Option<String> },
|
||||||
#[error("An unexpected crash occurred when processing the task.")]
|
#[error("An unexpected crash occurred when processing the task: {0}")]
|
||||||
ProcessBatchPanicked,
|
ProcessBatchPanicked(String),
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
FileStore(#[from] file_store::Error),
|
FileStore(#[from] file_store::Error),
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
@ -196,7 +196,7 @@ impl Error {
|
|||||||
| Error::Dump(_)
|
| Error::Dump(_)
|
||||||
| Error::Heed(_)
|
| Error::Heed(_)
|
||||||
| Error::Milli { .. }
|
| Error::Milli { .. }
|
||||||
| Error::ProcessBatchPanicked
|
| Error::ProcessBatchPanicked(_)
|
||||||
| Error::FileStore(_)
|
| Error::FileStore(_)
|
||||||
| Error::IoError(_)
|
| Error::IoError(_)
|
||||||
| Error::Persist(_)
|
| Error::Persist(_)
|
||||||
@ -257,7 +257,7 @@ impl ErrorCode for Error {
|
|||||||
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
|
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
|
||||||
Error::Dump(e) => e.error_code(),
|
Error::Dump(e) => e.error_code(),
|
||||||
Error::Milli { error, .. } => error.error_code(),
|
Error::Milli { error, .. } => error.error_code(),
|
||||||
Error::ProcessBatchPanicked => Code::Internal,
|
Error::ProcessBatchPanicked(_) => Code::Internal,
|
||||||
Error::Heed(e) => e.error_code(),
|
Error::Heed(e) => e.error_code(),
|
||||||
Error::HeedTransaction(e) => e.error_code(),
|
Error::HeedTransaction(e) => e.error_code(),
|
||||||
Error::FileStore(e) => e.error_code(),
|
Error::FileStore(e) => e.error_code(),
|
||||||
|
@ -166,13 +166,41 @@ impl IndexScheduler {
|
|||||||
let processing_batch = &mut processing_batch;
|
let processing_batch = &mut processing_batch;
|
||||||
let progress = progress.clone();
|
let progress = progress.clone();
|
||||||
std::thread::scope(|s| {
|
std::thread::scope(|s| {
|
||||||
|
let p = progress.clone();
|
||||||
let handle = std::thread::Builder::new()
|
let handle = std::thread::Builder::new()
|
||||||
.name(String::from("batch-operation"))
|
.name(String::from("batch-operation"))
|
||||||
.spawn_scoped(s, move || {
|
.spawn_scoped(s, move || {
|
||||||
cloned_index_scheduler.process_batch(batch, processing_batch, progress)
|
cloned_index_scheduler.process_batch(batch, processing_batch, p)
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
|
|
||||||
|
match handle.join() {
|
||||||
|
Ok(ret) => {
|
||||||
|
if ret.is_err() {
|
||||||
|
if let Ok(progress_view) =
|
||||||
|
serde_json::to_string(&progress.as_progress_view())
|
||||||
|
{
|
||||||
|
tracing::warn!("Batch failed while doing: {progress_view}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
Err(panic) => {
|
||||||
|
if let Ok(progress_view) =
|
||||||
|
serde_json::to_string(&progress.as_progress_view())
|
||||||
|
{
|
||||||
|
tracing::warn!("Batch failed while doing: {progress_view}")
|
||||||
|
}
|
||||||
|
let msg = match panic.downcast_ref::<&'static str>() {
|
||||||
|
Some(s) => *s,
|
||||||
|
None => match panic.downcast_ref::<String>() {
|
||||||
|
Some(s) => &s[..],
|
||||||
|
None => "Box<dyn Any>",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
Err(Error::ProcessBatchPanicked(msg.to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -326,8 +326,17 @@ impl IndexScheduler {
|
|||||||
match ret {
|
match ret {
|
||||||
Ok(Ok(())) => (),
|
Ok(Ok(())) => (),
|
||||||
Ok(Err(e)) => return Err(Error::DatabaseUpgrade(Box::new(e))),
|
Ok(Err(e)) => return Err(Error::DatabaseUpgrade(Box::new(e))),
|
||||||
Err(_e) => {
|
Err(e) => {
|
||||||
return Err(Error::DatabaseUpgrade(Box::new(Error::ProcessBatchPanicked)));
|
let msg = match e.downcast_ref::<&'static str>() {
|
||||||
|
Some(s) => *s,
|
||||||
|
None => match e.downcast_ref::<String>() {
|
||||||
|
Some(s) => &s[..],
|
||||||
|
None => "Box<dyn Any>",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
return Err(Error::DatabaseUpgrade(Box::new(Error::ProcessBatchPanicked(
|
||||||
|
msg.to_string(),
|
||||||
|
))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ snapshot_kind: text
|
|||||||
[]
|
[]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### All Tasks:
|
### All Tasks:
|
||||||
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "An unexpected crash occurred when processing the task.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "An unexpected crash occurred when processing the task: simulated panic", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Status:
|
### Status:
|
||||||
enqueued []
|
enqueued []
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
use std::fs::{read_dir, read_to_string, remove_file, File};
|
use std::fs::{read_dir, read_to_string, remove_file, File};
|
||||||
use std::io::BufWriter;
|
use std::io::BufWriter;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
@ -8,7 +9,9 @@ use dump::{DumpWriter, IndexMetadata};
|
|||||||
use file_store::FileStore;
|
use file_store::FileStore;
|
||||||
use meilisearch_auth::AuthController;
|
use meilisearch_auth::AuthController;
|
||||||
use meilisearch_types::heed::types::{SerdeJson, Str};
|
use meilisearch_types::heed::types::{SerdeJson, Str};
|
||||||
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified};
|
use meilisearch_types::heed::{
|
||||||
|
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
|
||||||
|
};
|
||||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
||||||
use meilisearch_types::milli::{obkv_to_json, BEU32};
|
use meilisearch_types::milli::{obkv_to_json, BEU32};
|
||||||
use meilisearch_types::tasks::{Status, Task};
|
use meilisearch_types::tasks::{Status, Task};
|
||||||
@ -78,6 +81,27 @@ enum Command {
|
|||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
target_version: String,
|
target_version: String,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Compact the index by using LMDB.
|
||||||
|
///
|
||||||
|
/// You must run this command while Meilisearch is off. The reason is that Meilisearch keep the
|
||||||
|
/// indexes opened and this compaction operation writes into another file. Meilisearch will not
|
||||||
|
/// switch to the new file.
|
||||||
|
///
|
||||||
|
/// **Another possibility** is to keep Meilisearch running to serve search requests, run the
|
||||||
|
/// compaction and once done, close and immediately reopen Meilisearch. This way Meilisearch
|
||||||
|
/// will reopened the data.mdb file when rebooting and see the newly compacted file, ignoring
|
||||||
|
/// the previous non-compacted data.
|
||||||
|
///
|
||||||
|
/// Note that the compaction will open the index, copy and compact the index into another file
|
||||||
|
/// **on the same disk as the index** and replace the previous index with the newly compacted
|
||||||
|
/// one. This means that the disk must have enough room for at most two times the index size.
|
||||||
|
///
|
||||||
|
/// To make sure not to lose any data, this tool takes a mutable transaction on the index
|
||||||
|
/// before running the copy and compaction. This way the current indexation must finish before
|
||||||
|
/// the compaction operation can start. Once the compaction is done, the big index is replaced
|
||||||
|
/// by the compacted one and the mutable transaction is released.
|
||||||
|
CompactIndex { index_name: String },
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() -> anyhow::Result<()> {
|
fn main() -> anyhow::Result<()> {
|
||||||
@ -94,6 +118,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
|
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
|
||||||
OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
|
OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
|
||||||
}
|
}
|
||||||
|
Command::CompactIndex { index_name } => compact_index(db_path, &index_name),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -347,3 +372,74 @@ fn export_a_dump(
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> {
|
||||||
|
let index_scheduler_path = db_path.join("tasks");
|
||||||
|
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
|
||||||
|
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
|
||||||
|
|
||||||
|
let rtxn = env.read_txn()?;
|
||||||
|
let index_mapping: Database<Str, UuidCodec> =
|
||||||
|
try_opening_database(&env, &rtxn, "index-mapping")?;
|
||||||
|
|
||||||
|
for result in index_mapping.iter(&rtxn)? {
|
||||||
|
let (uid, uuid) = result?;
|
||||||
|
|
||||||
|
if uid != index_name {
|
||||||
|
eprintln!("Found index {uid} and skipping it");
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
eprintln!("Found index {uid} 🎉");
|
||||||
|
}
|
||||||
|
|
||||||
|
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||||
|
let index = Index::new(EnvOpenOptions::new(), &index_path).with_context(|| {
|
||||||
|
format!("While trying to open the index at path {:?}", index_path.display())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
eprintln!("Awaiting for a mutable transaction...");
|
||||||
|
let _wtxn = index.write_txn().context("While awaiting for a write transaction")?;
|
||||||
|
|
||||||
|
// We create and immediately drop the file because the
|
||||||
|
let non_compacted_index_file_path = index_path.join("data.mdb");
|
||||||
|
let compacted_index_file_path = index_path.join("data.mdb.cpy");
|
||||||
|
|
||||||
|
eprintln!("Compacting the index...");
|
||||||
|
let before_compaction = Instant::now();
|
||||||
|
let new_file = index
|
||||||
|
.copy_to_file(&compacted_index_file_path, CompactionOption::Enabled)
|
||||||
|
.with_context(|| format!("While compacting {}", compacted_index_file_path.display()))?;
|
||||||
|
|
||||||
|
let after_size = new_file.metadata()?.len();
|
||||||
|
let before_size = std::fs::metadata(&non_compacted_index_file_path)
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"While retrieving the metadata of {}",
|
||||||
|
non_compacted_index_file_path.display(),
|
||||||
|
)
|
||||||
|
})?
|
||||||
|
.len();
|
||||||
|
|
||||||
|
let reduction = before_size as f64 / after_size as f64;
|
||||||
|
println!("Compaction successful. Took around {:.2?}", before_compaction.elapsed());
|
||||||
|
eprintln!("The index went from {before_size} bytes to {after_size} bytes ({reduction:.2}x reduction)");
|
||||||
|
|
||||||
|
eprintln!("Replacing the non-compacted index by the compacted one...");
|
||||||
|
std::fs::rename(&compacted_index_file_path, &non_compacted_index_file_path).with_context(
|
||||||
|
|| {
|
||||||
|
format!(
|
||||||
|
"While renaming {} into {}",
|
||||||
|
compacted_index_file_path.display(),
|
||||||
|
non_compacted_index_file_path.display(),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
drop(new_file);
|
||||||
|
|
||||||
|
println!("Everything's done 🎉");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
bail!("Target index {index_name} not found!")
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user