flush the dump-writer only once everything has been inserted

This commit is contained in:
Tamo 2022-10-17 17:04:52 +02:00 committed by Clément Renault
parent 78ce29f461
commit 83f3c5ec57
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 32 additions and 7 deletions

View File

@ -382,6 +382,7 @@ pub(crate) mod test {
for document in &documents { for document in &documents {
index.push_document(document).unwrap(); index.push_document(document).unwrap();
} }
index.flush().unwrap();
index.settings(&settings).unwrap(); index.settings(&settings).unwrap();
// ========== pushing the task queue // ========== pushing the task queue
@ -396,6 +397,7 @@ pub(crate) mod test {
} }
} }
} }
task_queue.flush().unwrap();
// ========== pushing the api keys // ========== pushing the api keys
let api_keys = create_test_api_keys(); let api_keys = create_test_api_keys();
@ -404,6 +406,7 @@ pub(crate) mod test {
for key in &api_keys { for key in &api_keys {
keys.push_key(key).unwrap(); keys.push_key(key).unwrap();
} }
keys.flush().unwrap();
// create the dump // create the dump
let mut file = tempfile::tempfile().unwrap(); let mut file = tempfile::tempfile().unwrap();

View File

@ -87,6 +87,11 @@ impl KeyWriter {
self.keys.write_all(b"\n")?; self.keys.write_all(b"\n")?;
Ok(()) Ok(())
} }
pub fn flush(mut self) -> Result<()> {
self.keys.flush()?;
Ok(())
}
} }
pub struct TaskWriter { pub struct TaskWriter {
@ -113,12 +118,16 @@ impl TaskWriter {
pub fn push_task(&mut self, task: &TaskDump) -> Result<UpdateFile> { pub fn push_task(&mut self, task: &TaskDump) -> Result<UpdateFile> {
self.queue.write_all(&serde_json::to_vec(task)?)?; self.queue.write_all(&serde_json::to_vec(task)?)?;
self.queue.write_all(b"\n")?; self.queue.write_all(b"\n")?;
self.queue.flush()?;
Ok(UpdateFile::new( Ok(UpdateFile::new(
self.update_files.join(format!("{}.jsonl", task.uid)), self.update_files.join(format!("{}.jsonl", task.uid)),
)) ))
} }
pub fn flush(mut self) -> Result<()> {
self.queue.flush()?;
Ok(())
}
} }
pub struct UpdateFile { pub struct UpdateFile {
@ -135,7 +144,6 @@ impl UpdateFile {
if let Some(writer) = self.writer.as_mut() { if let Some(writer) = self.writer.as_mut() {
writer.write_all(&serde_json::to_vec(document)?)?; writer.write_all(&serde_json::to_vec(document)?)?;
writer.write_all(b"\n")?; writer.write_all(b"\n")?;
writer.flush()?;
} else { } else {
let file = File::create(&self.path).unwrap(); let file = File::create(&self.path).unwrap();
self.writer = Some(BufWriter::new(file)); self.writer = Some(BufWriter::new(file));
@ -143,6 +151,13 @@ impl UpdateFile {
} }
Ok(()) Ok(())
} }
pub fn flush(self) -> Result<()> {
if let Some(mut writer) = self.writer {
writer.flush()?;
}
Ok(())
}
} }
pub struct IndexWriter { pub struct IndexWriter {
@ -167,8 +182,12 @@ impl IndexWriter {
} }
pub fn push_document(&mut self, document: &Map<String, Value>) -> Result<()> { pub fn push_document(&mut self, document: &Map<String, Value>) -> Result<()> {
self.documents.write_all(&serde_json::to_vec(document)?)?; serde_json::to_writer(&mut self.documents, document)?;
self.documents.write_all(b"\n")?; self.documents.write_all(b"\n")?;
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
self.documents.flush()?; self.documents.flush()?;
Ok(()) Ok(())
} }

View File

@ -499,17 +499,18 @@ impl IndexScheduler {
unreachable!(); unreachable!();
}; };
let dump = dump::DumpWriter::new(instance_uid.clone())?; let dump = dump::DumpWriter::new(instance_uid.clone())?;
let mut dump_keys = dump.create_keys()?;
// 1. dump the keys // 1. dump the keys
let mut dump_keys = dump.create_keys()?;
for key in keys { for key in keys {
dump_keys.push_key(key)?; dump_keys.push_key(key)?;
} }
dump_keys.flush()?;
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
// 2. dump the tasks // 2. dump the tasks
let mut tasks = dump.create_tasks_queue()?; let mut dump_tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? { for ret in self.all_tasks.iter(&rtxn)? {
let (_, mut t) = ret?; let (_, mut t) = ret?;
let status = t.status; let status = t.status;
@ -527,7 +528,7 @@ impl IndexScheduler {
t.started_at = Some(started_at); t.started_at = Some(started_at);
t.finished_at = Some(finished_at); t.finished_at = Some(finished_at);
} }
let mut dump_content_file = tasks.push_task(&t.into())?; let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file { if let Some(content_file) = content_file {
@ -548,9 +549,11 @@ impl IndexScheduler {
&documents_batch_index, &documents_batch_index,
)?)?; )?)?;
} }
dump_content_file.flush()?;
} }
} }
} }
dump_tasks.flush()?;
// TODO: maybe `self.indexes` could use this rtxn instead of creating its own // TODO: maybe `self.indexes` could use this rtxn instead of creating its own
drop(rtxn); drop(rtxn);
@ -585,8 +588,8 @@ impl IndexScheduler {
let file = File::create(path)?; let file = File::create(path)?;
dump.persist_to(BufWriter::new(file))?; dump.persist_to(BufWriter::new(file))?;
// if we reached this step we can tell the scheduler we succeeded to dump ourselves.
task.status = Status::Succeeded; task.status = Status::Succeeded;
Ok(vec![task]) Ok(vec![task])
} }
Batch::IndexOperation(operation) => { Batch::IndexOperation(operation) => {