diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index 8617e1dbe..be9a4ae46 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -12,7 +12,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::sync::RwLock; -use milli::heed::types::{OwnedType, SerdeBincode, Str}; +use milli::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use milli::heed::{self, Database, Env}; use milli::{RoaringBitmapCodec, BEU32}; @@ -108,7 +108,7 @@ pub struct IndexScheduler { pub(crate) env: Env, // The main database, it contains all the tasks accessible by their Id. - pub(crate) all_tasks: Database, SerdeBincode>, + pub(crate) all_tasks: Database, SerdeJson>, /// All the tasks ids grouped by their status. pub(crate) status: Database, RoaringBitmapCodec>, @@ -215,7 +215,27 @@ impl IndexScheduler { let tasks = self.get_existing_tasks(&rtxn, tasks.into_iter().rev().take(query.limit as usize))?; - Ok(tasks.into_iter().map(|task| task.as_task_view()).collect()) + let (started_at, processing) = self + .processing_tasks + .read() + .map_err(|_| Error::CorruptedTaskQueue)? + .clone(); + + let mut ret = tasks.into_iter().map(|task| task.as_task_view()); + if processing.is_empty() { + Ok(ret.collect()) + } else { + Ok(ret + .map(|task| match processing.contains(task.uid) { + true => TaskView { + status: Status::Processing, + started_at: Some(started_at.clone()), + ..task + }, + false => task, + }) + .collect()) + } } /// Register a new task in the scheduler. If it fails and data was associated with the task @@ -334,15 +354,10 @@ impl IndexScheduler { } } - // TODO: TAMO: do this later - // must delete the file on disk - // in case of error, must update the tasks with the error - // in case of « success » we must update all the task on disk - // self.handle_batch_result(res); + *self.processing_tasks.write().unwrap() = (finished_at, RoaringBitmap::new()); wtxn.commit()?; log::info!("A batch of tasks was successfully completed."); - Ok(()) } @@ -461,6 +476,9 @@ mod tests { index_scheduler.tick().unwrap(); + let task = index_scheduler.get_tasks(Query::default()).unwrap(); + assert_smol_debug_snapshot!(task, @r###"[TaskView { uid: 0, index_uid: Some("doggos"), status: Succeeded, kind: DocumentAddition, details: Some(DocumentAddition { received_documents: 1, indexed_documents: 1 }), error: None, duration: Some(Duration { seconds: 0, nanoseconds: 29654837 }), enqueued_at: OffsetDateTime { local_datetime: PrimitiveDateTime { date: Date { year: 2022, ordinal: 269 }, time: Time { hour: 11, minute: 34, second: 29, nanosecond: 202925184 } }, offset: UtcOffset { hours: 0, minutes: 0, seconds: 0 } }, started_at: Some(OffsetDateTime { local_datetime: PrimitiveDateTime { date: Date { year: 2022, ordinal: 269 }, time: Time { hour: 11, minute: 34, second: 29, nanosecond: 203190739 } }, offset: UtcOffset { hours: 0, minutes: 0, seconds: 0 } }), finished_at: Some(OffsetDateTime { local_datetime: PrimitiveDateTime { date: Date { year: 2022, ordinal: 269 }, time: Time { hour: 11, minute: 34, second: 29, nanosecond: 232845576 } }, offset: UtcOffset { hours: 0, minutes: 0, seconds: 0 } }) }]"###); + let doggos = index_scheduler.index("doggos").unwrap(); let rtxn = doggos.read_txn().unwrap(); diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 3d60be30f..c8330779c 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -42,7 +42,7 @@ pub struct TaskView { pub finished_at: Option, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Task { pub uid: TaskId, @@ -122,7 +122,7 @@ impl FromStr for Status { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum KindWithContent { DocumentAddition { diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index 8d767aec5..e725afb4e 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -49,6 +49,12 @@ impl IndexScheduler { .get_task(wtxn, task.uid)? .ok_or(Error::CorruptedTaskQueue)?; + debug_assert_eq!(old_task.uid, task.uid); + + if old_task == *task { + return Ok(()); + } + if old_task.status != task.status { self.update_status(wtxn, old_task.status, |bitmap| { bitmap.remove(task.uid); @@ -67,8 +73,7 @@ impl IndexScheduler { })?; } - // TODO: TAMO: update the task in `all_tasks` - + self.all_tasks.put(wtxn, &BEU32::new(task.uid), &task)?; Ok(()) } diff --git a/meilisearch-http/src/main.rs b/meilisearch-http/src/main.rs index b6f92ae28..651978c00 100644 --- a/meilisearch-http/src/main.rs +++ b/meilisearch-http/src/main.rs @@ -46,6 +46,8 @@ async fn main() -> anyhow::Result<()> { } let meilisearch = setup_meilisearch(&opt)?; + let m = meilisearch.clone(); + tokio::task::spawn_blocking(move || m.run()); let auth_controller = AuthController::new(&opt.db_path, &opt.master_key)?;