mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 08:48:32 +08:00
WIP: dump
This commit is contained in:
parent
0275b36fb0
commit
0f94ef8abc
@ -55,10 +55,10 @@ impl ApiKeys {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Data {
|
impl Data {
|
||||||
pub async fn new(options: Opt) -> anyhow::Result<Data> {
|
pub fn new(options: Opt) -> anyhow::Result<Data> {
|
||||||
let path = options.db_path.clone();
|
let path = options.db_path.clone();
|
||||||
|
|
||||||
let index_controller = IndexController::new(&path, &options).await?;
|
let index_controller = IndexController::new(&path, &options)?;
|
||||||
|
|
||||||
let mut api_keys = ApiKeys {
|
let mut api_keys = ApiKeys {
|
||||||
master: options.clone().master_key,
|
master: options.clone().master_key,
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
mod v1;
|
mod v1;
|
||||||
mod v2;
|
mod v2;
|
||||||
|
|
||||||
use std::{fs::File, path::{Path, PathBuf}, sync::Arc};
|
use std::{fs::File, path::{Path}, sync::Arc};
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
use heed::EnvOpenOptions;
|
use heed::EnvOpenOptions;
|
||||||
@ -10,12 +10,10 @@ use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
||||||
use super::update_actor::UpdateActorHandle;
|
|
||||||
use super::uuid_resolver::UuidResolverHandle;
|
|
||||||
use super::IndexMetadata;
|
use super::IndexMetadata;
|
||||||
use crate::index::Index;
|
use crate::index::Index;
|
||||||
use crate::index_controller::uuid_resolver;
|
use crate::index_controller::uuid_resolver;
|
||||||
use crate::{helpers::compression, index::Settings};
|
use crate::helpers::compression;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||||
enum DumpVersion {
|
enum DumpVersion {
|
||||||
@ -24,7 +22,7 @@ enum DumpVersion {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DumpVersion {
|
impl DumpVersion {
|
||||||
const CURRENT: Self = Self::V2;
|
// const CURRENT: Self = Self::V2;
|
||||||
|
|
||||||
/// Select the good importation function from the `DumpVersion` of metadata
|
/// Select the good importation function from the `DumpVersion` of metadata
|
||||||
pub fn import_index(self, size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> {
|
pub fn import_index(self, size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> {
|
||||||
@ -37,23 +35,25 @@ impl DumpVersion {
|
|||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct DumpMetadata {
|
pub struct Metadata {
|
||||||
indexes: Vec<IndexMetadata>,
|
indexes: Vec<IndexMetadata>,
|
||||||
db_version: String,
|
db_version: String,
|
||||||
dump_version: DumpVersion,
|
dump_version: DumpVersion,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DumpMetadata {
|
impl Metadata {
|
||||||
/// Create a DumpMetadata with the current dump version of meilisearch.
|
/*
|
||||||
|
/// Create a Metadata with the current dump version of meilisearch.
|
||||||
pub fn new(indexes: Vec<IndexMetadata>, db_version: String) -> Self {
|
pub fn new(indexes: Vec<IndexMetadata>, db_version: String) -> Self {
|
||||||
DumpMetadata {
|
Metadata {
|
||||||
indexes,
|
indexes,
|
||||||
db_version,
|
db_version,
|
||||||
dump_version: DumpVersion::CURRENT,
|
dump_version: DumpVersion::CURRENT,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
/// Extract DumpMetadata from `metadata.json` file present at provided `dir_path`
|
/// Extract Metadata from `metadata.json` file present at provided `dir_path`
|
||||||
fn from_path(dir_path: &Path) -> anyhow::Result<Self> {
|
fn from_path(dir_path: &Path) -> anyhow::Result<Self> {
|
||||||
let path = dir_path.join("metadata.json");
|
let path = dir_path.join("metadata.json");
|
||||||
let file = File::open(path)?;
|
let file = File::open(path)?;
|
||||||
@ -63,7 +63,8 @@ impl DumpMetadata {
|
|||||||
Ok(metadata)
|
Ok(metadata)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write DumpMetadata in `metadata.json` file at provided `dir_path`
|
/*
|
||||||
|
/// Write Metadata in `metadata.json` file at provided `dir_path`
|
||||||
fn to_path(&self, dir_path: &Path) -> anyhow::Result<()> {
|
fn to_path(&self, dir_path: &Path) -> anyhow::Result<()> {
|
||||||
let path = dir_path.join("metadata.json");
|
let path = dir_path.join("metadata.json");
|
||||||
let file = File::create(path)?;
|
let file = File::create(path)?;
|
||||||
@ -72,8 +73,10 @@ impl DumpMetadata {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
pub struct DumpService<U, R> {
|
pub struct DumpService<U, R> {
|
||||||
uuid_resolver_handle: R,
|
uuid_resolver_handle: R,
|
||||||
update_handle: U,
|
update_handle: U,
|
||||||
@ -148,7 +151,9 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
/// Write Settings in `settings.json` file at provided `dir_path`
|
/// Write Settings in `settings.json` file at provided `dir_path`
|
||||||
fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()> {
|
fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()> {
|
||||||
let path = dir_path.join("settings.json");
|
let path = dir_path.join("settings.json");
|
||||||
@ -158,6 +163,7 @@ fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()>
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
pub fn load_dump(
|
pub fn load_dump(
|
||||||
db_path: impl AsRef<Path>,
|
db_path: impl AsRef<Path>,
|
||||||
@ -170,12 +176,12 @@ pub fn load_dump(
|
|||||||
let uuid_resolver = uuid_resolver::HeedUuidStore::new(&db_path)?;
|
let uuid_resolver = uuid_resolver::HeedUuidStore::new(&db_path)?;
|
||||||
|
|
||||||
// extract the dump in a temporary directory
|
// extract the dump in a temporary directory
|
||||||
let tmp_dir = TempDir::new()?;
|
let tmp_dir = TempDir::new_in(db_path)?;
|
||||||
let tmp_dir_path = tmp_dir.path();
|
let tmp_dir_path = tmp_dir.path();
|
||||||
compression::from_tar_gz(dump_path, tmp_dir_path)?;
|
compression::from_tar_gz(dump_path, tmp_dir_path)?;
|
||||||
|
|
||||||
// read dump metadata
|
// read dump metadata
|
||||||
let metadata = DumpMetadata::from_path(&tmp_dir_path)?;
|
let metadata = Metadata::from_path(&tmp_dir_path)?;
|
||||||
|
|
||||||
// remove indexes which have same `uuid` than indexes to import and create empty indexes
|
// remove indexes which have same `uuid` than indexes to import and create empty indexes
|
||||||
let existing_index_uids = uuid_resolver.list()?;
|
let existing_index_uids = uuid_resolver.list()?;
|
||||||
@ -207,7 +213,7 @@ pub fn load_dump(
|
|||||||
// this cannot fail since we created all the missing uuid in the previous loop
|
// this cannot fail since we created all the missing uuid in the previous loop
|
||||||
let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap();
|
let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap();
|
||||||
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
|
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
|
||||||
let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db
|
// let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db
|
||||||
|
|
||||||
info!("Importing dump from {} into {}...", dump_path.display(), index_path.display());
|
info!("Importing dump from {} into {}...", dump_path.display(), index_path.display());
|
||||||
metadata.dump_version.import_index(size, &dump_path, &index_path).unwrap();
|
metadata.dump_version.import_index(size, &dump_path, &index_path).unwrap();
|
||||||
|
@ -84,19 +84,13 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow:
|
|||||||
std::fs::create_dir_all(&index_path)?;
|
std::fs::create_dir_all(&index_path)?;
|
||||||
let mut options = EnvOpenOptions::new();
|
let mut options = EnvOpenOptions::new();
|
||||||
options.map_size(size);
|
options.map_size(size);
|
||||||
let index = milli::Index::new(options, index_path)?;
|
let index = milli::Index::new(options.clone(), index_path)?;
|
||||||
let index = Index(Arc::new(index));
|
let index = Index(Arc::new(index));
|
||||||
|
|
||||||
// extract `settings.json` file and import content
|
// extract `settings.json` file and import content
|
||||||
let settings = import_settings(&dump_path)?;
|
let settings = import_settings(&dump_path)?;
|
||||||
dbg!(&settings);
|
dbg!(&settings);
|
||||||
let mut settings: index_controller::Settings = settings.into();
|
let settings: index_controller::Settings = settings.into();
|
||||||
if settings.displayed_attributes.as_ref().map_or(false, |o| o.as_ref().map_or(false, |v| v.contains(&String::from("*")))) {
|
|
||||||
settings.displayed_attributes = None;
|
|
||||||
}
|
|
||||||
if settings.searchable_attributes.as_ref().map_or(false, |o| o.as_ref().map_or(false, |v| v.contains(&String::from("*")))) {
|
|
||||||
settings.searchable_attributes = None;
|
|
||||||
}
|
|
||||||
let update_builder = UpdateBuilder::new(0);
|
let update_builder = UpdateBuilder::new(0);
|
||||||
index.update_settings(&settings, update_builder)?;
|
index.update_settings(&settings, update_builder)?;
|
||||||
|
|
||||||
@ -112,6 +106,9 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow:
|
|||||||
None,
|
None,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// at this point we should handle the updates, but since the update logic is not handled in
|
||||||
|
// meilisearch we are just going to ignore this part
|
||||||
|
|
||||||
// the last step: we extract the original milli::Index and close it
|
// the last step: we extract the original milli::Index and close it
|
||||||
Arc::try_unwrap(index.0)
|
Arc::try_unwrap(index.0)
|
||||||
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
|
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
|
||||||
|
@ -178,6 +178,10 @@ mod test {
|
|||||||
self.as_ref().snapshot(uuid, path).await
|
self.as_ref().snapshot(uuid, path).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
|
||||||
|
self.as_ref().dump(uuid, path).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
|
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
|
||||||
self.as_ref().get_index_stats(uuid).await
|
self.as_ref().get_index_stats(uuid).await
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,7 @@ pub struct Stats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl IndexController {
|
impl IndexController {
|
||||||
pub async fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> {
|
pub fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> {
|
||||||
let index_size = options.max_mdb_size.get_bytes() as usize;
|
let index_size = options.max_mdb_size.get_bytes() as usize;
|
||||||
let update_store_size = options.max_udb_size.get_bytes() as usize;
|
let update_store_size = options.max_udb_size.get_bytes() as usize;
|
||||||
|
|
||||||
|
@ -91,7 +91,6 @@ where
|
|||||||
meta: UpdateMeta,
|
meta: UpdateMeta,
|
||||||
mut payload: mpsc::Receiver<PayloadData<D>>,
|
mut payload: mpsc::Receiver<PayloadData<D>>,
|
||||||
) -> Result<UpdateStatus> {
|
) -> Result<UpdateStatus> {
|
||||||
|
|
||||||
let file_path = match meta {
|
let file_path = match meta {
|
||||||
UpdateMeta::DocumentsAddition { .. }
|
UpdateMeta::DocumentsAddition { .. }
|
||||||
| UpdateMeta::DeleteDocuments => {
|
| UpdateMeta::DeleteDocuments => {
|
||||||
|
@ -13,9 +13,8 @@ use crate::index_controller::{UpdateMeta, UpdateStatus};
|
|||||||
|
|
||||||
use actor::UpdateActor;
|
use actor::UpdateActor;
|
||||||
use message::UpdateMsg;
|
use message::UpdateMsg;
|
||||||
use update_store::UpdateStore;
|
|
||||||
pub use update_store::UpdateStoreInfo;
|
|
||||||
|
|
||||||
|
pub use update_store::{UpdateStore, UpdateStoreInfo};
|
||||||
pub use handle_impl::UpdateActorHandleImpl;
|
pub use handle_impl::UpdateActorHandleImpl;
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, UpdateError>;
|
pub type Result<T> = std::result::Result<T, UpdateError>;
|
||||||
|
@ -177,11 +177,7 @@ pub struct UpdateStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl UpdateStore {
|
impl UpdateStore {
|
||||||
pub fn open(
|
pub fn create(mut options: EnvOpenOptions, path: impl AsRef<Path>) -> anyhow::Result<(Self, mpsc::Receiver<()>)> {
|
||||||
mut options: EnvOpenOptions,
|
|
||||||
path: impl AsRef<Path>,
|
|
||||||
index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static,
|
|
||||||
) -> anyhow::Result<Arc<Self>> {
|
|
||||||
options.max_dbs(5);
|
options.max_dbs(5);
|
||||||
|
|
||||||
let env = options.open(path)?;
|
let env = options.open(path)?;
|
||||||
@ -189,21 +185,30 @@ impl UpdateStore {
|
|||||||
let next_update_id = env.create_database(Some("next-update-id"))?;
|
let next_update_id = env.create_database(Some("next-update-id"))?;
|
||||||
let updates = env.create_database(Some("updates"))?;
|
let updates = env.create_database(Some("updates"))?;
|
||||||
|
|
||||||
let (notification_sender, mut notification_receiver) = mpsc::channel(10);
|
let state = Arc::new(StateLock::from_state(State::Idle));
|
||||||
|
|
||||||
|
let (notification_sender, notification_receiver) = mpsc::channel(10);
|
||||||
// Send a first notification to trigger the process.
|
// Send a first notification to trigger the process.
|
||||||
let _ = notification_sender.send(());
|
let _ = notification_sender.send(());
|
||||||
|
|
||||||
let state = Arc::new(StateLock::from_state(State::Idle));
|
Ok((Self { env, pending_queue, next_update_id, updates, state, notification_sender }, notification_receiver))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn open(
|
||||||
|
options: EnvOpenOptions,
|
||||||
|
path: impl AsRef<Path>,
|
||||||
|
index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static,
|
||||||
|
) -> anyhow::Result<Arc<Self>> {
|
||||||
|
let (update_store, mut notification_receiver) = Self::create(options, path)?;
|
||||||
|
let update_store = Arc::new(update_store);
|
||||||
|
|
||||||
// Init update loop to perform any pending updates at launch.
|
// Init update loop to perform any pending updates at launch.
|
||||||
// Since we just launched the update store, and we still own the receiving end of the
|
// Since we just launched the update store, and we still own the receiving end of the
|
||||||
// channel, this call is guaranteed to succeed.
|
// channel, this call is guaranteed to succeed.
|
||||||
notification_sender
|
update_store.notification_sender
|
||||||
.try_send(())
|
.try_send(())
|
||||||
.expect("Failed to init update store");
|
.expect("Failed to init update store");
|
||||||
|
|
||||||
let update_store = Arc::new(UpdateStore { env, pending_queue, next_update_id, updates, state, notification_sender });
|
|
||||||
|
|
||||||
// We need a weak reference so we can take ownership on the arc later when we
|
// We need a weak reference so we can take ownership on the arc later when we
|
||||||
// want to close the index.
|
// want to close the index.
|
||||||
let update_store_weak = Arc::downgrade(&update_store);
|
let update_store_weak = Arc::downgrade(&update_store);
|
||||||
@ -283,6 +288,18 @@ impl UpdateStore {
|
|||||||
Ok(meta)
|
Ok(meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Push already processed updates in the UpdateStore. This is useful for the dumps
|
||||||
|
pub fn register_already_processed_update (
|
||||||
|
&self,
|
||||||
|
result: UpdateStatus,
|
||||||
|
index_uuid: Uuid,
|
||||||
|
) -> heed::Result<()> {
|
||||||
|
let mut wtxn = self.env.write_txn()?;
|
||||||
|
let (_global_id, update_id) = self.next_update_id(&mut wtxn, index_uuid)?;
|
||||||
|
self.updates.remap_key_type::<UpdateKeyCodec>().put(&mut wtxn, &(index_uuid, update_id), &result)?;
|
||||||
|
wtxn.commit()
|
||||||
|
}
|
||||||
|
|
||||||
/// Executes the user provided function on the next pending update (the one with the lowest id).
|
/// Executes the user provided function on the next pending update (the one with the lowest id).
|
||||||
/// This is asynchronous as it let the user process the update with a read-only txn and
|
/// This is asynchronous as it let the user process the update with a read-only txn and
|
||||||
/// only writing the result meta to the processed-meta store *after* it has been processed.
|
/// only writing the result meta to the processed-meta store *after* it has been processed.
|
||||||
|
@ -54,7 +54,7 @@ async fn main() -> Result<(), MainError> {
|
|||||||
//snapshot::load_snapshot(&opt.db_path, path, opt.ignore_snapshot_if_db_exists, opt.ignore_missing_snapshot)?;
|
//snapshot::load_snapshot(&opt.db_path, path, opt.ignore_snapshot_if_db_exists, opt.ignore_missing_snapshot)?;
|
||||||
//}
|
//}
|
||||||
|
|
||||||
let data = Data::new(opt.clone()).await?;
|
let data = Data::new(opt.clone())?;
|
||||||
|
|
||||||
//if !opt.no_analytics {
|
//if !opt.no_analytics {
|
||||||
//let analytics_data = data.clone();
|
//let analytics_data = data.clone();
|
||||||
|
@ -19,7 +19,7 @@ async fn get_settings() {
|
|||||||
assert_eq!(settings.keys().len(), 6);
|
assert_eq!(settings.keys().len(), 6);
|
||||||
assert_eq!(settings["displayedAttributes"], json!(["*"]));
|
assert_eq!(settings["displayedAttributes"], json!(["*"]));
|
||||||
assert_eq!(settings["searchableAttributes"], json!(["*"]));
|
assert_eq!(settings["searchableAttributes"], json!(["*"]));
|
||||||
assert_eq!(settings["attributesForFaceting"], json!(null));
|
assert_eq!(settings["attributesForFaceting"], json!({}));
|
||||||
assert_eq!(settings["distinctAttribute"], json!(null));
|
assert_eq!(settings["distinctAttribute"], json!(null));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
settings["rankingRules"],
|
settings["rankingRules"],
|
||||||
@ -82,7 +82,9 @@ async fn reset_all_settings() {
|
|||||||
assert_eq!(response["searchableAttributes"], json!(["bar"]));
|
assert_eq!(response["searchableAttributes"], json!(["bar"]));
|
||||||
assert_eq!(response["stopWords"], json!(["the"]));
|
assert_eq!(response["stopWords"], json!(["the"]));
|
||||||
|
|
||||||
|
eprintln!("BEFORE");
|
||||||
index.delete_settings().await;
|
index.delete_settings().await;
|
||||||
|
eprintln!("AFTER");
|
||||||
index.wait_update_id(1).await;
|
index.wait_update_id(1).await;
|
||||||
|
|
||||||
let (response, code) = index.settings().await;
|
let (response, code) = index.settings().await;
|
||||||
|
Loading…
Reference in New Issue
Block a user