fix dump v1

This commit is contained in:
Marin Postma 2021-05-31 10:42:31 +02:00
parent 33c6c4f0ee
commit bc5a5e37ea
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
4 changed files with 201 additions and 154 deletions

View File

@ -1,137 +1,178 @@
use std::path::Path;
use std::{
collections::{BTreeMap, BTreeSet},
fs::File,
marker::PhantomData,
path::Path,
sync::Arc,
};
use heed::EnvOpenOptions;
use log::{error, info, warn};
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::index_controller::IndexMetadata;
use crate::{index::deserialize_some, index_controller::uuid_resolver::HeedUuidStore};
use crate::{
index::{Index, Unchecked},
index_controller::{self, IndexMetadata},
};
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MetadataV1 {
db_version: String,
indexes: Vec<IndexMetadata>,
}
impl MetadataV1 {
pub fn load_dump(self, _src: impl AsRef<Path>, _dst: impl AsRef<Path>) -> anyhow::Result<()> {
todo!("implement load v1")
pub fn load_dump(
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
) -> anyhow::Result<()> {
info!(
"Loading dump, dump database version: {}, dump version: V1",
self.db_version
);
dbg!("here");
let uuid_store = HeedUuidStore::new(&dst)?;
dbg!("here");
for index in self.indexes {
let uuid = Uuid::new_v4();
uuid_store.insert(index.uid.clone(), uuid)?;
let src = src.as_ref().join(index.uid);
load_index(&src, &dst, uuid, index.meta.primary_key.as_deref(), size)?;
}
Ok(())
}
}
//This is the settings used in the last version of meilisearch exporting dump in V1
//#[derive(Default, Clone, Serialize, Deserialize, Debug)]
//#[serde(rename_all = "camelCase", deny_unknown_fields)]
//struct Settings {
//#[serde(default, deserialize_with = "deserialize_some")]
//pub ranking_rules: Option<Option<Vec<String>>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub distinct_attribute: Option<Option<String>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub searchable_attributes: Option<Option<Vec<String>>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub displayed_attributes: Option<Option<BTreeSet<String>>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub stop_words: Option<Option<BTreeSet<String>>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub synonyms: Option<Option<BTreeMap<String, Vec<String>>>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub attributes_for_faceting: Option<Option<Vec<String>>>,
//}
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
struct Settings {
#[serde(default, deserialize_with = "deserialize_some")]
pub ranking_rules: Option<Option<Vec<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub distinct_attribute: Option<Option<String>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub searchable_attributes: Option<Option<Vec<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub displayed_attributes: Option<Option<BTreeSet<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub stop_words: Option<Option<BTreeSet<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub synonyms: Option<Option<BTreeMap<String, Vec<String>>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub attributes_for_faceting: Option<Option<Vec<String>>>,
}
///// we need to **always** be able to convert the old settings to the settings currently being used
//impl From<Settings> for index_controller::Settings<Unchecked> {
//fn from(settings: Settings) -> Self {
//if settings.synonyms.flatten().is_some() {
//error!("`synonyms` are not yet implemented and thus will be ignored");
//}
//Self {
//distinct_attribute: settings.distinct_attribute,
//// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
//displayed_attributes: settings.displayed_attributes.map(|o| o.map(|vec| vec.into_iter().collect())),
//searchable_attributes: settings.searchable_attributes,
//// we previously had a `Vec<String>` but now we have a `HashMap<String, String>`
//// representing the name of the faceted field + the type of the field. Since the type
//// was not known in the V1 of the dump we are just going to assume everything is a
//// String
//attributes_for_faceting: settings.attributes_for_faceting.map(|o| o.map(|vec| vec.into_iter().map(|key| (key, String::from("string"))).collect())),
//// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
//ranking_rules: settings.ranking_rules.map(|o| o.map(|vec| vec.into_iter().filter_map(|criterion| {
//match criterion.as_str() {
//"words" | "typo" | "proximity" | "attribute" => Some(criterion),
//s if s.starts_with("asc") || s.starts_with("desc") => Some(criterion),
//"wordsPosition" => {
//warn!("The criteria `words` and `wordsPosition` have been merged into a single criterion `words` so `wordsPositon` will be ignored");
//Some(String::from("words"))
//}
//"exactness" => {
//error!("The criterion `{}` is not implemented currently and thus will be ignored", criterion);
//None
//}
//s => {
//error!("Unknown criterion found in the dump: `{}`, it will be ignored", s);
//None
//}
//}
//}).collect())),
//// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
//stop_words: settings.stop_words.map(|o| o.map(|vec| vec.into_iter().collect())),
//_kind: PhantomData,
//}
//}
//}
impl std::ops::Deref for Settings {
type Target = Option<Option<BTreeSet<String>>>;
///// Extract Settings from `settings.json` file present at provided `dir_path`
//fn import_settings(dir_path: &Path) -> anyhow::Result<Settings> {
//let path = dir_path.join("settings.json");
//let file = File::open(path)?;
//let reader = std::io::BufReader::new(file);
//let metadata = serde_json::from_reader(reader)?;
fn deref(&self) -> &Self::Target {
&self.stop_words
}
}
//Ok(metadata)
//}
fn load_index(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
uuid: Uuid,
primary_key: Option<&str>,
size: usize,
) -> anyhow::Result<()> {
let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid));
//pub fn import_dump(
//size: usize,
//uuid: Uuid,
//dump_path: &Path,
//db_path: &Path,
//primary_key: Option<&str>,
//) -> anyhow::Result<()> {
//let index_path = db_path.join(&format!("indexes/index-{}", uuid));
//info!("Importing a dump from an old version of meilisearch with dump version 1");
std::fs::create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, index_path)?;
let index = Index(Arc::new(index));
//std::fs::create_dir_all(&index_path)?;
//let mut options = EnvOpenOptions::new();
//options.map_size(size);
//let index = milli::Index::new(options, index_path)?;
//let index = Index(Arc::new(index));
// extract `settings.json` file and import content
let settings = import_settings(&src)?;
let settings: index_controller::Settings<Unchecked> = settings.into();
let update_builder = UpdateBuilder::new(0);
index.update_settings(&settings.check(), update_builder)?;
//// extract `settings.json` file and import content
//let settings = import_settings(&dump_path)?;
//let settings: index_controller::Settings<Unchecked> = settings.into();
//let update_builder = UpdateBuilder::new(0);
//index.update_settings(&settings.check(), update_builder)?;
let update_builder = UpdateBuilder::new(0);
let file = File::open(&src.as_ref().join("documents.jsonl"))?;
let reader = std::io::BufReader::new(file);
//let update_builder = UpdateBuilder::new(1);
//let file = File::open(&dump_path.join("documents.jsonl"))?;
//let reader = std::io::BufReader::new(file);
index.update_documents(
UpdateFormat::JsonStream,
IndexDocumentsMethod::ReplaceDocuments,
Some(reader),
update_builder,
primary_key,
)?;
//// TODO: TAMO: waiting for milli. We should use the result
//let _ = index.update_documents(
//UpdateFormat::JsonStream,
//IndexDocumentsMethod::ReplaceDocuments,
//Some(reader),
//update_builder,
//primary_key,
//);
// the last step: we extract the original milli::Index and close it
Arc::try_unwrap(index.0)
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
.unwrap()
.prepare_for_closing()
.wait();
//// the last step: we extract the original milli::Index and close it
//Arc::try_unwrap(index.0)
//.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
//.unwrap()
//.prepare_for_closing()
//.wait();
// Ignore updates in v1.
//// at this point we should handle the import of the updates, but since the update logic is not handled in
//// meilisearch we are just going to ignore this part
Ok(())
}
//Ok(())
//}
/// we need to **always** be able to convert the old settings to the settings currently being used
impl From<Settings> for index_controller::Settings<Unchecked> {
fn from(settings: Settings) -> Self {
if settings.synonyms.flatten().is_some() {
error!("`synonyms` are not yet implemented and thus will be ignored");
}
Self {
distinct_attribute: settings.distinct_attribute,
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
displayed_attributes: settings.displayed_attributes.map(|o| o.map(|vec| vec.into_iter().collect())),
searchable_attributes: settings.searchable_attributes,
// we previously had a `Vec<String>` but now we have a `HashMap<String, String>`
// representing the name of the faceted field + the type of the field. Since the type
// was not known in the V1 of the dump we are just going to assume everything is a
// String
attributes_for_faceting: settings.attributes_for_faceting.map(|o| o.map(|vec| vec.into_iter().map(|key| (key, String::from("string"))).collect())),
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
ranking_rules: settings.ranking_rules.map(|o| o.map(|vec| vec.into_iter().filter_map(|criterion| {
match criterion.as_str() {
"words" | "typo" | "proximity" | "attribute" => Some(criterion),
s if s.starts_with("asc") || s.starts_with("desc") => Some(criterion),
"wordsPosition" => {
warn!("The criteria `words` and `wordsPosition` have been merged into a single criterion `words` so `wordsPositon` will be ignored");
Some(String::from("words"))
}
"exactness" => {
error!("The criterion `{}` is not implemented currently and thus will be ignored", criterion);
None
}
s => {
error!("Unknown criterion found in the dump: `{}`, it will be ignored", s);
None
}
}
}).collect())),
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
stop_words: settings.stop_words.map(|o| o.map(|vec| vec.into_iter().collect())),
_kind: PhantomData,
}
}
}
/// Extract Settings from `settings.json` file present at provided `dir_path`
fn import_settings(dir_path: impl AsRef<Path>) -> anyhow::Result<Settings> {
let path = dbg!(dir_path.as_ref().join("settings.json"));
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}

