diff --git a/Cargo.lock b/Cargo.lock index 701156b46..a7cdd16e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2547,6 +2547,7 @@ dependencies = [ "bytes", "cargo_toml", "clap 4.0.32", + "cluster", "crossbeam-channel", "deserr", "dump", diff --git a/cluster/src/leader.rs b/cluster/src/leader.rs index 8159de4f3..377c3a786 100644 --- a/cluster/src/leader.rs +++ b/cluster/src/leader.rs @@ -6,10 +6,12 @@ use bus::{Bus, BusReader}; use crossbeam::channel::{unbounded, Receiver, Sender}; use ductile::{ChannelReceiver, ChannelSender, ChannelServer}; use log::info; +use meilisearch_types::tasks::Task; use crate::batch::Batch; use crate::{Consistency, FollowerMsg, LeaderMsg}; +#[derive(Clone)] pub struct Leader { task_ready_to_commit: Receiver, broadcast_to_follower: Sender, @@ -149,4 +151,10 @@ impl Leader { self.batch_id += 1; } + + pub fn register_new_task(&mut self, task: Task, update_file: Option>) { + self.broadcast_to_follower + .send(LeaderMsg::RegisterNewTask { task, update_file }) + .expect("Main thread is dead"); + } } diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 625ebc4ac..c464aa065 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -1,8 +1,10 @@ use std::net::ToSocketAddrs; use batch::Batch; +use crossbeam::channel::{unbounded, Receiver, Sender}; use ductile::{connect_channel, ChannelReceiver, ChannelSender}; -use meilisearch_types::tasks::KindWithContent; +use log::info; +use meilisearch_types::tasks::{KindWithContent, Task}; use serde::{Deserialize, Serialize}; pub mod batch; @@ -24,6 +26,8 @@ pub enum LeaderMsg { StartBatch { id: u32, batch: Batch }, // Tell the follower to commit the update asap Commit(u32), + // Tell the follower to commit the update asap + RegisterNewTask { task: Task, update_file: Option> }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -42,40 +46,87 @@ pub enum Consistency { All, } +#[derive(Clone)] pub struct Follower { sender: ChannelSender, - receiver: ChannelReceiver, + + get_batch: Receiver<(u32, Batch)>, + must_commit: Receiver, + register_new_task: Receiver<(Task, Option>)>, + batch_id: u32, } impl Follower { pub fn join(leader: impl ToSocketAddrs) -> Follower { let (sender, receiver) = connect_channel(leader).unwrap(); - Follower { sender, receiver, batch_id: 0 } + + info!("Connection to the leader established"); + + let (get_batch_sender, get_batch_receiver) = unbounded(); + let (must_commit_sender, must_commit_receiver) = unbounded(); + let (register_task_sender, register_task_receiver) = unbounded(); + + std::thread::spawn(move || { + Self::router(receiver, get_batch_sender, must_commit_sender, register_task_sender); + }); + + Follower { + sender, + get_batch: get_batch_receiver, + must_commit: must_commit_receiver, + register_new_task: register_task_receiver, + batch_id: 0, + } + } + + fn router( + receiver: ChannelReceiver, + get_batch: Sender<(u32, Batch)>, + must_commit: Sender, + register_new_task: Sender<(Task, Option>)>, + ) { + loop { + match receiver.recv().expect("Lost connection to the leader") { + LeaderMsg::StartBatch { id, batch } => { + info!("Starting to process a new batch"); + get_batch.send((id, batch)).expect("Lost connection to the main thread") + } + LeaderMsg::Commit(id) => { + info!("Must commit"); + must_commit.send(id).expect("Lost connection to the main thread") + } + LeaderMsg::RegisterNewTask { task, update_file } => { + info!("Registered a new task"); + register_new_task + .send((task, update_file)) + .expect("Lost connection to the main thread") + } + } + } } pub fn get_new_batch(&mut self) -> Batch { - loop { - match self.receiver.recv() { - Ok(LeaderMsg::StartBatch { id, batch }) if id == self.batch_id => { - self.batch_id = id; - break batch; - } - Err(_) => log::error!("lost connection to the leader"), - _ => (), - } - } + let (id, batch) = self.get_batch.recv().expect("Lost connection to the leader"); + self.batch_id = id; + batch } pub fn ready_to_commit(&mut self) { self.sender.send(FollowerMsg::ReadyToCommit(self.batch_id)).unwrap(); loop { - match self.receiver.recv() { - Ok(LeaderMsg::Commit(id)) if id == self.batch_id => break, - Err(_) => panic!("lost connection to the leader"), - _ => (), + let id = self.must_commit.recv().expect("Lost connection to the leader"); + #[allow(clippy::comparison_chain)] + if id == self.batch_id { + break; + } else if id > self.batch_id { + panic!("We missed a batch"); } } } + + pub fn get_new_task(&mut self) -> (Task, Option>) { + self.register_new_task.recv().unwrap() + } } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index b1d69de4a..e55654e1f 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1430,18 +1430,20 @@ impl IndexScheduler { Ok(match batch { CBatch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => { Batch::TaskCancelation { - task: self.get_existing_tasks(rtxn, Some(task))?[0], + task: self.get_existing_tasks(rtxn, Some(task))?[0].clone(), previous_started_at, previous_processing_tasks, } } CBatch::TaskDeletion(task) => { - Batch::TaskDeletion(self.get_existing_tasks(rtxn, Some(task))?[0]) + Batch::TaskDeletion(self.get_existing_tasks(rtxn, Some(task))?[0].clone()) } CBatch::SnapshotCreation(tasks) => { Batch::SnapshotCreation(self.get_existing_tasks(rtxn, tasks)?) } - CBatch::Dump(task) => Batch::Dump(self.get_existing_tasks(rtxn, Some(task))?[0]), + CBatch::Dump(task) => { + Batch::Dump(self.get_existing_tasks(rtxn, Some(task))?[0].clone()) + } CBatch::IndexOperation { op, must_create_index } => Batch::IndexOperation { op: self.get_index_op_from_cluster_index_op(rtxn, op)?, must_create_index, @@ -1449,12 +1451,12 @@ impl IndexScheduler { CBatch::IndexCreation { index_uid, primary_key, task } => Batch::IndexCreation { index_uid, primary_key, - task: self.get_existing_tasks(rtxn, Some(task))?[0], + task: self.get_existing_tasks(rtxn, Some(task))?[0].clone(), }, CBatch::IndexUpdate { index_uid, primary_key, task } => Batch::IndexUpdate { index_uid, primary_key, - task: self.get_existing_tasks(rtxn, Some(task))?[0], + task: self.get_existing_tasks(rtxn, Some(task))?[0].clone(), }, CBatch::IndexDeletion { index_uid, tasks, index_has_been_created } => { Batch::IndexDeletion { @@ -1464,7 +1466,7 @@ impl IndexScheduler { } } CBatch::IndexSwap { task } => { - Batch::IndexSwap { task: self.get_existing_tasks(rtxn, Some(task))?[0] } + Batch::IndexSwap { task: self.get_existing_tasks(rtxn, Some(task))?[0].clone() } } }) } @@ -1559,10 +1561,20 @@ impl From for cluster::batch::Batch { Batch::IndexOperation { op, must_create_index } => { CBatch::IndexOperation { op: op.into(), must_create_index } } - Batch::IndexCreation { index_uid, primary_key, task } => todo!(), - Batch::IndexUpdate { index_uid, primary_key, task } => todo!(), - Batch::IndexDeletion { index_uid, tasks, index_has_been_created } => todo!(), - Batch::IndexSwap { task } => todo!(), + Batch::IndexCreation { index_uid, primary_key, task } => { + CBatch::IndexCreation { index_uid, primary_key, task: task.uid } + } + Batch::IndexUpdate { index_uid, primary_key, task } => { + CBatch::IndexUpdate { index_uid, primary_key, task: task.uid } + } + Batch::IndexDeletion { index_uid, tasks, index_has_been_created } => { + CBatch::IndexDeletion { + index_uid, + tasks: tasks.into_iter().map(|task| task.uid).collect(), + index_has_been_created, + } + } + Batch::IndexSwap { task } => CBatch::IndexSwap { task: task.uid }, } } } diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index dcc348c98..9323ace17 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -33,6 +33,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { snapshots_path: _, auth_path: _, version_file_path: _, + cluster: _, test_breakpoint_sdr: _, planned_failures: _, run_loop_iteration: _, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 0ef7195ca..a37a1f8c9 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -31,6 +31,7 @@ mod uuid_codec; pub type Result = std::result::Result; pub type TaskId = u32; +use std::io::Write; use std::ops::{Bound, RangeBounds}; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; @@ -39,7 +40,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use batch::Batch; -use cluster::{Consistency, Follower, Leader}; +use cluster::{Follower, Leader}; use dump::{KindDump, TaskDump, UpdateFile}; pub use error::Error; use file_store::FileStore; @@ -52,6 +53,7 @@ use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use roaring::RoaringBitmap; +use serde::Deserialize; use synchronoise::SignalEvent; use time::OffsetDateTime; use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; @@ -326,9 +328,28 @@ pub struct IndexScheduler { run_loop_iteration: Arc>, } -enum Cluster { - Leader(RwLock), - Follower(RwLock), +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)] +pub enum ClusterMode { + Leader, + Follower, +} + +impl std::str::FromStr for ClusterMode { + type Err = (); + + fn from_str(s: &str) -> std::result::Result { + match s { + "leader" => Ok(ClusterMode::Leader), + "follower" => Ok(ClusterMode::Follower), + _ => Err(()), + } + } +} + +#[derive(Clone)] +pub enum Cluster { + Leader(Arc>), + Follower(Arc>), } impl IndexScheduler { @@ -368,6 +389,7 @@ impl IndexScheduler { /// Create an index scheduler and start its run loop. pub fn new( options: IndexSchedulerOptions, + cluster: Option, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, ) -> Result { @@ -427,7 +449,7 @@ impl IndexScheduler { snapshots_path: options.snapshots_path, auth_path: options.auth_path, version_file_path: options.version_file_path, - cluster: None, + cluster, #[cfg(test)] test_breakpoint_sdr, @@ -520,6 +542,16 @@ impl IndexScheduler { /// only once per index scheduler. fn run(&self) { let run = self.private_clone(); + + // if we're a follower we starts a thread to register the tasks coming from the leader + if let Some(Cluster::Follower(follower)) = self.cluster.clone() { + let this = self.private_clone(); + std::thread::spawn(move || loop { + let (task, content) = follower.write().unwrap().get_new_task(); + this.register_raw_task(task, content); + }); + } + std::thread::Builder::new() .name(String::from("scheduler")) .spawn(move || { @@ -877,6 +909,16 @@ impl IndexScheduler { return Err(e.into()); } + if let Some(Cluster::Leader(leader)) = &self.cluster { + let update_file = if let Some(uuid) = task.content_uuid() { + let path = self.file_store.get_update_path(uuid); + Some(std::fs::read(path).unwrap()) + } else { + None + }; + leader.write().unwrap().register_new_task(task.clone(), update_file); + } + // If the registered task is a task cancelation // we inform the processing tasks to stop (if necessary). if let KindWithContent::TaskCancelation { tasks, .. } = kind { @@ -1006,6 +1048,44 @@ impl IndexScheduler { Ok(task) } + /// /!\ should only be used when you're a follower in cluster mode + pub fn register_raw_task(&self, task: Task, content_file: Option>) { + if let Some(content) = content_file { + let uuid = task.content_uuid().expect("bad task"); + let (_, mut file) = self.file_store.new_update_with_uuid(uuid.as_u128()).unwrap(); + file.write_all(&content).unwrap(); + file.persist().unwrap(); + } + + let mut wtxn = self.env.write_txn().unwrap(); + + self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task).unwrap(); + + for index in task.indexes() { + self.update_index(&mut wtxn, index, |bitmap| { + bitmap.insert(task.uid); + }) + .unwrap(); + } + + self.update_status(&mut wtxn, task.status, |bitmap| { + bitmap.insert(task.uid); + }) + .unwrap(); + + self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| { + (bitmap.insert(task.uid)); + }) + .unwrap(); + + utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid) + .unwrap(); + + wtxn.commit().unwrap(); + + self.wake_up.signal(); + } + /// Create a new index without any associated task. pub fn create_raw_index( &self, @@ -1335,7 +1415,7 @@ mod tests { autobatching_enabled, }; - let index_scheduler = Self::new(options, sender, planned_failures).unwrap(); + let index_scheduler = Self::new(options, None, sender, planned_failures).unwrap(); // To be 100% consistent between all test we're going to start the scheduler right now // and ensure it's in the expected starting state. diff --git a/meilisearch/Cargo.toml b/meilisearch/Cargo.toml index 809975ec7..11daa64a7 100644 --- a/meilisearch/Cargo.toml +++ b/meilisearch/Cargo.toml @@ -24,6 +24,7 @@ bstr = "1.0.1" byte-unit = { version = "4.0.14", default-features = false, features = ["std", "serde"] } bytes = "1.2.1" clap = { version = "4.0.9", features = ["derive", "env"] } +cluster = { path = "../cluster" } crossbeam-channel = "0.5.6" deserr = "0.5.0" dump = { path = "../dump" } diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 13c236983..a8c15a698 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -12,8 +12,9 @@ pub mod search; use std::fs::File; use std::io::{BufReader, BufWriter}; +use std::net::ToSocketAddrs; use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread; use std::time::Duration; @@ -25,10 +26,11 @@ use actix_web::web::Data; use actix_web::{web, HttpRequest}; use analytics::Analytics; use anyhow::bail; +use cluster::{Follower, Leader}; use error::PayloadError; use extractors::payload::PayloadConfig; use http::header::CONTENT_TYPE; -use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; +use index_scheduler::{Cluster, IndexScheduler, IndexSchedulerOptions}; use log::error; use meilisearch_auth::AuthController; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; @@ -220,22 +222,53 @@ fn open_or_create_database_unchecked( // we don't want to create anything in the data.ms yet, thus we // wrap our two builders in a closure that'll be executed later. let auth_controller = AuthController::new(&opt.db_path, &opt.master_key); + + let cluster = if let Some(ref cluster) = opt.cluster_configuration.experimental_enable_ha { + match cluster.as_str() { + "leader" => { + let mut addr = opt.http_addr.to_socket_addrs().unwrap().next().unwrap(); + addr.set_port(6666); + Some(Cluster::Leader(Arc::new(RwLock::new(Leader::new(addr))))) + } + "follower" => { + let mut addr = opt + .cluster_configuration + .leader + .as_ref() + .expect("Can't be a follower without a leader") + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + addr.set_port(6666); + Some(Cluster::Follower(Arc::new(RwLock::new(Follower::join(addr))))) + } + _ => panic!("Available values for the cluster mode are leader and follower"), + } + } else { + None + }; + let index_scheduler_builder = || -> anyhow::Result<_> { - Ok(IndexScheduler::new(IndexSchedulerOptions { - version_file_path: opt.db_path.join(VERSION_FILE_NAME), - auth_path: opt.db_path.join("auth"), - tasks_path: opt.db_path.join("tasks"), - update_file_path: opt.db_path.join("update_files"), - indexes_path: opt.db_path.join("indexes"), - snapshots_path: opt.snapshot_dir.clone(), - dumps_path: opt.dump_dir.clone(), - task_db_size: opt.max_task_db_size.get_bytes() as usize, - index_base_map_size: opt.max_index_size.get_bytes() as usize, - indexer_config: (&opt.indexer_options).try_into()?, - autobatching_enabled: true, - index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, - index_count: DEFAULT_INDEX_COUNT, - })?) + Ok(IndexScheduler::new( + IndexSchedulerOptions { + version_file_path: opt.db_path.join(VERSION_FILE_NAME), + auth_path: opt.db_path.join("auth"), + tasks_path: opt.db_path.join("tasks"), + update_file_path: opt.db_path.join("update_files"), + indexes_path: opt.db_path.join("indexes"), + snapshots_path: opt.snapshot_dir.clone(), + dumps_path: opt.dump_dir.clone(), + task_db_size: opt.max_task_db_size.get_bytes() as usize, + index_base_map_size: opt.max_index_size.get_bytes() as usize, + indexer_config: (&opt.indexer_options).try_into()?, + autobatching_enabled: true, + index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() + as usize, + index_count: DEFAULT_INDEX_COUNT, + }, + cluster, + )?) }; match ( diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 0c6457e7a..8cda6d72d 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -12,6 +12,7 @@ use std::{env, fmt, fs}; use byte_unit::{Byte, ByteError}; use clap::Parser; +use index_scheduler::ClusterMode; use meilisearch_types::milli::update::IndexerConfig; use rustls::server::{ AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient, ServerSessionMemoryCache, @@ -297,6 +298,10 @@ pub struct Opt { #[clap(flatten)] pub indexer_options: IndexerOpts, + #[serde(flatten)] + #[clap(flatten)] + pub cluster_configuration: ClusterOpts, + /// Set the path to a configuration file that should be used to setup the engine. /// Format must be TOML. #[clap(long)] @@ -385,6 +390,7 @@ impl Opt { #[cfg(all(not(debug_assertions), feature = "analytics"))] no_analytics, experimental_enable_metrics: enable_metrics_route, + cluster_configuration, } = self; export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); @@ -518,6 +524,17 @@ impl IndexerOpts { } } +#[derive(Debug, Default, Clone, Parser, Deserialize)] +pub struct ClusterOpts { + #[clap(long)] + #[serde(default)] + pub experimental_enable_ha: Option, + + #[clap(long)] + #[serde(default)] + pub leader: Option, +} + impl TryFrom<&IndexerOpts> for IndexerConfig { type Error = anyhow::Error;