Introduce the ProcessingTasks struct

This commit is contained in:
Kerollmops 2022-10-17 13:54:35 +02:00 committed by Clément Renault
parent c9523c6f39
commit 703ba7a1fb
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 54 additions and 15 deletions

View File

@ -939,7 +939,7 @@ impl IndexScheduler {
// 1. Remove from this list the tasks that we are not allowed to delete // 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?; let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().1; let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
let all_task_ids = self.all_task_ids(&wtxn)?; let all_task_ids = self.all_task_ids(&wtxn)?;
let mut to_delete_tasks = all_task_ids & matched_tasks; let mut to_delete_tasks = all_task_ids & matched_tasks;

View File

@ -11,10 +11,9 @@ pub type TaskId = u32;
use dump::{KindDump, TaskDump, UpdateFile}; use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error; pub use error::Error;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use file_store::FileStore; use file_store::FileStore;
@ -27,8 +26,10 @@ use uuid::Uuid;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env}; use meilisearch_types::heed::{self, Database, Env};
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{Index, RoaringBitmapCodec, BEU32}; use meilisearch_types::milli::{Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use crate::index_mapper::IndexMapper; use crate::index_mapper::IndexMapper;
@ -117,6 +118,37 @@ impl Query {
} }
} }
#[derive(Debug, Clone)]
struct ProcessingTasks {
/// The date and time at which the indexation started.
started_at: OffsetDateTime,
/// The list of tasks ids that are currently running.
processing: RoaringBitmap,
/// A boolean that can be set to true to stop the currently processing tasks.
must_stop: Arc<AtomicBool>,
}
impl ProcessingTasks {
fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) {
self.started_at = started_at;
self.processing = processing;
}
fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) {
self.started_at = stopped_at;
self.processing = RoaringBitmap::new();
}
fn cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool {
// If there, at least, is one task that is currently processing we must stop.
let must_stop = !self.processing.is_disjoint(canceled_tasks);
if must_stop {
self.must_stop.store(true, Relaxed);
}
must_stop
}
}
/// Database const names for the `IndexScheduler`. /// Database const names for the `IndexScheduler`.
mod db_name { mod db_name {
pub const ALL_TASKS: &str = "all-tasks"; pub const ALL_TASKS: &str = "all-tasks";
@ -129,14 +161,12 @@ mod db_name {
/// 1. Resolve the name of the indexes. /// 1. Resolve the name of the indexes.
/// 2. Schedule the tasks. /// 2. Schedule the tasks.
pub struct IndexScheduler { pub struct IndexScheduler {
/// The list of tasks currently processing and their starting date.
pub(crate) processing_tasks: Arc<RwLock<(OffsetDateTime, RoaringBitmap)>>,
pub(crate) file_store: FileStore,
/// The LMDB environment which the DBs are associated with. /// The LMDB environment which the DBs are associated with.
pub(crate) env: Env, pub(crate) env: Env,
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
pub(crate) file_store: FileStore,
// The main database, it contains all the tasks accessible by their Id. // The main database, it contains all the tasks accessible by their Id.
pub(crate) all_tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>, pub(crate) all_tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>,
@ -153,7 +183,7 @@ pub struct IndexScheduler {
/// Get a signal when a batch needs to be processed. /// Get a signal when a batch needs to be processed.
pub(crate) wake_up: Arc<SignalEvent>, pub(crate) wake_up: Arc<SignalEvent>,
/// Weither autobatching is enabled or not. /// Whether auto-batching is enabled or not.
pub(crate) autobatching_enabled: bool, pub(crate) autobatching_enabled: bool,
/// The path used to create the dumps. /// The path used to create the dumps.
@ -195,12 +225,15 @@ impl IndexScheduler {
options.max_dbs(6); options.max_dbs(6);
let env = options.open(tasks_path)?; let env = options.open(tasks_path)?;
let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new()); let processing_tasks = ProcessingTasks {
started_at: OffsetDateTime::now_utc(),
processing: RoaringBitmap::new(),
must_stop: Arc::new(AtomicBool::new(false)),
};
let file_store = FileStore::new(&update_file_path)?; let file_store = FileStore::new(&update_file_path)?;
// allow unreachable_code to get rids of the warning in the case of a test build. // allow unreachable_code to get rids of the warning in the case of a test build.
let this = Self { let this = Self {
// by default there is no processing tasks
processing_tasks: Arc::new(RwLock::new(processing_tasks)), processing_tasks: Arc::new(RwLock::new(processing_tasks)),
file_store, file_store,
all_tasks: env.create_database(Some(db_name::ALL_TASKS))?, all_tasks: env.create_database(Some(db_name::ALL_TASKS))?,
@ -321,7 +354,7 @@ impl IndexScheduler {
.take(query.limit.unwrap_or(u32::MAX) as usize), .take(query.limit.unwrap_or(u32::MAX) as usize),
)?; )?;
let (started_at, processing) = self let ProcessingTasks { started_at, processing, .. } = self
.processing_tasks .processing_tasks
.read() .read()
.map_err(|_| Error::CorruptedTaskQueue)? .map_err(|_| Error::CorruptedTaskQueue)?
@ -556,7 +589,10 @@ impl IndexScheduler {
let processed_tasks = ids.len(); let processed_tasks = ids.len();
let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap(); let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap();
let started_at = OffsetDateTime::now_utc(); let started_at = OffsetDateTime::now_utc();
*self.processing_tasks.write().unwrap() = (started_at, processing_tasks); self.processing_tasks
.write()
.unwrap()
.start_processing_at(started_at, processing_tasks);
#[cfg(test)] #[cfg(test)]
{ {
@ -596,7 +632,10 @@ impl IndexScheduler {
} }
} }
} }
*self.processing_tasks.write().unwrap() = (finished_at, RoaringBitmap::new()); self.processing_tasks
.write()
.unwrap()
.stop_processing_at(finished_at);
wtxn.commit()?; wtxn.commit()?;
#[cfg(test)] #[cfg(test)]

View File

@ -31,7 +31,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let mut snap = String::new(); let mut snap = String::new();
let (_time, processing_tasks) = processing_tasks.read().unwrap().clone(); let processing_tasks = processing_tasks.read().unwrap().processing;
snap.push_str(&format!( snap.push_str(&format!(
"### Autobatching Enabled = {autobatching_enabled}\n" "### Autobatching Enabled = {autobatching_enabled}\n"
)); ));