Keep the ZK flow when enqueuing tasks

This commit is contained in:
Clément Renault 2023-08-30 17:15:15 +02:00
parent c488a4a351
commit 2d1434da81
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -609,271 +609,288 @@ impl IndexScheduler {
#[cfg(test)] #[cfg(test)]
run.breakpoint(Breakpoint::Init); run.breakpoint(Breakpoint::Init);
if let Some(zookeeper) = self.zookeeper.clone() { let latch = match self.zookeeper.clone() {
let id = Uuid::new_v4().to_string(); Some(zookeeper) => {
let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string()); let id = Uuid::new_v4().to_string();
latch.start().unwrap(); let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string());
latch.start().unwrap();
// Join the potential leaders list. // Join the potential leaders list.
// The lowest in the list is the leader. And if we're not the leader // The lowest in the list is the leader. And if we're not the leader
// we watch the node right before us to be notified if he dies. // we watch the node right before us to be notified if he dies.
// See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection // See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection
let latchc = latch.clone(); let latchc = latch.clone();
let this = self.private_clone(); let this = self.private_clone();
zookeeper zookeeper
.add_watch("/snapshots", AddWatchMode::PersistentRecursive, move |event| { .add_watch("/snapshots", AddWatchMode::PersistentRecursive, move |event| {
if !latchc.has_leadership() { if !latchc.has_leadership() {
let WatchedEvent { event_type, path, keeper_state: _ } = event;
match event_type {
WatchedEventType::NodeCreated => {
let path = path.unwrap();
log::info!("The snapshot {} is in preparation", path);
}
WatchedEventType::NodeDataChanged => {
let path = path.unwrap();
log::info!("Importing snapshot {}", path);
let snapshot_id =
path.strip_prefix("/snapshots/snapshot-").unwrap();
let snapshot_dir = PathBuf::from(format!(
"{}/zk-snapshots/{}",
env!("HOME"),
snapshot_id
));
// 1. TODO: Ensure the snapshot version file is the same as our version.
// 2. Download all the databases
let tasks_file =
tempfile::NamedTempFile::new_in(this.env.path()).unwrap();
log::info!("Downloading the index scheduler database.");
let tasks_snapshot = snapshot_dir.join("tasks.mdb");
std::fs::copy(tasks_snapshot, tasks_file).unwrap();
log::info!("Downloading the indexes databases");
let indexes_files =
tempfile::TempDir::new_in(&this.index_mapper.base_path)
.unwrap();
let mut indexes = Vec::new();
let dst = snapshot_dir.join("indexes");
for result in std::fs::read_dir(&dst).unwrap() {
let entry = result.unwrap();
let uuid = entry
.file_name()
.as_os_str()
.to_str()
.unwrap()
.to_string();
log::info!("\tDownloading the index {}", uuid.to_string());
std::fs::copy(
dst.join(&uuid),
indexes_files.path().join(&uuid),
)
.unwrap();
indexes.push(uuid);
}
// 3. Lock the index-mapper and close all the env
// TODO: continue here
// run.env.close();
// 4. Move all the databases
// 5. Unlock the index-mapper
// 2. Download and import the index-scheduler database
// 3. Snapshot every indexes
}
otherwise => panic!("{otherwise:?}"),
}
}
})
.unwrap();
match zookeeper.create(
"/tasks",
vec![],
Acl::open_unsafe().clone(),
CreateMode::Persistent,
) {
Ok(_) => (),
Err(ZkError::NodeExists) => {
log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk.");
let children = zookeeper
.get_children("/tasks", false)
.expect("Internal, the /tasks directory was deleted during execution."); // TODO change me
log::info!("Importing {} tasks", children.len());
for path in children {
log::info!(" Importing {}", path);
match zookeeper.get_data(&format!("/tasks/{}", &path), false) {
Ok((task, _stat)) => {
if task.is_empty() {
log::info!(" Task {} was empty, skipping.", path);
} else {
let task = serde_json::from_slice(&task).unwrap();
let mut wtxn = self.env.write_txn().unwrap();
self.register_raw_task(&mut wtxn, &task).unwrap();
wtxn.commit().unwrap();
// we received a new tasks, we must wake up
self.wake_up.signal();
}
}
Err(e) => panic!("{e}"),
}
// else the file was deleted while we were inserting the key. We ignore it.
// TODO: What happens if someone updates the files before we have the time
// to setup the watcher
}
}
Err(e) => panic!("{e}"),
}
// TODO: fix unwrap by returning a clear error.
let this = self.private_clone();
zookeeper
.add_watch("/tasks", AddWatchMode::PersistentRecursive, move |event| {
let WatchedEvent { event_type, path, keeper_state: _ } = event; let WatchedEvent { event_type, path, keeper_state: _ } = event;
match event_type { match event_type {
WatchedEventType::NodeCreated => {
let path = path.unwrap();
log::info!("The snapshot {} is in preparation", path);
}
WatchedEventType::NodeDataChanged => { WatchedEventType::NodeDataChanged => {
let path = path.unwrap(); let path = path.unwrap();
log::info!("Importing snapshot {}", path); // Add raw task content in local DB
let snapshot_id = log::info!("Received a new task from the cluster at {}", path);
path.strip_prefix("/snapshots/snapshot-").unwrap(); let (data, _stat) = this
let snapshot_dir = PathBuf::from(format!( .zookeeper
"{}/zk-snapshots/{}", .as_ref()
env!("HOME"), .unwrap()
snapshot_id .get_data(&path, false)
));
// 1. TODO: Ensure the snapshot version file is the same as our version.
// 2. Download all the databases
let tasks_file =
tempfile::NamedTempFile::new_in(this.env.path()).unwrap();
log::info!("Downloading the index scheduler database.");
let tasks_snapshot = snapshot_dir.join("tasks.mdb");
std::fs::copy(tasks_snapshot, tasks_file).unwrap();
log::info!("Downloading the indexes databases");
let indexes_files =
tempfile::TempDir::new_in(&this.index_mapper.base_path)
.unwrap();
let mut indexes = Vec::new();
let dst = snapshot_dir.join("indexes");
for result in std::fs::read_dir(&dst).unwrap() {
let entry = result.unwrap();
let uuid =
entry.file_name().as_os_str().to_str().unwrap().to_string();
log::info!("\tDownloading the index {}", uuid.to_string());
std::fs::copy(
dst.join(&uuid),
indexes_files.path().join(&uuid),
)
.unwrap(); .unwrap();
indexes.push(uuid); let task = serde_json::from_slice(data.as_slice()).unwrap();
} let mut wtxn = this.env.write_txn().unwrap();
this.register_raw_task(&mut wtxn, &task).unwrap();
// 3. Lock the index-mapper and close all the env wtxn.commit().unwrap();
// TODO: continue here
// run.env.close();
// 4. Move all the databases
// 5. Unlock the index-mapper
// 2. Download and import the index-scheduler database
// 3. Snapshot every indexes
} }
otherwise => panic!("{otherwise:?}"), WatchedEventType::None
| WatchedEventType::NodeCreated
| WatchedEventType::NodeDeleted => (),
WatchedEventType::NodeChildrenChanged
| WatchedEventType::DataWatchRemoved
| WatchedEventType::ChildWatchRemoved => panic!("{event_type:?}"),
} }
}
})
.unwrap();
match zookeeper.create( this.wake_up.signal();
"/tasks", })
vec![], .unwrap();
Acl::open_unsafe().clone(),
CreateMode::Persistent,
) {
Ok(_) => (),
Err(ZkError::NodeExists) => {
log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk.");
let children = zookeeper Some(latch)
.get_children("/tasks", false)
.expect("Internal, the /tasks directory was deleted during execution."); // TODO change me
log::info!("Importing {} tasks", children.len());
for path in children {
log::info!(" Importing {}", path);
match zookeeper.get_data(&format!("/tasks/{}", &path), false) {
Ok((task, _stat)) => {
if task.is_empty() {
log::info!(" Task {} was empty, skipping.", path);
} else {
let task = serde_json::from_slice(&task).unwrap();
let mut wtxn = self.env.write_txn().unwrap();
self.register_raw_task(&mut wtxn, &task).unwrap();
wtxn.commit().unwrap();
// we received a new tasks, we must wake up
self.wake_up.signal();
}
}
Err(e) => panic!("{e}"),
}
// else the file was deleted while we were inserting the key. We ignore it.
// TODO: What happens if someone updates the files before we have the time
// to setup the watcher
}
}
Err(e) => panic!("{e}"),
} }
None => None,
// TODO: fix unwrap by returning a clear error. };
let this = self.private_clone();
zookeeper
.add_watch("/tasks", AddWatchMode::PersistentRecursive, move |event| {
let WatchedEvent { event_type, path, keeper_state: _ } = event;
match event_type {
WatchedEventType::NodeDataChanged => {
let path = path.unwrap();
// Add raw task content in local DB
log::info!("Received a new task from the cluster at {}", path);
let (data, _stat) =
this.zookeeper.as_ref().unwrap().get_data(&path, false).unwrap();
let task = serde_json::from_slice(data.as_slice()).unwrap();
let mut wtxn = this.env.write_txn().unwrap();
this.register_raw_task(&mut wtxn, &task).unwrap();
wtxn.commit().unwrap();
}
WatchedEventType::None
| WatchedEventType::NodeCreated
| WatchedEventType::NodeDeleted => (),
WatchedEventType::NodeChildrenChanged
| WatchedEventType::DataWatchRemoved
| WatchedEventType::ChildWatchRemoved => panic!("{event_type:?}"),
}
this.wake_up.signal();
})
.unwrap();
}
let this = self.private_clone(); let this = self.private_clone();
std::thread::spawn(move || { std::thread::spawn(move || {
loop { loop {
// we're either a leader or not running in a cluster, // we're either a leader or not running in a cluster,
// either way we should wait until we receive a task. // either way we should wait until we receive a task.
let wake_up = this.wake_up.clone(); this.wake_up.wait();
let _ = wake_up.wait();
match this.tick() { // TODO watch the /election node and send a signal once it changes (be careful about atomics ordering)
Ok(TickOutcome::TickAgain(n)) => { if latch.as_ref().map_or(true, |latch| latch.has_leadership()) {
// We must tick again. match this.tick() {
this.wake_up.signal(); Ok(TickOutcome::TickAgain(n)) => {
// We must tick again.
this.wake_up.signal();
// if we're in a cluster that means we're the leader // if we're in a cluster that means we're the leader
// and should share a snapshot of what we've done. // and should share a snapshot of what we've done.
if let Some(ref zookeeper) = this.zookeeper { if let Some(ref zookeeper) = this.zookeeper {
// if nothing was processed we have nothing to do. // if nothing was processed we have nothing to do.
if n == 0 { if n == 0 {
continue; continue;
} }
let snapshot_path = zookeeper let snapshot_path = zookeeper
.create( .create(
"/snapshots/snapshot-", "/snapshots/snapshot-",
vec![], vec![],
Acl::open_unsafe().clone(), Acl::open_unsafe().clone(),
CreateMode::PersistentSequential, CreateMode::PersistentSequential,
)
.unwrap();
let snapshot_id = snapshot_path
.rsplit_once('-')
.map(|(_, id)| id.parse::<u32>().unwrap())
.unwrap();
let zk_snapshots = format!("{}/zk-snapshots", env!("HOME"));
std::fs::create_dir_all(&zk_snapshots).unwrap();
let snapshot_dir =
PathBuf::from(format!("{zk_snapshots}/{snapshot_id:0>10?}"));
std::fs::create_dir(&snapshot_dir).unwrap();
// 1. Snapshot the version file.
let dst = snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME);
std::fs::copy(&this.version_file_path, dst).unwrap();
// 2. Snapshot the index-scheduler LMDB env
log::info!("Snapshotting the tasks");
let env = this.env.clone();
env.copy_to_path(
snapshot_dir.join("tasks.mdb"),
heed::CompactionOption::Enabled,
) )
.unwrap(); .unwrap();
let snapshot_id = snapshot_path // 3. Snapshot every indexes
.rsplit_once('-') log::info!("Snapshotting the indexes");
.map(|(_, id)| id.parse::<u32>().unwrap()) let dst = snapshot_dir.join("indexes");
.unwrap(); std::fs::create_dir_all(&dst).unwrap();
let zk_snapshots = format!("{}/zk-snapshots", env!("HOME"));
std::fs::create_dir_all(&zk_snapshots).unwrap();
let snapshot_dir =
PathBuf::from(format!("{zk_snapshots}/{snapshot_id:0>10?}"));
std::fs::create_dir(&snapshot_dir).unwrap();
// 1. Snapshot the version file.
let dst = snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME);
std::fs::copy(&this.version_file_path, dst).unwrap();
// 2. Snapshot the index-scheduler LMDB env
log::info!("Snapshotting the tasks");
let env = this.env.clone();
env.copy_to_path(
snapshot_dir.join("tasks.mdb"),
heed::CompactionOption::Enabled,
)
.unwrap();
// 3. Snapshot every indexes
log::info!("Snapshotting the indexes");
let dst = snapshot_dir.join("indexes");
std::fs::create_dir_all(&dst).unwrap();
let this = this.private_clone();
let rtxn = this.env.read_txn().unwrap();
let indexes = this
.index_mapper
.index_mapping
.iter(&rtxn)
.unwrap()
.map(|ret| ret.unwrap())
.map(|(name, uuid)| (name.to_string(), uuid))
.collect::<Vec<_>>();
for (name, uuid) in indexes {
log::info!(" Snapshotting index {name}");
let this = this.private_clone(); let this = this.private_clone();
let dst = dst.clone();
let rtxn = this.env.read_txn().unwrap(); let rtxn = this.env.read_txn().unwrap();
let index = this.index_mapper.index(&rtxn, &name).unwrap(); let indexes = this
index .index_mapper
.copy_to_path( .index_mapping
dst.join(format!("{uuid}.mdb")), .iter(&rtxn)
heed::CompactionOption::Enabled, .unwrap()
) .map(|ret| ret.unwrap())
.unwrap(); .map(|(name, uuid)| (name.to_string(), uuid))
} .collect::<Vec<_>>();
// we must notify everyone that we dropped a new snapshot on the s3 for (name, uuid) in indexes {
let _stat = zookeeper.set_data( log::info!(" Snapshotting index {name}");
&format!("/snapshots/snapshot-{:0>10?}", snapshot_id), let this = this.private_clone();
vec![], let dst = dst.clone();
None, let rtxn = this.env.read_txn().unwrap();
); let index = this.index_mapper.index(&rtxn, &name).unwrap();
log::info!("Notified everyone about the new snapshot {snapshot_id}"); index
.copy_to_path(
dst.join(format!("{uuid}.mdb")),
heed::CompactionOption::Enabled,
)
.unwrap();
}
// We can now delete all the tasks that has been processed // we must notify everyone that we dropped a new snapshot on the s3
let processed = this let _stat = zookeeper.set_data(
.processing_tasks &format!("/snapshots/snapshot-{:0>10?}", snapshot_id),
.read() vec![],
.unwrap() None,
.processed_previously() );
.clone(); // we don't want to hold the mutex log::info!(
log::info!("Deleting {} processed tasks", processed.len()); "Notified everyone about the new snapshot {snapshot_id}"
for task in processed { );
let node = dbg!(format!("/tasks/task-{:0>10?}", task as i32));
let _ = zookeeper // we don't want to crash if we can't delete an update file. // We can now delete all the tasks that has been processed
.delete(&node, None) let processed = this
.unwrap(); .processing_tasks
// TODO: Delete the update files associated with the deleted tasks .read()
.unwrap()
.processed_previously()
.clone(); // we don't want to hold the mutex
log::info!("Deleting {} processed tasks", processed.len());
for task in processed {
let node = dbg!(format!("/tasks/task-{:0>10?}", task as i32));
let _ = zookeeper // we don't want to crash if we can't delete an update file.
.delete(&node, None)
.unwrap();
// TODO: Delete the update files associated with the deleted tasks
}
} }
} }
} Ok(TickOutcome::WaitForSignal) => (),
Ok(TickOutcome::WaitForSignal) => (), Err(e) => {
Err(e) => { log::error!("{}", e);
log::error!("{}", e); // Wait one second when an irrecoverable error occurs.
// Wait one second when an irrecoverable error occurs. if !e.is_recoverable() {
if !e.is_recoverable() { std::thread::sleep(Duration::from_secs(1));
std::thread::sleep(Duration::from_secs(1)); }
} }
} }
} }
@ -1264,11 +1281,13 @@ impl IndexScheduler {
// (that it does not contain duplicate indexes). // (that it does not contain duplicate indexes).
check_index_swap_validity(&task)?; check_index_swap_validity(&task)?;
this.register_raw_task(&mut wtxn, &task)?; if self.zookeeper.is_none() {
this.register_raw_task(&mut wtxn, &task)?;
if let Err(e) = wtxn.commit() { if let Err(e) = wtxn.commit() {
this.delete_persisted_task_data(&task)?; this.delete_persisted_task_data(&task)?;
return Err(e.into()); return Err(e.into());
}
} }
// If the registered task is a task cancelation // If the registered task is a task cancelation
@ -1281,8 +1300,10 @@ impl IndexScheduler {
} }
} }
// notify the scheduler loop to execute a new tick if self.zookeeper.is_none() {
this.wake_up.signal(); // notify the scheduler loop to execute a new tick
this.wake_up.signal();
}
// TODO: send task to ZK in raw json. // TODO: send task to ZK in raw json.
if let Some(zookeeper) = &self.zookeeper { if let Some(zookeeper) = &self.zookeeper {