mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 17:11:15 +08:00
Add dump support
This commit is contained in:
parent
5a83cecb0f
commit
13e9b4c2e5
@ -412,6 +412,8 @@ pub(crate) mod test {
|
||||
}
|
||||
keys.flush().unwrap();
|
||||
|
||||
// ========== TODO: create features here
|
||||
|
||||
// create the dump
|
||||
let mut file = tempfile::tempfile().unwrap();
|
||||
dump.persist_to(&mut file).unwrap();
|
||||
|
@ -191,6 +191,10 @@ impl CompatV5ToV6 {
|
||||
})
|
||||
})))
|
||||
}
|
||||
|
||||
pub fn features(&self) -> Result<Option<v6::RuntimeTogglableFeatures>> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub enum CompatIndexV5ToV6 {
|
||||
|
@ -107,6 +107,13 @@ impl DumpReader {
|
||||
DumpReader::Compat(compat) => compat.keys(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn features(&self) -> Result<Option<v6::RuntimeTogglableFeatures>> {
|
||||
match self {
|
||||
DumpReader::Current(current) => Ok(current.features()),
|
||||
DumpReader::Compat(compat) => compat.features(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<V6Reader> for DumpReader {
|
||||
@ -189,6 +196,8 @@ pub(crate) mod test {
|
||||
|
||||
use super::*;
|
||||
|
||||
// TODO: add `features` to tests
|
||||
|
||||
#[test]
|
||||
fn import_dump_v5() {
|
||||
let dump = File::open("tests/assets/v5.dump").unwrap();
|
||||
|
@ -2,6 +2,7 @@ use std::fs::{self, File};
|
||||
use std::io::{BufRead, BufReader, ErrorKind};
|
||||
use std::path::Path;
|
||||
|
||||
use log::debug;
|
||||
pub use meilisearch_types::milli;
|
||||
use tempfile::TempDir;
|
||||
use time::OffsetDateTime;
|
||||
@ -18,6 +19,7 @@ pub type Unchecked = meilisearch_types::settings::Unchecked;
|
||||
|
||||
pub type Task = crate::TaskDump;
|
||||
pub type Key = meilisearch_types::keys::Key;
|
||||
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
|
||||
|
||||
// ===== Other types to clarify the code of the compat module
|
||||
// everything related to the tasks
|
||||
@ -47,6 +49,7 @@ pub struct V6Reader {
|
||||
metadata: Metadata,
|
||||
tasks: BufReader<File>,
|
||||
keys: BufReader<File>,
|
||||
features: Option<RuntimeTogglableFeatures>,
|
||||
}
|
||||
|
||||
impl V6Reader {
|
||||
@ -58,11 +61,29 @@ impl V6Reader {
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
let feature_file = match fs::read(dump.path().join("experimental-features.json")) {
|
||||
Ok(feature_file) => Some(feature_file),
|
||||
Err(error) => match error.kind() {
|
||||
// Allows the file to be missing, this will only result in all experimental features disabled.
|
||||
ErrorKind::NotFound => {
|
||||
debug!("`experimental-features.json` not found in dump");
|
||||
None
|
||||
}
|
||||
_ => return Err(error.into()),
|
||||
},
|
||||
};
|
||||
let features = if let Some(feature_file) = feature_file {
|
||||
Some(serde_json::from_reader(&*feature_file)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(V6Reader {
|
||||
metadata: serde_json::from_reader(&*meta_file)?,
|
||||
instance_uid,
|
||||
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
|
||||
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
|
||||
features,
|
||||
dump,
|
||||
})
|
||||
}
|
||||
@ -129,6 +150,10 @@ impl V6Reader {
|
||||
(&mut self.keys).lines().map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn features(&self) -> Option<RuntimeTogglableFeatures> {
|
||||
self.features
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UpdateFile {
|
||||
|
@ -4,6 +4,7 @@ use std::path::PathBuf;
|
||||
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
use meilisearch_types::features::RuntimeTogglableFeatures;
|
||||
use meilisearch_types::keys::Key;
|
||||
use meilisearch_types::settings::{Checked, Settings};
|
||||
use serde_json::{Map, Value};
|
||||
@ -53,6 +54,13 @@ impl DumpWriter {
|
||||
TaskWriter::new(self.dir.path().join("tasks"))
|
||||
}
|
||||
|
||||
pub fn create_experimental_features(&self, features: RuntimeTogglableFeatures) -> Result<()> {
|
||||
Ok(std::fs::write(
|
||||
self.dir.path().join("experimental-features.json"),
|
||||
serde_json::to_string(&features)?,
|
||||
)?)
|
||||
}
|
||||
|
||||
pub fn persist_to(self, mut writer: impl Write) -> Result<()> {
|
||||
let gz_encoder = GzEncoder::new(&mut writer, Compression::default());
|
||||
let mut tar_encoder = tar::Builder::new(gz_encoder);
|
||||
|
@ -839,6 +839,10 @@ impl IndexScheduler {
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
// 4. Dump experimental feature settings
|
||||
let features = self.features()?.runtime_features();
|
||||
dump.create_experimental_features(features)?;
|
||||
|
||||
let dump_uid = started_at.format(format_description!(
|
||||
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
|
||||
)).unwrap();
|
||||
|
@ -309,12 +309,16 @@ fn import_dump(
|
||||
keys.push(key);
|
||||
}
|
||||
|
||||
// 3. Import the runtime features.
|
||||
let features = dump_reader.features()?.unwrap_or_default();
|
||||
index_scheduler.put_runtime_features(features)?;
|
||||
|
||||
let indexer_config = index_scheduler.indexer_config();
|
||||
|
||||
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
|
||||
// try to process tasks while we're trying to import the indexes.
|
||||
|
||||
// 3. Import the indexes.
|
||||
// 4. Import the indexes.
|
||||
for index_reader in dump_reader.indexes()? {
|
||||
let mut index_reader = index_reader?;
|
||||
let metadata = index_reader.metadata();
|
||||
@ -326,19 +330,19 @@ fn import_dump(
|
||||
let mut wtxn = index.write_txn()?;
|
||||
|
||||
let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config);
|
||||
// 3.1 Import the primary key if there is one.
|
||||
// 4.1 Import the primary key if there is one.
|
||||
if let Some(ref primary_key) = metadata.primary_key {
|
||||
builder.set_primary_key(primary_key.to_string());
|
||||
}
|
||||
|
||||
// 3.2 Import the settings.
|
||||
// 4.2 Import the settings.
|
||||
log::info!("Importing the settings.");
|
||||
let settings = index_reader.settings()?;
|
||||
apply_settings_to_builder(&settings, &mut builder);
|
||||
builder.execute(|indexing_step| log::debug!("update: {:?}", indexing_step), || false)?;
|
||||
|
||||
// 3.3 Import the documents.
|
||||
// 3.3.1 We need to recreate the grenad+obkv format accepted by the index.
|
||||
// 4.3 Import the documents.
|
||||
// 4.3.1 We need to recreate the grenad+obkv format accepted by the index.
|
||||
log::info!("Importing the documents.");
|
||||
let file = tempfile::tempfile()?;
|
||||
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(file));
|
||||
@ -349,7 +353,7 @@ fn import_dump(
|
||||
// This flush the content of the batch builder.
|
||||
let file = builder.into_inner()?.into_inner()?;
|
||||
|
||||
// 3.3.2 We feed it to the milli index.
|
||||
// 4.3.2 We feed it to the milli index.
|
||||
let reader = BufReader::new(file);
|
||||
let reader = DocumentsBatchReader::from_reader(reader)?;
|
||||
|
||||
@ -374,7 +378,7 @@ fn import_dump(
|
||||
|
||||
let mut index_scheduler_dump = index_scheduler.register_dumped_task()?;
|
||||
|
||||
// 4. Import the tasks.
|
||||
// 5. Import the tasks.
|
||||
for ret in dump_reader.tasks()? {
|
||||
let (task, file) = ret?;
|
||||
index_scheduler_dump.register_dumped_task(task, file)?;
|
||||
|
Loading…
Reference in New Issue
Block a user