[WIP] poc 2pc

This commit is contained in:
Tamo 2021-07-29 18:01:19 +02:00
parent 782acc5a7d
commit 01925af1de
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
7 changed files with 97 additions and 53 deletions

View File

@ -1,12 +1,13 @@
use std::fs::File; use std::fs::File;
use std::sync::{mpsc, Arc};
use crate::index::Index; use crate::index::Index;
use milli::update::UpdateBuilder; use milli::update::UpdateBuilder;
use milli::CompressionType; use milli::CompressionType;
use rayon::ThreadPool; use rayon::ThreadPool;
use crate::index_controller::UpdateMeta; use crate::index_controller::{Aborted, Done, Failed, Processed, Processing};
use crate::index_controller::{Failed, Processed, Processing}; use crate::index_controller::{UpdateMeta, UpdateResult};
use crate::option::IndexerOpts; use crate::option::IndexerOpts;
pub struct UpdateHandler { pub struct UpdateHandler {
@ -54,15 +55,17 @@ impl UpdateHandler {
pub fn handle_update( pub fn handle_update(
&self, &self,
channel: mpsc::Sender<(mpsc::Sender<Hello>, Result<Processed, Failed>)>,
meta: Processing, meta: Processing,
content: Option<File>, content: Option<File>,
index: Index, index: Index,
) -> Result<Processed, Failed> { ) -> Result<Done, Aborted> {
use UpdateMeta::*; use UpdateMeta::*;
let update_id = meta.id(); let update_id = meta.id();
let update_builder = self.update_builder(update_id); let update_builder = self.update_builder(update_id);
let mut wtxn = index.write_txn().unwrap();
let result = match meta.meta() { let result = match meta.meta() {
DocumentsAddition { DocumentsAddition {
@ -70,20 +73,47 @@ impl UpdateHandler {
format, format,
primary_key, primary_key,
} => index.update_documents( } => index.update_documents(
&mut wtxn,
*format, *format,
*method, *method,
content, content,
update_builder, update_builder,
primary_key.as_deref(), primary_key.as_deref(),
), ),
ClearDocuments => index.clear_documents(update_builder), ClearDocuments => index.clear_documents(&mut wtxn, update_builder),
DeleteDocuments { ids } => index.delete_documents(ids, update_builder), DeleteDocuments { ids } => index.delete_documents(&mut wtxn, ids, update_builder),
Settings(settings) => index.update_settings(&settings.clone().check(), update_builder), Settings(settings) => {
index.update_settings(&mut wtxn, &settings.clone().check(), update_builder)
}
}; };
match result { let result = match result {
Ok(result) => Ok(meta.process(result)), Ok(result) => Ok(meta.process(result)),
Err(e) => Err(meta.fail(e.into())), Err(e) => Err(meta.fail(e.into())),
};
let (sender, receiver) = mpsc::channel();
channel.send((sender, result));
// here we should decide how we want to handle a failure. probably by closing the channel
// right: for now I'm just going to panic
let meta = result.unwrap();
match receiver.recv() {
Ok(Hello::Abort) => Err(meta.abort()),
Ok(Hello::Commit) => wtxn
.commit()
.map(|ok| meta.commit())
.map_err(|e| meta.abort()),
Err(e) => panic!("update actor died {}", e),
} }
} }
} }
/// MARIN: I can't find any good name for this and I'm not even sure we need a new enum
pub enum Hello {
Commit,
Abort,
}

View File

@ -4,6 +4,7 @@ use std::marker::PhantomData;
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
use heed::RwTxn;
use log::{debug, info, trace}; use log::{debug, info, trace};
use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder, UpdateFormat}; use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder, UpdateFormat};
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
@ -160,24 +161,17 @@ pub struct Facets {
} }
impl Index { impl Index {
pub fn update_documents( pub fn update_documents<'a>(
&self, &'a self,
txn: &mut RwTxn<'a, 'a>,
format: UpdateFormat, format: UpdateFormat,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
content: Option<impl io::Read>, content: Option<impl io::Read>,
update_builder: UpdateBuilder, update_builder: UpdateBuilder,
primary_key: Option<&str>, primary_key: Option<&str>,
) -> Result<UpdateResult> { ) -> Result<UpdateResult> {
let mut txn = self.write_txn()?; let result =
let result = self.update_documents_txn( self.update_documents_txn(txn, format, method, content, update_builder, primary_key)?;
&mut txn,
format,
method,
content,
update_builder,
primary_key,
)?;
txn.commit()?;
Ok(result) Ok(result)
} }
@ -220,16 +214,14 @@ impl Index {
Ok(UpdateResult::DocumentsAddition(addition)) Ok(UpdateResult::DocumentsAddition(addition))
} }
pub fn clear_documents(&self, update_builder: UpdateBuilder) -> Result<UpdateResult> { pub fn clear_documents<'a>(
// We must use the write transaction of the update here. &'a self,
let mut wtxn = self.write_txn()?; wtxn: &mut RwTxn<'a, 'a>,
let builder = update_builder.clear_documents(&mut wtxn, self); update_builder: UpdateBuilder,
) -> Result<UpdateResult> {
let builder = update_builder.clear_documents(wtxn, self);
let _count = builder.execute()?; let _count = builder.execute()?;
Ok(UpdateResult::Other)
wtxn.commit()
.and(Ok(UpdateResult::Other))
.map_err(Into::into)
} }
pub fn update_settings_txn<'a, 'b>( pub fn update_settings_txn<'a, 'b>(
@ -302,8 +294,9 @@ impl Index {
Ok(UpdateResult::Other) Ok(UpdateResult::Other)
} }
pub fn update_settings( pub fn update_settings<'a>(
&self, &'a self,
txn: &mut RwTxn<'a, 'a>,
settings: &Settings<Checked>, settings: &Settings<Checked>,
update_builder: UpdateBuilder, update_builder: UpdateBuilder,
) -> Result<UpdateResult> { ) -> Result<UpdateResult> {
@ -313,12 +306,12 @@ impl Index {
Ok(result) Ok(result)
} }
pub fn delete_documents( pub fn delete_documents<'a>(
&self, &'a self,
txn: &mut RwTxn<'a, 'a>,
document_ids: &[String], document_ids: &[String],
update_builder: UpdateBuilder, update_builder: UpdateBuilder,
) -> Result<UpdateResult> { ) -> Result<UpdateResult> {
let mut txn = self.write_txn()?;
let mut builder = update_builder.delete_documents(&mut txn, self)?; let mut builder = update_builder.delete_documents(&mut txn, self)?;
// We ignore unexisting document ids // We ignore unexisting document ids
@ -327,9 +320,7 @@ impl Index {
}); });
let deleted = builder.execute()?; let deleted = builder.execute()?;
txn.commit() Ok(UpdateResult::DocumentDeletion { deleted })
.and(Ok(UpdateResult::DocumentDeletion { deleted }))
.map_err(Into::into)
} }
} }

