From 03aca2e452517c44cb1dc1e1e0c5d4a649e845da Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 14 Sep 2022 12:49:26 +0200 Subject: [PATCH] move the index mapping logic in another structure --- index-scheduler/src/index_mapper.rs | 45 ++++++++++++++++++++++------- index-scheduler/src/lib.rs | 21 ++++++-------- 2 files changed, 44 insertions(+), 22 deletions(-) diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 1a5339f06..b314ee861 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -1,23 +1,48 @@ use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; +use std::sync::RwLock; use index::Index; +use milli::heed::types::SerdeBincode; +use milli::heed::types::Str; +use milli::heed::Database; use milli::heed::RoTxn; use milli::heed::RwTxn; +use milli::update::IndexerConfig; use uuid::Uuid; use crate::Error; -use crate::IndexScheduler; use crate::Result; -impl IndexScheduler { +#[derive(Clone)] +pub struct IndexMapper { + // Keep track of the opened indexes and is used + // mainly by the index resolver. + index_map: Arc>>, + + // Map an index name with an index uuid currentl available on disk. + index_mapping: Database>, + + base_path: PathBuf, + index_size: usize, + indexer_config: Arc, +} + +impl IndexMapper { + /// Get or create the index. pub fn create_index(&self, rwtxn: &mut RwTxn, name: &str) -> Result { - let index = match self.index_txn(rwtxn, name) { + let index = match self.index(rwtxn, name) { Ok(index) => index, Err(Error::IndexNotFound(_)) => { let uuid = Uuid::new_v4(); - // TODO: TAMO: take the arguments from somewhere - Index::open(uuid.to_string(), name.to_string(), 100000, Arc::default())? + Index::open( + self.base_path.join(uuid.to_string()), + name.to_string(), + self.index_size, + self.indexer_config.clone(), + )? } error => return error, }; @@ -25,7 +50,8 @@ impl IndexScheduler { Ok(index) } - pub fn index_txn(&self, rtxn: &RoTxn, name: &str) -> Result { + /// Return an index, may open it if it wasn't already opened. + pub fn index(&self, rtxn: &RoTxn, name: &str) -> Result { let uuid = self .index_mapping .get(&rtxn, name)? @@ -46,12 +72,11 @@ impl IndexScheduler { // the entry method. match index_map.entry(uuid) { Entry::Vacant(entry) => { - // TODO: TAMO: get the args from somewhere. let index = Index::open( - uuid.to_string(), + self.base_path.join(uuid.to_string()), name.to_string(), - 100_000_000, - Arc::default(), + self.index_size, + self.indexer_config.clone(), )?; entry.insert(index.clone()); index diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 695ceae06..9a056ec82 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -9,6 +9,7 @@ use batch::Batch; pub use error::Error; use file_store::FileStore; use index::Index; +use index_mapper::IndexMapper; pub use task::Task; use task::{Kind, KindWithContent, Status}; use time::OffsetDateTime; @@ -21,7 +22,7 @@ use std::{collections::HashMap, sync::RwLock}; use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str}; use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; -use milli::update::IndexDocumentsMethod; +use milli::update::{IndexDocumentsMethod, IndexerConfig}; use milli::{RoaringBitmapCodec, BEU32}; use roaring::RoaringBitmap; use serde::Deserialize; @@ -50,10 +51,6 @@ pub struct Query { /// 2. Schedule the tasks. #[derive(Clone)] pub struct IndexScheduler { - // Keep track of the opened indexes and is used - // mainly by the index resolver. - index_map: Arc>>, - /// The list of tasks currently processing. processing_tasks: Arc>, @@ -65,16 +62,16 @@ pub struct IndexScheduler { // The main database, it contains all the tasks accessible by their Id. all_tasks: Database, SerdeBincode>, - // All the tasks ids grouped by their status. + /// All the tasks ids grouped by their status. status: Database, RoaringBitmapCodec>, - // All the tasks ids grouped by their kind. + /// All the tasks ids grouped by their kind. kind: Database, RoaringBitmapCodec>, - - // Map an index name with an index uuid currentl available on disk. - index_mapping: Database>, - // Store the tasks associated to an index. + /// Store the tasks associated to an index. index_tasks: Database, + /// In charge of creating and returning indexes. + index_mapper: IndexMapper, + // set to true when there is work to do. wake_up: Arc, } @@ -85,7 +82,7 @@ impl IndexScheduler { /// `IndexNotFound` error. pub fn index(&self, name: &str) -> Result { let rtxn = self.env.read_txn()?; - self.index_txn(&rtxn, name) + self.index_mapper.index(&rtxn, name) } /// Returns the tasks corresponding to the query.