Compare commits

...

14 Commits

Author SHA1 Message Date
Clément Renault
53cb9de39b
Merge e6295c9c5f02df14134322b831c7cd329578a4f1 into 42257eec53e66d6360d3ca61fc1b6106646cc56b 2025-01-29 09:18:29 +01:00
meili-bors[bot]
42257eec53
Merge #5272
Some checks failed
Test suite / Tests on ubuntu-20.04 (push) Failing after 1s
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Run tests in debug (push) Failing after 0s
Test suite / Tests on windows-2022 (push) Failing after 14s
Test suite / Run Rustfmt (push) Successful in 1m59s
Test suite / Run Clippy (push) Successful in 5m48s
Test suite / Tests on macos-13 (push) Has been cancelled
5272: Fix Batches Deletion and flaky tests r=irevoire a=Kerollmops

- This issue fixes #5263 by removing the batches from the date and time databases.
- It also introduces a new `enqueued_at` field in the batch object to quickly retrieve them in the `batches.enqueued_at` database
- Finally, it probably fixes all the flaky tests of the batches: https://github.com/meilisearch/meilisearch/issues/5256

Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Tamo <tamo@meilisearch.com>
2025-01-28 16:14:11 +00:00
Tamo
1beda3b9af
fix another flaky test 2025-01-28 16:53:50 +01:00
Tamo
8676e94f5c
fix the flaky tests 2025-01-28 16:53:50 +01:00
Tamo
ef47a0d820
apply review comment 2025-01-28 16:53:50 +01:00
Tamo
e0f0da57e2
make sure the batches we snapshots actually all contains an enqueued_at 2025-01-28 16:53:50 +01:00
Tamo
485e3127c7
use the remove_n_tasks_datetime_earlier_than function when updating batches 2025-01-28 16:53:50 +01:00
Tamo
58f90b70c7
store the enqueued at to eases the batch deletion 2025-01-28 16:53:50 +01:00
Tamo
508db9020d
update the snapshots 2025-01-28 16:53:50 +01:00
Kerollmops
6ff37c6fc4
Fix the insta snapshots 2025-01-28 16:53:50 +01:00
Kerollmops
f21ae1f5d1
Remove the batch id from the date time databases 2025-01-28 16:53:50 +01:00
meili-bors[bot]
483c52f07b
Merge #5289
5289: Fix workload files after removing the vectorStore experimental feature r=Kerollmops a=dureuill

Running the bench [currently fails](https://github.com/meilisearch/meilisearch/actions/runs/12990029453) on embedding-related workloads, due to the call to `/experimental-features` that is used to enable the vector store:

In v1.13, `vectorStore` is no longer an experimental feature, so trying to enable it causes a 400

Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2025-01-28 10:28:21 +00:00
Louis Dureuil
f88f415a00
Fix workload files after removing the vectorStore experimental feature 2025-01-27 14:39:28 +01:00
Kerollmops
e6295c9c5f
Introduce a meilitool subcommand to compact an index 2025-01-22 16:37:00 +01:00
14 changed files with 275 additions and 125 deletions

View File

@ -1,7 +1,7 @@
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::fmt::Write; use std::fmt::Write;
use meilisearch_types::batches::Batch; use meilisearch_types::batches::{Batch, BatchEnqueuedAt};
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, RoTxn}; use meilisearch_types::heed::{Database, RoTxn};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
@ -341,10 +341,14 @@ pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database<BEU32, RoaringBitmapCodec
pub fn snapshot_batch(batch: &Batch) -> String { pub fn snapshot_batch(batch: &Batch) -> String {
let mut snap = String::new(); let mut snap = String::new();
let Batch { uid, details, stats, started_at, finished_at, progress: _ } = batch; let Batch { uid, details, stats, started_at, finished_at, progress: _, enqueued_at } = batch;
if let Some(finished_at) = finished_at { if let Some(finished_at) = finished_at {
assert!(finished_at > started_at); assert!(finished_at > started_at);
} }
let BatchEnqueuedAt { earliest, oldest } = enqueued_at.unwrap();
assert!(*started_at > earliest);
assert!(earliest >= oldest);
snap.push('{'); snap.push('{');
snap.push_str(&format!("uid: {uid}, ")); snap.push_str(&format!("uid: {uid}, "));
snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap())); snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap()));

