mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-19 17:38:28 +08:00
implements most operations
This commit is contained in:
parent
9aa31cd391
commit
ef4594c078
@ -9,16 +9,19 @@ mod utils;
|
|||||||
use batch::Batch;
|
use batch::Batch;
|
||||||
pub use error::Error;
|
pub use error::Error;
|
||||||
use index::Index;
|
use index::Index;
|
||||||
use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str};
|
|
||||||
pub use task::Task;
|
pub use task::Task;
|
||||||
use task::{Kind, KindWithContent, Status};
|
use task::{Kind, KindWithContent, Status};
|
||||||
|
use time::OffsetDateTime;
|
||||||
|
use update_file_store::UpdateFileStore;
|
||||||
|
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{collections::HashMap, sync::RwLock};
|
use std::{collections::HashMap, sync::RwLock};
|
||||||
|
|
||||||
|
use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str};
|
||||||
use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn};
|
use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn};
|
||||||
|
use milli::update::IndexDocumentsMethod;
|
||||||
use milli::{RoaringBitmapCodec, BEU32};
|
use milli::{RoaringBitmapCodec, BEU32};
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
@ -54,6 +57,8 @@ pub struct IndexScheduler {
|
|||||||
/// The list of tasks currently processing.
|
/// The list of tasks currently processing.
|
||||||
processing_tasks: Arc<RwLock<RoaringBitmap>>,
|
processing_tasks: Arc<RwLock<RoaringBitmap>>,
|
||||||
|
|
||||||
|
file_store: UpdateFileStore,
|
||||||
|
|
||||||
/// The LMDB environment which the DBs are associated with.
|
/// The LMDB environment which the DBs are associated with.
|
||||||
env: Env,
|
env: Env,
|
||||||
|
|
||||||
@ -326,7 +331,66 @@ impl IndexScheduler {
|
|||||||
Batch::Cancel(_) => todo!(),
|
Batch::Cancel(_) => todo!(),
|
||||||
Batch::Snapshot(_) => todo!(),
|
Batch::Snapshot(_) => todo!(),
|
||||||
Batch::Dump(_) => todo!(),
|
Batch::Dump(_) => todo!(),
|
||||||
Batch::Contiguous { tasks, kind } => todo!(),
|
Batch::Contiguous { tasks, kind } => {
|
||||||
|
// it's safe because you can't batch 0 contiguous tasks.
|
||||||
|
let first_task = &tasks[0];
|
||||||
|
// and the two kind of tasks we batch MUST have ONE index name.
|
||||||
|
let index_name = first_task.indexes().unwrap()[0];
|
||||||
|
let index = self.index(index_name)?;
|
||||||
|
|
||||||
|
match kind {
|
||||||
|
Kind::DocumentAddition => {
|
||||||
|
let content_files = tasks.iter().map(|task| match &task.kind {
|
||||||
|
KindWithContent::DocumentAddition { content_file, .. } => {
|
||||||
|
content_file.clone()
|
||||||
|
}
|
||||||
|
k => unreachable!(
|
||||||
|
"Internal error, `{:?}` is not supposed to be reachable here",
|
||||||
|
k.as_kind()
|
||||||
|
),
|
||||||
|
});
|
||||||
|
let results = index.update_documents(
|
||||||
|
IndexDocumentsMethod::UpdateDocuments,
|
||||||
|
None,
|
||||||
|
self.file_store.clone(),
|
||||||
|
content_files,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
for (task, result) in tasks.iter_mut().zip(results) {
|
||||||
|
task.finished_at = Some(OffsetDateTime::now_utc());
|
||||||
|
match result {
|
||||||
|
Ok(_) => task.status = Status::Succeeded,
|
||||||
|
Err(_) => task.status = Status::Succeeded,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Kind::DocumentDeletion => {
|
||||||
|
let ids: Vec<_> = tasks
|
||||||
|
.iter()
|
||||||
|
.flat_map(|task| match &task.kind {
|
||||||
|
KindWithContent::DocumentDeletion { documents_ids, .. } => {
|
||||||
|
documents_ids.clone()
|
||||||
|
}
|
||||||
|
k => unreachable!(
|
||||||
|
"Internal error, `{:?}` is not supposed to be reachable here",
|
||||||
|
k.as_kind()
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let result = index.delete_documents(&ids);
|
||||||
|
|
||||||
|
for task in tasks.iter_mut() {
|
||||||
|
task.finished_at = Some(OffsetDateTime::now_utc());
|
||||||
|
match result {
|
||||||
|
Ok(_) => task.status = Status::Succeeded,
|
||||||
|
Err(_) => task.status = Status::Succeeded,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
Batch::Empty => todo!(),
|
Batch::Empty => todo!(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ use anyhow::Result;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::TaskId;
|
use crate::TaskId;
|
||||||
|
|
||||||
@ -56,7 +57,7 @@ pub enum KindWithContent {
|
|||||||
Snapshot,
|
Snapshot,
|
||||||
DocumentAddition {
|
DocumentAddition {
|
||||||
index_name: String,
|
index_name: String,
|
||||||
content_file: String,
|
content_file: Uuid,
|
||||||
},
|
},
|
||||||
DocumentDeletion {
|
DocumentDeletion {
|
||||||
index_name: String,
|
index_name: String,
|
||||||
|
Loading…
Reference in New Issue
Block a user