WIP: start updating the zookeeper client => leader election is missing

This commit is contained in:
Tamo 2023-10-31 13:04:32 +01:00
parent 8f04353b7d
commit c573261ac4
7 changed files with 351 additions and 360 deletions

491
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -32,7 +32,7 @@ thiserror = "1.0.40"
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] } time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
uuid = { version = "1.3.1", features = ["serde", "v4"] } uuid = { version = "1.3.1", features = ["serde", "v4"] }
tokio = { version = "1.27.0", features = ["full"] } tokio = { version = "1.27.0", features = ["full"] }
zookeeper = "0.8.0" zookeeper-client-sync = { path = "../../zookeeper-client-sync" }
parking_lot = "0.12.1" parking_lot = "0.12.1"
strois = "0.0.4" strois = "0.0.4"

View File

@ -63,9 +63,8 @@ use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime; use time::OffsetDateTime;
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
use uuid::Uuid; use uuid::Uuid;
use zookeeper::recipes::leader::LeaderLatch; use zookeeper_client_sync::{
use zookeeper::{ Acls, AddWatchMode, CreateMode, Error as ZkError, EventType, WatchedEvent, Zookeeper,
Acl, AddWatchMode, CreateMode, WatchedEvent, WatchedEventType, ZkError, ZooKeeper,
}; };
use crate::index_mapper::IndexMapper; use crate::index_mapper::IndexMapper;
@ -281,7 +280,7 @@ pub struct IndexSchedulerOptions {
/// The experimental features enabled for this instance. /// The experimental features enabled for this instance.
pub instance_features: InstanceTogglableFeatures, pub instance_features: InstanceTogglableFeatures,
/// zookeeper client /// zookeeper client
pub zookeeper: Option<Arc<ZooKeeper>>, pub zookeeper: Option<Arc<Zookeeper>>,
/// S3 bucket /// S3 bucket
pub s3: Option<Arc<Bucket>>, pub s3: Option<Arc<Bucket>>,
} }
@ -291,7 +290,7 @@ pub struct IndexSchedulerOptions {
#[derive(Clone)] #[derive(Clone)]
pub struct IndexScheduler { pub struct IndexScheduler {
inner: Arc<RwLock<Option<IndexSchedulerInner>>>, inner: Arc<RwLock<Option<IndexSchedulerInner>>>,
zookeeper: Option<Arc<ZooKeeper>>, zookeeper: Option<Arc<Zookeeper>>,
pub s3: Option<Arc<Bucket>>, pub s3: Option<Arc<Bucket>>,
wake_up: Arc<SignalEvent>, wake_up: Arc<SignalEvent>,
} }
@ -317,9 +316,8 @@ impl IndexScheduler {
// Create all the required directories in zookeeper // Create all the required directories in zookeeper
match zookeeper.create( match zookeeper.create(
"/election", "/election",
vec![], &vec![],
Acl::open_unsafe().clone(), &CreateMode::Persistent.with_acls(Acls::anyone_all()),
CreateMode::Persistent,
) { ) {
Ok(_) | Err(ZkError::NodeExists) => (), Ok(_) | Err(ZkError::NodeExists) => (),
Err(e) => panic!("{e}"), Err(e) => panic!("{e}"),
@ -327,9 +325,8 @@ impl IndexScheduler {
match zookeeper.create( match zookeeper.create(
"/snapshots", "/snapshots",
vec![], &[],
Acl::open_unsafe().clone(), &CreateMode::Persistent.with_acls(Acls::anyone_all()),
CreateMode::Persistent,
) { ) {
Ok(_) | Err(ZkError::NodeExists) => (), Ok(_) | Err(ZkError::NodeExists) => (),
Err(e) => panic!("{e}"), Err(e) => panic!("{e}"),
@ -395,17 +392,16 @@ impl IndexScheduler {
// See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection // See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection
let latchc = latch.clone(); let latchc = latch.clone();
let this = self.clone(); let this = self.clone();
zookeeper let watcher =
.add_watch("/snapshots", AddWatchMode::PersistentRecursive, move |event| { zookeeper.watch("/snapshots", AddWatchMode::PersistentRecursive).unwrap();
watcher.run_on_change(move |event| {
if !latchc.has_leadership() { if !latchc.has_leadership() {
let WatchedEvent { event_type, path, keeper_state: _ } = event; let WatchedEvent { event_type, path, .. } = event;
match event_type { match event_type {
WatchedEventType::NodeCreated => { EventType::NodeCreated => {
let path = path.unwrap();
log::info!("The snapshot {} is in preparation", path); log::info!("The snapshot {} is in preparation", path);
} }
WatchedEventType::NodeDataChanged => { EventType::NodeDataChanged => {
let path = path.unwrap();
let snapshot_id = let snapshot_id =
path.strip_prefix("/snapshots/snapshot-").unwrap(); path.strip_prefix("/snapshots/snapshot-").unwrap();
let snapshot_dir = format!("snapshots/{}", snapshot_id); let snapshot_dir = format!("snapshots/{}", snapshot_id);
@ -414,18 +410,17 @@ impl IndexScheduler {
otherwise => panic!("{otherwise:?}"), otherwise => panic!("{otherwise:?}"),
} }
} }
}) });
.unwrap();
{ {
// TODO we must lock the IndexSchedulerInner here to make sure that we don't // TODO we must lock the IndexSchedulerInner here to make sure that we don't
// load this snapshot after the upper watcher find a more recent one. // load this snapshot after the upper watcher find a more recent one.
let mut snapshots = zookeeper.get_children("/snapshots", false).unwrap(); let (mut snapshots, _) = zookeeper.get_children("/snapshots").unwrap();
snapshots.sort_unstable(); snapshots.sort_unstable();
for snapshot_name in dbg!(snapshots).iter().rev() { for snapshot_name in dbg!(snapshots).iter().rev() {
let (_, snapshot_id) = snapshot_name.rsplit_once('-').unwrap(); let (_, snapshot_id) = snapshot_name.rsplit_once('-').unwrap();
let snapshot_path = format!("/snapshots/{snapshot_name}"); let snapshot_path = format!("/snapshots/{snapshot_name}");
match zookeeper.get_data(&snapshot_path, false) { match zookeeper.get_data(&snapshot_path) {
Ok((data, _stat)) => { Ok((data, _stat)) => {
if data == b"ok" { if data == b"ok" {
eprintln!("Loading snapshot {snapshot_path}"); eprintln!("Loading snapshot {snapshot_path}");
@ -442,16 +437,15 @@ impl IndexScheduler {
match zookeeper.create( match zookeeper.create(
"/tasks", "/tasks",
vec![], &[],
Acl::open_unsafe().clone(), &CreateMode::Persistent.with_acls(Acls::anyone_all()),
CreateMode::Persistent,
) { ) {
Ok(_) => (), Ok(_) => (),
Err(ZkError::NodeExists) => { Err(ZkError::NodeExists) => {
log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk."); log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk.");
let children = zookeeper let (children, _) = zookeeper
.get_children("/tasks", false) .get_children("/tasks")
.expect("Internal, the /tasks directory was deleted during execution."); // TODO change me .expect("Internal, the /tasks directory was deleted during execution."); // TODO change me
log::info!("Importing {} tasks", children.len()); log::info!("Importing {} tasks", children.len());
@ -459,7 +453,7 @@ impl IndexScheduler {
let mut wtxn = inner.env.write_txn().unwrap(); let mut wtxn = inner.env.write_txn().unwrap();
for path in children { for path in children {
log::info!(" Importing {}", path); log::info!(" Importing {}", path);
match zookeeper.get_data(&format!("/tasks/{}", &path), false) { match zookeeper.get_data(&format!("/tasks/{}", &path)) {
Ok((data, _stat)) => { Ok((data, _stat)) => {
if data != b"ok" { if data != b"ok" {
log::info!(" Task {} was not \"ok\", skipping.", path); log::info!(" Task {} was not \"ok\", skipping.", path);
@ -490,16 +484,15 @@ impl IndexScheduler {
// TODO: fix unwrap by returning a clear error. // TODO: fix unwrap by returning a clear error.
let this = self.clone(); let this = self.clone();
let zookeeperc = zookeeper.clone(); let zookeeperc = zookeeper.clone();
zookeeper let watcher = zookeeper.watch("/tasks", AddWatchMode::PersistentRecursive).unwrap();
.add_watch("/tasks", AddWatchMode::PersistentRecursive, move |event| { watcher.run_on_change(move |event| {
let WatchedEvent { event_type, path, keeper_state: _ } = event; let WatchedEvent { event_type, path, .. } = event;
match event_type { match event_type {
WatchedEventType::NodeDataChanged => { EventType::NodeDataChanged => {
let path = path.unwrap();
// Add raw task content in local DB // Add raw task content in local DB
log::info!("Received a new task from the cluster at {}", path); log::info!("Received a new task from the cluster at {}", path);
let inner = this.inner(); let inner = this.inner();
let (data, _stat) = zookeeperc.get_data(&path, false).unwrap(); let (data, _stat) = zookeeperc.get_data(&path).unwrap();
if data == b"ok" { if data == b"ok" {
let mut wtxn = inner.env.write_txn().unwrap(); let mut wtxn = inner.env.write_txn().unwrap();
let id = path let id = path
@ -507,23 +500,18 @@ impl IndexScheduler {
.map(|(_, id)| id.parse::<u32>().unwrap()) .map(|(_, id)| id.parse::<u32>().unwrap())
.unwrap(); .unwrap();
let s3 = inner.options.s3.as_ref().unwrap(); let s3 = inner.options.s3.as_ref().unwrap();
let task = let task = s3.get_object_json(format!("tasks/{id:0>10}")).unwrap();
s3.get_object_json(format!("tasks/{id:0>10}")).unwrap();
inner.register_raw_task(&mut wtxn, &task).unwrap(); inner.register_raw_task(&mut wtxn, &task).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
} }
} }
WatchedEventType::None EventType::NodeCreated | EventType::NodeDeleted => (),
| WatchedEventType::NodeCreated EventType::NodeChildrenChanged => panic!("Node children changed"),
| WatchedEventType::NodeDeleted => (), EventType::Session => panic!("Session error"),
WatchedEventType::NodeChildrenChanged
| WatchedEventType::DataWatchRemoved
| WatchedEventType::ChildWatchRemoved => panic!("{event_type:?}"),
} }
this.wake_up.signal(); this.wake_up.signal();
}) });
.unwrap();
Some(latch) Some(latch)
} }
@ -553,20 +541,14 @@ impl IndexScheduler {
} }
let rtxn = inner.env.read_txn().unwrap(); let rtxn = inner.env.read_txn().unwrap();
let snapshot_path = zookeeper let (_, snapshot_id) = zookeeper
.create( .create(
"/snapshots/snapshot-", "/snapshots/snapshot-",
vec![], &[],
Acl::open_unsafe().clone(), &CreateMode::Persistent.with_acls(Acls::anyone_all()),
CreateMode::PersistentSequential,
) )
.unwrap(); .unwrap();
let snapshot_id = snapshot_path
.rsplit_once('-')
.map(|(_, id)| id.parse::<u32>().unwrap())
.unwrap();
let zk_snapshots = format!("snapshots"); let zk_snapshots = format!("snapshots");
let snapshot_dir = format!("{zk_snapshots}/{snapshot_id:0>10?}"); let snapshot_dir = format!("{zk_snapshots}/{snapshot_id:0>10?}");
@ -621,8 +603,8 @@ impl IndexScheduler {
// we must notify everyone that we dropped a new snapshot on the s3 // we must notify everyone that we dropped a new snapshot on the s3
let _stat = zookeeper.set_data( let _stat = zookeeper.set_data(
&format!("/snapshots/snapshot-{:0>10?}", snapshot_id), &format!("/snapshots/snapshot-{snapshot_id}"),
b"ok".to_vec(), b"ok",
None, None,
); );
log::info!( log::info!(
@ -1018,8 +1000,8 @@ pub struct IndexSchedulerInner {
/// The path to the version file of Meilisearch. /// The path to the version file of Meilisearch.
pub(crate) version_file_path: PathBuf, pub(crate) version_file_path: PathBuf,
/// The URL to the ZooKeeper cluster /// The URL to the Zookeeper cluster
pub(crate) zookeeper: Option<Arc<ZooKeeper>>, pub(crate) zookeeper: Option<Arc<Zookeeper>>,
// ================= test // ================= test
// The next entry is dedicated to the tests. // The next entry is dedicated to the tests.
@ -1435,16 +1417,14 @@ impl IndexSchedulerInner {
/// ///
/// If it fails and data was associated with the task, it tries to delete the associated data. /// If it fails and data was associated with the task, it tries to delete the associated data.
pub fn register(&self, kind: KindWithContent) -> Result<Task> { pub fn register(&self, kind: KindWithContent) -> Result<Task> {
let id = match &self.zookeeper { let zk_id = match &self.zookeeper {
Some(zookeeper) => { Some(zookeeper) => {
// Reserve uniq ID on zookeeper. And give it to the spawn blocking.
match zookeeper.create( match zookeeper.create(
"/tasks/task-", "/tasks/task-",
vec![], &[],
Acl::open_unsafe().clone(), &CreateMode::Persistent.with_acls(Acls::anyone_all()),
CreateMode::PersistentSequential,
) { ) {
Ok(path) => path.rsplit_once('-').map(|(_, id)| id.parse::<u32>().unwrap()), Ok((_, id)) => Some(id),
Err(e) => panic!("{e}"), Err(e) => panic!("{e}"),
} }
} }
@ -1461,8 +1441,11 @@ impl IndexSchedulerInner {
} }
// Retrieve the id generated by zookeeper or generate a local id. // Retrieve the id generated by zookeeper or generate a local id.
let id = match id { let id = match zk_id {
Some(id) => id as u32, Some(id) => id
.into_i64()
.try_into()
.expect("Can't convert zookeeper task id to meilisearch task id"),
None => self.next_task_id(&wtxn)?, None => self.next_task_id(&wtxn)?,
}; };
@ -1509,12 +1492,14 @@ impl IndexSchedulerInner {
// TODO: send task to ZK in raw json. // TODO: send task to ZK in raw json.
if let Some(zookeeper) = &self.zookeeper { if let Some(zookeeper) = &self.zookeeper {
// safe because if we had a zookeeper at the beginning we must have a zk_id
let zk_id = zk_id.unwrap();
let s3 = self.options.s3.as_ref().unwrap(); let s3 = self.options.s3.as_ref().unwrap();
s3.put_object(format!("tasks/{id:0>10?}"), &serde_json::to_vec_pretty(&task).unwrap()) s3.put_object(format!("tasks/{zk_id}"), &serde_json::to_vec_pretty(&task).unwrap())
.unwrap(); .unwrap();
// TODO: ugly unwrap // TODO: ugly unwrap
zookeeper.set_data(&format!("/tasks/task-{:0>10?}", id), b"ok".to_vec(), None).unwrap(); zookeeper.set_data(&format!("/tasks/task-{zk_id}"), b"ok", None).unwrap();
} }
Ok(task) Ok(task)

View File

@ -25,4 +25,4 @@ sha2 = "0.10.6"
thiserror = "1.0.40" thiserror = "1.0.40"
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] } time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
uuid = { version = "1.3.1", features = ["serde", "v4"] } uuid = { version = "1.3.1", features = ["serde", "v4"] }
zookeeper = "0.8.0" zookeeper-client-sync = { path = "../../zookeeper-client-sync" }

View File

@ -19,7 +19,7 @@ internal_error!(
AuthControllerError: meilisearch_types::milli::heed::Error, AuthControllerError: meilisearch_types::milli::heed::Error,
std::io::Error, std::io::Error,
serde_json::Error, serde_json::Error,
zookeeper::ZkError, zookeeper_client_sync::Error,
std::str::Utf8Error std::str::Utf8Error
); );

View File

@ -16,22 +16,22 @@ pub use store::open_auth_store_env;
use store::{generate_key_as_hexa, HeedAuthStore}; use store::{generate_key_as_hexa, HeedAuthStore};
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use zookeeper::{ use zookeeper_client_sync::{
Acl, AddWatchMode, CreateMode, WatchedEvent, WatchedEventType, ZkError, ZooKeeper, Acls, AddWatchMode, CreateMode, Error as ZkError, EventType, WatchedEvent, Zookeeper,
}; };
#[derive(Clone)] #[derive(Clone)]
pub struct AuthController { pub struct AuthController {
store: Arc<HeedAuthStore>, store: Arc<HeedAuthStore>,
master_key: Option<String>, master_key: Option<String>,
zookeeper: Option<Arc<ZooKeeper>>, zookeeper: Option<Arc<Zookeeper>>,
} }
impl AuthController { impl AuthController {
pub fn new( pub fn new(
db_path: impl AsRef<Path>, db_path: impl AsRef<Path>,
master_key: &Option<String>, master_key: &Option<String>,
zookeeper: Option<Arc<ZooKeeper>>, zookeeper: Option<Arc<Zookeeper>>,
) -> Result<Self> { ) -> Result<Self> {
let store = HeedAuthStore::new(db_path)?; let store = HeedAuthStore::new(db_path)?;
let controller = Self { store: Arc::new(store), master_key: master_key.clone(), zookeeper }; let controller = Self { store: Arc::new(store), master_key: master_key.clone(), zookeeper };
@ -42,22 +42,21 @@ impl AuthController {
// Zookeeper Event listener loop // Zookeeper Event listener loop
let controller_clone = controller.clone(); let controller_clone = controller.clone();
let zkk = zookeeper.clone(); let zkk = zookeeper.clone();
zookeeper.add_watch("/auth", AddWatchMode::PersistentRecursive, move |event| { let watcher = zookeeper.watch("/auth", AddWatchMode::PersistentRecursive)?;
let WatchedEvent { event_type, path, keeper_state: _ } = dbg!(event); watcher.run_on_change(move |event| {
let WatchedEvent { event_type, path, .. } = dbg!(event);
match event_type { match event_type {
WatchedEventType::NodeDeleted => { EventType::NodeDeleted => {
// TODO: ugly unwraps // TODO: ugly unwraps
let path = path.unwrap();
let uuid = path.strip_prefix("/auth/").unwrap(); let uuid = path.strip_prefix("/auth/").unwrap();
let uuid = Uuid::parse_str(&uuid).unwrap(); let uuid = Uuid::parse_str(&uuid).unwrap();
log::info!("The key {} has been deleted", uuid); log::info!("The key {} has been deleted", uuid);
controller_clone.store.delete_api_key(uuid).unwrap(); controller_clone.store.delete_api_key(uuid).unwrap();
} }
WatchedEventType::NodeCreated | WatchedEventType::NodeDataChanged => { EventType::NodeCreated | EventType::NodeDataChanged => {
let path = path.unwrap();
if path.strip_prefix("/auth/").map_or(false, |s| !s.is_empty()) { if path.strip_prefix("/auth/").map_or(false, |s| !s.is_empty()) {
let (key, _stat) = zkk.get_data(&path, false).unwrap(); let (key, _stat) = zkk.get_data(&path).unwrap();
let key: Key = serde_json::from_slice(&key).unwrap(); let key: Key = serde_json::from_slice(&key).unwrap();
log::info!("The key {} has been deleted", key.uid); log::info!("The key {} has been deleted", key.uid);
controller_clone.store.put_api_key(key).unwrap(); controller_clone.store.put_api_key(key).unwrap();
@ -65,15 +64,14 @@ impl AuthController {
} }
otherwise => panic!("Got the unexpected `{otherwise:?}` event!"), otherwise => panic!("Got the unexpected `{otherwise:?}` event!"),
} }
})?; });
// TODO: we should catch the potential unexpected errors here https://docs.rs/zookeeper-client/latest/zookeeper_client/struct.Client.html#method.create // TODO: we should catch the potential unexpected errors here https://docs.rs/zookeeper-client/latest/zookeeper_client/struct.Client.html#method.create
// for the moment we consider that `create` only returns Error::NodeExists. // for the moment we consider that `create` only returns Error::NodeExists.
match zookeeper.create( match zookeeper.create(
"/auth", "/auth",
vec![], &[],
Acl::open_unsafe().clone(), &CreateMode::Persistent.with_acls(Acls::anyone_all()),
CreateMode::Persistent,
) { ) {
// If the store is empty, we must generate and push the default api-keys. // If the store is empty, we must generate and push the default api-keys.
Ok(_) => generate_default_keys(&controller)?, Ok(_) => generate_default_keys(&controller)?,
@ -83,14 +81,14 @@ impl AuthController {
let store = controller.store.clone(); let store = controller.store.clone();
store.delete_all_keys()?; store.delete_all_keys()?;
let children = zookeeper let (children, _) = zookeeper
.get_children("/auth", false) .get_children("/auth")
.expect("Internal, the auth directory was deleted during execution."); .expect("Internal, the auth directory was deleted during execution.");
log::info!("Importing {} api-keys", children.len()); log::info!("Importing {} api-keys", children.len());
for path in children { for path in children {
log::info!(" Importing {}", path); log::info!(" Importing {}", path);
match zookeeper.get_data(&format!("/auth/{}", &path), false) { match zookeeper.get_data(&format!("/auth/{}", &path)) {
Ok((key, _stat)) => { Ok((key, _stat)) => {
let key = serde_json::from_slice(&key).unwrap(); let key = serde_json::from_slice(&key).unwrap();
let store = controller.store.clone(); let store = controller.store.clone();
@ -104,7 +102,7 @@ impl AuthController {
} }
} }
e @ Err( e @ Err(
ZkError::NoNode | ZkError::NoChildrenForEphemerals | ZkError::InvalidACL, ZkError::NoNode | ZkError::NoChildrenForEphemerals | ZkError::InvalidAcl,
) => unreachable!("{e:?}"), ) => unreachable!("{e:?}"),
Err(e) => panic!("{e}"), Err(e) => panic!("{e}"),
} }
@ -154,9 +152,8 @@ impl AuthController {
Some(zookeeper) => { Some(zookeeper) => {
zookeeper.create( zookeeper.create(
&format!("/auth/{}", key.uid), &format!("/auth/{}", key.uid),
serde_json::to_vec_pretty(&key)?, &serde_json::to_vec_pretty(&key)?,
Acl::open_unsafe().clone(), &CreateMode::Persistent.with_acls(Acls::anyone_all()),
CreateMode::Persistent,
)?; )?;
Ok(key) Ok(key)
@ -182,7 +179,7 @@ impl AuthController {
Some(zookeeper) => { Some(zookeeper) => {
zookeeper.set_data( zookeeper.set_data(
&format!("/auth/{}", key.uid), &format!("/auth/{}", key.uid),
serde_json::to_vec_pretty(&key)?, &serde_json::to_vec_pretty(&key)?,
None, None,
)?; )?;

View File

@ -106,7 +106,7 @@ walkdir = "2.3.3"
yaup = "0.2.1" yaup = "0.2.1"
serde_urlencoded = "0.7.1" serde_urlencoded = "0.7.1"
termcolor = "1.2.0" termcolor = "1.2.0"
zookeeper = "0.8.0" zookeeper-client-sync = { path = "../../zookeeper-client-sync" }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.8.0" actix-rt = "2.8.0"