View File

@ -12,8 +12,8 @@ use time::OffsetDateTime;
use super::{Query, Queue}; use super::{Query, Queue};
use crate::processing::ProcessingTasks; use crate::processing::ProcessingTasks;
use crate::utils::{ use crate::utils::{
insert_task_datetime, keep_ids_within_datetimes, map_bound, remove_task_datetime, insert_task_datetime, keep_ids_within_datetimes, map_bound,
ProcessingBatch, remove_n_tasks_datetime_earlier_than, remove_task_datetime, ProcessingBatch,
}; };
use crate::{Error, Result, BEI128}; use crate::{Error, Result, BEI128};
@ -181,6 +181,7 @@ impl BatchQueue {
stats: batch.stats, stats: batch.stats,
started_at: batch.started_at, started_at: batch.started_at,
finished_at: batch.finished_at, finished_at: batch.finished_at,
enqueued_at: batch.enqueued_at,
}, },
)?; )?;
@ -234,34 +235,25 @@ impl BatchQueue {
// What we know, though, is that the task date is from before the enqueued_at, and max two timestamps have been written // What we know, though, is that the task date is from before the enqueued_at, and max two timestamps have been written
// to the DB per batches. // to the DB per batches.
if let Some(ref old_batch) = old_batch { if let Some(ref old_batch) = old_batch {
let started_at = old_batch.started_at.unix_timestamp_nanos(); if let Some(enqueued_at) = old_batch.enqueued_at {
remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, old_batch.uid)?;
// We have either one or two enqueued at to remove remove_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, old_batch.uid)?;
let mut exit = old_batch.stats.total_nb_tasks.clamp(0, 2); } else {
let mut iterator = self.enqueued_at.rev_iter_mut(wtxn)?; // If we don't have the enqueued at in the batch it means the database comes from the v1.12
while let Some(entry) = iterator.next() { // and we still need to find the date by scrolling the database
let (key, mut value) = entry?; remove_n_tasks_datetime_earlier_than(
if key > started_at { wtxn,
continue; self.enqueued_at,
} old_batch.started_at,
if value.remove(old_batch.uid) { old_batch.stats.total_nb_tasks.clamp(1, 2) as usize,
exit = exit.saturating_sub(1); old_batch.uid,
// Safe because the key and value are owned )?;
unsafe {
iterator.put_current(&key, &value)?;
}
if exit == 0 {
break;
} }
} }
} // A finished batch MUST contains at least one task and have an enqueued_at
} let enqueued_at = batch.enqueued_at.as_ref().unwrap();
if let Some(enqueued_at) = batch.oldest_enqueued_at { insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.earliest, batch.uid)?;
insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?; insert_task_datetime(wtxn, self.enqueued_at, enqueued_at.oldest, batch.uid)?;
}
if let Some(enqueued_at) = batch.earliest_enqueued_at {
insert_task_datetime(wtxn, self.enqueued_at, enqueued_at, batch.uid)?;
}
// Update the started at and finished at // Update the started at and finished at
if let Some(ref old_batch) = old_batch { if let Some(ref old_batch) = old_batch {

View File

@ -102,6 +102,8 @@ fn query_batches_simple() {
.unwrap(); .unwrap();
assert_eq!(batches.len(), 1); assert_eq!(batches.len(), 1);
batches[0].started_at = OffsetDateTime::UNIX_EPOCH; batches[0].started_at = OffsetDateTime::UNIX_EPOCH;
assert!(batches[0].enqueued_at.is_some());
batches[0].enqueued_at = None;
// Insta cannot snapshot our batches because the batch stats contains an enum as key: https://github.com/mitsuhiko/insta/issues/689 // Insta cannot snapshot our batches because the batch stats contains an enum as key: https://github.com/mitsuhiko/insta/issues/689
let batch = serde_json::to_string_pretty(&batches[0]).unwrap(); let batch = serde_json::to_string_pretty(&batches[0]).unwrap();
snapshot!(batch, @r#" snapshot!(batch, @r#"
@ -123,7 +125,8 @@ fn query_batches_simple() {
} }
}, },
"startedAt": "1970-01-01T00:00:00Z", "startedAt": "1970-01-01T00:00:00Z",
"finishedAt": null "finishedAt": null,
"enqueuedAt": null
} }
"#); "#);

