Start working on the batch import in the dump

This commit is contained in:
Tamo 2025-01-20 12:44:21 +01:00
parent a5c44b4d79
commit ad0765ffa4
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
3 changed files with 137 additions and 3 deletions

View File

@ -4,6 +4,7 @@ use std::path::PathBuf;
use flate2::write::GzEncoder;
use flate2::Compression;
use meilisearch_types::batch_view::BatchView;
use meilisearch_types::features::RuntimeTogglableFeatures;
use meilisearch_types::keys::Key;
use meilisearch_types::settings::{Checked, Settings};
@ -54,6 +55,10 @@ impl DumpWriter {
TaskWriter::new(self.dir.path().join("tasks"))
}
pub fn create_batches_queue(&self) -> Result<BatchWriter> {
BatchWriter::new(self.dir.path().join("batches"))
}
pub fn create_experimental_features(&self, features: RuntimeTogglableFeatures) -> Result<()> {
Ok(std::fs::write(
self.dir.path().join("experimental-features.json"),
@ -156,6 +161,31 @@ impl UpdateFile {
}
}
pub struct BatchWriter {
queue: BufWriter<File>,
}
impl BatchWriter {
pub(crate) fn new(path: PathBuf) -> Result<Self> {
std::fs::create_dir(&path)?;
let queue = File::create(path.join("queue.jsonl"))?;
Ok(BatchWriter { queue: BufWriter::new(queue) })
}
/// Pushes batches in the dump.
/// The batches doesn't contains any private information thus we don't need a special type like with the tasks.
pub fn push_batch(&mut self, batch: &BatchView) -> Result<()> {
self.queue.write_all(&serde_json::to_vec(batch)?)?;
self.queue.write_all(b"\n")?;
Ok(())
}
pub fn flush(mut self) -> Result<()> {
self.queue.flush()?;
Ok(())
}
}
pub struct IndexWriter {
documents: BufWriter<File>,
settings: File,

View File

@ -17,7 +17,7 @@ tasks individually, but should be much faster since we are only performing
one indexing operation.
*/
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, File};
@ -27,6 +27,7 @@ use std::sync::atomic::Ordering;
use bumpalo::collections::CollectIn;
use bumpalo::Bump;
use dump::IndexMetadata;
use meilisearch_types::batch_view::BatchView;
use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
@ -790,7 +791,8 @@ impl IndexScheduler {
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
// 2. dump the queue
// 2.1. dump the tasks
progress.update_progress(DumpCreationProgress::DumpTheTasks);
let mut dump_tasks = dump.create_tasks_queue()?;
@ -821,7 +823,7 @@ impl IndexScheduler {
}
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
// 2.1.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
if self.must_stop_processing.get() {
return Err(Error::AbortedTask);
@ -851,6 +853,39 @@ impl IndexScheduler {
}
dump_tasks.flush()?;
// 2.2. dump the batches
let mut dump_batches = dump.create_batches_queue()?;
let (atomic, update_batch_progress) =
AtomicBatchStep::new(self.all_batches.len(&rtxn)? as u32);
progress.update_progress(update_batch_progress);
for ret in self.all_batches.iter(&rtxn)? {
if self.must_stop_processing.get() {
return Err(Error::AbortedTask);
}
let (_, mut batch) = ret?;
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if task.batch_uid == Some(batch.uid) {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
batch.progress = None;
let mut statuses = BTreeMap::new();
statuses.insert(Status::Succeeded, 1);
batch.stats.status = statuses;
batch.started_at = started_at;
batch.finished_at = Some(finished_at);
}
dump_batches.push_batch(&BatchView::from_batch(&batch))?;
atomic.fetch_add(1, Ordering::Relaxed);
}
dump_batches.flush()?;
// 3. Dump the indexes
progress.update_progress(DumpCreationProgress::DumpTheIndexes);
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32;

View File

@ -49,6 +49,7 @@ pub use features::RoFeatures;
use file_store::FileStore;
use flate2::bufread::GzEncoder;
use flate2::Compression;
use meilisearch_types::batch_view::BatchView;
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::error::ResponseError;
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
@ -1994,6 +1995,8 @@ pub struct Dump<'a> {
indexes: HashMap<String, RoaringBitmap>,
statuses: HashMap<Status, RoaringBitmap>,
kinds: HashMap<Kind, RoaringBitmap>,
batch_to_tasks_mapping: HashMap<TaskId, RoaringBitmap>,
}
impl<'a> Dump<'a> {
@ -2007,6 +2010,7 @@ impl<'a> Dump<'a> {
indexes: HashMap::new(),
statuses: HashMap::new(),
kinds: HashMap::new(),
batch_to_tasks_mapping: HashMap::new(),
})
}
@ -2158,9 +2162,74 @@ impl<'a> Dump<'a> {
self.statuses.entry(task.status).or_default().insert(task.uid);
self.kinds.entry(task.kind.as_kind()).or_default().insert(task.uid);
if let Some(batch_uid) = task.batch_uid {
self.batch_to_tasks_mapping.entry(batch_uid).or_default().insert(task.uid);
}
Ok(task)
}
/// Register a new task coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_batch(&mut self, batch: BatchView) -> Result<()> {
let batch = Batch {
uid: batch.uid,
// Batch cannot be processing while we import it
progress: None,
details: batch.details,
stats: batch.stats,
started_at: batch.started_at,
finished_at: batch.finished_at,
};
self.index_scheduler.all_batches.put(&mut self.wtxn, &batch.uid, &batch)?;
for index in batch.indexes() {
match self.indexes.get_mut(index) {
Some(bitmap) => {
bitmap.insert(batch.uid);
}
None => {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(batch.uid);
self.indexes.insert(index.to_string(), bitmap);
}
};
}
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.enqueued_at,
batch.enqueued_at,
batch.uid,
)?;
// we can't override the started_at & finished_at, so we must only set it if the tasks is finished and won't change
if matches!(batch.status, Status::Succeeded | Status::Failed | Status::Canceled) {
if let Some(started_at) = batch.started_at {
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.started_at,
started_at,
batch.uid,
)?;
}
if let Some(finished_at) = batch.finished_at {
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.finished_at,
finished_at,
batch.uid,
)?;
}
}
self.statuses.entry(batch.status).or_default().insert(batch.uid);
self.kinds.entry(batch.kind.as_kind()).or_default().insert(batch.uid);
Ok(batch)
}
/// Commit all the changes and exit the importing dump state
pub fn finish(mut self) -> Result<()> {
for (index, bitmap) in self.indexes {