diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index 3264bda7a..7b884e0a4 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -61,6 +61,8 @@ pub enum Error { SwapDuplicateIndexesFound(Vec), #[error("Index `{0}` not found.")] SwapIndexNotFound(String), + #[error("Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.")] + NoSpaceLeftInTaskQueue, #[error( "Indexes {} not found.", .0.iter().map(|s| format!("`{}`", s)).collect::>().join(", ") @@ -152,6 +154,8 @@ impl ErrorCode for Error { Error::TaskNotFound(_) => Code::TaskNotFound, Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters, Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters, + // TODO: not sure of the Code to use + Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice, Error::Dump(e) => e.error_code(), Error::Milli(e) => e.error_code(), Error::ProcessBatchPanicked => Code::Internal, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 0f82fb47d..3bde39040 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -820,6 +820,13 @@ impl IndexScheduler { pub fn register(&self, kind: KindWithContent) -> Result { let mut wtxn = self.env.write_txn()?; + // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task + if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) + && (self.env.real_disk_size()? * 100) / self.env.map_size()? as u64 > 50 + { + return Err(Error::NoSpaceLeftInTaskQueue); + } + let mut task = Task { uid: self.next_task_id(&wtxn)?, enqueued_at: OffsetDateTime::now_utc(), diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 0c6457e7a..563bc3496 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -68,7 +68,7 @@ const DEFAULT_LOG_EVERY_N: usize = 100_000; // The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be // opened simultaneously. pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB -pub const TASK_DB_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB +pub const TASK_DB_SIZE: u64 = 20 * 1024 * 1024 * 1024; // 20 GiB #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] #[serde(rename_all = "UPPERCASE")] diff --git a/meilisearch/tests/tasks/mod.rs b/meilisearch/tests/tasks/mod.rs index e9b5a2325..88f83bb70 100644 --- a/meilisearch/tests/tasks/mod.rs +++ b/meilisearch/tests/tasks/mod.rs @@ -1,11 +1,14 @@ mod errors; +use byte_unit::{Byte, ByteUnit}; use meili_snap::insta::assert_json_snapshot; +use meili_snap::{json_string, snapshot}; use serde_json::json; +use tempfile::TempDir; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use crate::common::Server; +use crate::common::{default_settings, Server}; #[actix_rt::test] async fn error_get_unexisting_task_status() { @@ -1000,3 +1003,101 @@ async fn test_summarized_dump_creation() { } "###); } + +#[actix_web::test] +async fn test_task_queue_is_full() { + let dir = TempDir::new().unwrap(); + let mut options = default_settings(dir.path()); + options.max_task_db_size = Byte::from_unit(500.0, ByteUnit::B).unwrap(); + + let server = Server::new_with_options(options).await.unwrap(); + + // the first task should be enqueued without issue + let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; + snapshot!(code, @"202 Accepted"); + snapshot!(json_string!(result, { ".enqueuedAt" => "[date]" }), @r###" + { + "taskUid": 0, + "indexUid": "doggo", + "status": "enqueued", + "type": "indexCreation", + "enqueuedAt": "[date]" + } + "###); + + loop { + let (res, code) = server.create_index(json!({ "uid": "doggo" })).await; + if code == 422 { + break; + } + if res["taskUid"] == json!(null) { + panic!( + "Encountered the strange case:\n{}", + serde_json::to_string_pretty(&res).unwrap() + ); + } + } + + let (result, code) = server.create_index(json!({ "uid": "doggo" })).await; + snapshot!(code, @"422 Unprocessable Entity"); + snapshot!(json_string!(result), @r###" + { + "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", + "code": "no_space_left_on_device", + "type": "system", + "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" + } + "###); + + // But we should still be able to register tasks deletion IF they delete something + let (result, code) = server.delete_tasks("uids=0").await; + snapshot!(code, @"200 OK"); + snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" + { + "taskUid": "uid", + "indexUid": null, + "status": "enqueued", + "type": "taskDeletion", + "enqueuedAt": "[date]" + } + "###); + + // we're going to fill up the queue once again + loop { + let (res, code) = server.delete_tasks("uids=0").await; + if code == 422 { + break; + } + if res["taskUid"] == json!(null) { + panic!( + "Encountered the strange case:\n{}", + serde_json::to_string_pretty(&res).unwrap() + ); + } + } + + // But we should NOT be able to register this task because it doesn't match any tasks + let (result, code) = server.delete_tasks("uids=0").await; + snapshot!(code, @"422 Unprocessable Entity"); + snapshot!(json_string!(result), @r###" + { + "message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.", + "code": "no_space_left_on_device", + "type": "system", + "link": "https://docs.meilisearch.com/errors#no_space_left_on_device" + } + "###); + + // The deletion still works + let (result, code) = server.delete_tasks("uids=*").await; + snapshot!(code, @"200 OK"); + snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###" + { + "taskUid": "uid", + "indexUid": null, + "status": "enqueued", + "type": "taskDeletion", + "enqueuedAt": "[date]" + } + "###); +}