From 6bcc30295028076efe09fc7f78e5daccc09ada1d Mon Sep 17 00:00:00 2001 From: mpostma Date: Fri, 26 Feb 2021 17:14:11 +0100 Subject: [PATCH] receive update --- src/data/updates.rs | 22 +- .../actor_index_controller/index_actor.rs | 19 +- .../actor_index_controller/mod.rs | 19 +- .../actor_index_controller/update_actor.rs | 91 +++- .../actor_index_controller/uuid_resolver.rs | 37 +- .../local_index_controller/update_store.rs | 407 ------------------ src/index_controller/mod.rs | 6 +- src/routes/document.rs | 39 +- 8 files changed, 181 insertions(+), 459 deletions(-) delete mode 100644 src/index_controller/local_index_controller/update_store.rs diff --git a/src/data/updates.rs b/src/data/updates.rs index 5cdaf7db1..a559d812a 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -1,29 +1,33 @@ -use std::ops::Deref; - //use async_compression::tokio_02::write::GzipEncoder; //use futures_util::stream::StreamExt; -//use milli::update::{IndexDocumentsMethod, UpdateFormat}; +use milli::update::{IndexDocumentsMethod, UpdateFormat}; //use tokio::io::AsyncWriteExt; use actix_web::web::Payload; +use tokio::fs::File; +use tokio::io::{AsyncWriteExt, AsyncSeekExt}; +use futures::prelude::stream::StreamExt; use crate::index_controller::UpdateStatus; use crate::index_controller::{Settings, IndexMetadata}; use super::Data; impl Data { - pub async fn add_documents( + pub async fn add_documents( &self, index: impl AsRef + Send + Sync + 'static, method: IndexDocumentsMethod, format: UpdateFormat, - stream: Payload, + mut stream: Payload, primary_key: Option, ) -> anyhow::Result - where - B: Deref, - E: std::error::Error + Send + Sync + 'static, { - let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, stream, primary_key).await?; + let file = tempfile::tempfile_in(".")?; + let mut file = File::from_std(file); + while let Some(Ok(bytes)) = stream.next().await { + file.write(bytes.as_ref()).await; + } + file.seek(std::io::SeekFrom::Start(0)).await?; + let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, file, primary_key).await?; Ok(update_status) } diff --git a/src/index_controller/actor_index_controller/index_actor.rs b/src/index_controller/actor_index_controller/index_actor.rs index 6de0f2b77..40f8de279 100644 --- a/src/index_controller/actor_index_controller/index_actor.rs +++ b/src/index_controller/actor_index_controller/index_actor.rs @@ -9,13 +9,15 @@ use milli::Index; use std::collections::hash_map::Entry; use std::fs::create_dir_all; use heed::EnvOpenOptions; -use crate::index_controller::IndexMetadata; +use crate::index_controller::{IndexMetadata, UpdateMeta, updates::{Processed, Failed, Processing}, UpdateResult as UResult}; pub type Result = std::result::Result; type AsyncMap = Arc>>; +type UpdateResult = std::result::Result, Failed>; enum IndexMsg { CreateIndex { uuid: Uuid, primary_key: Option, ret: oneshot::Sender> }, + Update { meta: Processing, data: std::fs::File, ret: oneshot::Sender}, } struct IndexActor { @@ -45,6 +47,7 @@ impl IndexActor { loop { match self.inbox.recv().await { Some(IndexMsg::CreateIndex { uuid, primary_key, ret }) => self.handle_create_index(uuid, primary_key, ret).await, + Some(IndexMsg::Update { ret, meta, data }) => self.handle_update().await, None => break, } } @@ -54,6 +57,10 @@ impl IndexActor { let result = self.store.create_index(uuid, primary_key).await; let _ = ret.send(result); } + + async fn handle_update(&self) { + println!("processing update!!!"); + } } #[derive(Clone)] @@ -77,6 +84,13 @@ impl IndexActorHandle { let _ = self.sender.send(msg).await; receiver.await.expect("IndexActor has been killed") } + + pub async fn update(&self, meta: Processing, data: std::fs::File) -> UpdateResult { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Update { ret, meta, data }; + let _ = self.sender.send(msg).await; + receiver.await.expect("IndexActor has been killed") + } } struct MapIndexStore { @@ -103,8 +117,6 @@ impl IndexStore for MapIndexStore { let db_path = self.root.join(format!("index-{}", meta.uuid)); - - println!("before blocking"); let index: Result = tokio::task::spawn_blocking(move || { create_dir_all(&db_path).expect("can't create db"); let mut options = EnvOpenOptions::new(); @@ -113,7 +125,6 @@ impl IndexStore for MapIndexStore { .map_err(|e| IndexError::Error(e))?; Ok(index) }).await.expect("thread died"); - println!("after blocking"); self.index_store.write().await.insert(meta.uuid.clone(), index?); diff --git a/src/index_controller/actor_index_controller/mod.rs b/src/index_controller/actor_index_controller/mod.rs index bfcac7a3f..621bbb1d6 100644 --- a/src/index_controller/actor_index_controller/mod.rs +++ b/src/index_controller/actor_index_controller/mod.rs @@ -1,24 +1,28 @@ mod index_actor; mod update_actor; mod uuid_resolver; +mod update_store; -use tokio::fs::File; use tokio::sync::oneshot; use super::IndexController; use uuid::Uuid; use super::IndexMetadata; +use tokio::fs::File; +use super::UpdateMeta; pub struct ActorIndexController { uuid_resolver: uuid_resolver::UuidResolverHandle, - index_actor: index_actor::IndexActorHandle, + index_handle: index_actor::IndexActorHandle, + update_handle: update_actor::UpdateActorHandle, } impl ActorIndexController { pub fn new() -> Self { let uuid_resolver = uuid_resolver::UuidResolverHandle::new(); let index_actor = index_actor::IndexActorHandle::new(); - Self { uuid_resolver, index_actor } + let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone()); + Self { uuid_resolver, index_handle: index_actor, update_handle } } } @@ -31,7 +35,7 @@ enum IndexControllerMsg { Shutdown, } -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] impl IndexController for ActorIndexController { async fn add_documents( &self, @@ -41,7 +45,10 @@ impl IndexController for ActorIndexController { data: File, primary_key: Option, ) -> anyhow::Result { - todo!() + let uuid = self.uuid_resolver.get_or_create(index).await?; + let meta = UpdateMeta::DocumentsAddition { method, format, primary_key }; + let status = self.update_handle.update(meta, Some(data), uuid).await?; + Ok(status) } fn clear_documents(&self, index: String) -> anyhow::Result { @@ -59,7 +66,7 @@ impl IndexController for ActorIndexController { async fn create_index(&self, index_settings: super::IndexSettings) -> anyhow::Result { let super::IndexSettings { name, primary_key } = index_settings; let uuid = self.uuid_resolver.create(name.unwrap()).await?; - let index_meta = self.index_actor.create_index(uuid, primary_key).await?; + let index_meta = self.index_handle.create_index(uuid, primary_key).await?; Ok(index_meta) } diff --git a/src/index_controller/actor_index_controller/update_actor.rs b/src/index_controller/actor_index_controller/update_actor.rs index 9fd3cc39f..a0bf011c5 100644 --- a/src/index_controller/actor_index_controller/update_actor.rs +++ b/src/index_controller/actor_index_controller/update_actor.rs @@ -1,16 +1,99 @@ use super::index_actor::IndexActorHandle; use uuid::Uuid; use tokio::sync::{mpsc, oneshot}; +use crate::index_controller::{UpdateMeta, UpdateStatus, UpdateResult}; +use thiserror::Error; +use tokio::io::AsyncReadExt; +use log::info; +use tokio::fs::File; +use std::path::PathBuf; +use std::fs::create_dir_all; +use std::sync::Arc; + +pub type Result = std::result::Result; +type UpdateStore = super::update_store::UpdateStore; + +#[derive(Debug, Error)] +pub enum UpdateError {} enum UpdateMsg { CreateIndex{ uuid: Uuid, - ret: oneshot::Sender>, + ret: oneshot::Sender>, + }, + Update { + uuid: Uuid, + meta: UpdateMeta, + payload: Option, + ret: oneshot::Sender> } } -struct UpdateActor { - update_store: S, +struct UpdateActor { + store: Arc, inbox: mpsc::Receiver, - index_actor: IndexActorHandle, + index_handle: IndexActorHandle, +} + +impl UpdateActor { + fn new(store: Arc, inbox: mpsc::Receiver, index_handle: IndexActorHandle) -> Self { + Self { store, inbox, index_handle } + } + + async fn run(mut self) { + + info!("started update actor."); + + loop { + match self.inbox.recv().await { + Some(UpdateMsg::Update { uuid, meta, payload, ret }) => self.handle_update(uuid, meta, payload, ret).await, + Some(_) => {} + None => {} + } + } + } + + async fn handle_update(&self, _uuid: Uuid, meta: UpdateMeta, payload: Option, ret: oneshot::Sender>) { + let mut buf = Vec::new(); + let mut payload = payload.unwrap(); + payload.read_to_end(&mut buf).await.unwrap(); + let result = self.store.register_update(meta, &buf).unwrap(); + let _ = ret.send(Ok(UpdateStatus::Pending(result))); + } +} + +#[derive(Clone)] +pub struct UpdateActorHandle { + sender: mpsc::Sender, +} + +impl UpdateActorHandle { + pub fn new(index_handle: IndexActorHandle) -> Self { + let (sender, receiver) = mpsc::channel(100); + let mut options = heed::EnvOpenOptions::new(); + options.map_size(4096 * 100_000); + let mut path = PathBuf::new(); + path.push("data.ms"); + path.push("updates"); + create_dir_all(&path).unwrap(); + let index_handle_clone = index_handle.clone(); + let store = UpdateStore::open(options, &path, move |meta, file| { + futures::executor::block_on(index_handle_clone.update(meta, file)) + }).unwrap(); + let actor = UpdateActor::new(store, receiver, index_handle); + tokio::task::spawn_local(actor.run()); + Self { sender } + } + + pub async fn update(&self, meta: UpdateMeta, payload: Option, uuid: Uuid) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Update { + uuid, + payload, + meta, + ret, + }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } } diff --git a/src/index_controller/actor_index_controller/uuid_resolver.rs b/src/index_controller/actor_index_controller/uuid_resolver.rs index d5756d05e..b75c0402c 100644 --- a/src/index_controller/actor_index_controller/uuid_resolver.rs +++ b/src/index_controller/actor_index_controller/uuid_resolver.rs @@ -14,6 +14,10 @@ enum UuidResolveMsg { name: String, ret: oneshot::Sender>>, }, + GetOrCreate { + name: String, + ret: oneshot::Sender>, + }, Create { name: String, ret: oneshot::Sender>, @@ -41,15 +45,21 @@ impl UuidResolverActor { loop { match self.inbox.recv().await { Some(Create { name, ret }) => self.handle_create(name, ret).await, - Some(_) => (), - // all senders have ned dropped, need to quit. + Some(GetOrCreate { name, ret }) => self.handle_get_or_create(name, ret).await, + Some(_) => {} + // all senders have been dropped, need to quit. None => break, } } } async fn handle_create(&self, name: String, ret: oneshot::Sender>) { - let result = self.store.create_uuid(name).await; + let result = self.store.create_uuid(name, true).await; + let _ = ret.send(result); + } + + async fn handle_get_or_create(&self, name: String, ret: oneshot::Sender>) { + let result = self.store.create_uuid(name, false).await; let _ = ret.send(result); } } @@ -75,6 +85,13 @@ impl UuidResolverHandle { Ok(receiver.await.expect("Uuid resolver actor has been killed")?) } + pub async fn get_or_create(&self, name: String) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::GetOrCreate { name, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver.await.expect("Uuid resolver actor has been killed")?) + } + pub async fn create(&self, name: String) -> anyhow::Result { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::Create { name, ret }; @@ -91,7 +108,9 @@ pub enum UuidError { #[async_trait::async_trait] trait UuidStore { - async fn create_uuid(&self, name: String) -> Result; + // Create a new entry for `name`. Return an error if `err` and the entry already exists, return + // the uuid otherwise. + async fn create_uuid(&self, name: String, err: bool) -> Result; async fn get_uuid(&self, name: String) -> Result>; } @@ -99,9 +118,15 @@ struct MapUuidStore(Arc>>); #[async_trait::async_trait] impl UuidStore for MapUuidStore { - async fn create_uuid(&self, name: String) -> Result { + async fn create_uuid(&self, name: String, err: bool) -> Result { match self.0.write().await.entry(name) { - Entry::Occupied(_) => Err(UuidError::NameAlreadyExist), + Entry::Occupied(entry) => { + if err { + Err(UuidError::NameAlreadyExist) + } else { + Ok(entry.get().clone()) + } + }, Entry::Vacant(entry) => { let uuid = Uuid::new_v4(); let uuid = entry.insert(uuid); diff --git a/src/index_controller/local_index_controller/update_store.rs b/src/index_controller/local_index_controller/update_store.rs deleted file mode 100644 index b025ff090..000000000 --- a/src/index_controller/local_index_controller/update_store.rs +++ /dev/null @@ -1,407 +0,0 @@ -use std::path::Path; -use std::sync::{Arc, RwLock}; - -use crossbeam_channel::Sender; -use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; -use heed::{EnvOpenOptions, Env, Database}; -use serde::{Serialize, Deserialize}; - -use crate::index_controller::updates::*; - -type BEU64 = heed::zerocopy::U64; - -#[derive(Clone)] -pub struct UpdateStore { - env: Env, - pending_meta: Database, SerdeJson>>, - pending: Database, ByteSlice>, - processed_meta: Database, SerdeJson>>, - failed_meta: Database, SerdeJson>>, - aborted_meta: Database, SerdeJson>>, - processing: Arc>>>, - notification_sender: Sender<()>, -} - -pub trait HandleUpdate { - fn handle_update(&mut self, meta: Processing, content: &[u8]) -> Result, Failed>; -} - -impl UpdateStore -where - M: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync + Clone, - N: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync, - E: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync, -{ - pub fn open( - mut options: EnvOpenOptions, - path: P, - mut update_handler: U, - ) -> heed::Result> - where - P: AsRef, - U: HandleUpdate + Send + 'static, - { - options.max_dbs(5); - - let env = options.open(path)?; - let pending_meta = env.create_database(Some("pending-meta"))?; - let pending = env.create_database(Some("pending"))?; - let processed_meta = env.create_database(Some("processed-meta"))?; - let aborted_meta = env.create_database(Some("aborted-meta"))?; - let failed_meta = env.create_database(Some("failed-meta"))?; - let processing = Arc::new(RwLock::new(None)); - - let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); - // Send a first notification to trigger the process. - let _ = notification_sender.send(()); - - let update_store = Arc::new(UpdateStore { - env, - pending, - pending_meta, - processed_meta, - aborted_meta, - notification_sender, - failed_meta, - processing, - }); - - // We need a weak reference so we can take ownership on the arc later when we - // want to close the index. - let update_store_weak = Arc::downgrade(&update_store); - std::thread::spawn(move || { - // Block and wait for something to process. - 'outer: for _ in notification_receiver { - loop { - match update_store_weak.upgrade() { - Some(update_store) => { - match update_store.process_pending_update(&mut update_handler) { - Ok(Some(_)) => (), - Ok(None) => break, - Err(e) => eprintln!("error while processing update: {}", e), - } - } - // the ownership on the arc has been taken, we need to exit. - None => break 'outer, - } - } - } - }); - - Ok(update_store) - } - - pub fn prepare_for_closing(self) -> heed::EnvClosingEvent { - self.env.prepare_for_closing() - } - - /// Returns the new biggest id to use to store the new update. - fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { - let last_pending = self.pending_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); - - let last_processed = self.processed_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); - - let last_aborted = self.aborted_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); - - let last_update_id = [last_pending, last_processed, last_aborted] - .iter() - .copied() - .flatten() - .max(); - - match last_update_id { - Some(last_id) => Ok(last_id + 1), - None => Ok(0), - } - } - - /// Registers the update content in the pending store and the meta - /// into the pending-meta store. Returns the new unique update id. - pub fn register_update( - &self, - meta: M, - content: &[u8] - ) -> heed::Result> { - let mut wtxn = self.env.write_txn()?; - - // We ask the update store to give us a new update id, this is safe, - // no other update can have the same id because we use a write txn before - // asking for the id and registering it so other update registering - // will be forced to wait for a new write txn. - let update_id = self.new_update_id(&wtxn)?; - let update_key = BEU64::new(update_id); - - let meta = Pending::new(meta, update_id); - self.pending_meta.put(&mut wtxn, &update_key, &meta)?; - self.pending.put(&mut wtxn, &update_key, content)?; - - wtxn.commit()?; - - if let Err(e) = self.notification_sender.try_send(()) { - assert!(!e.is_disconnected(), "update notification channel is disconnected"); - } - Ok(meta) - } - /// Executes the user provided function on the next pending update (the one with the lowest id). - /// This is asynchronous as it let the user process the update with a read-only txn and - /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, handler: &mut U) -> heed::Result> - where - U: HandleUpdate + Send + 'static, - { - // Create a read transaction to be able to retrieve the pending update in order. - let rtxn = self.env.read_txn()?; - let first_meta = self.pending_meta.first(&rtxn)?; - - // If there is a pending update we process and only keep - // a reader while processing it, not a writer. - match first_meta { - Some((first_id, pending)) => { - let first_content = self.pending - .get(&rtxn, &first_id)? - .expect("associated update content"); - - // we change the state of the update from pending to processing before we pass it - // to the update handler. Processing store is non persistent to be able recover - // from a failure - let processing = pending.processing(); - self.processing - .write() - .unwrap() - .replace(processing.clone()); - // Process the pending update using the provided user function. - let result = handler.handle_update(processing, first_content); - drop(rtxn); - - // Once the pending update have been successfully processed - // we must remove the content from the pending and processing stores and - // write the *new* meta to the processed-meta store and commit. - let mut wtxn = self.env.write_txn()?; - self.processing - .write() - .unwrap() - .take(); - self.pending_meta.delete(&mut wtxn, &first_id)?; - self.pending.delete(&mut wtxn, &first_id)?; - match result { - Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?, - Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?, - } - wtxn.commit()?; - - Ok(Some(())) - }, - None => Ok(None) - } - } - - /// Execute the user defined function with the meta-store iterators, the first - /// iterator is the *processed* meta one, the second the *aborted* meta one - /// and, the last is the *pending* meta one. - pub fn iter_metas(&self, mut f: F) -> heed::Result - where - F: for<'a> FnMut( - Option>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - ) -> heed::Result, - { - let rtxn = self.env.read_txn()?; - - // We get the pending, processed and aborted meta iterators. - let processed_iter = self.processed_meta.iter(&rtxn)?; - let aborted_iter = self.aborted_meta.iter(&rtxn)?; - let pending_iter = self.pending_meta.iter(&rtxn)?; - let processing = self.processing.read().unwrap().clone(); - let failed_iter = self.failed_meta.iter(&rtxn)?; - - // We execute the user defined function with both iterators. - (f)(processing, processed_iter, aborted_iter, pending_iter, failed_iter) - } - - /// Returns the update associated meta or `None` if the update doesn't exist. - pub fn meta(&self, update_id: u64) -> heed::Result>> { - let rtxn = self.env.read_txn()?; - let key = BEU64::new(update_id); - - if let Some(ref meta) = *self.processing.read().unwrap() { - if meta.id() == update_id { - return Ok(Some(UpdateStatus::Processing(meta.clone()))); - } - } - - if let Some(meta) = self.pending_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Pending(meta))); - } - - if let Some(meta) = self.processed_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Processed(meta))); - } - - if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Aborted(meta))); - } - - if let Some(meta) = self.failed_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Failed(meta))); - } - - Ok(None) - } - - /// Aborts an update, an aborted update content is deleted and - /// the meta of it is moved into the aborted updates database. - /// - /// Trying to abort an update that is currently being processed, an update - /// that as already been processed or which doesn't actually exist, will - /// return `None`. - #[allow(dead_code)] - pub fn abort_update(&self, update_id: u64) -> heed::Result>> { - let mut wtxn = self.env.write_txn()?; - let key = BEU64::new(update_id); - - // We cannot abort an update that is currently being processed. - if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) { - return Ok(None); - } - - let pending = match self.pending_meta.get(&wtxn, &key)? { - Some(meta) => meta, - None => return Ok(None), - }; - - let aborted = pending.abort(); - - self.aborted_meta.put(&mut wtxn, &key, &aborted)?; - self.pending_meta.delete(&mut wtxn, &key)?; - self.pending.delete(&mut wtxn, &key)?; - - wtxn.commit()?; - - Ok(Some(aborted)) - } - - /// Aborts all the pending updates, and not the one being currently processed. - /// Returns the update metas and ids that were successfully aborted. - #[allow(dead_code)] - pub fn abort_pendings(&self) -> heed::Result)>> { - let mut wtxn = self.env.write_txn()?; - let mut aborted_updates = Vec::new(); - - // We skip the first pending update as it is currently being processed. - for result in self.pending_meta.iter(&wtxn)?.skip(1) { - let (key, pending) = result?; - let id = key.get(); - aborted_updates.push((id, pending.abort())); - } - - for (id, aborted) in &aborted_updates { - let key = BEU64::new(*id); - self.aborted_meta.put(&mut wtxn, &key, &aborted)?; - self.pending_meta.delete(&mut wtxn, &key)?; - self.pending.delete(&mut wtxn, &key)?; - } - - wtxn.commit()?; - - Ok(aborted_updates) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::thread; - use std::time::{Duration, Instant}; - - impl HandleUpdate for F - where F: FnMut(Processing, &[u8]) -> Result, Failed> + Send + 'static { - fn handle_update(&mut self, meta: Processing, content: &[u8]) -> Result, Failed> { - self(meta, content) - } - } - - #[test] - fn simple() { - let dir = tempfile::tempdir().unwrap(); - let mut options = EnvOpenOptions::new(); - options.map_size(4096 * 100); - let update_store = UpdateStore::open(options, dir, |meta: Processing, _content: &_| -> Result<_, Failed<_, ()>> { - let new_meta = meta.meta().to_string() + " processed"; - let processed = meta.process(new_meta); - Ok(processed) - }).unwrap(); - - let meta = String::from("kiki"); - let update = update_store.register_update(meta, &[]).unwrap(); - thread::sleep(Duration::from_millis(100)); - let meta = update_store.meta(update.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "kiki processed"); - } else { - panic!() - } - } - - #[test] - #[ignore] - fn long_running_update() { - let dir = tempfile::tempdir().unwrap(); - let mut options = EnvOpenOptions::new(); - options.map_size(4096 * 100); - let update_store = UpdateStore::open(options, dir, |meta: Processing, _content:&_| -> Result<_, Failed<_, ()>> { - thread::sleep(Duration::from_millis(400)); - let new_meta = meta.meta().to_string() + "processed"; - let processed = meta.process(new_meta); - Ok(processed) - }).unwrap(); - - let before_register = Instant::now(); - - let meta = String::from("kiki"); - let update_kiki = update_store.register_update(meta, &[]).unwrap(); - assert!(before_register.elapsed() < Duration::from_millis(200)); - - let meta = String::from("coco"); - let update_coco = update_store.register_update(meta, &[]).unwrap(); - assert!(before_register.elapsed() < Duration::from_millis(200)); - - let meta = String::from("cucu"); - let update_cucu = update_store.register_update(meta, &[]).unwrap(); - assert!(before_register.elapsed() < Duration::from_millis(200)); - - thread::sleep(Duration::from_millis(400 * 3 + 100)); - - let meta = update_store.meta(update_kiki.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "kiki processed"); - } else { - panic!() - } - - let meta = update_store.meta(update_coco.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "coco processed"); - } else { - panic!() - } - - let meta = update_store.meta(update_cucu.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "cucu processed"); - } else { - panic!() - } - } -} diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index ed085456b..dc6cc3863 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -12,7 +12,7 @@ use milli::Index; use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; use uuid::Uuid; -use actix_web::web::Payload; +use tokio::fs::File; pub use updates::{Processed, Processing, Failed}; @@ -113,7 +113,7 @@ pub struct IndexSettings { /// be provided. This allows the implementer to define the behaviour of write accesses to the /// indices, and abstract the scheduling of the updates. The implementer must be able to provide an /// instance of `IndexStore` -#[async_trait::async_trait] +#[async_trait::async_trait(?Send)] pub trait IndexController { /* @@ -131,7 +131,7 @@ pub trait IndexController { index: String, method: IndexDocumentsMethod, format: UpdateFormat, - data: Payload, + data: File, primary_key: Option, ) -> anyhow::Result; diff --git a/src/routes/document.rs b/src/routes/document.rs index 2cb87074e..00d037359 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -3,7 +3,7 @@ use actix_web::{delete, get, post, put}; use actix_web::{web, HttpResponse}; use indexmap::IndexMap; use log::error; -//use milli::update::{IndexDocumentsMethod, UpdateFormat}; +use milli::update::{IndexDocumentsMethod, UpdateFormat}; use serde::Deserialize; use serde_json::Value; @@ -142,26 +142,25 @@ async fn add_documents_json( params: web::Query, body: Payload, ) -> Result { - todo!() - //let addition_result = data - //.add_documents( - //path.into_inner().index_uid, - //IndexDocumentsMethod::ReplaceDocuments, - //UpdateFormat::Json, - //body, - //params.primary_key.clone(), - //).await; + let addition_result = data + .add_documents( + path.into_inner().index_uid, + IndexDocumentsMethod::ReplaceDocuments, + UpdateFormat::Json, + body, + params.primary_key.clone(), + ).await; - //match addition_result { - //Ok(update) => { - //let value = serde_json::to_string(&update).unwrap(); - //let response = HttpResponse::Ok().body(value); - //Ok(response) - //} - //Err(e) => { - //Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) - //} - //} + match addition_result { + Ok(update) => { + let value = serde_json::to_string(&update).unwrap(); + let response = HttpResponse::Ok().body(value); + Ok(response) + } + Err(e) => { + Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) + } + } }