diff --git a/src/data/mod.rs b/src/data/mod.rs index 79572fcf7..76effde58 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use sha2::Digest; use crate::index_controller::{IndexMetadata, IndexSettings}; -use crate::index_controller::actor_index_controller::IndexController; +use crate::index_controller::IndexController; use crate::index::Settings; use crate::option::Opt; diff --git a/src/index_controller/actor_index_controller/index_actor.rs b/src/index_controller/index_actor.rs similarity index 100% rename from src/index_controller/actor_index_controller/index_actor.rs rename to src/index_controller/index_actor.rs diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 16f884137..54199c1d7 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -1,6 +1,17 @@ -pub mod actor_index_controller; mod updates; +mod index_actor; +mod update_actor; +mod uuid_resolver; +mod update_store; +mod update_handler; +use std::path::Path; + +use tokio::sync::{mpsc, oneshot}; +use futures::stream::StreamExt; +use actix_web::web::Payload; +use crate::index::{SearchResult, SearchQuery}; +use actix_web::web::Bytes; use chrono::{DateTime, Utc}; use milli::update::{IndexDocumentsMethod, UpdateFormat}; use serde::{Serialize, Deserialize}; @@ -34,10 +45,117 @@ pub enum UpdateMeta { Facets(Facets), } - - #[derive(Clone, Debug)] pub struct IndexSettings { pub name: Option, pub primary_key: Option, } + + +pub struct IndexController { + uuid_resolver: uuid_resolver::UuidResolverHandle, + index_handle: index_actor::IndexActorHandle, + update_handle: update_actor::UpdateActorHandle, +} + +enum IndexControllerMsg { + CreateIndex { + uuid: Uuid, + primary_key: Option, + ret: oneshot::Sender>, + }, + Shutdown, +} + +impl IndexController { + pub fn new(path: impl AsRef) -> Self { + let uuid_resolver = uuid_resolver::UuidResolverHandle::new(); + let index_actor = index_actor::IndexActorHandle::new(&path); + let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path); + Self { uuid_resolver, index_handle: index_actor, update_handle } + } + + pub async fn add_documents( + &self, + index: String, + method: milli::update::IndexDocumentsMethod, + format: milli::update::UpdateFormat, + mut payload: Payload, + primary_key: Option, + ) -> anyhow::Result { + let uuid = self.uuid_resolver.get_or_create(index).await?; + let meta = UpdateMeta::DocumentsAddition { method, format, primary_key }; + let (sender, receiver) = mpsc::channel(10); + + // It is necessary to spawn a local task to senf the payload to the update handle to + // prevent dead_locking between the update_handle::update that waits for the update to be + // registered and the update_actor that waits for the the payload to be sent to it. + tokio::task::spawn_local(async move { + while let Some(bytes) = payload.next().await { + match bytes { + Ok(bytes) => { sender.send(Ok(bytes)).await; }, + Err(e) => { + let error: Box = Box::new(e); + sender.send(Err(error)).await; }, + } + } + }); + + // This must be done *AFTER* spawning the task. + let status = self.update_handle.update(meta, receiver, uuid).await?; + Ok(status) + } + + fn clear_documents(&self, index: String) -> anyhow::Result { + todo!() + } + + fn delete_documents(&self, index: String, document_ids: Vec) -> anyhow::Result { + todo!() + } + + fn update_settings(&self, index_uid: String, settings: Settings) -> anyhow::Result { + todo!() + } + + pub async fn create_index(&self, index_settings: IndexSettings) -> anyhow::Result { + let IndexSettings { name, primary_key } = index_settings; + let uuid = self.uuid_resolver.create(name.unwrap()).await?; + let index_meta = self.index_handle.create_index(uuid, primary_key).await?; + Ok(index_meta) + } + + fn delete_index(&self, index_uid: String) -> anyhow::Result<()> { + todo!() + } + + fn swap_indices(&self, index1_uid: String, index2_uid: String) -> anyhow::Result<()> { + todo!() + } + + pub fn index(&self, name: String) -> anyhow::Result>> { + todo!() + } + + fn update_status(&self, index: String, id: u64) -> anyhow::Result> { + todo!() + } + + fn all_update_status(&self, index: String) -> anyhow::Result> { + todo!() + } + + pub fn list_indexes(&self) -> anyhow::Result> { + todo!() + } + + fn update_index(&self, name: String, index_settings: IndexSettings) -> anyhow::Result { + todo!() + } + + pub async fn search(&self, name: String, query: SearchQuery) -> anyhow::Result { + let uuid = self.uuid_resolver.resolve(name).await.unwrap().unwrap(); + let result = self.index_handle.search(uuid, query).await?; + Ok(result) + } +} diff --git a/src/index_controller/actor_index_controller/update_actor.rs b/src/index_controller/update_actor.rs similarity index 100% rename from src/index_controller/actor_index_controller/update_actor.rs rename to src/index_controller/update_actor.rs diff --git a/src/index_controller/actor_index_controller/update_handler.rs b/src/index_controller/update_handler.rs similarity index 100% rename from src/index_controller/actor_index_controller/update_handler.rs rename to src/index_controller/update_handler.rs diff --git a/src/index_controller/actor_index_controller/update_store.rs b/src/index_controller/update_store.rs similarity index 100% rename from src/index_controller/actor_index_controller/update_store.rs rename to src/index_controller/update_store.rs diff --git a/src/index_controller/actor_index_controller/uuid_resolver.rs b/src/index_controller/uuid_resolver.rs similarity index 100% rename from src/index_controller/actor_index_controller/uuid_resolver.rs rename to src/index_controller/uuid_resolver.rs