From 41c4c553096f7bf5afd6a9df88e70dccd1555f78 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Tue, 30 Apr 2024 20:08:13 +0200 Subject: [PATCH] Start implementation of a new writter --- dump/src/lib.rs | 1 + dump/src/new_writer.rs | 251 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 252 insertions(+) create mode 100644 dump/src/new_writer.rs diff --git a/dump/src/lib.rs b/dump/src/lib.rs index 42cb0e444..aa5da573a 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize}; use time::OffsetDateTime; mod error; +mod new_writer; mod reader; mod writer; diff --git a/dump/src/new_writer.rs b/dump/src/new_writer.rs new file mode 100644 index 000000000..e4e7c310a --- /dev/null +++ b/dump/src/new_writer.rs @@ -0,0 +1,251 @@ +use std::fs::File; +use std::io::{Read, Seek, Write}; +use std::path::Path; +use std::result::Result as StdResult; + +use flate2::write::GzEncoder; +use flate2::Compression; +use meilisearch_types::milli::documents::{ + obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, DocumentsBatchReader, +}; +use tar::{Builder as TarBuilder, Header}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::{Key, Metadata, Result, TaskId, CURRENT_DUMP_VERSION}; + +pub struct DumpWriter { + tar: TarBuilder>, +} + +impl DumpWriter { + pub fn new(instance_uuid: Option, writer: W) -> Result { + /// TODO: should we use a BuffWriter? + let gz_encoder = GzEncoder::new(writer, Compression::default()); + let mut tar = TarBuilder::new(gz_encoder); + + let mut header = Header::new_gnu(); + + // Append metadata into metadata.json. + let metadata = Metadata { + dump_version: CURRENT_DUMP_VERSION, + db_version: env!("CARGO_PKG_VERSION").to_string(), + dump_date: OffsetDateTime::now_utc(), + }; + let data = serde_json::to_string(&metadata).unwrap(); + header.set_size(data.len() as u64); + tar.append_data(&mut header, "metadata.json", data.as_bytes()).unwrap(); + + // Append instance uid into instance_uid.uuid. + if let Some(instance_uuid) = instance_uuid { + let data = instance_uuid.as_hyphenated().to_string(); + header.set_size(data.len() as u64); + tar.append_data(&mut header, "instance_uid.uuid", data.as_bytes()).unwrap(); + } + + Ok(Self { tar }) + } + + pub fn dump_keys(&mut self, keys: &[Key]) -> Result<()> { + let mut buffer = Vec::new(); + for key in keys { + serde_json::to_writer(&mut buffer, key)?; + buffer.push(b'\n'); + } + let mut header = Header::new_gnu(); + header.set_path("keys.jsonl"); + header.set_size(buffer.len() as u64); + + self.tar.append(&mut header, buffer.as_slice())?; + Ok(()) + } + + pub fn create_tasks(&mut self) -> Result> { + FileWriter::new(&mut self.tar, "tasks/queue.jsonl") + } + + pub fn dump_update_file( + &mut self, + task_uid: TaskId, + update_file: DocumentsBatchReader, + ) -> Result<()> { + let path = format!("tasks/update_files/{}.jsonl", task_uid); + let mut fw = FileWriter::new(&mut self.tar, path)?; + let mut serializer = UpdateFileSerializer::new(update_file); + fw.calculate_len(SerializerIteratorReader::new(&mut serializer))?; + serializer.reset(); + fw.write_data(SerializerIteratorReader::new(&mut serializer)) + } +} + +trait SerializerIterator { + fn next_serialize_into(&mut self, buffer: &mut Vec) -> StdResult; +} + +struct SerializerIteratorReader<'i, I: SerializerIterator> { + iterator: &'i mut I, + buffer: Vec, +} + +impl Read for SerializerIteratorReader<'_, I> { + fn read(&mut self, buf: &mut [u8]) -> StdResult { + let mut size = 0; + loop { + // if the inner buffer is empty, fill it with a new document. + if self.buffer.is_empty() { + if !self.iterator.next_serialize_into(&mut self.buffer)? { + // nothing more to write, return the written size. + return Ok(size); + } + } + + let doc_size = self.buffer.len(); + let remaining_size = buf[size..].len(); + if remaining_size < doc_size { + // if the serialized document size exceed the buf size, + // drain the inner buffer filling the remaining space. + buf[size..].copy_from_slice(&self.buffer[..remaining_size]); + self.buffer.drain(..remaining_size); + + // then return. + return Ok(buf.len()); + } else { + // otherwise write the whole inner buffer into the buf, clear it and continue. + buf[size..][..doc_size].copy_from_slice(&self.buffer); + size += doc_size; + self.buffer.clear(); + } + } + } +} + +impl<'i, I: SerializerIterator> SerializerIteratorReader<'i, I> { + fn new(iterator: &'i mut I) -> Self { + Self { iterator, buffer: Vec::new() } + } +} + +struct UpdateFileSerializer { + cursor: DocumentsBatchCursor, + documents_batch_index: DocumentsBatchIndex, +} + +impl SerializerIterator for UpdateFileSerializer { + fn next_serialize_into(&mut self, buffer: &mut Vec) -> StdResult { + /// TODO: don't unwrap, original version: `cursor.next_document().map_err(milli::Error::from)?` + match self.cursor.next_document().unwrap() { + Some(doc) => { + /// TODO: don't unwrap + let json_value = obkv_to_object(&doc, &self.documents_batch_index).unwrap(); + serde_json::to_writer(&mut *buffer, &json_value)?; + buffer.push(b'\n'); + Ok(true) + } + None => Ok(false), + } + } +} + +impl UpdateFileSerializer { + fn new(reader: DocumentsBatchReader) -> Self { + let (cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); + + Self { cursor, documents_batch_index } + } + + /// Resets the cursor to be able to read from the start again. + pub fn reset(&mut self) { + self.cursor.reset(); + } +} + +pub struct FileWriter<'a, W: Write> { + header: Header, + tar: &'a mut TarBuilder>, + size: Option, +} + +impl<'a, W: Write> FileWriter<'a, W> { + pub(crate) fn new>( + tar: &'a mut TarBuilder>, + path: P, + ) -> Result { + let mut header = Header::new_gnu(); + header.set_path(path); + Ok(Self { header, tar, size: None }) + } + + pub fn calculate_len(&mut self, mut reader: R) -> Result { + let mut calculator = SizeCalculatorWriter::new(); + std::io::copy(&mut reader, &mut calculator)?; + let size = calculator.into_inner(); + self.size = Some(size); + + Ok(size) + } + + pub fn write_data(mut self, reader: R) -> Result<()> { + let expected_size = + self.size.expect("calculate_len must be called before writing the data."); + self.header.set_size(expected_size); + + let mut scr = SizeCalculatorReader::new(reader); + self.tar.append(&mut self.header, &mut scr)?; + assert_eq!( + expected_size, + scr.into_inner(), + "Provided data size is different from the pre-calculated size." + ); + + Ok(()) + } +} + +struct SizeCalculatorWriter { + size: usize, +} + +impl SizeCalculatorWriter { + fn new() -> Self { + Self { size: 0 } + } + + fn into_inner(self) -> u64 { + self.size as u64 + } +} + +impl Write for SizeCalculatorWriter { + fn write(&mut self, buf: &[u8]) -> StdResult { + self.size += buf.len(); + Ok(self.size) + } + + fn flush(&mut self) -> std::result::Result<(), std::io::Error> { + Ok(()) + } +} + +struct SizeCalculatorReader { + size: usize, + reader: R, +} + +impl SizeCalculatorReader { + fn new(reader: R) -> Self { + Self { size: 0, reader } + } + + fn into_inner(self) -> u64 { + self.size as u64 + } +} + +impl Read for SizeCalculatorReader { + fn read(&mut self, buf: &mut [u8]) -> StdResult { + let size = self.reader.read(buf)?; + self.size += size; + + Ok(size) + } +}