This commit is contained in:
Tamo 2025-01-27 18:22:07 +01:00
parent ad0765ffa4
commit e017986dde
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
6 changed files with 69 additions and 56 deletions

View File

@ -1,7 +1,8 @@
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
#![allow(clippy::wrong_self_convention)] #![allow(clippy::wrong_self_convention)]
use meilisearch_types::batches::BatchId; use meilisearch_types::batch_view::BatchView;
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use meilisearch_types::keys::Key; use meilisearch_types::keys::Key;
use meilisearch_types::milli::update::IndexDocumentsMethod; use meilisearch_types::milli::update::IndexDocumentsMethod;
@ -91,6 +92,13 @@ pub struct TaskDump {
pub finished_at: Option<OffsetDateTime>, pub finished_at: Option<OffsetDateTime>,
} }
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BatchDump {
pub original: Batch,
pub tasks: RoaringBitmap,
}
// A `Kind` specific version made for the dump. If modified you may break the dump. // A `Kind` specific version made for the dump. If modified you may break the dump.
#[derive(Debug, PartialEq, Serialize, Deserialize)] #[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]

View File

@ -101,6 +101,16 @@ impl DumpReader {
} }
} }
pub fn batches(&mut self) -> Result<Box<dyn Iterator<Item = Result<v6::Batch>> + '_>> {
match self {
DumpReader::Current(current) => Ok(current.dumps()),
// There was no batches in the previous version
DumpReader::Compat(_compat) => {
Ok(Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Result<v6::Batch>> + '_>)
}
}
}
pub fn keys(&mut self) -> Result<Box<dyn Iterator<Item = Result<v6::Key>> + '_>> { pub fn keys(&mut self) -> Result<Box<dyn Iterator<Item = Result<v6::Key>> + '_>> {
match self { match self {
DumpReader::Current(current) => Ok(current.keys()), DumpReader::Current(current) => Ok(current.keys()),

View File

@ -18,6 +18,7 @@ pub type Checked = meilisearch_types::settings::Checked;
pub type Unchecked = meilisearch_types::settings::Unchecked; pub type Unchecked = meilisearch_types::settings::Unchecked;
pub type Task = crate::TaskDump; pub type Task = crate::TaskDump;
pub type Batch = crate::BatchDump;
pub type Key = meilisearch_types::keys::Key; pub type Key = meilisearch_types::keys::Key;
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures; pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
@ -48,6 +49,7 @@ pub struct V6Reader {
instance_uid: Option<Uuid>, instance_uid: Option<Uuid>,
metadata: Metadata, metadata: Metadata,
tasks: BufReader<File>, tasks: BufReader<File>,
batches: BufReader<File>,
keys: BufReader<File>, keys: BufReader<File>,
features: Option<RuntimeTogglableFeatures>, features: Option<RuntimeTogglableFeatures>,
} }
@ -82,6 +84,7 @@ impl V6Reader {
metadata: serde_json::from_reader(&*meta_file)?, metadata: serde_json::from_reader(&*meta_file)?,
instance_uid, instance_uid,
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?), tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
batches: BufReader::new(File::open(dump.path().join("batches").join("queue.jsonl"))?),
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?), keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
features, features,
dump, dump,
@ -124,7 +127,7 @@ impl V6Reader {
&mut self, &mut self,
) -> Box<dyn Iterator<Item = Result<(Task, Option<Box<super::UpdateFile>>)>> + '_> { ) -> Box<dyn Iterator<Item = Result<(Task, Option<Box<super::UpdateFile>>)>> + '_> {
Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { Box::new((&mut self.tasks).lines().map(|line| -> Result<_> {
let task: Task = serde_json::from_str(&line?).unwrap(); let task: Task = serde_json::from_str(&line?)?;
let update_file_path = self let update_file_path = self
.dump .dump
@ -145,6 +148,14 @@ impl V6Reader {
})) }))
} }
pub fn dumps(&mut self) -> Box<dyn Iterator<Item = Result<Batch>> + '_> {
Box::new(
(&mut self.batches)
.lines()
.map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }),
)
}
pub fn keys(&mut self) -> Box<dyn Iterator<Item = Result<Key>> + '_> { pub fn keys(&mut self) -> Box<dyn Iterator<Item = Result<Key>> + '_> {
Box::new( Box::new(
(&mut self.keys).lines().map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }), (&mut self.keys).lines().map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }),

View File