View File

@ -7,16 +7,16 @@ use futures::stream::StreamExt;
use heed::CompactionOption; use heed::CompactionOption;
use log::debug; use log::debug;
use milli::update::UpdateBuilder; use milli::update::UpdateBuilder;
use tokio::sync::oneshot;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use tokio::{fs, sync::mpsc}; use tokio::{fs, sync::mpsc};
use uuid::Uuid; use uuid::Uuid;
use crate::index::update_handler::Hello;
use crate::index::{ use crate::index::{
update_handler::UpdateHandler, Checked, Document, SearchQuery, SearchResult, Settings, update_handler::UpdateHandler, Checked, Document, SearchQuery, SearchResult, Settings,
}; };
use crate::index_controller::{ use crate::index_controller::{Aborted, Done, Failed, IndexStats, Processed, Processing, get_arc_ownership_blocking};
get_arc_ownership_blocking, Failed, IndexStats, Processed, Processing,
};
use crate::option::IndexerOpts; use crate::option::IndexerOpts;
use super::error::{IndexActorError, Result}; use super::error::{IndexActorError, Result};
@ -81,11 +81,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
} }
Update { Update {
ret, ret,
channel,
meta, meta,
data, data,
uuid, uuid,
} => { } => {
let _ = ret.send(self.handle_update(uuid, meta, data).await); let _ = ret.send(self.handle_update(channel, uuid, meta, data).await);
} }
Search { ret, query, uuid } => { Search { ret, query, uuid } => {
let _ = ret.send(self.handle_search(uuid, query).await); let _ = ret.send(self.handle_search(uuid, query).await);
@ -163,10 +164,11 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
async fn handle_update( async fn handle_update(
&self, &self,
channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender<Hello>, std::result::Result<Processed, Failed>)>,
uuid: Uuid, uuid: Uuid,
meta: Processing, meta: Processing,
data: Option<File>, data: Option<File>,
) -> Result<std::result::Result<Processed, Failed>> { ) -> Result<std::result::Result<Done, Aborted>> {
debug!("Processing update {}", meta.id()); debug!("Processing update {}", meta.id());
let update_handler = self.update_handler.clone(); let update_handler = self.update_handler.clone();
let index = match self.store.get(uuid).await? { let index = match self.store.get(uuid).await? {
@ -174,7 +176,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
None => self.store.create(uuid, None).await?, None => self.store.create(uuid, None).await?,
}; };
Ok(spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?) Ok(spawn_blocking(move || update_handler.handle_update(channel, meta, data, index)).await?)
} }
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings<Checked>> { async fn handle_settings(&self, uuid: Uuid) -> Result<Settings<Checked>> {

View File

@ -4,10 +4,7 @@ use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{index::{Checked, update_handler::Hello}, index_controller::{IndexSettings, IndexStats, Processing}};
index::Checked,
index_controller::{IndexSettings, IndexStats, Processing},
};
use crate::{ use crate::{
index::{Document, SearchQuery, SearchResult, Settings}, index::{Document, SearchQuery, SearchResult, Settings},
index_controller::{Failed, Processed}, index_controller::{Failed, Processed},
@ -36,13 +33,15 @@ impl IndexActorHandle for IndexActorHandleImpl {
async fn update( async fn update(
&self, &self,
channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender<Hello>, std::result::Result<Processed, Failed>)>,
uuid: Uuid, uuid: Uuid,
meta: Processing, meta: Processing,
data: Option<std::fs::File>, data: Option<std::fs::File>,
) -> Result<std::result::Result<Processed, Failed>> { ) -> Result<std::result::Result<(Processed, oneshot::Sender<()>), Failed>> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Update { let msg = IndexMsg::Update {
ret, ret,
channel,
meta, meta,
data, data,
uuid, uuid,

View File

@ -4,6 +4,7 @@ use tokio::sync::oneshot;
use uuid::Uuid; use uuid::Uuid;
use super::error::Result as IndexResult; use super::error::Result as IndexResult;
use crate::index::update_handler::Hello;
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::{Failed, IndexStats, Processed, Processing}; use crate::index_controller::{Failed, IndexStats, Processed, Processing};
@ -18,9 +19,10 @@ pub enum IndexMsg {
}, },
Update { Update {
uuid: Uuid, uuid: Uuid,
channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender<Hello>, std::result::Result<Processed, Failed>)>,
meta: Processing, meta: Processing,
data: Option<std::fs::File>, data: Option<std::fs::File>,
ret: oneshot::Sender<IndexResult<Result<Processed, Failed>>>, ret: oneshot::Sender<IndexResult<Result<(Processed, oneshot::Sender<()>), Failed>>>,
}, },
Search { Search {
uuid: Uuid, uuid: Uuid,

View File

@ -13,6 +13,7 @@ pub use handle_impl::IndexActorHandleImpl;
use message::IndexMsg; use message::IndexMsg;
use store::{IndexStore, MapIndexStore}; use store::{IndexStore, MapIndexStore};
use crate::index::update_handler::Hello;
use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings}; use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings};
use crate::index_controller::{Failed, IndexStats, Processed, Processing}; use crate::index_controller::{Failed, IndexStats, Processed, Processing};
use error::Result; use error::Result;
@ -57,6 +58,7 @@ pub trait IndexActorHandle {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta>; async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta>;
async fn update( async fn update(
&self, &self,
channel: std::sync::mpsc::Sender<(std::sync::mpsc::Sender<Hello>, std::result::Result<Processed, Failed>)>,
uuid: Uuid, uuid: Uuid,
meta: Processing, meta: Processing,
data: Option<File>, data: Option<File>,

View File

@ -2,6 +2,7 @@ mod codec;
pub mod dump; pub mod dump;
use std::fs::{copy, create_dir_all, remove_file, File}; use std::fs::{copy, create_dir_all, remove_file, File};
use std::io::BufRead;
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
@ -29,6 +30,7 @@ use codec::*;
use super::error::Result; use super::error::Result;
use super::UpdateMeta; use super::UpdateMeta;
use crate::helpers::EnvSizer; use crate::helpers::EnvSizer;
use crate::index::update_handler::Hello;
use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle};
#[allow(clippy::upper_case_acronyms)] #[allow(clippy::upper_case_acronyms)]
@ -329,13 +331,29 @@ impl UpdateStore {
None => None, None => None,
}; };
let (sender, receiver) = std::sync::mpsc::channel();
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
let handle = Handle::current(); let handle =
let result = tokio::task::spawn(index_handle.update(sender, index_uuid, processing.clone(), file));
match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) {
Ok(result) => result, let (sender2, receiver) = std::sync::mpsc::channel();
Err(e) => Err(processing.fail(e.into())), // TODO: we should not panic here
let (sender, result) = receiver.recv().unwrap();
let mut line = String::new();
loop {
std::io::stdin().lock().read_line(&mut line).unwrap();
match line.as_str() {
"commit" => {
sender.send((sender2, Hello::Commit));
break;
}
"abort" => {
sender.send((sender2, Hello::Abort));
break;
}
_ => (),
}; };
}
// Once the pending update have been successfully processed // Once the pending update have been successfully processed
// we must remove the content from the pending and processing stores and // we must remove the content from the pending and processing stores and