mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 02:27:40 +08:00
Use slashes instead of dots for the s3 paths separators
This commit is contained in:
parent
f544cfa444
commit
f37fdceb15
@ -406,7 +406,7 @@ impl IndexScheduler {
|
|||||||
log::info!("Importing snapshot {}", path);
|
log::info!("Importing snapshot {}", path);
|
||||||
let snapshot_id =
|
let snapshot_id =
|
||||||
path.strip_prefix("/snapshots/snapshot-").unwrap();
|
path.strip_prefix("/snapshots/snapshot-").unwrap();
|
||||||
let snapshot_dir = format!("snapshots.{}", snapshot_id);
|
let snapshot_dir = format!("snapshots/{}", snapshot_id);
|
||||||
|
|
||||||
let inner = this.inner();
|
let inner = this.inner();
|
||||||
let s3 = inner.options.s3.as_ref().unwrap();
|
let s3 = inner.options.s3.as_ref().unwrap();
|
||||||
@ -424,7 +424,7 @@ impl IndexScheduler {
|
|||||||
tempfile::NamedTempFile::new_in(inner.env.path()).unwrap();
|
tempfile::NamedTempFile::new_in(inner.env.path()).unwrap();
|
||||||
|
|
||||||
log::info!("Downloading the index scheduler database.");
|
log::info!("Downloading the index scheduler database.");
|
||||||
let tasks_snapshot = format!("{snapshot_dir}.tasks.mdb");
|
let tasks_snapshot = format!("{snapshot_dir}/tasks.mdb");
|
||||||
let status = s3
|
let status = s3
|
||||||
.get_object_to_writer(tasks_snapshot, &mut tasks_file)
|
.get_object_to_writer(tasks_snapshot, &mut tasks_file)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@ -434,7 +434,7 @@ impl IndexScheduler {
|
|||||||
let indexes_files =
|
let indexes_files =
|
||||||
tempfile::TempDir::new_in(&base_path).unwrap();
|
tempfile::TempDir::new_in(&base_path).unwrap();
|
||||||
|
|
||||||
let src = format!("{snapshot_dir}.indexes");
|
let src = format!("{snapshot_dir}/indexes");
|
||||||
let uuids =
|
let uuids =
|
||||||
s3.list(src.clone(), None).unwrap().into_iter().flat_map(
|
s3.list(src.clone(), None).unwrap().into_iter().flat_map(
|
||||||
|lbr| {
|
|lbr| {
|
||||||
@ -457,7 +457,7 @@ impl IndexScheduler {
|
|||||||
.join("data.mdb");
|
.join("data.mdb");
|
||||||
let mut file = File::create(path).unwrap();
|
let mut file = File::create(path).unwrap();
|
||||||
s3.get_object_to_writer(
|
s3.get_object_to_writer(
|
||||||
format!("{src}.{uuid}.mdb"),
|
format!("{src}/{uuid}.mdb"),
|
||||||
&mut file,
|
&mut file,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@ -538,7 +538,7 @@ impl IndexScheduler {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
let s3 = inner.options.s3.as_ref().unwrap();
|
let s3 = inner.options.s3.as_ref().unwrap();
|
||||||
let task =
|
let task =
|
||||||
s3.get_object(format!("tasks.{id:0>10}")).unwrap();
|
s3.get_object(format!("/tasks/{id:0>10}")).unwrap();
|
||||||
|
|
||||||
let task = serde_json::from_slice(task.as_slice()).unwrap();
|
let task = serde_json::from_slice(task.as_slice()).unwrap();
|
||||||
inner.register_raw_task(&mut wtxn, &task).unwrap();
|
inner.register_raw_task(&mut wtxn, &task).unwrap();
|
||||||
@ -577,7 +577,7 @@ impl IndexScheduler {
|
|||||||
.map(|(_, id)| id.parse::<u32>().unwrap())
|
.map(|(_, id)| id.parse::<u32>().unwrap())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let s3 = inner.options.s3.as_ref().unwrap();
|
let s3 = inner.options.s3.as_ref().unwrap();
|
||||||
let task = s3.get_object(format!("tasks.{id:0>10}")).unwrap();
|
let task = s3.get_object(format!("tasks/{id:0>10}")).unwrap();
|
||||||
let task = serde_json::from_slice(task.as_slice()).unwrap();
|
let task = serde_json::from_slice(task.as_slice()).unwrap();
|
||||||
inner.register_raw_task(&mut wtxn, &task).unwrap();
|
inner.register_raw_task(&mut wtxn, &task).unwrap();
|
||||||
wtxn.commit().unwrap();
|
wtxn.commit().unwrap();
|
||||||
@ -638,13 +638,13 @@ impl IndexScheduler {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let zk_snapshots = format!("snapshots");
|
let zk_snapshots = format!("snapshots");
|
||||||
let snapshot_dir = format!("{zk_snapshots}.{snapshot_id:0>10?}");
|
let snapshot_dir = format!("{zk_snapshots}/{snapshot_id:0>10?}");
|
||||||
|
|
||||||
let s3 = inner.options.s3.as_ref().unwrap();
|
let s3 = inner.options.s3.as_ref().unwrap();
|
||||||
|
|
||||||
// 1. Snapshot the version file.
|
// 1. Snapshot the version file.
|
||||||
let dst = format!(
|
let dst = format!(
|
||||||
"{snapshot_dir}.{}",
|
"{snapshot_dir}/{}",
|
||||||
meilisearch_types::VERSION_FILE_NAME
|
meilisearch_types::VERSION_FILE_NAME
|
||||||
);
|
);
|
||||||
let mut version_file_path =
|
let mut version_file_path =
|
||||||
@ -661,7 +661,7 @@ impl IndexScheduler {
|
|||||||
// 2. Snapshot the index-scheduler LMDB env
|
// 2. Snapshot the index-scheduler LMDB env
|
||||||
log::info!("Snapshotting the tasks");
|
log::info!("Snapshotting the tasks");
|
||||||
let env = inner.env.clone();
|
let env = inner.env.clone();
|
||||||
let snapshot_dir = format!("{zk_snapshots}.{snapshot_id:0>10?}");
|
let snapshot_dir = format!("{zk_snapshots}/{snapshot_id:0>10?}");
|
||||||
|
|
||||||
let temp = TempDir::new().unwrap();
|
let temp = TempDir::new().unwrap();
|
||||||
let mut file = env
|
let mut file = env
|
||||||
@ -672,7 +672,7 @@ impl IndexScheduler {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
s3.put_object_stream(
|
s3.put_object_stream(
|
||||||
&mut file,
|
&mut file,
|
||||||
format!("{snapshot_dir}.tasks.mdb"),
|
format!("{snapshot_dir}/tasks.mdb"),
|
||||||
)
|
)
|
||||||
.or_else(|e| match e {
|
.or_else(|e| match e {
|
||||||
s3::error::S3Error::Http(404, _) => Ok(200),
|
s3::error::S3Error::Http(404, _) => Ok(200),
|
||||||
@ -683,7 +683,7 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
// 3. Snapshot every indexes
|
// 3. Snapshot every indexes
|
||||||
log::info!("Snapshotting the indexes");
|
log::info!("Snapshotting the indexes");
|
||||||
let dst = format!("{snapshot_dir}.indexes");
|
let dst = format!("{snapshot_dir}/indexes");
|
||||||
|
|
||||||
for ret in inner.index_mapper.index_mapping.iter(&rtxn).unwrap() {
|
for ret in inner.index_mapper.index_mapping.iter(&rtxn).unwrap() {
|
||||||
let (name, uuid) = ret.unwrap();
|
let (name, uuid) = ret.unwrap();
|
||||||
@ -697,7 +697,7 @@ impl IndexScheduler {
|
|||||||
heed::CompactionOption::Enabled,
|
heed::CompactionOption::Enabled,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
s3.put_object_stream(&mut file, format!("{dst}.{uuid}.mdb"))
|
s3.put_object_stream(&mut file, format!("{dst}/{uuid}.mdb"))
|
||||||
.or_else(|e| match e {
|
.or_else(|e| match e {
|
||||||
s3::error::S3Error::Http(404, _) => Ok(200),
|
s3::error::S3Error::Http(404, _) => Ok(200),
|
||||||
e => Err(e),
|
e => Err(e),
|
||||||
@ -726,7 +726,7 @@ impl IndexScheduler {
|
|||||||
let _ = zookeeper // we don't want to crash if we can't delete an update file.
|
let _ = zookeeper // we don't want to crash if we can't delete an update file.
|
||||||
.delete(&node, None)
|
.delete(&node, None)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
s3.delete_object(format!("tasks.{:0>10?}", task_id as u32))
|
s3.delete_object(format!("/tasks/{:0>10?}", task_id as u32))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// TODO: Delete the update files associated with the deleted tasks
|
// TODO: Delete the update files associated with the deleted tasks
|
||||||
if let Some(content_uuid) = inner
|
if let Some(content_uuid) = inner
|
||||||
@ -1526,7 +1526,7 @@ impl IndexSchedulerInner {
|
|||||||
// TODO: send task to ZK in raw json.
|
// TODO: send task to ZK in raw json.
|
||||||
if let Some(zookeeper) = &self.zookeeper {
|
if let Some(zookeeper) = &self.zookeeper {
|
||||||
let s3 = self.options.s3.as_ref().unwrap();
|
let s3 = self.options.s3.as_ref().unwrap();
|
||||||
s3.put_object(format!("tasks.{id:0>10?}"), &serde_json::to_vec_pretty(&task).unwrap())
|
s3.put_object(format!("tasks/{id:0>10?}"), &serde_json::to_vec_pretty(&task).unwrap())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// TODO: ugly unwrap
|
// TODO: ugly unwrap
|
||||||
|
Loading…
Reference in New Issue
Block a user