From b66bf049b59bafde13f8344b5fa6e607b14984c9 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Mon, 7 Aug 2023 17:02:51 +0200 Subject: [PATCH] Create a task on zookeeper side when a task is created locally --- Cargo.lock | 2 + index-scheduler/Cargo.toml | 2 + index-scheduler/src/lib.rs | 240 +++++++++++++------- meilisearch-auth/src/lib.rs | 2 +- meilisearch/src/lib.rs | 58 ++--- meilisearch/src/routes/dump.rs | 3 +- meilisearch/src/routes/indexes/documents.rs | 14 +- meilisearch/src/routes/indexes/mod.rs | 9 +- meilisearch/src/routes/indexes/settings.rs | 16 +- meilisearch/src/routes/swap_indexes.rs | 2 +- meilisearch/src/routes/tasks.rs | 4 +- 11 files changed, 213 insertions(+), 139 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1005b3d9d..ec92fe58e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1994,7 +1994,9 @@ dependencies = [ "tempfile", "thiserror", "time", + "tokio", "uuid 1.4.1", + "zookeeper-client", ] [[package]] diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index 9e7c2ae4b..b97545b20 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -31,6 +31,8 @@ tempfile = "3.5.0" thiserror = "1.0.40" time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] } uuid = { version = "1.3.1", features = ["serde", "v4"] } +tokio = { version = "1.27.0", features = ["full"] } +zookeeper-client = "0.5.0" [dev-dependencies] big_s = "1.0.2" diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index b7b8685aa..7644685a6 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -58,6 +58,7 @@ use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; use uuid::Uuid; +use zookeeper_client as zk; use crate::index_mapper::IndexMapper; use crate::utils::{check_index_swap_validity, clamp_to_page_size}; @@ -258,6 +259,8 @@ pub struct IndexSchedulerOptions { pub max_number_of_tasks: usize, /// The experimental features enabled for this instance. pub instance_features: InstanceTogglableFeatures, + /// zookeeper client + pub zk: Option, } /// Structure which holds meilisearch's indexes and schedules the tasks @@ -326,6 +329,9 @@ pub struct IndexScheduler { /// The path to the version file of Meilisearch. pub(crate) version_file_path: PathBuf, + /// The URL to the ZooKeeper cluster + pub(crate) zk: Option, + // ================= test // The next entry is dedicated to the tests. /// Provide a way to set a breakpoint in multiple part of the scheduler. @@ -367,6 +373,7 @@ impl IndexScheduler { snapshots_path: self.snapshots_path.clone(), dumps_path: self.dumps_path.clone(), auth_path: self.auth_path.clone(), + zk: self.zk.clone(), version_file_path: self.version_file_path.clone(), #[cfg(test)] test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), @@ -381,7 +388,7 @@ impl IndexScheduler { impl IndexScheduler { /// Create an index scheduler and start its run loop. - pub fn new( + pub async fn new( options: IndexSchedulerOptions, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, @@ -463,7 +470,7 @@ impl IndexScheduler { snapshots_path: options.snapshots_path, auth_path: options.auth_path, version_file_path: options.version_file_path, - + zk: options.zk, #[cfg(test)] test_breakpoint_sdr, #[cfg(test)] @@ -473,7 +480,7 @@ impl IndexScheduler { features, }; - this.run(); + this.run().await; Ok(this) } @@ -561,31 +568,63 @@ impl IndexScheduler { /// /// This function will execute in a different thread and must be called /// only once per index scheduler. - fn run(&self) { + async fn run(&self) { let run = self.private_clone(); - std::thread::Builder::new() - .name(String::from("scheduler")) - .spawn(move || { - #[cfg(test)] - run.breakpoint(Breakpoint::Init); + tokio::task::spawn(async move { + #[cfg(test)] + run.breakpoint(Breakpoint::Init); - run.wake_up.wait(); + let wake_up = run.wake_up.clone(); + tokio::task::spawn_blocking(move || wake_up.wait()).await; - loop { - match run.tick() { - Ok(TickOutcome::TickAgain(_)) => (), - Ok(TickOutcome::WaitForSignal) => run.wake_up.wait(), - Err(e) => { - log::error!("{}", e); - // Wait one second when an irrecoverable error occurs. - if !e.is_recoverable() { - std::thread::sleep(Duration::from_secs(1)); - } + loop { + match run.tick().await { + Ok(TickOutcome::TickAgain(_)) => (), + Ok(TickOutcome::WaitForSignal) => { + let wake_up = run.wake_up.clone(); + tokio::task::spawn_blocking(move || wake_up.wait()).await; + } + Err(e) => { + log::error!("{}", e); + // Wait one second when an irrecoverable error occurs. + if !e.is_recoverable() { + std::thread::sleep(Duration::from_secs(1)); } } } - }) - .unwrap(); + } + }); + + if let Some(ref zk) = &self.zk { + let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all()); + match zk.create("/tasks", &[], &options).await { + Ok(_) => (), + Err(zk::Error::NodeExists) => { + todo!("Syncronize with the cluster") + } + Err(e) => panic!("{e}"), + } + + // TODO: fix unwrap by returning a clear error. + let mut watcher = + zk.watch("/tasks", zk::AddWatchMode::PersistentRecursive).await.unwrap(); + let czk = zk.clone(); + tokio::spawn(async move { + let zk = czk; + loop { + let zk::WatchedEvent { event_type, session_state, path } = + dbg!(watcher.changed().await); + match event_type { + zk::EventType::Session => panic!("Session error {:?}", session_state), + // A task as been added + zk::EventType::NodeDataChanged => { + // Add raw task content in local DB + } + _ => (), + } + } + }); + } } pub fn indexer_config(&self) -> &IndexerConfig { @@ -920,74 +959,118 @@ impl IndexScheduler { /// Register a new task in the scheduler. /// /// If it fails and data was associated with the task, it tries to delete the associated data. - pub fn register(&self, kind: KindWithContent) -> Result { - let mut wtxn = self.env.write_txn()?; + pub async fn register(&self, kind: KindWithContent) -> Result { + let id = match self.zk { + Some(ref zk) => { + // reserve uniq ID on zookeeper. And give it to the spawn blocking. + let options = + zk::CreateMode::PersistentSequential.with_acls(zk::Acls::anyone_all()); + match zk.create("/tasks/task-", &[], &options).await { + Ok((_stats, id)) => Some(id), + Err(e) => panic!("{e}"), + } + } + None => None, + }; - // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task - if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) - && (self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64 > 50 - { - return Err(Error::NoSpaceLeftInTaskQueue); + let this = self.private_clone(); + let task = tokio::task::spawn_blocking(move || { + let mut wtxn = this.env.write_txn()?; + + // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task + if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) + && (this.env.non_free_pages_size()? * 100) / this.env.map_size()? as u64 > 50 + { + return Err(Error::NoSpaceLeftInTaskQueue); + } + + // get id generated by zookeeper or generate a local id. + let id = match id { + Some(id) => id.0 as u32, + None => this.next_task_id(&wtxn)?, + }; + + let mut task = Task { + uid: id, + enqueued_at: OffsetDateTime::now_utc(), + started_at: None, + finished_at: None, + error: None, + canceled_by: None, + details: kind.default_details(), + status: Status::Enqueued, + kind: kind.clone(), + }; + // For deletion and cancelation tasks, we want to make extra sure that they + // don't attempt to delete/cancel tasks that are newer than themselves. + filter_out_references_to_newer_tasks(&mut task); + // If the register task is an index swap task, verify that it is well-formed + // (that it does not contain duplicate indexes). + check_index_swap_validity(&task)?; + + this.register_raw_task(&mut wtxn, &task)?; + + if let Err(e) = wtxn.commit() { + this.delete_persisted_task_data(&task)?; + return Err(e.into()); + } + + // If the registered task is a task cancelation + // we inform the processing tasks to stop (if necessary). + if let KindWithContent::TaskCancelation { tasks, .. } = kind { + let tasks_to_cancel = RoaringBitmap::from_iter(tasks); + if this + .processing_tasks + .read() + .unwrap() + .must_cancel_processing_tasks(&tasks_to_cancel) + { + this.must_stop_processing.must_stop(); + } + } + + // notify the scheduler loop to execute a new tick + this.wake_up.signal(); + + Ok(task) + }) + .await + .unwrap()?; + + // TODO: send task to ZK in raw json. + if let Some(ref zk) = self.zk { + let id = id.unwrap(); + // TODO: ugly unwrap + zk.set_data( + &format!("/tasks/task-{}", id), + &serde_json::to_vec_pretty(&task).unwrap(), + None, + ) + .await + .unwrap(); } - let mut task = Task { - uid: self.next_task_id(&wtxn)?, - enqueued_at: OffsetDateTime::now_utc(), - started_at: None, - finished_at: None, - error: None, - canceled_by: None, - details: kind.default_details(), - status: Status::Enqueued, - kind: kind.clone(), - }; - // For deletion and cancelation tasks, we want to make extra sure that they - // don't attempt to delete/cancel tasks that are newer than themselves. - filter_out_references_to_newer_tasks(&mut task); - // If the register task is an index swap task, verify that it is well-formed - // (that it does not contain duplicate indexes). - check_index_swap_validity(&task)?; + Ok(task) + } - // Get rid of the mutability. - let task = task; - - self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?; + pub fn register_raw_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { + self.all_tasks.append(wtxn, &BEU32::new(task.uid), &task)?; for index in task.indexes() { - self.update_index(&mut wtxn, index, |bitmap| { + self.update_index(wtxn, index, |bitmap| { bitmap.insert(task.uid); })?; } - self.update_status(&mut wtxn, Status::Enqueued, |bitmap| { + self.update_status(wtxn, Status::Enqueued, |bitmap| { bitmap.insert(task.uid); })?; - self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| { + self.update_kind(wtxn, task.kind.as_kind(), |bitmap| { bitmap.insert(task.uid); })?; - utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid)?; - - if let Err(e) = wtxn.commit() { - self.delete_persisted_task_data(&task)?; - return Err(e.into()); - } - - // If the registered task is a task cancelation - // we inform the processing tasks to stop (if necessary). - if let KindWithContent::TaskCancelation { tasks, .. } = kind { - let tasks_to_cancel = RoaringBitmap::from_iter(tasks); - if self.processing_tasks.read().unwrap().must_cancel_processing_tasks(&tasks_to_cancel) - { - self.must_stop_processing.must_stop(); - } - } - - // notify the scheduler loop to execute a new tick - self.wake_up.signal(); - - Ok(task) + utils::insert_task_datetime(wtxn, self.enqueued_at, task.enqueued_at, task.uid) } /// Register a new task coming from a dump in the scheduler. @@ -1046,7 +1129,7 @@ impl IndexScheduler { /// 6. Reset the in-memory list of processed tasks. /// /// Returns the number of processed tasks. - fn tick(&self) -> Result { + async fn tick(&self) -> Result { #[cfg(test)] { *self.run_loop_iteration.write().unwrap() += 1; @@ -1055,7 +1138,7 @@ impl IndexScheduler { puffin::GlobalProfiler::lock().new_frame(); - self.cleanup_task_queue()?; + self.cleanup_task_queue().await?; let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let batch = @@ -1194,7 +1277,7 @@ impl IndexScheduler { } /// Register a task to cleanup the task queue if needed - fn cleanup_task_queue(&self) -> Result<()> { + async fn cleanup_task_queue(&self) -> Result<()> { let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let nb_tasks = self.all_task_ids(&rtxn)?.len(); @@ -1237,7 +1320,8 @@ impl IndexScheduler { delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?, ), tasks: to_delete, - })?; + }) + .await?; Ok(()) } diff --git a/meilisearch-auth/src/lib.rs b/meilisearch-auth/src/lib.rs index 4e6afd0b8..d88ce24f8 100644 --- a/meilisearch-auth/src/lib.rs +++ b/meilisearch-auth/src/lib.rs @@ -110,7 +110,7 @@ impl AuthController { dbg!(controller_clone.store.delete_api_key(uuid).unwrap()); } zk::EventType::NodeCreated | zk::EventType::NodeDataChanged => { - let (key, stat) = zk.get_data(&path).await.unwrap(); + let (key, _stat) = zk.get_data(&path).await.unwrap(); let key: Key = serde_json::from_slice(&key).unwrap(); log::info!("The key {} has been deleted", key.uid); diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 51e919f71..1b277fe90 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -203,15 +203,16 @@ pub async fn setup_meilisearch( if let ScheduleSnapshot::Enabled(snapshot_delay) = opt.schedule_snapshot { let snapshot_delay = Duration::from_secs(snapshot_delay); let index_scheduler = index_scheduler.clone(); - thread::Builder::new() - .name(String::from("register-snapshot-tasks")) - .spawn(move || loop { + tokio::task::spawn(async move { + loop { thread::sleep(snapshot_delay); - if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) { + if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation).await { error!("Error while registering snapshot: {}", e); } - }) - .unwrap(); + } + }) + .await + .unwrap(); } Ok((index_scheduler, auth_controller)) @@ -225,31 +226,32 @@ async fn open_or_create_database_unchecked( ) -> anyhow::Result<(IndexScheduler, AuthController)> { // we don't want to create anything in the data.ms yet, thus we // wrap our two builders in a closure that'll be executed later. - let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zk); + let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zk.clone()); let instance_features = opt.to_instance_features(); - let index_scheduler_builder = || -> anyhow::Result<_> { - Ok(IndexScheduler::new(IndexSchedulerOptions { - version_file_path: opt.db_path.join(VERSION_FILE_NAME), - auth_path: opt.db_path.join("auth"), - tasks_path: opt.db_path.join("tasks"), - update_file_path: opt.db_path.join("update_files"), - indexes_path: opt.db_path.join("indexes"), - snapshots_path: opt.snapshot_dir.clone(), - dumps_path: opt.dump_dir.clone(), - task_db_size: opt.max_task_db_size.get_bytes() as usize, - index_base_map_size: opt.max_index_size.get_bytes() as usize, - enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, - indexer_config: (&opt.indexer_options).try_into()?, - autobatching_enabled: true, - max_number_of_tasks: 1_000_000, - index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, - index_count: DEFAULT_INDEX_COUNT, - instance_features, - })?) - }; + let index_scheduler = IndexScheduler::new(IndexSchedulerOptions { + version_file_path: opt.db_path.join(VERSION_FILE_NAME), + auth_path: opt.db_path.join("auth"), + tasks_path: opt.db_path.join("tasks"), + update_file_path: opt.db_path.join("update_files"), + indexes_path: opt.db_path.join("indexes"), + snapshots_path: opt.snapshot_dir.clone(), + dumps_path: opt.dump_dir.clone(), + task_db_size: opt.max_task_db_size.get_bytes() as usize, + index_base_map_size: opt.max_index_size.get_bytes() as usize, + enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, + indexer_config: (&opt.indexer_options).try_into()?, + autobatching_enabled: true, + max_number_of_tasks: 1_000_000, + index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, + index_count: DEFAULT_INDEX_COUNT, + instance_features, + zk: zk.clone(), + }) + .await + .map_err(anyhow::Error::from); match ( - index_scheduler_builder(), + index_scheduler, auth_controller.await.map_err(anyhow::Error::from), create_version_file(&opt.db_path).map_err(anyhow::Error::from), ) { diff --git a/meilisearch/src/routes/dump.rs b/meilisearch/src/routes/dump.rs index 0aabd2aa6..e61b40ff1 100644 --- a/meilisearch/src/routes/dump.rs +++ b/meilisearch/src/routes/dump.rs @@ -29,8 +29,7 @@ pub async fn create_dump( keys: auth_controller.list_keys()?, instance_uid: analytics.instance_uid().cloned(), }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 2afc1b5fb..a99f57911 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -129,8 +129,7 @@ pub async fn delete_document( index_uid: index_uid.to_string(), documents_ids: vec![document_id], }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) } @@ -445,7 +444,7 @@ async fn document_addition( }; let scheduler = index_scheduler.clone(); - let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? { + let task = match scheduler.register(task).await { Ok(task) => task, Err(e) => { index_scheduler.delete_update_file(uuid)?; @@ -476,8 +475,7 @@ pub async fn delete_documents_batch( let task = KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -512,8 +510,7 @@ pub async fn delete_documents_by_filter( .map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?; let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -529,8 +526,7 @@ pub async fn clear_all_documents( analytics.delete_documents(DocumentDeletionKind::ClearAll, &req); let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/indexes/mod.rs b/meilisearch/src/routes/indexes/mod.rs index 81b5c3f2e..fee7310e0 100644 --- a/meilisearch/src/routes/indexes/mod.rs +++ b/meilisearch/src/routes/indexes/mod.rs @@ -135,8 +135,7 @@ pub async fn create_index( ); let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); Ok(HttpResponse::Accepted().json(task)) } else { @@ -203,8 +202,7 @@ pub async fn update_index( primary_key: body.primary_key, }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -216,8 +214,7 @@ pub async fn delete_index( ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); Ok(HttpResponse::Accepted().json(task)) } diff --git a/meilisearch/src/routes/indexes/settings.rs b/meilisearch/src/routes/indexes/settings.rs index 1b48ae745..df2748cf1 100644 --- a/meilisearch/src/routes/indexes/settings.rs +++ b/meilisearch/src/routes/indexes/settings.rs @@ -55,10 +55,7 @@ macro_rules! make_setting_route { is_deletion: true, allow_index_creation, }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)) - .await?? - .into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -97,10 +94,7 @@ macro_rules! make_setting_route { is_deletion: false, allow_index_creation, }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)) - .await?? - .into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -586,8 +580,7 @@ pub async fn update_all( is_deletion: false, allow_index_creation, }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) @@ -622,8 +615,7 @@ pub async fn delete_all( is_deletion: true, allow_index_creation, }; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + let task: SummarizedTaskView = index_scheduler.register(task).await?.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) diff --git a/meilisearch/src/routes/swap_indexes.rs b/meilisearch/src/routes/swap_indexes.rs index c4e204c09..acaffea33 100644 --- a/meilisearch/src/routes/swap_indexes.rs +++ b/meilisearch/src/routes/swap_indexes.rs @@ -61,7 +61,7 @@ pub async fn swap_indexes( let task = KindWithContent::IndexSwap { swaps }; - let task = index_scheduler.register(task)?; + let task = index_scheduler.register(task).await?; let task: SummarizedTaskView = task.into(); Ok(HttpResponse::Accepted().json(task)) } diff --git a/meilisearch/src/routes/tasks.rs b/meilisearch/src/routes/tasks.rs index f7d4c44d7..9c9fd2aa4 100644 --- a/meilisearch/src/routes/tasks.rs +++ b/meilisearch/src/routes/tasks.rs @@ -333,7 +333,7 @@ async fn cancel_tasks( let task_cancelation = KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks }; - let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation)).await??; + let task = index_scheduler.register(task_cancelation).await?; let task: SummarizedTaskView = task.into(); Ok(HttpResponse::Ok().json(task)) @@ -378,7 +378,7 @@ async fn delete_tasks( let task_deletion = KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks }; - let task = task::spawn_blocking(move || index_scheduler.register(task_deletion)).await??; + let task = index_scheduler.register(task_deletion).await?; let task: SummarizedTaskView = task.into(); Ok(HttpResponse::Ok().json(task))