View File

@ -1,13 +1,13 @@
use std::path::Path;
use anyhow::Context;
use chrono::{DateTime, Utc};
use log::{info, warn};
use log::info;
use serde::{Deserialize, Serialize};
use crate::{index::Index, index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore}, option::IndexerOpts};
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MetadataV2 {
db_version: String,
index_db_size: u64,
@ -29,6 +29,7 @@ impl MetadataV2 {
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
// TODO: use these variable to test if loading the index is possible.
_index_db_size: u64,
_update_db_size: u64,
indexing_options: &IndexerOpts,
@ -37,37 +38,21 @@ impl MetadataV2 {
"Loading dump from {}, dump database version: {}, dump version: V2",
self.dump_date, self.db_version
);
// get dir in which to load the db:
let dst_dir = dst
.as_ref()
.parent()
.with_context(|| format!("Invalid db path: {}", dst.as_ref().display()))?;
let tmp_dst = tempfile::tempdir_in(dst_dir)?;
info!("Loading index database.");
HeedUuidStore::load_dump(src.as_ref(), &tmp_dst)?;
HeedUuidStore::load_dump(src.as_ref(), &dst)?;
info!("Loading updates.");
UpdateStore::load_dump(&src, &tmp_dst, self.update_db_size)?;
UpdateStore::load_dump(&src, &dst, self.update_db_size)?;
info!("Loading indexes");
let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?;
for index in indexes {
let index = index?;
Index::load_dump(&index.path(), &tmp_dst, self.index_db_size, indexing_options)?;
Index::load_dump(&index.path(), &dst, self.index_db_size, indexing_options)?;
}
// Persist and atomically rename the db
let persisted_dump = tmp_dst.into_path();
if dst.as_ref().exists() {
warn!("Overwriting database at {}", dst.as_ref().display());
std::fs::remove_dir_all(&dst)?;
}
std::fs::rename(&persisted_dump, &dst)?;
Ok(())
}
}

