connect the new scheduler to meilisearch-http officially.

I can index documents and do search
This commit is contained in:
Tamo 2022-09-26 13:46:34 +02:00 committed by Clément Renault
parent f84cbee170
commit 42f5c1fc3f
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
4 changed files with 38 additions and 13 deletions

View File

@ -12,7 +12,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use milli::heed::types::{OwnedType, SerdeBincode, Str};
use milli::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use milli::heed::{self, Database, Env};
use milli::{RoaringBitmapCodec, BEU32};
@ -108,7 +108,7 @@ pub struct IndexScheduler {
pub(crate) env: Env,
// The main database, it contains all the tasks accessible by their Id.
pub(crate) all_tasks: Database<OwnedType<BEU32>, SerdeBincode<Task>>,
pub(crate) all_tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>,
/// All the tasks ids grouped by their status.
pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
@ -215,7 +215,27 @@ impl IndexScheduler {
let tasks =
self.get_existing_tasks(&rtxn, tasks.into_iter().rev().take(query.limit as usize))?;
Ok(tasks.into_iter().map(|task| task.as_task_view()).collect())
let (started_at, processing) = self
.processing_tasks
.read()
.map_err(|_| Error::CorruptedTaskQueue)?
.clone();
let mut ret = tasks.into_iter().map(|task| task.as_task_view());
if processing.is_empty() {
Ok(ret.collect())
} else {
Ok(ret
.map(|task| match processing.contains(task.uid) {
true => TaskView {
status: Status::Processing,
started_at: Some(started_at.clone()),
..task
},
false => task,
})
.collect())
}
}
/// Register a new task in the scheduler. If it fails and data was associated with the task
@ -334,15 +354,10 @@ impl IndexScheduler {
}
}
// TODO: TAMO: do this later
// must delete the file on disk
// in case of error, must update the tasks with the error
// in case of « success » we must update all the task on disk
// self.handle_batch_result(res);
*self.processing_tasks.write().unwrap() = (finished_at, RoaringBitmap::new());
wtxn.commit()?;
log::info!("A batch of tasks was successfully completed.");
Ok(())
}
@ -461,6 +476,9 @@ mod tests {
index_scheduler.tick().unwrap();
let task = index_scheduler.get_tasks(Query::default()).unwrap();
assert_smol_debug_snapshot!(task, @r###"[TaskView { uid: 0, index_uid: Some("doggos"), status: Succeeded, kind: DocumentAddition, details: Some(DocumentAddition { received_documents: 1, indexed_documents: 1 }), error: None, duration: Some(Duration { seconds: 0, nanoseconds: 29654837 }), enqueued_at: OffsetDateTime { local_datetime: PrimitiveDateTime { date: Date { year: 2022, ordinal: 269 }, time: Time { hour: 11, minute: 34, second: 29, nanosecond: 202925184 } }, offset: UtcOffset { hours: 0, minutes: 0, seconds: 0 } }, started_at: Some(OffsetDateTime { local_datetime: PrimitiveDateTime { date: Date { year: 2022, ordinal: 269 }, time: Time { hour: 11, minute: 34, second: 29, nanosecond: 203190739 } }, offset: UtcOffset { hours: 0, minutes: 0, seconds: 0 } }), finished_at: Some(OffsetDateTime { local_datetime: PrimitiveDateTime { date: Date { year: 2022, ordinal: 269 }, time: Time { hour: 11, minute: 34, second: 29, nanosecond: 232845576 } }, offset: UtcOffset { hours: 0, minutes: 0, seconds: 0 } }) }]"###);
let doggos = index_scheduler.index("doggos").unwrap();
let rtxn = doggos.read_txn().unwrap();

View File

@ -42,7 +42,7 @@ pub struct TaskView {
pub finished_at: Option<OffsetDateTime>,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Task {
pub uid: TaskId,
@ -122,7 +122,7 @@ impl FromStr for Status {
}
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum KindWithContent {
DocumentAddition {

View File

@ -49,6 +49,12 @@ impl IndexScheduler {
.get_task(wtxn, task.uid)?
.ok_or(Error::CorruptedTaskQueue)?;
debug_assert_eq!(old_task.uid, task.uid);
if old_task == *task {
return Ok(());
}
if old_task.status != task.status {
self.update_status(wtxn, old_task.status, |bitmap| {
bitmap.remove(task.uid);
@ -67,8 +73,7 @@ impl IndexScheduler {
})?;
}
// TODO: TAMO: update the task in `all_tasks`
self.all_tasks.put(wtxn, &BEU32::new(task.uid), &task)?;
Ok(())
}

View File

@ -46,6 +46,8 @@ async fn main() -> anyhow::Result<()> {
}
let meilisearch = setup_meilisearch(&opt)?;
let m = meilisearch.clone();
tokio::task::spawn_blocking(move || m.run());
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key)?;