View File

@ -2,7 +2,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use std::panic::{catch_unwind, AssertUnwindSafe}; use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use meilisearch_types::batches::BatchId; use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self}; use meilisearch_types::milli::{self};
@ -16,7 +16,10 @@ use crate::processing::{
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress, UpdateIndexProgress,
}; };
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; use crate::utils::{
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
ProcessingBatch,
};
use crate::{Error, IndexScheduler, Result, TaskId}; use crate::{Error, IndexScheduler, Result, TaskId};
impl IndexScheduler { impl IndexScheduler {
@ -418,7 +421,6 @@ impl IndexScheduler {
to_delete_tasks -= &enqueued_tasks; to_delete_tasks -= &enqueued_tasks;
// 2. We now have a list of tasks to delete, delete them // 2. We now have a list of tasks to delete, delete them
let mut affected_indexes = HashSet::new(); let mut affected_indexes = HashSet::new();
let mut affected_statuses = HashSet::new(); let mut affected_statuses = HashSet::new();
let mut affected_kinds = HashSet::new(); let mut affected_kinds = HashSet::new();
@ -515,9 +517,51 @@ impl IndexScheduler {
tasks -= &to_delete_tasks; tasks -= &to_delete_tasks;
// We must remove the batch entirely // We must remove the batch entirely
if tasks.is_empty() { if tasks.is_empty() {
if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? {
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
remove_task_datetime(
wtxn,
self.queue.batches.enqueued_at,
earliest,
batch_id,
)?;
remove_task_datetime(
wtxn,
self.queue.batches.enqueued_at,
oldest,
batch_id,
)?;
} else {
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
// and we still need to find the date by scrolling the database
remove_n_tasks_datetime_earlier_than(
wtxn,
self.queue.batches.enqueued_at,
batch.started_at,
batch.stats.total_nb_tasks.clamp(1, 2) as usize,
batch_id,
)?;
}
remove_task_datetime(
wtxn,
self.queue.batches.started_at,
batch.started_at,
batch_id,
)?;
if let Some(finished_at) = batch.finished_at {
remove_task_datetime(
wtxn,
self.queue.batches.finished_at,
finished_at,
batch_id,
)?;
}
self.queue.batches.all_batches.delete(wtxn, &batch_id)?; self.queue.batches.all_batches.delete(wtxn, &batch_id)?;
self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?; self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?;
} }
}
// Anyway, we must remove the batch from all its reverse indexes. // Anyway, we must remove the batch from all its reverse indexes.
// The only way to do that is to check // The only way to do that is to check

View File

@ -56,16 +56,13 @@ succeeded [1,]
### Batches Index Tasks: ### Batches Index Tasks:
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Enqueued At: ### Batches Enqueued At:
[timestamp] [0,]
[timestamp] [1,] [timestamp] [1,]
[timestamp] [1,] [timestamp] [1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Started At: ### Batches Started At:
[timestamp] [0,]
[timestamp] [1,] [timestamp] [1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Finished At: ### Batches Finished At:
[timestamp] [0,]
[timestamp] [1,] [timestamp] [1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### File Store: ### File Store:

View File

@ -54,15 +54,12 @@ succeeded [1,]
### Batches Index Tasks: ### Batches Index Tasks:
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Enqueued At: ### Batches Enqueued At:
[timestamp] [0,]
[timestamp] [1,] [timestamp] [1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Started At: ### Batches Started At:
[timestamp] [0,]
[timestamp] [1,] [timestamp] [1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Finished At: ### Batches Finished At:
[timestamp] [0,]
[timestamp] [1,] [timestamp] [1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### File Store: ### File Store:

View File

@ -87,7 +87,6 @@ doggo [2,3,]
girafo [4,] girafo [4,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Enqueued At: ### Batches Enqueued At:
[timestamp] [0,]
[timestamp] [1,] [timestamp] [1,]
[timestamp] [2,] [timestamp] [2,]
[timestamp] [3,] [timestamp] [3,]
@ -95,7 +94,6 @@ girafo [4,]
[timestamp] [5,] [timestamp] [5,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Started At: ### Batches Started At:
[timestamp] [0,]
[timestamp] [1,] [timestamp] [1,]
[timestamp] [2,] [timestamp] [2,]
[timestamp] [3,] [timestamp] [3,]
@ -103,7 +101,6 @@ girafo [4,]
[timestamp] [5,] [timestamp] [5,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Finished At: ### Batches Finished At:
[timestamp] [0,]
[timestamp] [1,] [timestamp] [1,]
[timestamp] [2,] [timestamp] [2,]
[timestamp] [3,] [timestamp] [3,]

View File

@ -3,7 +3,7 @@
use std::collections::{BTreeSet, HashSet}; use std::collections::{BTreeSet, HashSet};
use std::ops::Bound; use std::ops::Bound;
use meilisearch_types::batches::{Batch, BatchId, BatchStats}; use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats};
use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::CboRoaringBitmapCodec; use meilisearch_types::milli::CboRoaringBitmapCodec;
use meilisearch_types::task_view::DetailsView; use meilisearch_types::task_view::DetailsView;
@ -30,8 +30,7 @@ pub struct ProcessingBatch {
pub kinds: HashSet<Kind>, pub kinds: HashSet<Kind>,
pub indexes: HashSet<String>, pub indexes: HashSet<String>,
pub canceled_by: HashSet<TaskId>, pub canceled_by: HashSet<TaskId>,
pub oldest_enqueued_at: Option<OffsetDateTime>, pub enqueued_at: Option<BatchEnqueuedAt>,
pub earliest_enqueued_at: Option<OffsetDateTime>,
pub started_at: OffsetDateTime, pub started_at: OffsetDateTime,
pub finished_at: Option<OffsetDateTime>, pub finished_at: Option<OffsetDateTime>,
} }
@ -51,8 +50,7 @@ impl ProcessingBatch {
kinds: HashSet::default(), kinds: HashSet::default(),
indexes: HashSet::default(), indexes: HashSet::default(),
canceled_by: HashSet::default(), canceled_by: HashSet::default(),
oldest_enqueued_at: None, enqueued_at: None,
earliest_enqueued_at: None,
started_at: OffsetDateTime::now_utc(), started_at: OffsetDateTime::now_utc(),
finished_at: None, finished_at: None,
} }
@ -80,14 +78,18 @@ impl ProcessingBatch {
if let Some(canceled_by) = task.canceled_by { if let Some(canceled_by) = task.canceled_by {
self.canceled_by.insert(canceled_by); self.canceled_by.insert(canceled_by);
} }
self.oldest_enqueued_at = match self.enqueued_at.as_mut() {
Some(self.oldest_enqueued_at.map_or(task.enqueued_at, |oldest_enqueued_at| { Some(BatchEnqueuedAt { earliest, oldest }) => {
task.enqueued_at.min(oldest_enqueued_at) *oldest = task.enqueued_at.min(*oldest);
})); *earliest = task.enqueued_at.max(*earliest);
self.earliest_enqueued_at = }
Some(self.earliest_enqueued_at.map_or(task.enqueued_at, |earliest_enqueued_at| { None => {
task.enqueued_at.max(earliest_enqueued_at) self.enqueued_at = Some(BatchEnqueuedAt {
})); earliest: task.enqueued_at,
oldest: task.enqueued_at,
});
}
}
} }
} }
@ -138,6 +140,7 @@ impl ProcessingBatch {
stats: self.stats.clone(), stats: self.stats.clone(),
started_at: self.started_at, started_at: self.started_at,
finished_at: self.finished_at, finished_at: self.finished_at,
enqueued_at: self.enqueued_at,
} }
} }
} }
@ -174,6 +177,33 @@ pub(crate) fn remove_task_datetime(
Ok(()) Ok(())
} }
pub(crate) fn remove_n_tasks_datetime_earlier_than(
wtxn: &mut RwTxn,
database: Database<BEI128, CboRoaringBitmapCodec>,
earlier_than: OffsetDateTime,
mut count: usize,
task_id: TaskId,
) -> Result<()> {
let earlier_than = earlier_than.unix_timestamp_nanos();
let mut iter = database.rev_range_mut(wtxn, &(..earlier_than))?;
while let Some((current, mut existing)) = iter.next().transpose()? {
count -= existing.remove(task_id) as usize;
if existing.is_empty() {
// safety: We don't keep references to the database
unsafe { iter.del_current()? };
} else {
// safety: We don't keep references to the database
unsafe { iter.put_current(&current, &existing)? };
}
if count == 0 {
break;
}
}
Ok(())
}
pub(crate) fn keep_ids_within_datetimes( pub(crate) fn keep_ids_within_datetimes(
rtxn: &RoTxn, rtxn: &RoTxn,
ids: &mut RoaringBitmap, ids: &mut RoaringBitmap,
@ -329,14 +359,27 @@ impl crate::IndexScheduler {
kind, kind,
} = task; } = task;
assert_eq!(uid, task.uid); assert_eq!(uid, task.uid);
if let Some(ref batch) = batch_uid { if task.status != Status::Enqueued {
let batch_uid = batch_uid.expect("All non enqueued tasks must be part of a batch");
assert!(self assert!(self
.queue .queue
.batch_to_tasks_mapping .batch_to_tasks_mapping
.get(&rtxn, batch) .get(&rtxn, &batch_uid)
.unwrap() .unwrap()
.unwrap() .unwrap()
.contains(uid)); .contains(uid));
let batch = self.queue.batches.get_batch(&rtxn, batch_uid).unwrap().unwrap();
assert_eq!(batch.uid, batch_uid);
if task.status == Status::Processing {
assert!(batch.progress.is_some());
} else {
assert!(batch.progress.is_none());
}
assert_eq!(batch.started_at, task.started_at.unwrap());
assert_eq!(batch.finished_at, task.finished_at);
let enqueued_at = batch.enqueued_at.unwrap();
assert!(task.enqueued_at >= enqueued_at.oldest);
assert!(task.enqueued_at <= enqueued_at.earliest);
} }
if let Some(task_index_uid) = &task_index_uid { if let Some(task_index_uid) = &task_index_uid {
assert!(self assert!(self

View File

@ -24,6 +24,18 @@ pub struct Batch {
pub started_at: OffsetDateTime, pub started_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339::option")] #[serde(with = "time::serde::rfc3339::option")]
pub finished_at: Option<OffsetDateTime>, pub finished_at: Option<OffsetDateTime>,
// Enqueued at is never displayed and is only required when removing a batch.
// It's always some except when upgrading from a database pre v1.12
pub enqueued_at: Option<BatchEnqueuedAt>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct BatchEnqueuedAt {
#[serde(with = "time::serde::rfc3339")]
pub earliest: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
pub oldest: OffsetDateTime,
} }
#[derive(Default, Debug, Clone, Serialize, Deserialize, ToSchema)] #[derive(Default, Debug, Clone, Serialize, Deserialize, ToSchema)]

View File

@ -41,9 +41,8 @@ async fn list_batches() {
let index = server.index("test"); let index = server.index("test");
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded(); index.wait_task(task.uid()).await.succeeded();
index let (task, _status_code) = index.create(None).await;
.add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) index.wait_task(task.uid()).await.failed();
.await;
let (response, code) = index.list_batches().await; let (response, code) = index.list_batches().await;
assert_eq!(code, 200); assert_eq!(code, 200);
assert_eq!( assert_eq!(
@ -96,11 +95,12 @@ async fn list_batches_pagination_and_reverse() {
async fn list_batches_with_star_filters() { async fn list_batches_with_star_filters() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (batch, _code) = index.create(None).await; let (task, _code) = index.create(None).await;
index.wait_task(batch.uid()).await.succeeded(); index.wait_task(task.uid()).await.succeeded();
index let index = server.index("test");
.add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) let (task, _code) = index.create(None).await;
.await; index.wait_task(task.uid()).await.failed();
let (response, code) = index.service.get("/batches?indexUids=test").await; let (response, code) = index.service.get("/batches?indexUids=test").await;
assert_eq!(code, 200); assert_eq!(code, 200);
assert_eq!(response["results"].as_array().unwrap().len(), 2); assert_eq!(response["results"].as_array().unwrap().len(), 2);
@ -187,9 +187,6 @@ async fn list_batches_invalid_canceled_by_filter() {
let index = server.index("test"); let index = server.index("test");
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded(); index.wait_task(task.uid()).await.succeeded();
index
.add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None)
.await;
let (response, code) = index.filtered_batches(&[], &[], &["0"]).await; let (response, code) = index.filtered_batches(&[], &[], &["0"]).await;
assert_eq!(code, 200, "{}", response); assert_eq!(code, 200, "{}", response);
@ -202,9 +199,8 @@ async fn list_batches_status_and_type_filtered() {
let index = server.index("test"); let index = server.index("test");
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded(); index.wait_task(task.uid()).await.succeeded();
index let (task, _status_code) = index.update(Some("id")).await;
.add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) index.wait_task(task.uid()).await.succeeded();
.await;
let (response, code) = index.filtered_batches(&["indexCreation"], &["failed"], &[]).await; let (response, code) = index.filtered_batches(&["indexCreation"], &["failed"], &[]).await;
assert_eq!(code, 200, "{}", response); assert_eq!(code, 200, "{}", response);
@ -212,7 +208,7 @@ async fn list_batches_status_and_type_filtered() {
let (response, code) = index let (response, code) = index
.filtered_batches( .filtered_batches(
&["indexCreation", "documentAdditionOrUpdate"], &["indexCreation", "IndexUpdate"],
&["succeeded", "processing", "enqueued"], &["succeeded", "processing", "enqueued"],
&[], &[],
) )

View File

@ -1,6 +1,7 @@
use std::fs::{read_dir, read_to_string, remove_file, File}; use std::fs::{read_dir, read_to_string, remove_file, File};
use std::io::BufWriter; use std::io::BufWriter;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Instant;
use anyhow::Context; use anyhow::Context;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
@ -8,7 +9,9 @@ use dump::{DumpWriter, IndexMetadata};
use file_store::FileStore; use file_store::FileStore;
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified}; use meilisearch_types::heed::{
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::{obkv_to_json, BEU32}; use meilisearch_types::milli::{obkv_to_json, BEU32};
use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::tasks::{Status, Task};
@ -78,6 +81,27 @@ enum Command {
#[arg(long)] #[arg(long)]
target_version: String, target_version: String,
}, },
/// Compact the index by using LMDB.
///
/// You must run this command while Meilisearch is off. The reason is that Meilisearch keep the
/// indexes opened and this compaction operation writes into another file. Meilisearch will not
/// switch to the new file.
///
/// **Another possibility** is to keep Meilisearch running to serve search requests, run the
/// compaction and once done, close and immediately reopen Meilisearch. This way Meilisearch
/// will reopened the data.mdb file when rebooting and see the newly compacted file, ignoring
/// the previous non-compacted data.
///
/// Note that the compaction will open the index, copy and compact the index into another file
/// **on the same disk as the index** and replace the previous index with the newly compacted
/// one. Which means that the disk must have enough room for at most two time the index size.
///
/// To make sure not to loose any data, this tool takes a mutable transaction on the index
/// before running the copy and compaction. This way the current indexation must finish before
/// the compaction operation can start. Once the compaction is done, the big index is replaced
/// by the compacted one and the mutable transaction is released.
CompactIndex { index_name: String },
} }
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
@ -94,6 +118,7 @@ fn main() -> anyhow::Result<()> {
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?; let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade() OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
} }
Command::CompactIndex { index_name } => compact_index(db_path, &index_name),
} }
} }
@ -347,3 +372,73 @@ fn export_a_dump(
Ok(()) Ok(())
} }
fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> {
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let rtxn = env.read_txn()?;
let index_mapping: Database<Str, UuidCodec> =
try_opening_database(&env, &rtxn, "index-mapping")?;
for result in index_mapping.iter(&rtxn)? {
let (uid, uuid) = result?;
if uid != index_name {
eprintln!("Found index {uid} and skipping it");
continue;
} else {
eprintln!("Found index {uid} 🎉");
}
let index_path = db_path.join("indexes").join(uuid.to_string());
let index = Index::new(EnvOpenOptions::new(), &index_path).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
eprintln!("Awaiting for a mutable transaction...");
let _wtxn = index.write_txn().context("While awaiting for a write transaction")?;
// We create and immediately drop the file because the
let non_compacted_index_file_path = index_path.join("data.mdb");
let compacted_index_file_path = index_path.join("data.mdb.cpy");
eprintln!("Compacting the index...");
let before_compaction = Instant::now();
let new_file = index
.copy_to_file(&compacted_index_file_path, CompactionOption::Enabled)
.with_context(|| format!("While compacting {}", compacted_index_file_path.display()))?;
let after_size = new_file.metadata()?.len();
let before_size = std::fs::metadata(&non_compacted_index_file_path)
.with_context(|| {
format!(
"While retrieving the metadata of {}",
non_compacted_index_file_path.display(),
)
})?
.len();
let reduction = before_size as f64 / after_size as f64;
println!("Compaction successful. Took around {:.2?}", before_compaction.elapsed());
eprintln!("The index went from {before_size} bytes to {after_size} bytes ({reduction:.2}x reduction)");
eprintln!("Replacing the non-compacted index by the compacted one...");
std::fs::rename(&compacted_index_file_path, &non_compacted_index_file_path).with_context(
|| {
format!(
"While renaming {} into {}",
compacted_index_file_path.display(),
non_compacted_index_file_path.display(),
)
},
)?;
drop(new_file);
println!("Everything's done 🎉");
}
Ok(())
}

View File

@ -12,16 +12,6 @@
} }
}, },
"precommands": [ "precommands": [
{
"route": "experimental-features",
"method": "PATCH",
"body": {
"inline": {
"vectorStore": true
}
},
"synchronous": "DontWait"
},
{ {
"route": "indexes/movies/settings", "route": "indexes/movies/settings",
"method": "PATCH", "method": "PATCH",

View File

@ -12,16 +12,6 @@
} }
}, },
"precommands": [ "precommands": [
{
"route": "experimental-features",
"method": "PATCH",
"body": {
"inline": {
"vectorStore": true
}
},
"synchronous": "DontWait"
},
{ {
"route": "indexes/movies/settings", "route": "indexes/movies/settings",
"method": "PATCH", "method": "PATCH",

View File

@ -13,16 +13,6 @@
} }
}, },
"precommands": [ "precommands": [
{
"route": "experimental-features",
"method": "PATCH",
"body": {
"inline": {
"vectorStore": true
}
},
"synchronous": "DontWait"
},
{ {
"route": "indexes/movies/settings", "route": "indexes/movies/settings",
"method": "PATCH", "method": "PATCH",