use options max db sizes

This commit is contained in:
mpostma 2021-03-13 10:09:10 +01:00
parent 49b74b587a
commit 99c89cf2ba
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
5 changed files with 27 additions and 18 deletions

View File

@ -60,7 +60,9 @@ impl Data {
let path = options.db_path.clone(); let path = options.db_path.clone();
create_dir_all(&path)?; create_dir_all(&path)?;
let index_controller = IndexController::new(&path)?; let index_size = options.max_mdb_size.get_bytes() as usize;
let update_store_size = options.max_udb_size.get_bytes() as usize;
let index_controller = IndexController::new(&path, index_size, update_store_size)?;
let mut api_keys = ApiKeys { let mut api_keys = ApiKeys {
master: options.clone().master_key, master: options.clone().master_key,

View File

@ -401,11 +401,11 @@ pub struct IndexActorHandle {
} }
impl IndexActorHandle { impl IndexActorHandle {
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> { pub fn new(path: impl AsRef<Path>, index_size: usize) -> anyhow::Result<Self> {
let (read_sender, read_receiver) = mpsc::channel(100); let (read_sender, read_receiver) = mpsc::channel(100);
let (write_sender, write_receiver) = mpsc::channel(100); let (write_sender, write_receiver) = mpsc::channel(100);
let store = HeedIndexStore::new(path)?; let store = HeedIndexStore::new(path, index_size)?;
let actor = IndexActor::new(read_receiver, write_receiver, store)?; let actor = IndexActor::new(read_receiver, write_receiver, store)?;
tokio::task::spawn(actor.run()); tokio::task::spawn(actor.run());
Ok(Self { Ok(Self {
@ -416,8 +416,7 @@ impl IndexActorHandle {
pub async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> { pub async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::CreateIndex { let msg = IndexMsg::CreateIndex { ret,
ret,
uuid, uuid,
primary_key, primary_key,
}; };
@ -515,15 +514,17 @@ impl IndexActorHandle {
struct HeedIndexStore { struct HeedIndexStore {
index_store: AsyncMap<Uuid, Index>, index_store: AsyncMap<Uuid, Index>,
path: PathBuf, path: PathBuf,
index_size: usize,
} }
impl HeedIndexStore { impl HeedIndexStore {
fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> { fn new(path: impl AsRef<Path>, index_size: usize) -> anyhow::Result<Self> {
let path = path.as_ref().join("indexes/"); let path = path.as_ref().join("indexes/");
let index_store = Arc::new(RwLock::new(HashMap::new())); let index_store = Arc::new(RwLock::new(HashMap::new()));
Ok(Self { Ok(Self {
index_store, index_store,
path, path,
index_size,
}) })
} }
} }
@ -536,8 +537,9 @@ impl IndexStore for HeedIndexStore {
return Err(IndexError::IndexAlreadyExists); return Err(IndexError::IndexAlreadyExists);
} }
let index_size = self.index_size;
let index = spawn_blocking(move || -> Result<Index> { let index = spawn_blocking(move || -> Result<Index> {
let index = open_index(&path, 4096 * 100_000)?; let index = open_index(&path, index_size)?;
if let Some(primary_key) = primary_key { if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?; let mut txn = index.write_txn()?;
index.put_primary_key(&mut txn, &primary_key)?; index.put_primary_key(&mut txn, &primary_key)?;
@ -565,7 +567,8 @@ impl IndexStore for HeedIndexStore {
return Ok(None); return Ok(None);
} }
let index = spawn_blocking(|| open_index(path, 4096 * 100_000)) let index_size = self.index_size;
let index = spawn_blocking(move || open_index(path, index_size))
.await .await
.map_err(|e| IndexError::Error(e.into()))??; .map_err(|e| IndexError::Error(e.into()))??;
self.index_store self.index_store

View File

@ -59,10 +59,10 @@ pub struct IndexController {
} }
impl IndexController { impl IndexController {
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> { pub fn new(path: impl AsRef<Path>, index_size: usize, update_store_size: usize) -> anyhow::Result<Self> {
let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?; let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?;
let index_actor = index_actor::IndexActorHandle::new(&path)?; let index_actor = index_actor::IndexActorHandle::new(&path, index_size)?;
let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path)?; let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path, update_store_size)?;
Ok(Self { uuid_resolver, index_handle: index_actor, update_handle }) Ok(Self { uuid_resolver, index_handle: index_actor, update_handle })
} }

View File

@ -211,10 +211,10 @@ impl<D> UpdateActorHandle<D>
where where
D: AsRef<[u8]> + Sized + 'static + Sync + Send, D: AsRef<[u8]> + Sized + 'static + Sync + Send,
{ {
pub fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> anyhow::Result<Self> { pub fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>, update_store_size: usize) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned().join("updates"); let path = path.as_ref().to_owned().join("updates");
let (sender, receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let store = MapUpdateStoreStore::new(index_handle, &path); let store = MapUpdateStoreStore::new(index_handle, &path, update_store_size);
let actor = UpdateActor::new(store, receiver, path)?; let actor = UpdateActor::new(store, receiver, path)?;
tokio::task::spawn(actor.run()); tokio::task::spawn(actor.run());
@ -272,16 +272,18 @@ struct MapUpdateStoreStore {
db: Arc<RwLock<HashMap<Uuid, Arc<UpdateStore>>>>, db: Arc<RwLock<HashMap<Uuid, Arc<UpdateStore>>>>,
index_handle: IndexActorHandle, index_handle: IndexActorHandle,
path: PathBuf, path: PathBuf,
update_store_size: usize,
} }
impl MapUpdateStoreStore { impl MapUpdateStoreStore {
fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> Self { fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>, update_store_size: usize) -> Self {
let db = Arc::new(RwLock::new(HashMap::new())); let db = Arc::new(RwLock::new(HashMap::new()));
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
Self { Self {
db, db,
index_handle, index_handle,
path, path,
update_store_size,
} }
} }
} }
@ -292,7 +294,8 @@ impl UpdateStoreStore for MapUpdateStoreStore {
match self.db.write().await.entry(uuid) { match self.db.write().await.entry(uuid) {
Entry::Vacant(e) => { Entry::Vacant(e) => {
let mut options = heed::EnvOpenOptions::new(); let mut options = heed::EnvOpenOptions::new();
options.map_size(4096 * 100_000); let update_store_size = self.update_store_size;
options.map_size(update_store_size);
let path = self.path.clone().join(format!("updates-{}", e.key())); let path = self.path.clone().join(format!("updates-{}", e.key()));
create_dir_all(&path).unwrap(); create_dir_all(&path).unwrap();
let index_handle = self.index_handle.clone(); let index_handle = self.index_handle.clone();
@ -324,7 +327,8 @@ impl UpdateStoreStore for MapUpdateStoreStore {
// We can safely load the index // We can safely load the index
let index_handle = self.index_handle.clone(); let index_handle = self.index_handle.clone();
let mut options = heed::EnvOpenOptions::new(); let mut options = heed::EnvOpenOptions::new();
options.map_size(4096 * 100_000); let update_store_size = self.update_store_size;
options.map_size(update_store_size);
let store = UpdateStore::open(options, &path, move |meta, file| { let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle.update(meta, file)) futures::executor::block_on(index_handle.update(meta, file))
}) })

View File

@ -35,14 +35,14 @@ async fn update_settings_unknown_field() {
async fn test_partial_update() { async fn test_partial_update() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (response, _code) = index.update_settings(json!({"displayedAttributes": ["foo"]})).await; let (_response, _code) = index.update_settings(json!({"displayedAttributes": ["foo"]})).await;
index.wait_update_id(0).await; index.wait_update_id(0).await;
let (response, code) = index.settings().await; let (response, code) = index.settings().await;
assert_eq!(code, 200); assert_eq!(code, 200);
assert_eq!(response["displayedAttributes"],json!(["foo"])); assert_eq!(response["displayedAttributes"],json!(["foo"]));
assert_eq!(response["searchableAttributes"],json!(["*"])); assert_eq!(response["searchableAttributes"],json!(["*"]));
let (response, _) = index.update_settings(json!({"searchableAttributes": ["bar"]})).await; let (_response, _) = index.update_settings(json!({"searchableAttributes": ["bar"]})).await;
index.wait_update_id(1).await; index.wait_update_id(1).await;
let (response, code) = index.settings().await; let (response, code) = index.settings().await;