diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 664eb2924..00dc6e8b1 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -35,6 +35,7 @@ pub type TaskId = u32; use std::collections::{BTreeMap, HashMap}; use std::fs::File; use std::ops::{Bound, RangeBounds}; +use std::os::fd::AsRawFd; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; @@ -58,7 +59,7 @@ use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; use roaring::RoaringBitmap; use s3::Bucket; use synchronoise::SignalEvent; -use tempfile::NamedTempFile; +use tempfile::{NamedTempFile, TempDir}; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; @@ -430,15 +431,17 @@ impl IndexScheduler { tempfile::TempDir::new_in(&base_path).unwrap(); let src = format!("{snapshot_dir}.indexes"); - for result in std::fs::read_dir(&src).unwrap() { - let entry = result.unwrap(); - let uuid = entry - .file_name() - .as_os_str() - .to_str() - .unwrap() - .to_string(); - log::info!("\tDownloading the index {}", uuid.to_string()); + let uuids = + s3.list(src.clone(), None).unwrap().into_iter().flat_map( + |lbr| { + lbr.contents.into_iter().map(|o| { + let mut iter = o.key.rsplit('.'); + iter.nth(1).unwrap().to_string() + }) + }, + ); + for uuid in uuids { + log::info!("\tDownloading the index {}", uuid); std::fs::create_dir_all( indexes_files.path().join(&uuid).with_extension(""), ) @@ -642,7 +645,12 @@ impl IndexScheduler { ); let mut version_file_path = File::open(&inner.version_file_path).unwrap(); - s3.put_object_stream(&mut version_file_path, dst).unwrap(); + s3.put_object_stream(&mut version_file_path, dst) + .or_else(|e| match e { + s3::error::S3Error::Http(404, _) => Ok(200), + e => Err(e), + }) + .unwrap(); version_file_path.sync_data().unwrap(); drop(version_file_path); @@ -651,13 +659,21 @@ impl IndexScheduler { let env = inner.env.clone(); let snapshot_dir = format!("{zk_snapshots}.{snapshot_id:0>10?}"); - let mut temp = NamedTempFile::new().unwrap(); - env.copy_to_path(temp.path(), heed::CompactionOption::Enabled) + let temp = TempDir::new().unwrap(); + let mut file = env + .copy_to_path( + temp.path().join("data.mdb"), + heed::CompactionOption::Enabled, + ) .unwrap(); s3.put_object_stream( - &mut temp, + &mut file, format!("{snapshot_dir}.tasks.mdb"), ) + .or_else(|e| match e { + s3::error::S3Error::Http(404, _) => Ok(200), + e => Err(e), + }) .unwrap(); temp.close().unwrap(); @@ -670,11 +686,18 @@ impl IndexScheduler { log::info!(" Snapshotting index {name}"); let dst = dst.clone(); let index = inner.index_mapper.index(&rtxn, &name).unwrap(); - let mut temp = NamedTempFile::new().unwrap(); - index - .copy_to_path(temp.path(), heed::CompactionOption::Enabled) + let temp = TempDir::new().unwrap(); + let mut file = index + .copy_to_path( + temp.path().join("data.mdb"), + heed::CompactionOption::Enabled, + ) .unwrap(); - s3.put_object_stream(&mut temp, format!("{dst}.{uuid}.mdb")) + s3.put_object_stream(&mut file, format!("{dst}.{uuid}.mdb")) + .or_else(|e| match e { + s3::error::S3Error::Http(404, _) => Ok(200), + e => Err(e), + }) .unwrap(); temp.close().unwrap(); }