diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 83b0d56ab..880dc1197 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -1,3 +1,10 @@ +/*! +The autobatcher is responsible for combining the next enqueued +tasks affecting a single index into a [batch](crate::batch::Batch). + +The main function of the autobatcher is [`next_autobatch`]. +*/ + use meilisearch_types::milli::update::IndexDocumentsMethod::{ self, ReplaceDocuments, UpdateDocuments, }; @@ -6,8 +13,10 @@ use std::ops::ControlFlow::{self, Break, Continue}; use crate::KindWithContent; -/// This enum contain the minimal necessary informations -/// to make the autobatcher works. +/// Succinctly describes a task's [`Kind`](meilisearch_types::tasks::Kind) +/// for the purpose of simplifying the implementation of the autobatcher. +/// +/// Only the non-prioritised tasks that can be grouped in a batch have a corresponding [`AutobatchKind`] enum AutobatchKind { DocumentImport { method: IndexDocumentsMethod, @@ -387,6 +396,16 @@ impl BatchKind { } } +/// Create a batch from an ordered list of tasks. +/// +/// ## Preconditions +/// 1. The tasks must be enqueued and given in the order in which they were enqueued +/// 2. The tasks must not be prioritised tasks (e.g. task cancellation, dump, snapshot, task deletion) +/// 3. The tasks must all be related to the same index +/// +/// ## Return +/// `None` if the list of tasks is empty. Otherwise, an [`AutoBatch`] that represents +/// a subset of the given tasks. pub fn autobatch(enqueued: Vec<(TaskId, KindWithContent)>) -> Option { let mut enqueued = enqueued.into_iter(); let (id, kind) = enqueued.next()?; diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 607557a1c..e5c16b0b2 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1,3 +1,22 @@ +/*! +This module handles the creation and processing of batch operations. + +A batch is a combination of multiple tasks that can be processed at once. +Executing a batch operation should always be functionally equivalent to +executing each of its tasks' operations individually and in order. + +For example, if the user sends two tasks: +1. import documents X +2. import documents Y + +We can combine the two tasks in a single batch: +1. import documents X and Y + +Processing this batch is functionally equivalent to processing the two +tasks individally, but should be much faster since we are only performing +one indexing operation. +*/ + use std::collections::HashSet; use std::fs::File; use std::io::BufWriter; @@ -26,6 +45,11 @@ use roaring::RoaringBitmap; use time::OffsetDateTime; use uuid::Uuid; +/// Represents a combination of tasks that can all be processed at the same time. +/// +/// A batch contains the set of tasks that it represents (accessible through +/// [`self.ids()`](Batch::ids)), as well as additional information on how to +/// be processed. #[derive(Debug)] pub(crate) enum Batch { TaskCancelation(Task), @@ -49,6 +73,7 @@ pub(crate) enum Batch { }, } +/// A [batch](Batch) that combines multiple tasks operating on an index. #[derive(Debug)] pub(crate) enum IndexOperation { DocumentImport { @@ -102,6 +127,7 @@ pub(crate) enum IndexOperation { } impl Batch { + /// Return the task ids associated with this batch. pub fn ids(&self) -> Vec { match self { Batch::TaskCancelation(task) @@ -135,6 +161,12 @@ impl Batch { } impl IndexScheduler { + /// Convert an [`BatchKind`](crate::autobatcher::BatchKind) into a [`Batch`]. + /// + /// ## Arguments + /// - `rtxn`: read transaction + /// - `index_uid`: name of the index affected by the operations of the autobatch + /// - `batch`: the result of the autobatcher pub(crate) fn create_next_batch_index( &self, rtxn: &RoTxn, @@ -456,6 +488,12 @@ impl IndexScheduler { Ok(None) } + /// Apply the operation associated with the given batch. + /// + /// ## Return + /// The list of tasks that were processed. The metadata of each task in the returned + /// list is updated accordingly, with the exception of the its date fields + /// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at). pub(crate) fn process_batch(&self, batch: Batch) -> Result> { match batch { Batch::TaskCancelation(mut task) => { @@ -741,6 +779,10 @@ impl IndexScheduler { } } + /// Process the index operation on the given index. + /// + /// ## Return + /// The list of processed tasks. fn apply_index_operation<'txn, 'i>( &self, index_wtxn: &'txn mut RwTxn<'i, '_>, diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index b096ece1f..0e80213c1 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -16,23 +16,29 @@ use crate::{Error, Result}; const INDEX_MAPPING: &str = "index-mapping"; +/// Structure managing meilisearch's indexes. +/// +/// It is responsible for: +/// 1. Creating new indexes +/// 2. Opening indexes and storing references to these opened indexes +/// 3. Accessing indexes through their uuid +/// 4. Mapping a user-defined name to each index uuid. #[derive(Clone)] pub struct IndexMapper { - // Keep track of the opened indexes and is used - // mainly by the index resolver. + /// Keep track of the opened indexes. Used mainly by the index resolver. index_map: Arc>>, // TODO create a UUID Codec that uses the 16 bytes representation - // Map an index name with an index uuid currently available on disk. + /// Map an index name with an index uuid currently available on disk. index_mapping: Database>, + /// Path to the folder where the LMDB environments of each index are. base_path: PathBuf, index_size: usize, pub indexer_config: Arc, } -/// Weither the index must not be inserted back -/// or it is available for use. +/// Whether the index is available for use or is forbidden to be inserted back in the index map #[derive(Clone)] pub enum IndexStatus { /// Do not insert it back in the index map as it is currently being deleted. @@ -167,6 +173,7 @@ impl IndexMapper { Ok(index) } + /// Return all indexes, may open them if they weren't already opened. pub fn indexes(&self, rtxn: &RoTxn) -> Result> { self.index_mapping .iter(rtxn)? diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 9463cd9f9..42a69ed2b 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1,3 +1,23 @@ +/*! +This crate defines the index scheduler, which is responsible for: +1. Keeping references to meilisearch's indexes and mapping them to their +user-defined names. +2. Scheduling tasks given by the user and executing them, in batch if possible. + +When an `IndexScheduler` is created, a new thread containing a reference to the +scheduler is created. This thread runs the scheduler's run loop, where the +scheduler waits to be woken up to process new tasks. It wakes up when: + +1. it is launched for the first time +2. a new task is registered +3. a batch of tasks has been processed + +It is only within this thread that the scheduler is allowed to process tasks. +On the other hand, the publicly accessible methods of the scheduler can be +called asynchronously from any thread. These methods can either query the +content of the scheduler or enqueue new tasks. +*/ + mod autobatcher; mod batch; pub mod error; @@ -36,26 +56,50 @@ use crate::index_mapper::IndexMapper; type BEI128 = meilisearch_types::heed::zerocopy::I128; +/// Defines a subset of tasks to be retrieved from the [`IndexScheduler`]. +/// +/// An empty/default query (where each field is set to `None`) matches all tasks. +/// Each non-null field restricts the set of tasks further. #[derive(Default, Debug, Clone, PartialEq, Eq)] pub struct Query { + /// The maximum number of tasks to be matched pub limit: Option, + /// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched pub from: Option, + /// The allowed [statuses](`meilisearch_types::tasks::Task::status`) of the matched tasls pub status: Option>, + /// The allowed [kinds](meilisearch_types::tasks::Kind) of the matched tasks. + /// + /// The kind of a task is given by: + /// ``` + /// # use meilisearch_types::tasks::{Task, Kind}; + /// # fn doc_func(task: Task) -> Kind { + /// task.kind.as_kind() + /// # } + /// ``` pub kind: Option>, + /// The allowed [index ids](meilisearch_types::tasks::Task::index_uid) of the matched tasks pub index_uid: Option>, + /// The [task ids](`meilisearch_types::tasks::Task::uid`) to be matched pub uid: Option>, + /// Exclusive upper bound of the matched tasks' [`enqueued_at`](meilisearch_types::tasks::Task::enqueued_at) field. pub before_enqueued_at: Option, + /// Exclusive lower bound of the matched tasks' [`enqueued_at`](meilisearch_types::tasks::Task::enqueued_at) field. pub after_enqueued_at: Option, + /// Exclusive upper bound of the matched tasks' [`started_at`](meilisearch_types::tasks::Task::started_at) field. pub before_started_at: Option, + /// Exclusive lower bound of the matched tasks' [`started_at`](meilisearch_types::tasks::Task::started_at) field. pub after_started_at: Option, + /// Exclusive upper bound of the matched tasks' [`finished_at`](meilisearch_types::tasks::Task::finished_at) field. pub before_finished_at: Option, + /// Exclusive lower bound of the matched tasks' [`finished_at`](meilisearch_types::tasks::Task::finished_at) field. pub after_finished_at: Option, } impl Query { /// Return `true` iff every field of the query is set to `None`, such that the query - /// would match all tasks. + /// matches all tasks. pub fn is_empty(&self) -> bool { matches!( self, @@ -75,24 +119,8 @@ impl Query { } ) } - pub fn with_status(self, status: Status) -> Self { - let mut status_vec = self.status.unwrap_or_default(); - status_vec.push(status); - Self { - status: Some(status_vec), - ..self - } - } - - pub fn with_kind(self, kind: Kind) -> Self { - let mut kind_vec = self.kind.unwrap_or_default(); - kind_vec.push(kind); - Self { - kind: Some(kind_vec), - ..self - } - } + /// Add an [index id](meilisearch_types::tasks::Task::index_uid) to the list of permitted indexes. pub fn with_index(self, index_uid: String) -> Self { let mut index_vec = self.index_uid.unwrap_or_default(); index_vec.push(index_uid); @@ -101,22 +129,6 @@ impl Query { ..self } } - - pub fn with_uid(self, uid: TaskId) -> Self { - let mut task_vec = self.uid.unwrap_or_default(); - task_vec.push(uid); - Self { - uid: Some(task_vec), - ..self - } - } - - pub fn with_limit(self, limit: u32) -> Self { - Self { - limit: Some(limit), - ..self - } - } } #[derive(Debug, Clone)] @@ -182,16 +194,19 @@ mod db_name { pub const FINISHED_AT: &str = "finished-at"; } -/// This module is responsible for two things; -/// 1. Resolve the name of the indexes. -/// 2. Schedule the tasks. +/// Structure which holds meilisearch's indexes and schedules the tasks +/// to be performed on them. pub struct IndexScheduler { /// The LMDB environment which the DBs are associated with. pub(crate) env: Env, /// A boolean that can be set to true to stop the currently processing tasks. pub(crate) must_stop_processing: MustStopProcessing, + + /// The list of tasks currently processing pub(crate) processing_tasks: Arc>, + + /// The list of files referenced by the tasks pub(crate) file_store: FileStore, // The main database, it contains all the tasks accessible by their Id. @@ -248,6 +263,17 @@ pub enum Breakpoint { } impl IndexScheduler { + /// Create an index scheduler and start its run loop. + /// + /// ## Arguments + /// - `tasks_path`: the path to the folder containing the task databases + /// - `update_file_path`: the path to the file store containing the files associated to the tasks + /// - `indexes_path`: the path to the folder containing meilisearch's indexes + /// - `dumps_path`: the path to the folder containing the dumps + /// - `index_size`: the maximum size, in bytes, of each meilisearch index + /// - `indexer_config`: configuration used during indexing for each meilisearch index + /// - `autobatching_enabled`: `true` iff the index scheduler is allowed to automatically batch tasks + /// together, to process multiple tasks at once. pub fn new( tasks_path: PathBuf, update_file_path: PathBuf, @@ -296,7 +322,10 @@ impl IndexScheduler { Ok(this) } - /// This function will execute in a different thread and must be called only once. + /// Start the run loop for the given index scheduler. + /// + /// This function will execute in a different thread and must be called + /// only once per index scheduler. fn run(&self) { let run = Self { must_stop_processing: MustStopProcessing::default(), @@ -334,9 +363,10 @@ impl IndexScheduler { &self.index_mapper.indexer_config } - /// Return the index corresponding to the name. If it wasn't opened before - /// it'll be opened. But if it doesn't exist on disk it'll throw an - /// `IndexNotFound` error. + /// Return the index corresponding to the name. + /// + /// * If the index wasn't opened before, the index will be opened. + /// * If the index doesn't exist on disk, the `IndexNotFoundError` is thrown. pub fn index(&self, name: &str) -> Result { let rtxn = self.env.read_txn()?; self.index_mapper.index(&rtxn, name) @@ -348,7 +378,7 @@ impl IndexScheduler { self.index_mapper.indexes(&rtxn) } - /// Return the task ids corresponding to the query + /// Return the task ids matched by the given query. pub fn get_task_ids(&self, query: &Query) -> Result { let rtxn = self.env.read_txn()?; @@ -410,7 +440,7 @@ impl IndexScheduler { Ok(tasks) } - /// Returns the tasks corresponding to the query. + /// Returns the tasks matched by the given query. pub fn get_tasks(&self, query: Query) -> Result> { let tasks = self.get_task_ids(&query)?; let rtxn = self.env.read_txn()?; @@ -450,8 +480,9 @@ impl IndexScheduler { } } - /// Register a new task in the scheduler. If it fails and data was associated with the task - /// it tries to delete the file. + /// Register a new task in the scheduler. + /// + /// If it fails and data was associated with the task, it tries to delete the associated data. pub fn register(&self, kind: KindWithContent) -> Result { let mut wtxn = self.env.write_txn()?; @@ -645,6 +676,11 @@ impl IndexScheduler { Ok(index) } + /// Create a file and register it in the index scheduler. + /// + /// The returned file and uuid can be used to associate + /// some data to a task. The file will be kept until + /// the task has been fully processed. pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> { Ok(self.file_store.new_update()?) } @@ -654,11 +690,23 @@ impl IndexScheduler { Ok(self.file_store.new_update_with_uuid(uuid)?) } + /// Delete a file from the index scheduler. + /// + /// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method. pub fn delete_update_file(&self, uuid: Uuid) -> Result<()> { Ok(self.file_store.delete(uuid)?) } - /// Create and execute and store the result of one batch of registered tasks. + /// Perform one iteration of the run loop. + /// + /// 1. Find the next batch of tasks to be processed. + /// 2. Update the information of these tasks following the start of their processing. + /// 3. Update the in-memory list of processed tasks accordingly. + /// 4. Process the batch: + /// - perform the actions of each batched task + /// - update the information of each batched task following the end + /// of their processing. + /// 5. Reset the in-memory list of processed tasks. /// /// Returns the number of processed tasks. fn tick(&self) -> Result { diff --git a/meilisearch-http/src/routes/indexes/mod.rs b/meilisearch-http/src/routes/indexes/mod.rs index 97e8ca3d6..e79f02244 100644 --- a/meilisearch-http/src/routes/indexes/mod.rs +++ b/meilisearch-http/src/routes/indexes/mod.rs @@ -215,12 +215,12 @@ impl IndexStats { index_uid: String, ) -> Result { // we check if there is currently a task processing associated with this index. - let processing_task = index_scheduler.get_tasks( - Query::default() - .with_status(Status::Processing) - .with_index(index_uid.clone()) - .with_limit(1), - )?; + let processing_task = index_scheduler.get_tasks(Query { + status: Some(vec![Status::Processing]), + index_uid: Some(vec![index_uid.clone()]), + limit: Some(1), + ..Query::default() + })?; let is_processing = !processing_task.is_empty(); let index = index_scheduler.index(&index_uid)?; diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index b189e934d..b816fabb4 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -293,11 +293,11 @@ pub fn create_all_stats( let mut last_task: Option = None; let mut indexes = BTreeMap::new(); let mut database_size = 0; - let processing_task = index_scheduler.get_tasks( - Query::default() - .with_status(Status::Processing) - .with_limit(1), - )?; + let processing_task = index_scheduler.get_tasks(Query { + status: Some(vec![Status::Processing]), + limit: Some(1), + ..Query::default() + })?; let processing_index = processing_task .first() .and_then(|task| task.index_uid().clone());