2021-04-22 10:14:29 +02:00
use std ::collections ::HashSet ;
2021-03-23 11:00:50 +01:00
use std ::io ::SeekFrom ;
use std ::path ::{ Path , PathBuf } ;
2021-06-09 16:19:45 +02:00
use std ::sync ::atomic ::AtomicBool ;
2021-04-13 17:14:02 +02:00
use std ::sync ::Arc ;
2021-03-23 11:00:50 +01:00
2021-06-02 17:45:28 +02:00
use async_stream ::stream ;
use futures ::StreamExt ;
2021-06-29 15:25:18 +02:00
use log ::trace ;
2021-07-29 16:07:04 +02:00
use serdeval ::* ;
2021-03-23 11:00:50 +01:00
use tokio ::fs ;
2021-04-22 10:14:29 +02:00
use tokio ::io ::AsyncWriteExt ;
2021-03-24 11:29:11 +01:00
use tokio ::sync ::mpsc ;
use uuid ::Uuid ;
2021-03-23 11:00:50 +01:00
2021-06-14 21:26:35 +02:00
use super ::error ::{ Result , UpdateActorError } ;
2021-06-15 17:39:07 +02:00
use super ::{ PayloadData , UpdateMsg , UpdateStore , UpdateStoreInfo } ;
2021-05-25 09:46:11 +02:00
use crate ::index_controller ::index_actor ::IndexActorHandle ;
2021-04-13 17:14:02 +02:00
use crate ::index_controller ::{ UpdateMeta , UpdateStatus } ;
2021-03-23 11:00:50 +01:00
2021-04-13 17:14:02 +02:00
pub struct UpdateActor < D , I > {
2021-03-23 11:00:50 +01:00
path : PathBuf ,
2021-04-13 17:14:02 +02:00
store : Arc < UpdateStore > ,
2021-06-02 17:45:28 +02:00
inbox : Option < mpsc ::Receiver < UpdateMsg < D > > > ,
2021-03-23 11:00:50 +01:00
index_handle : I ,
2021-06-09 16:19:45 +02:00
must_exit : Arc < AtomicBool > ,
2021-03-23 11:00:50 +01:00
}
2021-04-13 17:14:02 +02:00
impl < D , I > UpdateActor < D , I >
2021-03-23 11:00:50 +01:00
where
D : AsRef < [ u8 ] > + Sized + 'static ,
2021-03-23 16:19:01 +01:00
I : IndexActorHandle + Clone + Send + Sync + 'static ,
2021-03-23 11:00:50 +01:00
{
pub fn new (
2021-04-13 17:14:02 +02:00
update_db_size : usize ,
2021-03-23 11:00:50 +01:00
inbox : mpsc ::Receiver < UpdateMsg < D > > ,
path : impl AsRef < Path > ,
index_handle : I ,
2021-06-15 17:39:07 +02:00
) -> anyhow ::Result < Self > {
2021-04-22 10:14:29 +02:00
let path = path . as_ref ( ) . join ( " updates " ) ;
2021-04-13 17:14:02 +02:00
std ::fs ::create_dir_all ( & path ) ? ;
let mut options = heed ::EnvOpenOptions ::new ( ) ;
options . map_size ( update_db_size ) ;
2021-06-09 16:19:45 +02:00
let must_exit = Arc ::new ( AtomicBool ::new ( false ) ) ;
let store = UpdateStore ::open ( options , & path , index_handle . clone ( ) , must_exit . clone ( ) ) ? ;
2021-03-23 11:00:50 +01:00
std ::fs ::create_dir_all ( path . join ( " update_files " ) ) ? ;
2021-06-02 17:45:28 +02:00
let inbox = Some ( inbox ) ;
2021-05-25 09:46:11 +02:00
Ok ( Self {
path ,
store ,
inbox ,
index_handle ,
2021-06-09 16:19:45 +02:00
must_exit ,
2021-05-25 09:46:11 +02:00
} )
2021-03-23 11:00:50 +01:00
}
pub async fn run ( mut self ) {
use UpdateMsg ::* ;
2021-06-23 10:41:55 +02:00
trace! ( " Started update actor. " ) ;
2021-03-23 11:00:50 +01:00
2021-06-02 17:45:28 +02:00
let mut inbox = self
. inbox
. take ( )
. expect ( " A receiver should be present by now. " ) ;
2021-06-09 16:19:45 +02:00
2021-06-02 17:45:28 +02:00
let must_exit = self . must_exit . clone ( ) ;
let stream = stream! {
loop {
let msg = inbox . recv ( ) . await ;
2021-06-09 16:19:45 +02:00
2021-06-02 17:45:28 +02:00
if must_exit . load ( std ::sync ::atomic ::Ordering ::Relaxed ) {
break ;
2021-03-23 11:00:50 +01:00
}
2021-06-02 17:45:28 +02:00
match msg {
Some ( msg ) = > yield msg ,
None = > break ,
2021-04-09 15:41:24 +03:00
}
2021-03-23 11:00:50 +01:00
}
2021-06-02 17:45:28 +02:00
} ;
stream
. for_each_concurrent ( Some ( 10 ) , | msg | async {
match msg {
Update {
uuid ,
meta ,
data ,
ret ,
} = > {
let _ = ret . send ( self . handle_update ( uuid , meta , data ) . await ) ;
}
ListUpdates { uuid , ret } = > {
let _ = ret . send ( self . handle_list_updates ( uuid ) . await ) ;
}
GetUpdate { uuid , ret , id } = > {
let _ = ret . send ( self . handle_get_update ( uuid , id ) . await ) ;
}
Delete { uuid , ret } = > {
let _ = ret . send ( self . handle_delete ( uuid ) . await ) ;
}
Snapshot { uuids , path , ret } = > {
let _ = ret . send ( self . handle_snapshot ( uuids , path ) . await ) ;
}
GetInfo { ret } = > {
let _ = ret . send ( self . handle_get_info ( ) . await ) ;
}
Dump { uuids , path , ret } = > {
let _ = ret . send ( self . handle_dump ( uuids , path ) . await ) ;
}
}
} )
. await ;
2021-03-23 11:00:50 +01:00
}
async fn handle_update (
& self ,
uuid : Uuid ,
meta : UpdateMeta ,
2021-06-23 14:45:26 +02:00
payload : mpsc ::Receiver < PayloadData < D > > ,
2021-03-23 11:00:50 +01:00
) -> Result < UpdateStatus > {
2021-04-22 10:14:29 +02:00
let file_path = match meta {
2021-06-09 17:10:10 +02:00
UpdateMeta ::DocumentsAddition { .. } = > {
2021-04-22 10:14:29 +02:00
let update_file_id = uuid ::Uuid ::new_v4 ( ) ;
let path = self
. path
. join ( format! ( " update_files/update_ {} " , update_file_id ) ) ;
let mut file = fs ::OpenOptions ::new ( )
. read ( true )
. write ( true )
. create ( true )
. open ( & path )
2021-05-25 09:46:11 +02:00
. await ? ;
2021-04-22 10:14:29 +02:00
2021-06-23 14:48:33 +02:00
async fn write_to_file < D > (
file : & mut fs ::File ,
mut payload : mpsc ::Receiver < PayloadData < D > > ,
) -> Result < usize >
2021-06-23 14:45:26 +02:00
where
D : AsRef < [ u8 ] > + Sized + 'static ,
{
let mut file_len = 0 ;
2021-06-23 16:34:07 +02:00
2021-06-23 14:45:26 +02:00
while let Some ( bytes ) = payload . recv ( ) . await {
let bytes = bytes ? ;
file_len + = bytes . as_ref ( ) . len ( ) ;
file . write_all ( bytes . as_ref ( ) ) . await ? ;
}
2021-06-23 16:34:07 +02:00
file . flush ( ) . await ? ;
2021-06-23 14:45:26 +02:00
Ok ( file_len )
2021-04-22 10:14:29 +02:00
}
2021-06-23 14:45:26 +02:00
let file_len = write_to_file ( & mut file , payload ) . await ;
match file_len {
Ok ( len ) if len > 0 = > {
let file = file . into_std ( ) . await ;
Some ( ( file , update_file_id ) )
}
Err ( e ) = > {
fs ::remove_file ( & path ) . await ? ;
2021-06-23 14:48:33 +02:00
return Err ( e ) ;
2021-06-23 14:45:26 +02:00
}
_ = > {
fs ::remove_file ( & path ) . await ? ;
None
}
2021-03-23 11:00:50 +01:00
}
}
2021-05-25 09:46:11 +02:00
_ = > None ,
2021-04-22 10:14:29 +02:00
} ;
2021-03-23 11:00:50 +01:00
2021-04-13 17:14:02 +02:00
let update_store = self . store . clone ( ) ;
2021-03-23 11:00:50 +01:00
tokio ::task ::spawn_blocking ( move | | {
2021-07-29 16:07:04 +02:00
use std ::io ::{ BufReader , Seek } ;
2021-03-23 11:00:50 +01:00
// If the payload is empty, ignore the check.
2021-05-29 00:08:17 +02:00
let update_uuid = if let Some ( ( mut file , uuid ) ) = file_path {
2021-04-22 10:14:29 +02:00
// set the file back to the beginning
2021-05-25 09:46:11 +02:00
file . seek ( SeekFrom ::Start ( 0 ) ) ? ;
2021-03-23 11:00:50 +01:00
// Check that the json payload is valid:
let reader = BufReader ::new ( & mut file ) ;
2021-07-29 16:07:04 +02:00
// Validate that the payload is in the correct format.
let _ : Seq < Map < Str , Any > > = serde_json ::from_reader ( reader )
. map_err ( | e | UpdateActorError ::InvalidPayload ( Box ::new ( e ) ) ) ? ;
2021-03-23 11:00:50 +01:00
2021-05-29 00:08:17 +02:00
Some ( uuid )
2021-04-22 10:14:29 +02:00
} else {
None
} ;
2021-03-23 11:00:50 +01:00
// The payload is valid, we can register it to the update store.
2021-05-25 09:46:11 +02:00
let status = update_store
2021-05-29 00:08:17 +02:00
. register_update ( meta , update_uuid , uuid )
2021-05-25 09:46:11 +02:00
. map ( UpdateStatus ::Enqueued ) ? ;
Ok ( status )
2021-03-23 11:00:50 +01:00
} )
2021-05-25 09:46:11 +02:00
. await ?
2021-03-23 11:00:50 +01:00
}
async fn handle_list_updates ( & self , uuid : Uuid ) -> Result < Vec < UpdateStatus > > {
2021-04-13 17:14:02 +02:00
let update_store = self . store . clone ( ) ;
2021-03-23 11:00:50 +01:00
tokio ::task ::spawn_blocking ( move | | {
2021-05-25 09:46:11 +02:00
let result = update_store . list ( uuid ) ? ;
2021-03-23 11:00:50 +01:00
Ok ( result )
} )
2021-05-25 09:46:11 +02:00
. await ?
2021-03-23 11:00:50 +01:00
}
async fn handle_get_update ( & self , uuid : Uuid , id : u64 ) -> Result < UpdateStatus > {
2021-04-13 17:14:02 +02:00
let store = self . store . clone ( ) ;
2021-06-09 17:10:10 +02:00
tokio ::task ::spawn_blocking ( move | | {
2021-06-15 17:39:07 +02:00
let result = store
. meta ( uuid , id ) ?
. ok_or ( UpdateActorError ::UnexistingUpdate ( id ) ) ? ;
2021-06-09 17:10:10 +02:00
Ok ( result )
} )
. await ?
2021-03-23 11:00:50 +01:00
}
async fn handle_delete ( & self , uuid : Uuid ) -> Result < ( ) > {
2021-05-10 20:24:14 +02:00
let store = self . store . clone ( ) ;
2021-04-28 16:43:49 +02:00
2021-05-25 09:46:11 +02:00
tokio ::task ::spawn_blocking ( move | | store . delete_all ( uuid ) ) . await ? ? ;
2021-04-28 16:43:49 +02:00
Ok ( ( ) )
}
2021-03-23 11:00:50 +01:00
2021-05-10 20:24:14 +02:00
async fn handle_snapshot ( & self , uuids : HashSet < Uuid > , path : PathBuf ) -> Result < ( ) > {
let index_handle = self . index_handle . clone ( ) ;
let update_store = self . store . clone ( ) ;
2021-04-28 16:43:49 +02:00
2021-05-25 09:46:11 +02:00
tokio ::task ::spawn_blocking ( move | | update_store . snapshot ( & uuids , & path , index_handle ) )
. await ? ? ;
2021-03-23 11:00:50 +01:00
Ok ( ( ) )
}
2021-05-25 16:33:09 +02:00
async fn handle_dump ( & self , uuids : HashSet < Uuid > , path : PathBuf ) -> Result < ( ) > {
2021-03-23 11:00:50 +01:00
let index_handle = self . index_handle . clone ( ) ;
2021-04-13 17:14:02 +02:00
let update_store = self . store . clone ( ) ;
2021-05-31 16:40:59 +02:00
2021-06-14 21:26:35 +02:00
tokio ::task ::spawn_blocking ( move | | -> Result < ( ) > {
2021-05-05 19:06:07 +02:00
update_store . dump ( & uuids , path . to_path_buf ( ) , index_handle ) ? ;
Ok ( ( ) )
2021-04-13 17:14:02 +02:00
} )
2021-05-25 09:46:11 +02:00
. await ? ? ;
2021-03-23 11:00:50 +01:00
Ok ( ( ) )
}
2021-04-09 15:41:24 +03:00
2021-04-14 18:55:04 +02:00
async fn handle_get_info ( & self ) -> Result < UpdateStoreInfo > {
2021-04-14 17:53:12 +02:00
let update_store = self . store . clone ( ) ;
2021-06-14 21:26:35 +02:00
let info = tokio ::task ::spawn_blocking ( move | | -> Result < UpdateStoreInfo > {
2021-04-22 10:14:29 +02:00
let info = update_store . get_info ( ) ? ;
2021-04-14 18:55:04 +02:00
Ok ( info )
2021-04-14 17:53:12 +02:00
} )
2021-05-25 09:46:11 +02:00
. await ? ? ;
2021-04-09 15:41:24 +03:00
2021-04-14 18:55:04 +02:00
Ok ( info )
2021-04-09 15:41:24 +03:00
}
2021-03-23 11:00:50 +01:00
}