dump updates

This commit is contained in:
Marin Postma 2021-05-25 16:33:09 +02:00
parent 464639aa0f
commit 3593ebb8aa
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
8 changed files with 252 additions and 169 deletions

View File

@ -178,6 +178,7 @@ impl Index {
let indexing_callback =
|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step);
let gzipped = false;
let addition = match content {
Some(content) if gzipped => {

View File

@ -194,9 +194,10 @@ where
Ok(())
}
async fn handle_dump(&self, uuids: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()> {
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
let update_store = self.store.clone();
println!("starting dump");
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
Ok(())

View File

@ -71,7 +71,7 @@ where
receiver.await.expect("update actor killed.")
}
async fn dump(&self, uuids: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()> {
async fn dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Dump { uuids, path, ret };
let _ = self.sender.send(msg).await;

View File

@ -32,7 +32,7 @@ pub enum UpdateMsg<D> {
ret: oneshot::Sender<Result<()>>,
},
Dump {
uuids: HashSet<(String, Uuid)>,
uuids: HashSet<Uuid>,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},

View File

@ -1,7 +1,7 @@
mod actor;
mod handle_impl;
mod message;
mod update_store;
mod store;
use std::{collections::HashSet, path::PathBuf};
@ -16,7 +16,7 @@ use actor::UpdateActor;
use message::UpdateMsg;
pub use handle_impl::UpdateActorHandleImpl;
pub use update_store::{UpdateStore, UpdateStoreInfo};
pub use store::{UpdateStore, UpdateStoreInfo};
pub type Result<T> = std::result::Result<T, UpdateError>;
type PayloadData<D> = std::result::Result<D, PayloadError>;
@ -62,7 +62,7 @@ pub trait UpdateActorHandle {
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus>;
async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, uuid: HashSet<Uuid>, path: PathBuf) -> Result<()>;
async fn dump(&self, uuid: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()>;
async fn dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()>;
async fn get_info(&self) -> Result<UpdateStoreInfo>;
async fn update(
&self,

View File

@ -0,0 +1,86 @@
use std::{borrow::Cow, convert::TryInto, mem::size_of};
use heed::{BytesDecode, BytesEncode};
use uuid::Uuid;
pub struct NextIdCodec;
pub enum NextIdKey {
Global,
Index(Uuid),
}
impl<'a> BytesEncode<'a> for NextIdCodec {
type EItem = NextIdKey;
fn bytes_encode(item: &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
match item {
NextIdKey::Global => Some(Cow::Borrowed(b"__global__")),
NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())),
}
}
}
pub struct PendingKeyCodec;
impl<'a> BytesEncode<'a> for PendingKeyCodec {
type EItem = (u64, Uuid, u64);
fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(&global_id.to_be_bytes());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for PendingKeyCodec {
type DItem = (u64, Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let global_id_bytes = bytes.get(0..size_of::<u64>())?.try_into().ok()?;
let global_id = u64::from_be_bytes(global_id_bytes);
let uuid_bytes = bytes
.get(size_of::<u64>()..(size_of::<u64>() + size_of::<Uuid>()))?
.try_into()
.ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes
.get((size_of::<u64>() + size_of::<Uuid>())..)?
.try_into()
.ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((global_id, uuid, update_id))
}
}
pub struct UpdateKeyCodec;
impl<'a> BytesEncode<'a> for UpdateKeyCodec {
type EItem = (Uuid, u64);
fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for UpdateKeyCodec {
type DItem = (Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let uuid_bytes = bytes.get(0..size_of::<Uuid>())?.try_into().ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes.get(size_of::<Uuid>()..)?.try_into().ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((uuid, update_id))
}
}

View File

@ -0,0 +1,146 @@
use std::{
collections::HashSet,
fs::{copy, create_dir_all, File},
io::Write,
path::{Path, PathBuf},
};
use anyhow::Context;
use heed::RoTxn;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::{State, codec::UpdateKeyCodec};
use super::UpdateStore;
use crate::index_controller::{index_actor::IndexActorHandle, UpdateStatus};
#[derive(Serialize, Deserialize)]
struct UpdateEntry {
uuid: Uuid,
update: UpdateStatus,
}
impl UpdateStore {
pub fn dump(
&self,
uuids: &HashSet<Uuid>,
path: PathBuf,
handle: impl IndexActorHandle,
) -> anyhow::Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Dumping);
// txn must *always* be acquired after state lock, or it will dead lock.
let txn = self.env.write_txn()?;
let dump_path = path.join("updates");
create_dir_all(&dump_path)?;
self.dump_updates(&txn, uuids, &dump_path)?;
let fut = dump_indexes(uuids, handle, &path);
tokio::runtime::Handle::current().block_on(fut)?;
state_lock.swap(State::Idle);
Ok(())
}
fn dump_updates(
&self,
txn: &RoTxn,
uuids: &HashSet<Uuid>,
path: impl AsRef<Path>,
) -> anyhow::Result<()> {
let dump_data_path = path.as_ref().join("data.jsonl");
let mut dump_data_file = File::create(dump_data_path)?;
let update_files_path = path.as_ref().join("update_files");
create_dir_all(&update_files_path)?;
self.dump_pending(&txn, uuids, &mut dump_data_file, &update_files_path)?;
self.dump_completed(&txn, uuids, &mut dump_data_file)?;
Ok(())
}
fn dump_pending(
&self,
txn: &RoTxn,
uuids: &HashSet<Uuid>,
mut file: &mut File,
update_files_path: impl AsRef<Path>,
) -> anyhow::Result<()> {
let pendings = self.pending_queue.iter(txn)?.lazily_decode_data();
for pending in pendings {
let ((_, uuid, _), data) = pending?;
if uuids.contains(&uuid) {
let mut update = data.decode()?;
if let Some(content) = update.content.take() {
update.content = Some(dump_update_file(content, &update_files_path)?);
}
let update_json = UpdateEntry {
uuid,
update: update.into(),
};
serde_json::to_writer(&mut file, &update_json)?;
file.write(b"\n")?;
}
}
Ok(())
}
fn dump_completed(
&self,
txn: &RoTxn,
uuids: &HashSet<Uuid>,
mut file: &mut File,
) -> anyhow::Result<()> {
let updates = self
.updates
.iter(txn)?
.remap_key_type::<UpdateKeyCodec>()
.lazily_decode_data();
for update in updates {
let ((uuid, _), data) = update?;
if uuids.contains(&uuid) {
let update = data.decode()?.into();
let update_json = UpdateEntry { uuid, update };
serde_json::to_writer(&mut file, &update_json)?;
file.write(b"\n")?;
}
}
Ok(())
}
}
async fn dump_indexes(uuids: &HashSet<Uuid>, handle: impl IndexActorHandle, path: impl AsRef<Path>)-> anyhow::Result<()> {
for uuid in uuids {
handle.dump(*uuid, path.as_ref().to_owned()).await?;
}
Ok(())
}
fn dump_update_file(
file_path: impl AsRef<Path>,
dump_path: impl AsRef<Path>,
) -> anyhow::Result<PathBuf> {
let filename: PathBuf = file_path
.as_ref()
.file_name()
.context("invalid update file name")?
.into();
let dump_file_path = dump_path.as_ref().join(&filename);
copy(file_path, dump_file_path)?;
Ok(filename)
}

View File

@ -1,23 +1,25 @@
mod dump;
mod codec;
use std::collections::{BTreeMap, HashSet};
use std::convert::TryInto;
use std::fs::{copy, create_dir_all, remove_file, File};
use std::mem::size_of;
use std::path::Path;
use std::sync::Arc;
use std::{borrow::Cow, path::PathBuf};
use anyhow::Context;
use arc_swap::ArcSwap;
use futures::StreamExt;
use heed::types::{ByteSlice, OwnedType, SerdeJson};
use heed::zerocopy::U64;
use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions};
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
use log::error;
use parking_lot::{Mutex, MutexGuard};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use uuid::Uuid;
use codec::*;
use super::UpdateMeta;
use crate::{helpers::EnvSizer, index_controller::index_actor::IndexResult};
use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle};
@ -25,13 +27,6 @@ use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, Ind
#[allow(clippy::upper_case_acronyms)]
type BEU64 = U64<heed::byteorder::BE>;
struct NextIdCodec;
enum NextIdKey {
Global,
Index(Uuid),
}
pub struct UpdateStoreInfo {
/// Size of the update store in bytes.
pub size: u64,
@ -45,13 +40,13 @@ pub struct StateLock {
data: ArcSwap<State>,
}
struct StateLockGuard<'a> {
pub struct StateLockGuard<'a> {
_lock: MutexGuard<'a, ()>,
state: &'a StateLock,
}
impl StateLockGuard<'_> {
fn swap(&self, state: State) -> Arc<State> {
pub fn swap(&self, state: State) -> Arc<State> {
self.state.data.swap(Arc::new(state))
}
}
@ -63,11 +58,11 @@ impl StateLock {
Self { lock, data }
}
fn read(&self) -> Arc<State> {
pub fn read(&self) -> Arc<State> {
self.data.load().clone()
}
fn write(&self) -> StateLockGuard {
pub fn write(&self) -> StateLockGuard {
let _lock = self.lock.lock();
let state = &self;
StateLockGuard { _lock, state }
@ -82,81 +77,6 @@ pub enum State {
Dumping,
}
impl<'a> BytesEncode<'a> for NextIdCodec {
type EItem = NextIdKey;
fn bytes_encode(item: &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
match item {
NextIdKey::Global => Some(Cow::Borrowed(b"__global__")),
NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())),
}
}
}
struct PendingKeyCodec;
impl<'a> BytesEncode<'a> for PendingKeyCodec {
type EItem = (u64, Uuid, u64);
fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(&global_id.to_be_bytes());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for PendingKeyCodec {
type DItem = (u64, Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let global_id_bytes = bytes.get(0..size_of::<u64>())?.try_into().ok()?;
let global_id = u64::from_be_bytes(global_id_bytes);
let uuid_bytes = bytes
.get(size_of::<u64>()..(size_of::<u64>() + size_of::<Uuid>()))?
.try_into()
.ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes
.get((size_of::<u64>() + size_of::<Uuid>())..)?
.try_into()
.ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((global_id, uuid, update_id))
}
}
struct UpdateKeyCodec;
impl<'a> BytesEncode<'a> for UpdateKeyCodec {
type EItem = (Uuid, u64);
fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for UpdateKeyCodec {
type DItem = (Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let uuid_bytes = bytes.get(0..size_of::<Uuid>())?.try_into().ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes.get(size_of::<Uuid>()..)?.try_into().ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((uuid, update_id))
}
}
#[derive(Clone)]
pub struct UpdateStore {
pub env: Env,
@ -174,7 +94,7 @@ pub struct UpdateStore {
/// | 16-bytes | 8-bytes |
updates: Database<ByteSlice, SerdeJson<UpdateStatus>>,
/// Indicates the current state of the update store,
state: Arc<StateLock>,
pub state: Arc<StateLock>,
/// Wake up the loop when a new event occurs.
notification_sender: mpsc::Sender<()>,
}
@ -364,6 +284,7 @@ impl UpdateStore {
let processing = pending.processing();
// Acquire the state lock and set the current state to processing.
// txn must *always* be acquired after state lock, or it will dead lock.
let state = self.state.write();
state.swap(State::Processing(index_uuid, processing.clone()));
@ -580,78 +501,6 @@ impl UpdateStore {
Ok(())
}
pub fn dump(
&self,
uuids: &HashSet<(String, Uuid)>,
path: PathBuf,
handle: impl IndexActorHandle,
) -> anyhow::Result<()> {
use std::io::prelude::*;
let state_lock = self.state.write();
state_lock.swap(State::Dumping);
let txn = self.env.write_txn()?;
for (index_uid, index_uuid) in uuids.iter() {
let file = File::create(path.join(index_uid).join("updates.jsonl"))?;
let mut file = std::io::BufWriter::new(file);
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
for entry in pendings {
let ((_, uuid, _), pending) = entry?;
if &uuid == index_uuid {
let mut update: UpdateStatus = pending.decode()?.into();
if let Some(path) = update.content_path_mut() {
*path = path.file_name().expect("update path can't be empty").into();
}
serde_json::to_writer(&mut file, &update)?;
file.write_all(b"\n")?;
}
}
let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?;
for entry in updates {
let (_, update) = entry?;
let mut update = update.clone();
if let Some(path) = update.content_path_mut() {
*path = path.file_name().expect("update path can't be empty").into();
}
serde_json::to_writer(&mut file, &update)?;
file.write_all(b"\n")?;
}
}
let update_files_path = path.join("update_files");
create_dir_all(&update_files_path)?;
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
for entry in pendings {
let ((_, uuid, _), pending) = entry?;
if uuids.iter().any(|(_, id)| id == &uuid) {
if let Some(path) = pending.decode()?.content_path() {
let name = path.file_name().unwrap();
let to = update_files_path.join(name);
copy(path, to)?;
}
}
}
// Perform the dump of each index concurently. Only a third of the capabilities of
// the index actor at a time not to put too much pressure on the index actor
let path = &path;
let mut stream = futures::stream::iter(uuids.iter())
.map(|(uid, uuid)| handle.dump(*uuid, path.clone()))
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
Handle::current().block_on(async {
while let Some(res) = stream.next().await {
res?;
}
Ok(())
})
}
pub fn get_info(&self) -> anyhow::Result<UpdateStoreInfo> {
let mut size = self.env.size();