mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-27 04:25:06 +08:00
connect the new scheduler to meilisearch-http officially.
I can index documents and do search
This commit is contained in:
parent
cb4feabca2
commit
ce2dfecc03
@ -12,7 +12,7 @@ use std::path::PathBuf;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
|
|
||||||
use milli::heed::types::{OwnedType, SerdeBincode, Str};
|
use milli::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
||||||
use milli::heed::{self, Database, Env};
|
use milli::heed::{self, Database, Env};
|
||||||
|
|
||||||
use milli::{RoaringBitmapCodec, BEU32};
|
use milli::{RoaringBitmapCodec, BEU32};
|
||||||
@ -108,7 +108,7 @@ pub struct IndexScheduler {
|
|||||||
pub(crate) env: Env,
|
pub(crate) env: Env,
|
||||||
|
|
||||||
// The main database, it contains all the tasks accessible by their Id.
|
// The main database, it contains all the tasks accessible by their Id.
|
||||||
pub(crate) all_tasks: Database<OwnedType<BEU32>, SerdeBincode<Task>>,
|
pub(crate) all_tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>,
|
||||||
|
|
||||||
/// All the tasks ids grouped by their status.
|
/// All the tasks ids grouped by their status.
|
||||||
pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
|
pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
|
||||||
@ -215,7 +215,27 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
let tasks =
|
let tasks =
|
||||||
self.get_existing_tasks(&rtxn, tasks.into_iter().rev().take(query.limit as usize))?;
|
self.get_existing_tasks(&rtxn, tasks.into_iter().rev().take(query.limit as usize))?;
|
||||||
Ok(tasks.into_iter().map(|task| task.as_task_view()).collect())
|
let (started_at, processing) = self
|
||||||
|
.processing_tasks
|
||||||
|
.read()
|
||||||
|
.map_err(|_| Error::CorruptedTaskQueue)?
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
let mut ret = tasks.into_iter().map(|task| task.as_task_view());
|
||||||
|
if processing.is_empty() {
|
||||||
|
Ok(ret.collect())
|
||||||
|
} else {
|
||||||
|
Ok(ret
|
||||||
|
.map(|task| match processing.contains(task.uid) {
|
||||||
|
true => TaskView {
|
||||||
|
status: Status::Processing,
|
||||||
|
started_at: Some(started_at.clone()),
|
||||||
|
..task
|
||||||
|
},
|
||||||
|
false => task,
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a new task in the scheduler. If it fails and data was associated with the task
|
/// Register a new task in the scheduler. If it fails and data was associated with the task
|
||||||
@ -334,15 +354,10 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: TAMO: do this later
|
*self.processing_tasks.write().unwrap() = (finished_at, RoaringBitmap::new());
|
||||||
// must delete the file on disk
|
|
||||||
// in case of error, must update the tasks with the error
|
|
||||||
// in case of « success » we must update all the task on disk
|
|
||||||
// self.handle_batch_result(res);
|
|
||||||
|
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
log::info!("A batch of tasks was successfully completed.");
|
log::info!("A batch of tasks was successfully completed.");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -461,6 +476,9 @@ mod tests {
|
|||||||
|
|
||||||
index_scheduler.tick().unwrap();
|
index_scheduler.tick().unwrap();
|
||||||
|
|
||||||
|
let task = index_scheduler.get_tasks(Query::default()).unwrap();
|
||||||
|
assert_smol_debug_snapshot!(task, @r###"[TaskView { uid: 0, index_uid: Some("doggos"), status: Succeeded, kind: DocumentAddition, details: Some(DocumentAddition { received_documents: 1, indexed_documents: 1 }), error: None, duration: Some(Duration { seconds: 0, nanoseconds: 29654837 }), enqueued_at: OffsetDateTime { local_datetime: PrimitiveDateTime { date: Date { year: 2022, ordinal: 269 }, time: Time { hour: 11, minute: 34, second: 29, nanosecond: 202925184 } }, offset: UtcOffset { hours: 0, minutes: 0, seconds: 0 } }, started_at: Some(OffsetDateTime { local_datetime: PrimitiveDateTime { date: Date { year: 2022, ordinal: 269 }, time: Time { hour: 11, minute: 34, second: 29, nanosecond: 203190739 } }, offset: UtcOffset { hours: 0, minutes: 0, seconds: 0 } }), finished_at: Some(OffsetDateTime { local_datetime: PrimitiveDateTime { date: Date { year: 2022, ordinal: 269 }, time: Time { hour: 11, minute: 34, second: 29, nanosecond: 232845576 } }, offset: UtcOffset { hours: 0, minutes: 0, seconds: 0 } }) }]"###);
|
||||||
|
|
||||||
let doggos = index_scheduler.index("doggos").unwrap();
|
let doggos = index_scheduler.index("doggos").unwrap();
|
||||||
|
|
||||||
let rtxn = doggos.read_txn().unwrap();
|
let rtxn = doggos.read_txn().unwrap();
|
||||||
|
@ -42,7 +42,7 @@ pub struct TaskView {
|
|||||||
pub finished_at: Option<OffsetDateTime>,
|
pub finished_at: Option<OffsetDateTime>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct Task {
|
pub struct Task {
|
||||||
pub uid: TaskId,
|
pub uid: TaskId,
|
||||||
@ -122,7 +122,7 @@ impl FromStr for Status {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub enum KindWithContent {
|
pub enum KindWithContent {
|
||||||
DocumentAddition {
|
DocumentAddition {
|
||||||
|
@ -49,6 +49,12 @@ impl IndexScheduler {
|
|||||||
.get_task(wtxn, task.uid)?
|
.get_task(wtxn, task.uid)?
|
||||||
.ok_or(Error::CorruptedTaskQueue)?;
|
.ok_or(Error::CorruptedTaskQueue)?;
|
||||||
|
|
||||||
|
debug_assert_eq!(old_task.uid, task.uid);
|
||||||
|
|
||||||
|
if old_task == *task {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
if old_task.status != task.status {
|
if old_task.status != task.status {
|
||||||
self.update_status(wtxn, old_task.status, |bitmap| {
|
self.update_status(wtxn, old_task.status, |bitmap| {
|
||||||
bitmap.remove(task.uid);
|
bitmap.remove(task.uid);
|
||||||
@ -67,8 +73,7 @@ impl IndexScheduler {
|
|||||||
})?;
|
})?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: TAMO: update the task in `all_tasks`
|
self.all_tasks.put(wtxn, &BEU32::new(task.uid), &task)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,6 +46,8 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let meilisearch = setup_meilisearch(&opt)?;
|
let meilisearch = setup_meilisearch(&opt)?;
|
||||||
|
let m = meilisearch.clone();
|
||||||
|
tokio::task::spawn_blocking(move || m.run());
|
||||||
|
|
||||||
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key)?;
|
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key)?;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user