View File

@ -2,11 +2,12 @@ use std::fs::File;
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use log::{error, info};
use log::{error, info, warn};
#[cfg(test)]
use mockall::automock;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use anyhow::Context;
use loaders::v1::MetadataV1;
use loaders::v2::MetadataV2;
@ -53,22 +54,16 @@ pub trait DumpActorHandle {
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "dump_version")]
#[serde(tag = "dumpVersion")]
pub enum Metadata {
V1 {
#[serde(flatten)]
meta: MetadataV1,
},
V2 {
#[serde(flatten)]
meta: MetadataV2,
},
V1(MetadataV1),
V2(MetadataV2),
}
impl Metadata {
pub fn new_v2(index_db_size: u64, update_db_size: u64) -> Self {
let meta = MetadataV2::new(index_db_size, update_db_size);
Self::V2 { meta }
Self::V2(meta)
}
}
@ -135,16 +130,31 @@ pub fn load_dump(
let mut meta_file = File::open(&meta_path)?;
let meta: Metadata = serde_json::from_reader(&mut meta_file)?;
let dst_dir = dst_path
.as_ref()
.parent()
.with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?;
let tmp_dst = tempfile::tempdir_in(dst_dir)?;
match meta {
Metadata::V1 { meta } => meta.load_dump(&tmp_src_path, dst_path)?,
Metadata::V2 { meta } => meta.load_dump(
Metadata::V1(meta) => meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size as usize)?,
Metadata::V2(meta) => meta.load_dump(
&tmp_src_path,
dst_path.as_ref(),
tmp_dst.path(),
index_db_size,
update_db_size,
indexer_opts,
)?,
}
// Persist and atomically rename the db
let persisted_dump = tmp_dst.into_path();
if dst_path.as_ref().exists() {
warn!("Overwriting database at {}", dst_path.as_ref().display());
std::fs::remove_dir_all(&dst_path)?;
}
std::fs::rename(&persisted_dump, &dst_path)?;
Ok(())
}

View File

@ -1,13 +1,16 @@
use std::{collections::HashSet, io::{BufReader, BufRead, Write}};
use std::fs::{create_dir_all, File};
use std::path::{Path, PathBuf};
use std::{
collections::HashSet,
io::{BufRead, BufReader, Write},
};
use heed::{
types::{ByteSlice, Str},
CompactionOption, Database, Env, EnvOpenOptions,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use serde::{Serialize, Deserialize};
use super::{Result, UuidResolverError, UUID_STORE_SIZE};
use crate::helpers::EnvSizer;
@ -45,7 +48,14 @@ impl HeedUuidStore {
let mut options = EnvOpenOptions::new();
options.map_size(UUID_STORE_SIZE); // 1GB
let env = options.open(path)?;
let db = env.create_database(None)?; Ok(Self { env, db }) } pub fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> { let env = self.env.clone(); let db = self.db; let mut txn = env.write_txn()?;
let db = env.create_database(None)?;
Ok(Self { env, db })
}
pub fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
@ -62,7 +72,10 @@ impl HeedUuidStore {
Ok(uuid)
}
}
} pub fn get_uuid(&self, name: String) -> Result<Option<Uuid>> { let env = self.env.clone(); let db = self.db;
}
pub fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
let txn = env.read_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
@ -149,9 +162,7 @@ impl HeedUuidStore {
let uid = uid.to_string();
let uuid = Uuid::from_slice(uuid)?;
let entry = DumpEntry {
uuid, uid
};
let entry = DumpEntry { uuid, uid };
serde_json::to_writer(&mut dump_file, &entry)?;
dump_file.write(b"\n").unwrap();