From 951a5b5832de732c76d616ea5341f0a764ce9eb7 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 9 Jan 2023 09:35:37 +0100 Subject: [PATCH] Add IndexMapper::resize_index fn --- index-scheduler/src/index_mapper.rs | 63 +++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 1562a918d..7a9591254 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -186,6 +186,69 @@ impl IndexMapper { Ok(self.index_mapping.get(rtxn, name)?.is_some()) } + /// Resizes the maximum size of the specified index to the double of its current maximum size. + /// + /// This operation involves closing the underlying environment and so can take a long time to complete. + /// + /// # Panics + /// + /// - If the Index corresponding to the passed name is concurrently being deleted/resized or cannot be found in the + /// in memory hash map. + pub fn resize_index(&self, rtxn: &RoTxn, name: &str) -> Result<()> { + // fixme: factor to a function? + let uuid = self + .index_mapping + .get(rtxn, name)? + .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; + + // We remove the index from the in-memory index map. + let mut lock = self.index_map.write().unwrap(); + // signal that will be sent when the resize operation completes + let resize_operation = Arc::new(SignalEvent::manual(false)); + let index = match lock.insert(uuid, BeingResized(resize_operation)) { + Some(Available(index)) => index, + Some(previous_status) => { + lock.insert(uuid, previous_status); + panic!( + "Attempting to resize index {name} that is already being resized or deleted." + ) + } + None => { + panic!("Could not find the status of index {name} in the in-memory index mapper.") + } + }; + + drop(lock); + + let current_size = index.map_size()?; + let new_size = current_size * 2; + let closing_event = index.prepare_for_closing(); + + log::debug!("Waiting for index {name} to close"); + + if !closing_event.wait_timeout(std::time::Duration::from_secs(600)) { + // fail after 10 minutes waiting + panic!("Could not resize index {name} (unable to close it)"); + } + + log::info!("Resized index {name} from {current_size} to {new_size} bytes"); + + let index_path = self.base_path.join(uuid.to_string()); + let index = self.create_or_open_index(&index_path, None, new_size)?; + + // Add back the resized index + let mut lock = self.index_map.write().unwrap(); + let Some(BeingResized(resize_operation)) = lock.insert(uuid, Available(index)) else { + panic!("Index state for index {name} was modified while it was being resized") + }; + + // drop the lock before signaling completion so that other threads don't immediately await on the lock after waking up. + drop(lock); + resize_operation.signal(); + + Ok(()) + } + /// Return an index, may open it if it wasn't already opened. pub fn index(&self, rtxn: &RoTxn, name: &str) -> Result { let uuid = self