Introduce the zk-tasks folder

This commit is contained in:
Kerollmops 2023-09-04 18:24:34 +02:00
parent 7d85753573
commit 41697c4d65
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -33,6 +33,7 @@ pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32;
use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
@ -359,6 +360,9 @@ impl IndexScheduler {
let latch = match self.zookeeper {
Some(ref zookeeper) => {
let zk_tasks = format!("{}/zk-tasks", env!("HOME"));
std::fs::create_dir_all(&zk_tasks).unwrap();
let id = Uuid::new_v4().to_string();
let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string());
let wake_up = self.wake_up.clone();
@ -517,11 +521,19 @@ impl IndexScheduler {
for path in children {
log::info!(" Importing {}", path);
match zookeeper.get_data(&format!("/tasks/{}", &path), false) {
Ok((task, _stat)) => {
if task.is_empty() {
log::info!(" Task {} was empty, skipping.", path);
Ok((data, _stat)) => {
if data != b"ok" {
log::info!(" Task {} was not \"ok\", skipping.", path);
} else {
let task = serde_json::from_slice(&task).unwrap();
let id = path
.rsplit_once('-')
.map(|(_, id)| id.parse::<u32>().unwrap())
.unwrap();
let task_path = Path::new(std::env!("HOME"))
.join("zk-tasks")
.join(format!("{:0>10}", id));
let file = File::open(task_path).unwrap();
let task = serde_json::from_reader(file).unwrap();
inner.register_raw_task(&mut wtxn, &task).unwrap();
// we received a new tasks, we must wake up
self.wake_up.signal();
@ -550,12 +562,22 @@ impl IndexScheduler {
// Add raw task content in local DB
log::info!("Received a new task from the cluster at {}", path);
let inner = this.inner();
let mut wtxn = inner.env.write_txn().unwrap();
let (data, _stat) = zookeeperc.get_data(&path, false).unwrap();
let task = serde_json::from_slice(data.as_slice()).unwrap();
if data == b"ok" {
let mut wtxn = inner.env.write_txn().unwrap();
let id = path
.rsplit_once('-')
.map(|(_, id)| id.parse::<u32>().unwrap())
.unwrap();
let path = Path::new(env!("HOME"))
.join("zk-tasks")
.join(format!("{:0>10?}", id));
let file = File::open(path).unwrap();
let task = serde_json::from_reader(file).unwrap();
inner.register_raw_task(&mut wtxn, &task).unwrap();
wtxn.commit().unwrap();
}
}
WatchedEventType::None
| WatchedEventType::NodeCreated
| WatchedEventType::NodeDeleted => (),
@ -1454,14 +1476,13 @@ impl IndexSchedulerInner {
// TODO: send task to ZK in raw json.
if let Some(zookeeper) = &self.zookeeper {
// TODO: ugly unwrap
zookeeper
.set_data(
&format!("/tasks/task-{:0>10?}", id),
std::fs::write(
Path::new(std::env!("HOME")).join("zk-tasks").join(format!("{:0>10?}", id)),
serde_json::to_vec_pretty(&task).unwrap(),
None,
)
.unwrap();
)?;
// TODO: ugly unwrap
zookeeper.set_data(&format!("/tasks/task-{:0>10?}", id), b"ok".to_vec(), None).unwrap();
}
Ok(task)