@ -14,7 +14,7 @@ use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use crate::reader::Document; use crate::reader::Document;
use crate::{IndexMetadata, Metadata, Result, TaskDump, CURRENT_DUMP_VERSION}; use crate::{BatchDump, IndexMetadata, Metadata, Result, TaskDump, CURRENT_DUMP_VERSION};
pub struct DumpWriter { pub struct DumpWriter {
dir: TempDir, dir: TempDir,
@ -174,7 +174,7 @@ impl BatchWriter {
/// Pushes batches in the dump. /// Pushes batches in the dump.
/// The batches doesn't contains any private information thus we don't need a special type like with the tasks. /// The batches doesn't contains any private information thus we don't need a special type like with the tasks.
pub fn push_batch(&mut self, batch: &BatchView) -> Result<()> { pub fn push_batch(&mut self, batch: &BatchDump) -> Result<()> {
self.queue.write_all(&serde_json::to_vec(batch)?)?; self.queue.write_all(&serde_json::to_vec(batch)?)?;
self.queue.write_all(b"\n")?; self.queue.write_all(b"\n")?;
Ok(()) Ok(())

View File

@ -26,7 +26,7 @@ use std::sync::atomic::Ordering;
use bumpalo::collections::CollectIn; use bumpalo::collections::CollectIn;
use bumpalo::Bump; use bumpalo::Bump;
use dump::IndexMetadata; use dump::{BatchDump, IndexMetadata};
use meilisearch_types::batch_view::BatchView; use meilisearch_types::batch_view::BatchView;
use meilisearch_types::batches::BatchId; use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::heed::{RoTxn, RwTxn};
@ -881,7 +881,12 @@ impl IndexScheduler {
batch.started_at = started_at; batch.started_at = started_at;
batch.finished_at = Some(finished_at); batch.finished_at = Some(finished_at);
} }
dump_batches.push_batch(&BatchView::from_batch(&batch))?; // a missing or empty batch shouldn't exists but if it happens we should just skip it
if let Some(tasks) = self.batch_to_tasks_mapping.get(&rtxn, &batch.uid)? {
if !tasks.is_empty() {
dump_batches.push_batch(&BatchDump { original: batch, tasks })?;
}
}
atomic.fetch_add(1, Ordering::Relaxed); atomic.fetch_add(1, Ordering::Relaxed);
} }
dump_batches.flush()?; dump_batches.flush()?;

View File

@ -43,7 +43,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
use dump::{KindDump, TaskDump, UpdateFile}; use dump::{BatchDump, KindDump, TaskDump, UpdateFile};
pub use error::Error; pub use error::Error;
pub use features::RoFeatures; pub use features::RoFeatures;
use file_store::FileStore; use file_store::FileStore;
@ -1996,6 +1996,10 @@ pub struct Dump<'a> {
statuses: HashMap<Status, RoaringBitmap>, statuses: HashMap<Status, RoaringBitmap>,
kinds: HashMap<Kind, RoaringBitmap>, kinds: HashMap<Kind, RoaringBitmap>,
batch_indexes: HashMap<String, RoaringBitmap>,
batch_statuses: HashMap<Status, RoaringBitmap>,
batch_kinds: HashMap<Kind, RoaringBitmap>,
batch_to_tasks_mapping: HashMap<TaskId, RoaringBitmap>, batch_to_tasks_mapping: HashMap<TaskId, RoaringBitmap>,
} }
@ -2010,6 +2014,9 @@ impl<'a> Dump<'a> {
indexes: HashMap::new(), indexes: HashMap::new(),
statuses: HashMap::new(), statuses: HashMap::new(),
kinds: HashMap::new(), kinds: HashMap::new(),
batch_indexes: HashMap::new(),
batch_statuses: HashMap::new(),
batch_kinds: HashMap::new(),
batch_to_tasks_mapping: HashMap::new(), batch_to_tasks_mapping: HashMap::new(),
}) })
} }
@ -2171,63 +2178,35 @@ impl<'a> Dump<'a> {
/// Register a new task coming from a dump in the scheduler. /// Register a new task coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_batch(&mut self, batch: BatchView) -> Result<()> { pub fn register_dumped_batch(&mut self, dump: BatchDump) -> Result<()> {
let batch = Batch { let BatchDump { original, tasks } = dump;
uid: batch.uid,
// Batch cannot be processing while we import it
progress: None,
details: batch.details,
stats: batch.stats,
started_at: batch.started_at,
finished_at: batch.finished_at,
};
self.index_scheduler.all_batches.put(&mut self.wtxn, &batch.uid, &batch)?; self.index_scheduler.all_batches.put(&mut self.wtxn, &original.uid, &original)?;
self.index_scheduler..put(&mut self.wtxn, &original.uid, &original)?;
for index in batch.indexes() {
match self.indexes.get_mut(index) {
Some(bitmap) => {
bitmap.insert(batch.uid);
}
None => {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(batch.uid);
self.indexes.insert(index.to_string(), bitmap);
}
};
}
utils::insert_task_datetime( utils::insert_task_datetime(
&mut self.wtxn, &mut self.wtxn,
self.index_scheduler.enqueued_at, self.index_scheduler.batch_enqueued_at,
batch.enqueued_at, original.started_at, // TODO: retrieve the enqueued_at from the dump
batch.uid, original.uid,
)?; )?;
// we can't override the started_at & finished_at, so we must only set it if the tasks is finished and won't change utils::insert_task_datetime(
if matches!(batch.status, Status::Succeeded | Status::Failed | Status::Canceled) { &mut self.wtxn,
if let Some(started_at) = batch.started_at { self.index_scheduler.started_at,
utils::insert_task_datetime( original.started_at,
&mut self.wtxn, original.uid,
self.index_scheduler.started_at, )?;
started_at, if let Some(finished_at) = original.finished_at {
batch.uid, utils::insert_task_datetime(
)?; &mut self.wtxn,
} self.index_scheduler.finished_at,
if let Some(finished_at) = batch.finished_at { finished_at,
utils::insert_task_datetime( original.uid,
&mut self.wtxn, )?;
self.index_scheduler.finished_at,
finished_at,
batch.uid,
)?;
}
} }
self.statuses.entry(batch.status).or_default().insert(batch.uid); Ok(())
self.kinds.entry(batch.kind.as_kind()).or_default().insert(batch.uid);
Ok(batch)
} }
/// Commit all the changes and exit the importing dump state /// Commit all the changes and exit the importing dump state