diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index aa95b7075..8237c8540 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -43,7 +43,7 @@ use uuid::Uuid; use crate::autobatcher::{self, BatchKind}; use crate::utils::{self, swap_index_uid_in_task}; -use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId}; +use crate::{Error, IndexSchedulerInner, ProcessingTasks, Result, TaskId}; /// Represents a combination of tasks that can all be processed at the same time. /// @@ -213,7 +213,7 @@ impl IndexOperation { } } -impl IndexScheduler { +impl IndexSchedulerInner { /// Convert an [`BatchKind`](crate::autobatcher::BatchKind) into a [`Batch`]. /// /// ## Arguments @@ -480,8 +480,7 @@ impl IndexScheduler { if let Some(task_id) = to_cancel.max() { // We retrieve the tasks that were processing before this tasks cancelation started. // We must *not* reset the processing tasks before calling this method. - let ProcessingTasks { started_at, processing, .. } = - &*self.processing_tasks.read().unwrap(); + let ProcessingTasks { started_at, processing, .. } = &*self.processing_tasks.read(); return Ok(Some(Batch::TaskCancelation { task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?, previous_started_at: *started_at, @@ -1392,7 +1391,7 @@ impl IndexScheduler { fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result { // 1. Remove from this list the tasks that we are not allowed to delete let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?; - let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); + let processing_tasks = &self.processing_tasks.read().processing.clone(); let all_task_ids = self.all_task_ids(wtxn)?; let mut to_delete_tasks = all_task_ids & matched_tasks; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 57f45b5c3..adbc06763 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -281,93 +281,10 @@ pub struct IndexSchedulerOptions { /// Structure which holds meilisearch's indexes and schedules the tasks /// to be performed on them. #[derive(Clone)] -pub struct IndexScheduler(Arc>>); - -/// This is the internal structure that keeps the indexes alive. -struct IndexSchedulerInner { - /// The LMDB environment which the DBs are associated with. - pub(crate) env: Env, - - /// A boolean that can be set to true to stop the currently processing tasks. - pub(crate) must_stop_processing: MustStopProcessing, - - /// The list of tasks currently processing - pub(crate) processing_tasks: Arc>, - - /// The list of files referenced by the tasks - pub(crate) file_store: FileStore, - - // The main database, it contains all the tasks accessible by their Id. - pub(crate) all_tasks: Database, SerdeJson>, - - /// All the tasks ids grouped by their status. - // TODO we should not be able to serialize a `Status::Processing` in this database. - pub(crate) status: Database, RoaringBitmapCodec>, - /// All the tasks ids grouped by their kind. - pub(crate) kind: Database, RoaringBitmapCodec>, - /// Store the tasks associated to an index. - pub(crate) index_tasks: Database, - - /// Store the tasks that were canceled by a task uid - pub(crate) canceled_by: Database, RoaringBitmapCodec>, - - /// Store the task ids of tasks which were enqueued at a specific date - pub(crate) enqueued_at: Database, CboRoaringBitmapCodec>, - - /// Store the task ids of finished tasks which started being processed at a specific date - pub(crate) started_at: Database, CboRoaringBitmapCodec>, - - /// Store the task ids of tasks which finished at a specific date - pub(crate) finished_at: Database, CboRoaringBitmapCodec>, - - /// In charge of creating, opening, storing and returning indexes. - pub(crate) index_mapper: IndexMapper, - - /// In charge of fetching and setting the status of experimental features. - features: features::FeatureData, - - /// Get a signal when a batch needs to be processed. - pub(crate) wake_up: Arc, - - /// Whether auto-batching is enabled or not. - pub(crate) autobatching_enabled: bool, - - /// The max number of tasks allowed before the scheduler starts to delete - /// the finished tasks automatically. - pub(crate) max_number_of_tasks: usize, - - /// The path used to create the dumps. - pub(crate) dumps_path: PathBuf, - - /// The path used to create the snapshots. - pub(crate) snapshots_path: PathBuf, - - /// The path to the folder containing the auth LMDB env. - pub(crate) auth_path: PathBuf, - - /// The path to the version file of Meilisearch. - pub(crate) version_file_path: PathBuf, - - /// The URL to the ZooKeeper cluster - pub(crate) zookeeper: Option>, - - // ================= test - // The next entry is dedicated to the tests. - /// Provide a way to set a breakpoint in multiple part of the scheduler. - /// - /// See [self.breakpoint()](`IndexScheduler::breakpoint`) for an explanation. - #[cfg(test)] - test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, - - /// A list of planned failures within the [`tick`](IndexScheduler::tick) method of the index scheduler. - /// - /// The first field is the iteration index and the second field identifies a location in the code. - #[cfg(test)] - planned_failures: Vec<(usize, tests::FailureLocation)>, - - /// A counter that is incremented before every call to [`tick`](IndexScheduler::tick) - #[cfg(test)] - run_loop_iteration: Arc>, +pub struct IndexScheduler { + inner: Arc>>, + zookeeper: Option>, + wake_up: Arc, } impl IndexScheduler { @@ -487,20 +404,27 @@ impl IndexScheduler { } } - let this = IndexScheduler(Arc::new(RwLock::new(inner))); + let this = IndexScheduler { + zookeeper: inner.zookeeper.clone(), + wake_up: inner.wake_up.clone(), + inner: Arc::new(RwLock::new(Some(inner))), + }; + this.run(); + Ok(this) } /// Returns a read lock of the `IndexScheduler` inner structure. pub fn inner(&self) -> MappedRwLockReadGuard { - RwLockReadGuard::map(self.0.read(), |opt| opt.unwrap()) + RwLockReadGuard::map(self.inner.read(), |opt| opt.as_ref().unwrap()) } /// Return `Ok(())` if the index scheduler is able to access one of its database. pub fn health(&self) -> Result<()> { - let rtxn = self.env.read_txn()?; - self.all_tasks.first(&rtxn)?; + let this = self.inner(); + let rtxn = this.env.read_txn()?; + this.all_tasks.first(&rtxn)?; Ok(()) } @@ -573,10 +497,6 @@ impl IndexScheduler { } } - pub fn read_txn(&self) -> Result { - self.env.read_txn().map_err(|e| e.into()) - } - /// Start the run loop for the given index scheduler. /// /// This function will execute in a different thread and must be called @@ -585,28 +505,37 @@ impl IndexScheduler { #[cfg(test)] self.breakpoint(Breakpoint::Init); - let latch = match self.zookeeper.clone() { - Some(zookeeper) => { + let latch = match self.zookeeper { + Some(ref zookeeper) => { let id = Uuid::new_v4().to_string(); let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string()); - - let this = self.private_clone(); + let wake_up = self.wake_up.clone(); + let latchc = latch.clone(); zookeeper .add_watch("/election", AddWatchMode::PersistentRecursive, move |_| { + if latchc.has_leadership() { + log::info!("I am the leader!"); + } else { + log::info!("I am *not* the leader!"); + } thread::sleep(Duration::from_secs(1)); - this.wake_up.signal(); + wake_up.signal(); }) .unwrap(); + latch.start().unwrap(); + // Join the potential leaders list. // The lowest in the list is the leader. And if we're not the leader // we watch the node right before us to be notified if he dies. // See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection let latchc = latch.clone(); - let this = self.private_clone(); + let this = self.clone(); zookeeper .add_watch("/snapshots", AddWatchMode::PersistentRecursive, move |event| { if !latchc.has_leadership() { + let inner = this.inner(); + let WatchedEvent { event_type, path, keeper_state: _ } = event; match event_type { WatchedEventType::NodeCreated => { @@ -628,7 +557,7 @@ impl IndexScheduler { // 2. Download all the databases let tasks_file = - tempfile::NamedTempFile::new_in(this.env.path()).unwrap(); + tempfile::NamedTempFile::new_in(inner.env.path()).unwrap(); log::info!("Downloading the index scheduler database."); let tasks_snapshot = snapshot_dir.join("tasks.mdb"); @@ -636,7 +565,7 @@ impl IndexScheduler { log::info!("Downloading the indexes databases"); let indexes_files = - tempfile::TempDir::new_in(&this.index_mapper.base_path) + tempfile::TempDir::new_in(&inner.index_mapper.base_path) .unwrap(); let mut indexes = Vec::new(); @@ -692,6 +621,8 @@ impl IndexScheduler { .expect("Internal, the /tasks directory was deleted during execution."); // TODO change me log::info!("Importing {} tasks", children.len()); + let inner = self.inner(); + let mut wtxn = inner.env.write_txn().unwrap(); for path in children { log::info!(" Importing {}", path); match zookeeper.get_data(&format!("/tasks/{}", &path), false) { @@ -700,9 +631,7 @@ impl IndexScheduler { log::info!(" Task {} was empty, skipping.", path); } else { let task = serde_json::from_slice(&task).unwrap(); - let mut wtxn = self.env.write_txn().unwrap(); - self.register_raw_task(&mut wtxn, &task).unwrap(); - wtxn.commit().unwrap(); + inner.register_raw_task(&mut wtxn, &task).unwrap(); // we received a new tasks, we must wake up self.wake_up.signal(); } @@ -713,12 +642,14 @@ impl IndexScheduler { // TODO: What happens if someone updates the files before we have the time // to setup the watcher } + wtxn.commit().unwrap(); } Err(e) => panic!("{e}"), } // TODO: fix unwrap by returning a clear error. - let this = self.private_clone(); + let this = self.clone(); + let zookeeperc = zookeeper.clone(); zookeeper .add_watch("/tasks", AddWatchMode::PersistentRecursive, move |event| { let WatchedEvent { event_type, path, keeper_state: _ } = event; @@ -727,15 +658,11 @@ impl IndexScheduler { let path = path.unwrap(); // Add raw task content in local DB log::info!("Received a new task from the cluster at {}", path); - let (data, _stat) = this - .zookeeper - .as_ref() - .unwrap() - .get_data(&path, false) - .unwrap(); + let inner = this.inner(); + let mut wtxn = inner.env.write_txn().unwrap(); + let (data, _stat) = zookeeperc.get_data(&path, false).unwrap(); let task = serde_json::from_slice(data.as_slice()).unwrap(); - let mut wtxn = this.env.write_txn().unwrap(); - this.register_raw_task(&mut wtxn, &task).unwrap(); + inner.register_raw_task(&mut wtxn, &task).unwrap(); wtxn.commit().unwrap(); } WatchedEventType::None @@ -755,28 +682,29 @@ impl IndexScheduler { None => None, }; - let this = self.private_clone(); + let this = self.clone(); thread::spawn(move || { loop { // we're either a leader or not running in a cluster, // either way we should wait until we receive a task. this.wake_up.wait(); - // TODO watch the /election node and send a signal once it changes (be careful about atomics ordering) if latch.as_ref().map_or(true, |latch| latch.has_leadership()) { - match this.tick() { + let inner = this.inner(); + match inner.tick() { Ok(TickOutcome::TickAgain(n)) => { // We must tick again. this.wake_up.signal(); // if we're in a cluster that means we're the leader // and should share a snapshot of what we've done. - if let Some(ref zookeeper) = this.zookeeper { + if let Some(ref zookeeper) = inner.zookeeper { // if nothing was processed we have nothing to do. if n == 0 { continue; } + let rtxn = inner.env.read_txn().unwrap(); let snapshot_path = zookeeper .create( "/snapshots/snapshot-", @@ -799,11 +727,11 @@ impl IndexScheduler { // 1. Snapshot the version file. let dst = snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME); - std::fs::copy(&this.version_file_path, dst).unwrap(); + std::fs::copy(&inner.version_file_path, dst).unwrap(); // 2. Snapshot the index-scheduler LMDB env log::info!("Snapshotting the tasks"); - let env = this.env.clone(); + let env = inner.env.clone(); env.copy_to_path( snapshot_dir.join("tasks.mdb"), heed::CompactionOption::Enabled, @@ -815,9 +743,7 @@ impl IndexScheduler { let dst = snapshot_dir.join("indexes"); std::fs::create_dir_all(&dst).unwrap(); - let this = this.private_clone(); - let rtxn = this.env.read_txn().unwrap(); - let indexes = this + let indexes = inner .index_mapper .index_mapping .iter(&rtxn) @@ -828,10 +754,8 @@ impl IndexScheduler { for (name, uuid) in indexes { log::info!(" Snapshotting index {name}"); - let this = this.private_clone(); let dst = dst.clone(); - let rtxn = this.env.read_txn().unwrap(); - let index = this.index_mapper.index(&rtxn, &name).unwrap(); + let index = inner.index_mapper.index(&rtxn, &name).unwrap(); index .copy_to_path( dst.join(format!("{uuid}.mdb")), @@ -851,12 +775,9 @@ impl IndexScheduler { ); // We can now delete all the tasks that has been processed - let processed = this - .processing_tasks - .read() - .unwrap() - .processed_previously() - .clone(); // we don't want to hold the mutex + // we don't want to hold the mutex + let processed = + inner.processing_tasks.read().processed_previously().clone(); log::info!("Deleting {} processed tasks", processed.len()); for task in processed { let node = format!("/tasks/task-{:0>10?}", task as i32); @@ -881,18 +802,14 @@ impl IndexScheduler { }); } - pub fn indexer_config(&self) -> &IndexerConfig { - &self.index_mapper.indexer_config - } - /// Return the real database size (i.e.: The size **with** the free pages) pub fn size(&self) -> Result { - Ok(self.env.real_disk_size()?) + Ok(self.inner().env.real_disk_size()?) } /// Return the used database size (i.e.: The size **without** the free pages) pub fn used_size(&self) -> Result { - Ok(self.env.non_free_pages_size()?) + Ok(self.inner().env.non_free_pages_size()?) } /// Return the index corresponding to the name. @@ -909,14 +826,16 @@ impl IndexScheduler { /// If you need to fetch information from or perform an action on all indexes, /// see the `try_for_each_index` function. pub fn index(&self, name: &str) -> Result { - let rtxn = self.env.read_txn()?; - self.index_mapper.index(&rtxn, name) + let this = self.inner(); + let rtxn = this.env.read_txn()?; + this.index_mapper.index(&rtxn, name) } /// Return the name of all indexes without opening them. pub fn index_names(&self) -> Result> { - let rtxn = self.env.read_txn()?; - self.index_mapper.index_names(&rtxn) + let this = self.inner(); + let rtxn = this.env.read_txn()?; + this.index_mapper.index_names(&rtxn) } /// Attempts `f` for each index that exists known to the index scheduler. @@ -933,15 +852,626 @@ impl IndexScheduler { where V: FromIterator, { - let rtxn = self.env.read_txn()?; - self.index_mapper.try_for_each_index(&rtxn, f) + let this = self.inner(); + let rtxn = this.env.read_txn()?; + this.index_mapper.try_for_each_index(&rtxn, f) + } + + /// The returned structure contains: + /// 1. The name of the property being observed can be `statuses`, `types`, or `indexes`. + /// 2. The name of the specific data related to the property can be `enqueued` for the `statuses`, `settingsUpdate` for the `types`, or the name of the index for the `indexes`, for example. + /// 3. The number of times the properties appeared. + pub fn get_stats(&self) -> Result>> { + let this = self.inner(); + let rtxn = this.read_txn()?; + + let mut res = BTreeMap::new(); + + res.insert( + "statuses".to_string(), + enum_iterator::all::() + .map(|s| Ok((s.to_string(), this.get_status(&rtxn, s)?.len()))) + .collect::>>()?, + ); + res.insert( + "types".to_string(), + enum_iterator::all::() + .map(|s| Ok((s.to_string(), this.get_kind(&rtxn, s)?.len()))) + .collect::>>()?, + ); + res.insert( + "indexes".to_string(), + this.index_tasks + .iter(&rtxn)? + .map(|res| Ok(res.map(|(name, bitmap)| (name.to_string(), bitmap.len()))?)) + .collect::>>()?, + ); + + Ok(res) + } + + // Return true if there is at least one task that is processing. + pub fn is_task_processing(&self) -> Result { + Ok(!self.inner().processing_tasks.read().processing.is_empty()) + } + + /// Return true iff there is at least one task associated with this index + /// that is processing. + pub fn is_index_processing(&self, index: &str) -> Result { + let this = self.inner(); + let rtxn = this.env.read_txn()?; + let processing_tasks = this.processing_tasks.read().processing.clone(); + let index_tasks = this.index_tasks(&rtxn, index)?; + let nbr_index_processing_tasks = processing_tasks.intersection_len(&index_tasks); + Ok(nbr_index_processing_tasks > 0) + } + + /// Return the task ids matching the query along with the total number of tasks + /// by ignoring the from and limit parameters from the user's point of view. + /// + /// There are two differences between an internal query and a query executed by + /// the user. + /// + /// 1. IndexSwap tasks are not publicly associated with any index, but they are associated + /// with many indexes internally. + /// 2. The user may not have the rights to access the tasks (internally) associated with all indexes. + pub fn get_task_ids_from_authorized_indexes( + &self, + query: &Query, + filters: &meilisearch_auth::AuthFilter, + ) -> Result<(RoaringBitmap, u64)> { + let this = self.inner(); + let rtxn = this.env.read_txn()?; + this.get_task_ids_from_authorized_indexes(&rtxn, query, filters) + } + + /// Return the tasks matching the query from the user's point of view along + /// with the total number of tasks matching the query, ignoring from and limit. + /// + /// There are two differences between an internal query and a query executed by + /// the user. + /// + /// 1. IndexSwap tasks are not publicly associated with any index, but they are associated + /// with many indexes internally. + /// 2. The user may not have the rights to access the tasks (internally) associated with all indexes. + pub fn get_tasks_from_authorized_indexes( + &self, + query: Query, + filters: &meilisearch_auth::AuthFilter, + ) -> Result<(Vec, u64)> { + let this = self.inner(); + let rtxn = this.env.read_txn()?; + + let (tasks, total) = this.get_task_ids_from_authorized_indexes(&rtxn, &query, filters)?; + let tasks = this.get_existing_tasks( + &rtxn, + tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), + )?; + + let ProcessingTasks { started_at, processing, .. } = this.processing_tasks.read().clone(); + + let ret = tasks.into_iter(); + if processing.is_empty() { + Ok((ret.collect(), total)) + } else { + Ok(( + ret.map(|task| { + if processing.contains(task.uid) { + Task { status: Status::Processing, started_at: Some(started_at), ..task } + } else { + task + } + }) + .collect(), + total, + )) + } + } + + /// Register a new task in the scheduler. + /// + /// If it fails and data was associated with the task, it tries to delete the associated data. + pub fn register(&self, kind: KindWithContent) -> Result { + self.inner().register(kind) + } + + /// Create a file and register it in the index scheduler. + /// + /// The returned file and uuid can be used to associate + /// some data to a task. The file will be kept until + /// the task has been fully processed. + pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> { + self.inner().create_update_file() + } + + #[cfg(test)] + pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, file_store::File)> { + Ok(self.inner().file_store.new_update_with_uuid(uuid)?) + } + + /// The size on disk taken by all the updates files contained in the `IndexScheduler`, in bytes. + pub fn compute_update_file_size(&self) -> Result { + Ok(self.inner().file_store.compute_total_size()?) + } + + /// Delete a file from the index scheduler. + /// + /// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method. + pub fn delete_update_file(&self, uuid: Uuid) -> Result<()> { + self.inner().delete_update_file(uuid) + } + + pub fn index_stats(&self, index_uid: &str) -> Result { + let is_indexing = self.is_index_processing(index_uid)?; + let this = self.inner(); + let rtxn = this.read_txn()?; + let index_stats = this.index_mapper.stats_of(&rtxn, index_uid)?; + Ok(IndexStats { is_indexing, inner_stats: index_stats }) + } + + pub fn features(&self) -> Result { + self.inner().features() + } + + /// Blocks the thread until the test handle asks to progress to/through this breakpoint. + /// + /// Two messages are sent through the channel for each breakpoint. + /// The first message is `(b, false)` and the second message is `(b, true)`. + /// + /// Since the channel has a capacity of zero, the `send` and `recv` calls wait for each other. + /// So when the index scheduler calls `test_breakpoint_sdr.send(b, false)`, it blocks + /// the thread until the test catches up by calling `test_breakpoint_rcv.recv()` enough. + /// From the test side, we call `recv()` repeatedly until we find the message `(breakpoint, false)`. + /// As soon as we find it, the index scheduler is unblocked but then wait again on the call to + /// `test_breakpoint_sdr.send(b, true)`. This message will only be able to send once the + /// test asks to progress to the next `(b2, false)`. + #[cfg(test)] + fn breakpoint(&self, b: Breakpoint) { + // We send two messages. The first one will sync with the call + // to `handle.wait_until(b)`. The second one will block until the + // the next call to `handle.wait_until(..)`. + self.test_breakpoint_sdr.send((b, false)).unwrap(); + // This one will only be able to be sent if the test handle stays alive. + // If it fails, then it means that we have exited the test. + // By crashing with `unwrap`, we kill the run loop. + self.test_breakpoint_sdr.send((b, true)).unwrap(); + } +} + +/// This is the internal structure that keeps the indexes alive. +pub struct IndexSchedulerInner { + /// The LMDB environment which the DBs are associated with. + pub(crate) env: Env, + + /// A boolean that can be set to true to stop the currently processing tasks. + pub(crate) must_stop_processing: MustStopProcessing, + + /// The list of tasks currently processing + pub(crate) processing_tasks: Arc>, + + /// The list of files referenced by the tasks + pub(crate) file_store: FileStore, + + // The main database, it contains all the tasks accessible by their Id. + pub(crate) all_tasks: Database, SerdeJson>, + + /// All the tasks ids grouped by their status. + // TODO we should not be able to serialize a `Status::Processing` in this database. + pub(crate) status: Database, RoaringBitmapCodec>, + /// All the tasks ids grouped by their kind. + pub(crate) kind: Database, RoaringBitmapCodec>, + /// Store the tasks associated to an index. + pub(crate) index_tasks: Database, + + /// Store the tasks that were canceled by a task uid + pub(crate) canceled_by: Database, RoaringBitmapCodec>, + + /// Store the task ids of tasks which were enqueued at a specific date + pub(crate) enqueued_at: Database, CboRoaringBitmapCodec>, + + /// Store the task ids of finished tasks which started being processed at a specific date + pub(crate) started_at: Database, CboRoaringBitmapCodec>, + + /// Store the task ids of tasks which finished at a specific date + pub(crate) finished_at: Database, CboRoaringBitmapCodec>, + + /// In charge of creating, opening, storing and returning indexes. + pub(crate) index_mapper: IndexMapper, + + /// In charge of fetching and setting the status of experimental features. + features: features::FeatureData, + + /// Get a signal when a batch needs to be processed. + pub(crate) wake_up: Arc, + + /// Whether auto-batching is enabled or not. + pub(crate) autobatching_enabled: bool, + + /// The max number of tasks allowed before the scheduler starts to delete + /// the finished tasks automatically. + pub(crate) max_number_of_tasks: usize, + + /// The path used to create the dumps. + pub(crate) dumps_path: PathBuf, + + /// The path used to create the snapshots. + pub(crate) snapshots_path: PathBuf, + + /// The path to the folder containing the auth LMDB env. + pub(crate) auth_path: PathBuf, + + /// The path to the version file of Meilisearch. + pub(crate) version_file_path: PathBuf, + + /// The URL to the ZooKeeper cluster + pub(crate) zookeeper: Option>, + + // ================= test + // The next entry is dedicated to the tests. + /// Provide a way to set a breakpoint in multiple part of the scheduler. + /// + /// See [self.breakpoint()](`IndexScheduler::breakpoint`) for an explanation. + #[cfg(test)] + test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, + + /// A list of planned failures within the [`tick`](IndexScheduler::tick) method of the index scheduler. + /// + /// The first field is the iteration index and the second field identifies a location in the code. + #[cfg(test)] + planned_failures: Vec<(usize, tests::FailureLocation)>, + + /// A counter that is incremented before every call to [`tick`](IndexScheduler::tick) + #[cfg(test)] + run_loop_iteration: Arc>, +} + +impl IndexSchedulerInner { + pub fn read_txn(&self) -> Result { + self.env.read_txn().map_err(|e| e.into()) + } + + /// Perform one iteration of the run loop. + /// + /// 1. See if we need to cleanup the task queue + /// 2. Find the next batch of tasks to be processed. + /// 3. Update the information of these tasks following the start of their processing. + /// 4. Update the in-memory list of processed tasks accordingly. + /// 5. Process the batch: + /// - perform the actions of each batched task + /// - update the information of each batched task following the end + /// of their processing. + /// 6. Reset the in-memory list of processed tasks. + /// + /// Returns the number of processed tasks. + fn tick(&self) -> Result { + #[cfg(test)] + { + *self.run_loop_iteration.write().unwrap() += 1; + self.breakpoint(Breakpoint::Start); + } + + puffin::GlobalProfiler::lock().new_frame(); + + self.cleanup_task_queue()?; + + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; + let batch = + match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { + Some(batch) => batch, + None => return Ok(TickOutcome::WaitForSignal), + }; + let index_uid = batch.index_uid().map(ToOwned::to_owned); + drop(rtxn); + + // 1. store the starting date with the bitmap of processing tasks. + let mut ids = batch.ids(); + ids.sort_unstable(); + let processed_tasks = ids.len(); + let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap(); + let started_at = OffsetDateTime::now_utc(); + + // We reset the must_stop flag to be sure that we don't stop processing tasks + self.must_stop_processing.reset(); + self.processing_tasks.write().start_processing_at(started_at, processing_tasks); + + #[cfg(test)] + self.breakpoint(Breakpoint::BatchCreated); + + // 2. Process the tasks + let res = thread::scope(|s| { + thread::Builder::new() + .name(String::from("batch-operation")) + .spawn_scoped(s, || self.process_batch(batch)) + .unwrap() + .join() + .unwrap() + }); + + #[cfg(test)] + self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?; + + let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; + + let finished_at = OffsetDateTime::now_utc(); + match res { + Ok(tasks) => { + #[cfg(test)] + self.breakpoint(Breakpoint::ProcessBatchSucceeded); + + #[allow(unused_variables)] + for (i, mut task) in tasks.into_iter().enumerate() { + task.started_at = Some(started_at); + task.finished_at = Some(finished_at); + + #[cfg(test)] + self.maybe_fail( + tests::FailureLocation::UpdatingTaskAfterProcessBatchSuccess { + task_uid: i as u32, + }, + )?; + + self.update_task(&mut wtxn, &task) + .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; + if let Err(e) = self.delete_persisted_task_data(&task) { + log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid); + } + } + log::info!("A batch of tasks was successfully completed."); + } + // If we have an abortion error we must stop the tick here and re-schedule tasks. + Err(Error::Milli(milli::Error::InternalError( + milli::InternalError::AbortedIndexation, + ))) => { + #[cfg(test)] + self.breakpoint(Breakpoint::AbortedIndexation); + wtxn.abort().map_err(Error::HeedTransaction)?; + + // We make sure that we don't call `stop_processing` on the `processing_tasks`, + // this is because we want to let the next tick call `create_next_batch` and keep + // the `started_at` date times and `processings` of the current processing tasks. + // This date time is used by the task cancelation to store the right `started_at` + // date in the task on disk. + return Ok(TickOutcome::TickAgain(0)); + } + // If an index said it was full, we need to: + // 1. identify which index is full + // 2. close the associated environment + // 3. resize it + // 4. re-schedule tasks + Err(Error::Milli(milli::Error::UserError( + milli::UserError::MaxDatabaseSizeReached, + ))) if index_uid.is_some() => { + // fixme: add index_uid to match to avoid the unwrap + let index_uid = index_uid.unwrap(); + // fixme: handle error more gracefully? not sure when this could happen + self.index_mapper.resize_index(&wtxn, &index_uid)?; + wtxn.abort().map_err(Error::HeedTransaction)?; + + return Ok(TickOutcome::TickAgain(0)); + } + // In case of a failure we must get back and patch all the tasks with the error. + Err(err) => { + #[cfg(test)] + self.breakpoint(Breakpoint::ProcessBatchFailed); + let error: ResponseError = err.into(); + for id in ids { + let mut task = self + .get_task(&wtxn, id) + .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? + .ok_or(Error::CorruptedTaskQueue)?; + task.started_at = Some(started_at); + task.finished_at = Some(finished_at); + task.status = Status::Failed; + task.error = Some(error.clone()); + task.details = task.details.map(|d| d.to_failed()); + + #[cfg(test)] + self.maybe_fail(tests::FailureLocation::UpdatingTaskAfterProcessBatchFailure)?; + + if let Err(e) = self.delete_persisted_task_data(&task) { + log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid); + } + self.update_task(&mut wtxn, &task) + .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; + } + } + } + + self.processing_tasks.write().stop_processing(); + + #[cfg(test)] + self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; + + wtxn.commit().map_err(Error::HeedTransaction)?; + + #[cfg(test)] + self.breakpoint(Breakpoint::AfterProcessing); + + Ok(TickOutcome::TickAgain(processed_tasks)) + } + + /// Register a task to cleanup the task queue if needed + fn cleanup_task_queue(&self) -> Result<()> { + let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; + + let nb_tasks = self.all_task_ids(&rtxn)?.len(); + // if we have less than 1M tasks everything is fine + if nb_tasks < self.max_number_of_tasks as u64 { + return Ok(()); + } + + let finished = self.status.get(&rtxn, &Status::Succeeded)?.unwrap_or_default() + | self.status.get(&rtxn, &Status::Failed)?.unwrap_or_default() + | self.status.get(&rtxn, &Status::Canceled)?.unwrap_or_default(); + + let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(100_000)); + + // /!\ the len must be at least 2 or else we might enter an infinite loop where we only delete + // the deletion tasks we enqueued ourselves. + if to_delete.len() < 2 { + log::warn!("The task queue is almost full, but no task can be deleted yet."); + // the only thing we can do is hope that the user tasks are going to finish + return Ok(()); + } + + log::info!( + "The task queue is almost full. Deleting the oldest {} finished tasks.", + to_delete.len() + ); + + // it's safe to unwrap here because we checked the len above + let newest_task_id = to_delete.iter().last().unwrap(); + let last_task_to_delete = + self.get_task(&rtxn, newest_task_id)?.ok_or(Error::CorruptedTaskQueue)?; + drop(rtxn); + + // increase time by one nanosecond so that the enqueuedAt of the last task to delete is also lower than that date. + let delete_before = last_task_to_delete.enqueued_at + Duration::from_nanos(1); + + self.register(KindWithContent::TaskDeletion { + query: format!( + "?beforeEnqueuedAt={}&statuses=succeeded,failed,canceled", + delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?, + ), + tasks: to_delete, + })?; + + Ok(()) + } + + pub fn indexer_config(&self) -> &IndexerConfig { + &self.index_mapper.indexer_config + } + + /// Register a new task in the scheduler. + /// + /// If it fails and data was associated with the task, it tries to delete the associated data. + pub fn register(&self, kind: KindWithContent) -> Result { + let id = match &self.zookeeper { + Some(zookeeper) => { + // Reserve uniq ID on zookeeper. And give it to the spawn blocking. + match zookeeper.create( + "/tasks/task-", + vec![], + Acl::open_unsafe().clone(), + CreateMode::PersistentSequential, + ) { + Ok(path) => path.rsplit_once('-').map(|(_, id)| id.parse::().unwrap()), + Err(e) => panic!("{e}"), + } + } + None => None, + }; + + let mut wtxn = self.env.write_txn()?; + + // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task + if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) + && (self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64 > 50 + { + return Err(Error::NoSpaceLeftInTaskQueue); + } + + // Retrieve the id generated by zookeeper or generate a local id. + let id = match id { + Some(id) => id as u32, + None => self.next_task_id(&wtxn)?, + }; + + let mut task = Task { + uid: id, + enqueued_at: OffsetDateTime::now_utc(), + started_at: None, + finished_at: None, + error: None, + canceled_by: None, + details: kind.default_details(), + status: Status::Enqueued, + kind: kind.clone(), + }; + // For deletion and cancelation tasks, we want to make extra sure that they + // don't attempt to delete/cancel tasks that are newer than themselves. + filter_out_references_to_newer_tasks(&mut task); + // If the register task is an index swap task, verify that it is well-formed + // (that it does not contain duplicate indexes). + check_index_swap_validity(&task)?; + + if self.zookeeper.is_none() { + self.register_raw_task(&mut wtxn, &task)?; + + if let Err(e) = wtxn.commit() { + self.delete_persisted_task_data(&task)?; + return Err(e.into()); + } + } + + // If the registered task is a task cancelation + // we inform the processing tasks to stop (if necessary). + if let KindWithContent::TaskCancelation { tasks, .. } = kind { + let tasks_to_cancel = RoaringBitmap::from_iter(tasks); + if self.processing_tasks.read().must_cancel_processing_tasks(&tasks_to_cancel) { + self.must_stop_processing.must_stop(); + } + } + + if self.zookeeper.is_none() { + // notify the scheduler loop to execute a new tick + self.wake_up.signal(); + } + + // TODO: send task to ZK in raw json. + if let Some(zookeeper) = &self.zookeeper { + // TODO: ugly unwrap + zookeeper + .set_data( + &format!("/tasks/task-{:0>10?}", id), + serde_json::to_vec_pretty(&task).unwrap(), + None, + ) + .unwrap(); + } + + Ok(task) + } + + /// Register a new task coming from a dump in the scheduler. + /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. + pub fn register_dumped_task(&self) -> Result { + Dump::new(self) + } + + /// Create a file and register it in the index scheduler. + /// + /// The returned file and uuid can be used to associate + /// some data to a task. The file will be kept until + /// the task has been fully processed. + pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> { + Ok(self.file_store.new_update()?) + } + + /// Delete a file from the index scheduler. + /// + /// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method. + pub fn delete_update_file(&self, uuid: Uuid) -> Result<()> { + Ok(self.file_store.delete(uuid)?) + } + + pub fn put_runtime_features(&self, features: RuntimeTogglableFeatures) -> Result<()> { + let wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; + self.features.put_runtime_features(wtxn, features)?; + Ok(()) + } + + pub fn features(&self) -> Result { + let rtxn = self.read_txn()?; + self.features.features(rtxn) } /// Return the task ids matched by the given query from the index scheduler's point of view. pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result { let ProcessingTasks { started_at: started_at_processing, processing: processing_tasks, .. - } = self.processing_tasks.read().unwrap().clone(); + } = self.processing_tasks.read().clone(); let mut tasks = self.all_task_ids(rtxn)?; @@ -1074,53 +1604,6 @@ impl IndexScheduler { Ok(tasks) } - /// The returned structure contains: - /// 1. The name of the property being observed can be `statuses`, `types`, or `indexes`. - /// 2. The name of the specific data related to the property can be `enqueued` for the `statuses`, `settingsUpdate` for the `types`, or the name of the index for the `indexes`, for example. - /// 3. The number of times the properties appeared. - pub fn get_stats(&self) -> Result>> { - let rtxn = self.read_txn()?; - - let mut res = BTreeMap::new(); - - res.insert( - "statuses".to_string(), - enum_iterator::all::() - .map(|s| Ok((s.to_string(), self.get_status(&rtxn, s)?.len()))) - .collect::>>()?, - ); - res.insert( - "types".to_string(), - enum_iterator::all::() - .map(|s| Ok((s.to_string(), self.get_kind(&rtxn, s)?.len()))) - .collect::>>()?, - ); - res.insert( - "indexes".to_string(), - self.index_tasks - .iter(&rtxn)? - .map(|res| Ok(res.map(|(name, bitmap)| (name.to_string(), bitmap.len()))?)) - .collect::>>()?, - ); - - Ok(res) - } - - // Return true if there is at least one task that is processing. - pub fn is_task_processing(&self) -> Result { - Ok(!self.processing_tasks.read().unwrap().processing.is_empty()) - } - - /// Return true iff there is at least one task associated with this index - /// that is processing. - pub fn is_index_processing(&self, index: &str) -> Result { - let rtxn = self.env.read_txn()?; - let processing_tasks = self.processing_tasks.read().unwrap().processing.clone(); - let index_tasks = self.index_tasks(&rtxn, index)?; - let nbr_index_processing_tasks = processing_tasks.intersection_len(&index_tasks); - Ok(nbr_index_processing_tasks > 0) - } - /// Return the task ids matching the query along with the total number of tasks /// by ignoring the from and limit parameters from the user's point of view. /// @@ -1167,142 +1650,6 @@ impl IndexScheduler { Ok((tasks, total_tasks.len())) } - /// Return the tasks matching the query from the user's point of view along - /// with the total number of tasks matching the query, ignoring from and limit. - /// - /// There are two differences between an internal query and a query executed by - /// the user. - /// - /// 1. IndexSwap tasks are not publicly associated with any index, but they are associated - /// with many indexes internally. - /// 2. The user may not have the rights to access the tasks (internally) associated with all indexes. - pub fn get_tasks_from_authorized_indexes( - &self, - query: Query, - filters: &meilisearch_auth::AuthFilter, - ) -> Result<(Vec, u64)> { - let rtxn = self.env.read_txn()?; - - let (tasks, total) = self.get_task_ids_from_authorized_indexes(&rtxn, &query, filters)?; - let tasks = self.get_existing_tasks( - &rtxn, - tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), - )?; - - let ProcessingTasks { started_at, processing, .. } = - self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone(); - - let ret = tasks.into_iter(); - if processing.is_empty() { - Ok((ret.collect(), total)) - } else { - Ok(( - ret.map(|task| { - if processing.contains(task.uid) { - Task { status: Status::Processing, started_at: Some(started_at), ..task } - } else { - task - } - }) - .collect(), - total, - )) - } - } - - /// Register a new task in the scheduler. - /// - /// If it fails and data was associated with the task, it tries to delete the associated data. - pub fn register(&self, kind: KindWithContent) -> Result { - let id = match &self.zookeeper { - Some(zookeeper) => { - // Reserve uniq ID on zookeeper. And give it to the spawn blocking. - match zookeeper.create( - "/tasks/task-", - vec![], - Acl::open_unsafe().clone(), - CreateMode::PersistentSequential, - ) { - Ok(path) => path.rsplit_once('-').map(|(_, id)| id.parse::().unwrap()), - Err(e) => panic!("{e}"), - } - } - None => None, - }; - - let this = self.private_clone(); - let mut wtxn = this.env.write_txn()?; - - // if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task - if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty()) - && (this.env.non_free_pages_size()? * 100) / this.env.map_size()? as u64 > 50 - { - return Err(Error::NoSpaceLeftInTaskQueue); - } - - // Retrieve the id generated by zookeeper or generate a local id. - let id = match id { - Some(id) => id as u32, - None => this.next_task_id(&wtxn)?, - }; - - let mut task = Task { - uid: id, - enqueued_at: OffsetDateTime::now_utc(), - started_at: None, - finished_at: None, - error: None, - canceled_by: None, - details: kind.default_details(), - status: Status::Enqueued, - kind: kind.clone(), - }; - // For deletion and cancelation tasks, we want to make extra sure that they - // don't attempt to delete/cancel tasks that are newer than themselves. - filter_out_references_to_newer_tasks(&mut task); - // If the register task is an index swap task, verify that it is well-formed - // (that it does not contain duplicate indexes). - check_index_swap_validity(&task)?; - - if self.zookeeper.is_none() { - this.register_raw_task(&mut wtxn, &task)?; - - if let Err(e) = wtxn.commit() { - this.delete_persisted_task_data(&task)?; - return Err(e.into()); - } - } - - // If the registered task is a task cancelation - // we inform the processing tasks to stop (if necessary). - if let KindWithContent::TaskCancelation { tasks, .. } = kind { - let tasks_to_cancel = RoaringBitmap::from_iter(tasks); - if this.processing_tasks.read().unwrap().must_cancel_processing_tasks(&tasks_to_cancel) - { - this.must_stop_processing.must_stop(); - } - } - - if self.zookeeper.is_none() { - // notify the scheduler loop to execute a new tick - this.wake_up.signal(); - } - - // TODO: send task to ZK in raw json. - if let Some(zookeeper) = &self.zookeeper { - // TODO: ugly unwrap - zookeeper - .set_data( - &format!("/tasks/task-{:0>10?}", id), - serde_json::to_vec_pretty(&task).unwrap(), - None, - ) - .unwrap(); - } - - Ok(task) - } - pub fn register_raw_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { self.all_tasks.put(wtxn, &BEU32::new(task.uid), &task)?; @@ -1323,10 +1670,11 @@ impl IndexScheduler { utils::insert_task_datetime(wtxn, self.enqueued_at, task.enqueued_at, task.uid) } - /// Register a new task coming from a dump in the scheduler. - /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. - pub fn register_dumped_task(&mut self) -> Result { - Dump::new(self) + pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> { + match task.content_uuid() { + Some(content_file) => self.delete_update_file(content_file), + None => Ok(()), + } } /// Create a new index without any associated task. @@ -1339,295 +1687,10 @@ impl IndexScheduler { let index = self.index_mapper.create_index(wtxn, name, date)?; Ok(index) } - - /// Create a file and register it in the index scheduler. - /// - /// The returned file and uuid can be used to associate - /// some data to a task. The file will be kept until - /// the task has been fully processed. - pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> { - Ok(self.file_store.new_update()?) - } - - #[cfg(test)] - pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, file_store::File)> { - Ok(self.file_store.new_update_with_uuid(uuid)?) - } - - /// The size on disk taken by all the updates files contained in the `IndexScheduler`, in bytes. - pub fn compute_update_file_size(&self) -> Result { - Ok(self.file_store.compute_total_size()?) - } - - /// Delete a file from the index scheduler. - /// - /// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method. - pub fn delete_update_file(&self, uuid: Uuid) -> Result<()> { - Ok(self.file_store.delete(uuid)?) - } - - /// Perform one iteration of the run loop. - /// - /// 1. See if we need to cleanup the task queue - /// 2. Find the next batch of tasks to be processed. - /// 3. Update the information of these tasks following the start of their processing. - /// 4. Update the in-memory list of processed tasks accordingly. - /// 5. Process the batch: - /// - perform the actions of each batched task - /// - update the information of each batched task following the end - /// of their processing. - /// 6. Reset the in-memory list of processed tasks. - /// - /// Returns the number of processed tasks. - fn tick(&self) -> Result { - #[cfg(test)] - { - *self.run_loop_iteration.write().unwrap() += 1; - self.breakpoint(Breakpoint::Start); - } - - puffin::GlobalProfiler::lock().new_frame(); - - self.cleanup_task_queue()?; - - let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; - let batch = - match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { - Some(batch) => batch, - None => return Ok(TickOutcome::WaitForSignal), - }; - let index_uid = batch.index_uid().map(ToOwned::to_owned); - drop(rtxn); - - // 1. store the starting date with the bitmap of processing tasks. - let mut ids = batch.ids(); - ids.sort_unstable(); - let processed_tasks = ids.len(); - let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap(); - let started_at = OffsetDateTime::now_utc(); - - // We reset the must_stop flag to be sure that we don't stop processing tasks - self.must_stop_processing.reset(); - self.processing_tasks.write().unwrap().start_processing_at(started_at, processing_tasks); - - #[cfg(test)] - self.breakpoint(Breakpoint::BatchCreated); - - // 2. Process the tasks - let res = { - let cloned_index_scheduler = self.private_clone(); - let handle = thread::Builder::new() - .name(String::from("batch-operation")) - .spawn(move || cloned_index_scheduler.process_batch(batch)) - .unwrap(); - handle.join().unwrap_or(Err(Error::ProcessBatchPanicked)) - }; - - #[cfg(test)] - self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?; - - let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; - - let finished_at = OffsetDateTime::now_utc(); - match res { - Ok(tasks) => { - #[cfg(test)] - self.breakpoint(Breakpoint::ProcessBatchSucceeded); - - #[allow(unused_variables)] - for (i, mut task) in tasks.into_iter().enumerate() { - task.started_at = Some(started_at); - task.finished_at = Some(finished_at); - - #[cfg(test)] - self.maybe_fail( - tests::FailureLocation::UpdatingTaskAfterProcessBatchSuccess { - task_uid: i as u32, - }, - )?; - - self.update_task(&mut wtxn, &task) - .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; - if let Err(e) = self.delete_persisted_task_data(&task) { - log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid); - } - } - log::info!("A batch of tasks was successfully completed."); - } - // If we have an abortion error we must stop the tick here and re-schedule tasks. - Err(Error::Milli(milli::Error::InternalError( - milli::InternalError::AbortedIndexation, - ))) => { - #[cfg(test)] - self.breakpoint(Breakpoint::AbortedIndexation); - wtxn.abort().map_err(Error::HeedTransaction)?; - - // We make sure that we don't call `stop_processing` on the `processing_tasks`, - // this is because we want to let the next tick call `create_next_batch` and keep - // the `started_at` date times and `processings` of the current processing tasks. - // This date time is used by the task cancelation to store the right `started_at` - // date in the task on disk. - return Ok(TickOutcome::TickAgain(0)); - } - // If an index said it was full, we need to: - // 1. identify which index is full - // 2. close the associated environment - // 3. resize it - // 4. re-schedule tasks - Err(Error::Milli(milli::Error::UserError( - milli::UserError::MaxDatabaseSizeReached, - ))) if index_uid.is_some() => { - // fixme: add index_uid to match to avoid the unwrap - let index_uid = index_uid.unwrap(); - // fixme: handle error more gracefully? not sure when this could happen - self.index_mapper.resize_index(&wtxn, &index_uid)?; - wtxn.abort().map_err(Error::HeedTransaction)?; - - return Ok(TickOutcome::TickAgain(0)); - } - // In case of a failure we must get back and patch all the tasks with the error. - Err(err) => { - #[cfg(test)] - self.breakpoint(Breakpoint::ProcessBatchFailed); - let error: ResponseError = err.into(); - for id in ids { - let mut task = self - .get_task(&wtxn, id) - .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? - .ok_or(Error::CorruptedTaskQueue)?; - task.started_at = Some(started_at); - task.finished_at = Some(finished_at); - task.status = Status::Failed; - task.error = Some(error.clone()); - task.details = task.details.map(|d| d.to_failed()); - - #[cfg(test)] - self.maybe_fail(tests::FailureLocation::UpdatingTaskAfterProcessBatchFailure)?; - - if let Err(e) = self.delete_persisted_task_data(&task) { - log::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid); - } - self.update_task(&mut wtxn, &task) - .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; - } - } - } - - self.processing_tasks.write().unwrap().stop_processing(); - - #[cfg(test)] - self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; - - wtxn.commit().map_err(Error::HeedTransaction)?; - - #[cfg(test)] - self.breakpoint(Breakpoint::AfterProcessing); - - Ok(TickOutcome::TickAgain(processed_tasks)) - } - - /// Register a task to cleanup the task queue if needed - fn cleanup_task_queue(&self) -> Result<()> { - let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; - - let nb_tasks = self.all_task_ids(&rtxn)?.len(); - // if we have less than 1M tasks everything is fine - if nb_tasks < self.max_number_of_tasks as u64 { - return Ok(()); - } - - let finished = self.status.get(&rtxn, &Status::Succeeded)?.unwrap_or_default() - | self.status.get(&rtxn, &Status::Failed)?.unwrap_or_default() - | self.status.get(&rtxn, &Status::Canceled)?.unwrap_or_default(); - - let to_delete = RoaringBitmap::from_iter(finished.into_iter().rev().take(100_000)); - - // /!\ the len must be at least 2 or else we might enter an infinite loop where we only delete - // the deletion tasks we enqueued ourselves. - if to_delete.len() < 2 { - log::warn!("The task queue is almost full, but no task can be deleted yet."); - // the only thing we can do is hope that the user tasks are going to finish - return Ok(()); - } - - log::info!( - "The task queue is almost full. Deleting the oldest {} finished tasks.", - to_delete.len() - ); - - // it's safe to unwrap here because we checked the len above - let newest_task_id = to_delete.iter().last().unwrap(); - let last_task_to_delete = - self.get_task(&rtxn, newest_task_id)?.ok_or(Error::CorruptedTaskQueue)?; - drop(rtxn); - - // increase time by one nanosecond so that the enqueuedAt of the last task to delete is also lower than that date. - let delete_before = last_task_to_delete.enqueued_at + Duration::from_nanos(1); - - self.register(KindWithContent::TaskDeletion { - query: format!( - "?beforeEnqueuedAt={}&statuses=succeeded,failed,canceled", - delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?, - ), - tasks: to_delete, - })?; - - Ok(()) - } - - pub fn index_stats(&self, index_uid: &str) -> Result { - let is_indexing = self.is_index_processing(index_uid)?; - let rtxn = self.read_txn()?; - let index_stats = self.index_mapper.stats_of(&rtxn, index_uid)?; - - Ok(IndexStats { is_indexing, inner_stats: index_stats }) - } - - pub fn features(&self) -> Result { - let rtxn = self.read_txn()?; - self.features.features(rtxn) - } - - pub fn put_runtime_features(&self, features: RuntimeTogglableFeatures) -> Result<()> { - let wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; - self.features.put_runtime_features(wtxn, features)?; - Ok(()) - } - - pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> { - match task.content_uuid() { - Some(content_file) => self.delete_update_file(content_file), - None => Ok(()), - } - } - - /// Blocks the thread until the test handle asks to progress to/through this breakpoint. - /// - /// Two messages are sent through the channel for each breakpoint. - /// The first message is `(b, false)` and the second message is `(b, true)`. - /// - /// Since the channel has a capacity of zero, the `send` and `recv` calls wait for each other. - /// So when the index scheduler calls `test_breakpoint_sdr.send(b, false)`, it blocks - /// the thread until the test catches up by calling `test_breakpoint_rcv.recv()` enough. - /// From the test side, we call `recv()` repeatedly until we find the message `(breakpoint, false)`. - /// As soon as we find it, the index scheduler is unblocked but then wait again on the call to - /// `test_breakpoint_sdr.send(b, true)`. This message will only be able to send once the - /// test asks to progress to the next `(b2, false)`. - #[cfg(test)] - fn breakpoint(&self, b: Breakpoint) { - // We send two messages. The first one will sync with the call - // to `handle.wait_until(b)`. The second one will block until the - // the next call to `handle.wait_until(..)`. - self.test_breakpoint_sdr.send((b, false)).unwrap(); - // This one will only be able to be sent if the test handle stays alive. - // If it fails, then it means that we have exited the test. - // By crashing with `unwrap`, we kill the run loop. - self.test_breakpoint_sdr.send((b, true)).unwrap(); - } } pub struct Dump<'a> { - index_scheduler: &'a IndexScheduler, + index_scheduler: &'a IndexSchedulerInner, wtxn: RwTxn<'a, 'a>, indexes: HashMap, @@ -1636,7 +1699,7 @@ pub struct Dump<'a> { } impl<'a> Dump<'a> { - pub(crate) fn new(index_scheduler: &'a mut IndexScheduler) -> Result { + pub(crate) fn new(index_scheduler: &'a IndexSchedulerInner) -> Result { // While loading a dump no one should be able to access the scheduler thus I can block everything. let wtxn = index_scheduler.env.write_txn()?; diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index 3971d9116..894c2b3de 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -10,9 +10,9 @@ use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status use roaring::{MultiOps, RoaringBitmap}; use time::OffsetDateTime; -use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128}; +use crate::{Error, IndexSchedulerInner, Result, Task, TaskId, BEI128}; -impl IndexScheduler { +impl IndexSchedulerInner { pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result { enum_iterator::all().map(|s| self.get_status(rtxn, s)).union() } diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index fb9b0bb66..bb2ff0db1 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -280,6 +280,7 @@ fn import_dump( auth: &mut AuthController, ) -> Result<(), anyhow::Error> { let reader = File::open(dump_path)?; + let index_scheduler = index_scheduler.inner(); let mut dump_reader = dump::DumpReader::open(reader)?; if let Some(date) = dump_reader.date() { diff --git a/meilisearch/src/routes/features.rs b/meilisearch/src/routes/features.rs index a2822b4d4..0c5d7f58b 100644 --- a/meilisearch/src/routes/features.rs +++ b/meilisearch/src/routes/features.rs @@ -78,6 +78,6 @@ async fn patch_features( }), Some(&req), ); - index_scheduler.put_runtime_features(new_features)?; + index_scheduler.inner().put_runtime_features(new_features)?; Ok(HttpResponse::Ok().json(new_features)) } diff --git a/meilisearch/src/routes/tasks.rs b/meilisearch/src/routes/tasks.rs index 1c2da6cef..aaaa86dfc 100644 --- a/meilisearch/src/routes/tasks.rs +++ b/meilisearch/src/routes/tasks.rs @@ -324,11 +324,8 @@ async fn cancel_tasks( let query = params.into_query(); - let (tasks, _) = index_scheduler.get_task_ids_from_authorized_indexes( - &index_scheduler.read_txn()?, - &query, - index_scheduler.filters(), - )?; + let (tasks, _) = + index_scheduler.get_task_ids_from_authorized_indexes(&query, index_scheduler.filters())?; let task_cancelation = KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks }; @@ -369,11 +366,8 @@ async fn delete_tasks( ); let query = params.into_query(); - let (tasks, _) = index_scheduler.get_task_ids_from_authorized_indexes( - &index_scheduler.read_txn()?, - &query, - index_scheduler.filters(), - )?; + let (tasks, _) = + index_scheduler.get_task_ids_from_authorized_indexes(&query, index_scheduler.filters())?; let task_deletion = KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };