diff --git a/Cargo.lock b/Cargo.lock index 6cd41aae7..ef637ed78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -428,12 +428,54 @@ dependencies = [ "critical-section", ] +[[package]] +name = "attohttpc" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fcf00bc6d5abb29b5f97e3c61a90b6d3caa12f3faf897d4a3e3607c050a35a7" +dependencies = [ + "http", + "log", + "rustls 0.20.8", + "serde", + "serde_json", + "url", + "webpki", + "webpki-roots 0.22.6", +] + [[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-creds" +version = "0.34.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3776743bb68d4ad02ba30ba8f64373f1be4e082fe47651767171ce75bb2f6cf5" +dependencies = [ + "attohttpc", + "dirs", + "log", + "quick-xml", + "rust-ini", + "serde", + "thiserror", + "time", + "url", +] + +[[package]] +name = "aws-region" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "056557a61427d0e5ba29dd931031c8ffed4ee7a550e7cd55692a9d8deb0a9dba" +dependencies = [ + "thiserror", +] + [[package]] name = "backtrace" version = "0.3.67" @@ -1188,6 +1230,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs-next" version = "1.0.2" @@ -1198,6 +1249,17 @@ dependencies = [ "dirs-sys-next", ] +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi 0.3.9", +] + [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -1209,6 +1271,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "dlv-list" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" + [[package]] name = "dump" version = "1.3.0" @@ -1943,6 +2011,7 @@ dependencies = [ "parking_lot", "puffin", "roaring", + "rust-s3", "serde", "serde_json", "synchronoise", @@ -2548,6 +2617,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "maybe-async" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f1b8c13cb1f814b634a96b2c725449fe7ed464a7b8781de8688be5ffbd3f305" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "md5" version = "0.7.0" @@ -2625,6 +2705,7 @@ dependencies = [ "rayon", "regex", "reqwest", + "rust-s3", "rustls 0.20.8", "rustls-pemfile", "segment", @@ -3013,6 +3094,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-multimap" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a" +dependencies = [ + "dlv-list", + "hashbrown 0.12.3", +] + [[package]] name = "page_size" version = "0.4.2" @@ -3370,6 +3461,16 @@ dependencies = [ "puffin", ] +[[package]] +name = "quick-xml" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.30" @@ -3577,6 +3678,45 @@ dependencies = [ "smallvec", ] +[[package]] +name = "rust-ini" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df" +dependencies = [ + "cfg-if 1.0.0", + "ordered-multimap", +] + +[[package]] +name = "rust-s3" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b2ac5ff6acfbe74226fa701b5ef793aaa054055c13ebb7060ad36942956e027" +dependencies = [ + "async-trait", + "attohttpc", + "aws-creds", + "aws-region", + "base64 0.13.1", + "bytes 1.4.0", + "cfg-if 1.0.0", + "hex", + "hmac", + "http", + "log", + "maybe-async", + "md5", + "percent-encoding", + "quick-xml", + "serde", + "serde_derive", + "sha2", + "thiserror", + "time", + "url", +] + [[package]] name = "rustc-demangle" version = "0.1.23" diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index a327b4386..41626d63d 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -34,6 +34,7 @@ uuid = { version = "1.3.1", features = ["serde", "v4"] } tokio = { version = "1.27.0", features = ["full"] } zookeeper = "0.8.0" parking_lot = "0.12.1" +rust-s3 = { version = "0.33.0", default-features = false, features = ["sync-rustls-tls"] } [dev-dependencies] big_s = "1.0.2" diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 91483d390..664eb2924 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -56,7 +56,9 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; use roaring::RoaringBitmap; +use s3::Bucket; use synchronoise::SignalEvent; +use tempfile::NamedTempFile; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; @@ -277,6 +279,8 @@ pub struct IndexSchedulerOptions { pub instance_features: InstanceTogglableFeatures, /// zookeeper client pub zookeeper: Option>, + /// S3 bucket + pub s3: Option, } /// Structure which holds meilisearch's indexes and schedules the tasks @@ -305,6 +309,7 @@ impl IndexScheduler { // initialize the directories we need to process batches. if let Some(zookeeper) = &inner.zookeeper { + // Create all the required directories in zookeeper match zookeeper.create( "/election", vec![], @@ -360,9 +365,6 @@ impl IndexScheduler { let latch = match self.zookeeper { Some(ref zookeeper) => { - let zk_tasks = format!("{}/zk-tasks", env!("HOME")); - std::fs::create_dir_all(&zk_tasks).unwrap(); - let id = Uuid::new_v4().to_string(); let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string()); let wake_up = self.wake_up.clone(); @@ -401,13 +403,10 @@ impl IndexScheduler { log::info!("Importing snapshot {}", path); let snapshot_id = path.strip_prefix("/snapshots/snapshot-").unwrap(); - let snapshot_dir = PathBuf::from(format!( - "{}/zk-snapshots/{}", - env!("HOME"), - snapshot_id - )); + let snapshot_dir = format!("snapshots.{}", snapshot_id); let inner = this.inner(); + let s3 = inner.options.s3.as_ref().unwrap(); // 1. TODO: Ensure the snapshot version file is the same as our version. @@ -418,19 +417,19 @@ impl IndexScheduler { path }; - let tasks_file = + let mut tasks_file = tempfile::NamedTempFile::new_in(inner.env.path()).unwrap(); log::info!("Downloading the index scheduler database."); - let tasks_snapshot = snapshot_dir.join("tasks.mdb"); - std::fs::copy(&tasks_snapshot, &tasks_file).unwrap(); + let tasks_snapshot = format!("{snapshot_dir}/tasks.mdb"); + s3.get_object_to_writer(tasks_snapshot, &mut tasks_file) + .unwrap(); log::info!("Downloading the indexes databases"); let indexes_files = tempfile::TempDir::new_in(&base_path).unwrap(); - let mut indexes = Vec::new(); - let src = snapshot_dir.join("indexes"); + let src = format!("{snapshot_dir}.indexes"); for result in std::fs::read_dir(&src).unwrap() { let entry = result.unwrap(); let uuid = entry @@ -444,16 +443,17 @@ impl IndexScheduler { indexes_files.path().join(&uuid).with_extension(""), ) .unwrap(); - std::fs::copy( - src.join(&uuid).with_extension("mdb"), - indexes_files - .path() - .join(&uuid) - .with_extension("") - .join("data.mdb"), + let path = indexes_files + .path() + .join(&uuid) + .with_extension("") + .join("data.mdb"); + let mut file = File::create(path).unwrap(); + s3.get_object_to_writer( + format!("{src}.{uuid}.mdb"), + &mut file, ) .unwrap(); - indexes.push(uuid); } // 3. Lock the index-mapper and close all the env @@ -529,11 +529,11 @@ impl IndexScheduler { .rsplit_once('-') .map(|(_, id)| id.parse::().unwrap()) .unwrap(); - let task_path = Path::new(std::env!("HOME")) - .join("zk-tasks") - .join(format!("{:0>10}", id)); - let file = File::open(task_path).unwrap(); - let task = serde_json::from_reader(file).unwrap(); + let s3 = inner.options.s3.as_ref().unwrap(); + let task = + s3.get_object(format!("tasks.{id:0>10}")).unwrap(); + + let task = serde_json::from_slice(task.as_slice()).unwrap(); inner.register_raw_task(&mut wtxn, &task).unwrap(); // we received a new tasks, we must wake up self.wake_up.signal(); @@ -569,11 +569,9 @@ impl IndexScheduler { .rsplit_once('-') .map(|(_, id)| id.parse::().unwrap()) .unwrap(); - let path = Path::new(env!("HOME")) - .join("zk-tasks") - .join(format!("{:0>10?}", id)); - let file = File::open(path).unwrap(); - let task = serde_json::from_reader(file).unwrap(); + let s3 = inner.options.s3.as_ref().unwrap(); + let task = s3.get_object(format!("tasks.{id:0>10}")).unwrap(); + let task = serde_json::from_slice(task.as_slice()).unwrap(); inner.register_raw_task(&mut wtxn, &task).unwrap(); wtxn.commit().unwrap(); } @@ -632,49 +630,53 @@ impl IndexScheduler { .map(|(_, id)| id.parse::().unwrap()) .unwrap(); - let zk_snapshots = format!("{}/zk-snapshots", env!("HOME")); - std::fs::create_dir_all(&zk_snapshots).unwrap(); - let snapshot_dir = - PathBuf::from(format!("{zk_snapshots}/{snapshot_id:0>10?}")); - std::fs::create_dir(&snapshot_dir).unwrap(); + let zk_snapshots = format!("snapshots"); + let snapshot_dir = format!("{zk_snapshots}.{snapshot_id:0>10?}"); + + let s3 = inner.options.s3.as_ref().unwrap(); // 1. Snapshot the version file. - let dst = snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME); - std::fs::copy(&inner.version_file_path, dst).unwrap(); + let dst = format!( + "{snapshot_dir}.{}", + meilisearch_types::VERSION_FILE_NAME + ); + let mut version_file_path = + File::open(&inner.version_file_path).unwrap(); + s3.put_object_stream(&mut version_file_path, dst).unwrap(); + version_file_path.sync_data().unwrap(); + drop(version_file_path); // 2. Snapshot the index-scheduler LMDB env log::info!("Snapshotting the tasks"); let env = inner.env.clone(); - env.copy_to_path( - snapshot_dir.join("tasks.mdb"), - heed::CompactionOption::Enabled, + let snapshot_dir = format!("{zk_snapshots}.{snapshot_id:0>10?}"); + + let mut temp = NamedTempFile::new().unwrap(); + env.copy_to_path(temp.path(), heed::CompactionOption::Enabled) + .unwrap(); + s3.put_object_stream( + &mut temp, + format!("{snapshot_dir}.tasks.mdb"), ) .unwrap(); + temp.close().unwrap(); // 3. Snapshot every indexes log::info!("Snapshotting the indexes"); - let dst = snapshot_dir.join("indexes"); - std::fs::create_dir_all(&dst).unwrap(); + let dst = format!("{snapshot_dir}.indexes"); - let indexes = inner - .index_mapper - .index_mapping - .iter(&rtxn) - .unwrap() - .map(|ret| ret.unwrap()) - .map(|(name, uuid)| (name.to_string(), uuid)) - .collect::>(); - - for (name, uuid) in indexes { + for ret in inner.index_mapper.index_mapping.iter(&rtxn).unwrap() { + let (name, uuid) = ret.unwrap(); log::info!(" Snapshotting index {name}"); let dst = dst.clone(); let index = inner.index_mapper.index(&rtxn, &name).unwrap(); + let mut temp = NamedTempFile::new().unwrap(); index - .copy_to_path( - dst.join(format!("{uuid}.mdb")), - heed::CompactionOption::Enabled, - ) + .copy_to_path(temp.path(), heed::CompactionOption::Enabled) .unwrap(); + s3.put_object_stream(&mut temp, format!("{dst}.{uuid}.mdb")) + .unwrap(); + temp.close().unwrap(); } // we must notify everyone that we dropped a new snapshot on the s3 @@ -1476,10 +1478,9 @@ impl IndexSchedulerInner { // TODO: send task to ZK in raw json. if let Some(zookeeper) = &self.zookeeper { - std::fs::write( - Path::new(std::env!("HOME")).join("zk-tasks").join(format!("{:0>10?}", id)), - serde_json::to_vec_pretty(&task).unwrap(), - )?; + let s3 = self.options.s3.as_ref().unwrap(); + s3.put_object(format!("tasks.{id:0>10?}"), &serde_json::to_vec_pretty(&task).unwrap()) + .unwrap(); // TODO: ugly unwrap zookeeper.set_data(&format!("/tasks/task-{:0>10?}", id), b"ok".to_vec(), None).unwrap(); diff --git a/meilisearch/Cargo.toml b/meilisearch/Cargo.toml index 9ac09934e..5ee9dae83 100644 --- a/meilisearch/Cargo.toml +++ b/meilisearch/Cargo.toml @@ -80,6 +80,7 @@ reqwest = { version = "0.11.16", features = [ ], default-features = false } rustls = "0.20.8" rustls-pemfile = "1.0.2" +rust-s3 = { version = "0.33.0", default-features = false, features = ["sync-rustls-tls"] } segment = { version = "0.2.2", optional = true } serde = { version = "1.0.160", features = ["derive"] } serde_json = { version = "1.0.95", features = ["preserve_order"] } diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index c9fd4344a..b7e1be43a 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -39,6 +39,8 @@ use meilisearch_types::versioning::{check_version_file, create_version_file}; use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; pub use option::Opt; use option::ScheduleSnapshot; +use s3::creds::Credentials; +use s3::{Bucket, Region}; use zookeeper::ZooKeeper; use crate::error::MeilisearchHttpError; @@ -242,6 +244,15 @@ fn open_or_create_database_unchecked( index_count: DEFAULT_INDEX_COUNT, instance_features, zookeeper: zookeeper.clone(), + s3: opt.s3_url.as_ref().map(|url| { + Bucket::new( + "test-rust-s3", + Region::Custom { region: "eu-central-1".to_owned(), endpoint: url.clone() }, + Credentials::default().unwrap(), + ) + .unwrap() + .with_path_style() + }), })) .map_err(anyhow::Error::from); diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index eea2637c3..4e9d8ee9e 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -29,6 +29,7 @@ const MEILI_HTTP_ADDR: &str = "MEILI_HTTP_ADDR"; const MEILI_MASTER_KEY: &str = "MEILI_MASTER_KEY"; const MEILI_ENV: &str = "MEILI_ENV"; const MEILI_ZK_URL: &str = "MEILI_ZK_URL"; +const MEILI_S3_URL: &str = "MEILI_S3_URL"; #[cfg(all(not(debug_assertions), feature = "analytics"))] const MEILI_NO_ANALYTICS: &str = "MEILI_NO_ANALYTICS"; const MEILI_HTTP_PAYLOAD_SIZE_LIMIT: &str = "MEILI_HTTP_PAYLOAD_SIZE_LIMIT"; @@ -160,6 +161,10 @@ pub struct Opt { #[clap(long, env = MEILI_ZK_URL)] pub zk_url: Option, + /// Sets the HTTP address and port used to communicate with the S3 bucket. + #[clap(long, env = MEILI_S3_URL)] + pub s3_url: Option, + /// Deactivates Meilisearch's built-in telemetry when provided. /// /// Meilisearch automatically collects data from all instances that do not opt out using this flag. @@ -375,6 +380,7 @@ impl Opt { master_key, env, zk_url, + s3_url, max_index_size: _, max_task_db_size: _, http_payload_size_limit, @@ -411,6 +417,9 @@ impl Opt { if let Some(zk_url) = zk_url { export_to_env_if_not_present(MEILI_ZK_URL, zk_url); } + if let Some(s3_url) = s3_url { + export_to_env_if_not_present(MEILI_S3_URL, s3_url); + } #[cfg(all(not(debug_assertions), feature = "analytics"))] { export_to_env_if_not_present(MEILI_NO_ANALYTICS, no_analytics.to_string());