From 2ae0806773352d0a99c9e25a01d0ec3aed100992 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 10 Oct 2022 19:57:47 +0200 Subject: [PATCH] rewrite the update file API --- dump/src/lib.rs | 9 ++++++--- dump/src/reader/v6.rs | 2 +- dump/src/writer.rs | 41 +++++++++++++++++++++++++++++++++-------- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/dump/src/lib.rs b/dump/src/lib.rs index 84b0412de..fe69abb2f 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -236,12 +236,15 @@ pub(crate) mod test { // ========== pushing the task queue let tasks = create_test_tasks(); - /* let mut task_queue = dump.create_tasks_queue().unwrap(); for (task, update_file) in &tasks { - task_queue.push_task(task, update_file.map(|c| c)).unwrap(); + let mut update = task_queue.push_task(task).unwrap(); + if let Some(update_file) = update_file { + for u in update_file { + update.push_document(u).unwrap(); + } + } } - */ // ========== pushing the api keys let api_keys = create_test_api_keys(); diff --git a/dump/src/reader/v6.rs b/dump/src/reader/v6.rs index cdd6d3f77..685b1d3eb 100644 --- a/dump/src/reader/v6.rs +++ b/dump/src/reader/v6.rs @@ -118,7 +118,7 @@ impl V6Reader { .path() .join("tasks") .join("update_files") - .join(task.uid.to_string()); + .join(format!("{}.jsonl", task.uid.to_string())); if update_file_path.exists() { Ok(( diff --git a/dump/src/writer.rs b/dump/src/writer.rs index 01bd30abc..824e403c6 100644 --- a/dump/src/writer.rs +++ b/dump/src/writer.rs @@ -1,6 +1,6 @@ use std::{ fs::{self, File}, - io::{Read, Write}, + io::{BufReader, BufWriter, Read, Write}, path::PathBuf, }; @@ -13,7 +13,7 @@ use tempfile::TempDir; use time::OffsetDateTime; use uuid::Uuid; -use crate::{IndexMetadata, Metadata, Result, CURRENT_DUMP_VERSION}; +use crate::{reader::Document, IndexMetadata, Metadata, Result, CURRENT_DUMP_VERSION}; pub struct DumpWriter { dir: TempDir, @@ -105,16 +105,41 @@ impl TaskWriter { /// Pushes tasks in the dump. /// If the tasks has an associated `update_file` it'll use the `task_id` as its name. - pub fn push_task(&mut self, task: &TaskView, update_file: Option) -> Result<()> { + pub fn push_task(&mut self, task: &TaskView) -> Result { // TODO: this could be removed the day we implements `Deserialize` on the Duration. let mut task = task.clone(); task.duration = None; self.queue.write_all(&serde_json::to_vec(&task)?)?; self.queue.write_all(b"\n")?; - if let Some(mut update_file) = update_file { - let mut file = File::create(&self.update_files.join(task.uid.to_string()))?; - std::io::copy(&mut update_file, &mut file)?; + + Ok(UpdateFile::new( + self.update_files + .join(format!("{}.jsonl", task.uid.to_string())), + )) + } +} + +pub struct UpdateFile { + path: PathBuf, + writer: Option>, +} + +impl UpdateFile { + pub(crate) fn new(path: PathBuf) -> UpdateFile { + UpdateFile { path, writer: None } + } + + pub fn push_document(&mut self, document: &Document) -> Result<()> { + if let Some(writer) = self.writer.as_mut() { + writer.write_all(&serde_json::to_vec(document)?)?; + writer.write_all(b"\n")?; + writer.flush()?; + } else { + dbg!(&self.path); + let file = File::create(&self.path).unwrap(); + self.writer = Some(BufWriter::new(file)); + self.push_document(document)?; } Ok(()) } @@ -253,7 +278,7 @@ pub(crate) mod test { │ │ └---- metadata.json ├---- tasks/ │ ├---- update_files/ - │ │ └---- 1 + │ │ └---- 1.jsonl │ └---- queue.jsonl ├---- keys.jsonl ├---- metadata.json @@ -310,7 +335,7 @@ pub(crate) mod test { assert_eq!(serde_json::from_str::(task).unwrap(), expected.0); if let Some(expected_update) = expected.1 { - let path = dump_path.join(format!("tasks/update_files/{}", expected.0.uid)); + let path = dump_path.join(format!("tasks/update_files/{}.jsonl", expected.0.uid)); println!("trying to open {}", path.display()); let update = fs::read_to_string(path).unwrap(); let documents: Vec = update