diff --git a/meilisearch-http/src/index/mod.rs b/meilisearch-http/src/index/mod.rs index f26cc4283..c4bf19856 100644 --- a/meilisearch-http/src/index/mod.rs +++ b/meilisearch-http/src/index/mod.rs @@ -1,8 +1,11 @@ -use std::{collections::{BTreeSet, HashSet}, marker::PhantomData}; +use std::{collections::{BTreeSet, HashSet}, io::Write, marker::PhantomData, path::{Path, PathBuf}}; use std::ops::Deref; use std::sync::Arc; +use std::fs::File; use anyhow::{bail, Context}; +use heed::RoTxn; +use indexmap::IndexMap; use milli::obkv_to_json; use serde_json::{Map, Value}; @@ -38,7 +41,10 @@ where impl Index { pub fn settings(&self) -> anyhow::Result> { let txn = self.read_txn()?; + self.settings_txn(&txn) + } + pub fn settings_txn(&self, txn: &RoTxn) -> anyhow::Result> { let displayed_attributes = self .displayed_fields(&txn)? .map(|fields| fields.into_iter().map(String::from).collect()); @@ -161,4 +167,57 @@ impl Index { displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid)); Ok(displayed_fields_ids) } + + pub fn dump(&self, path: PathBuf) -> anyhow::Result<()> { + // acquire write txn make sure any ongoing write is finnished before we start. + let txn = self.env.write_txn()?; + + self.dump_documents(&txn, &path)?; + self.dump_meta(&txn, &path)?; + + Ok(()) + } + + fn dump_documents(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { + println!("dumping documents"); + let document_file_path = path.as_ref().join("documents.jsonl"); + let mut document_file = File::create(&document_file_path)?; + + let documents = self.all_documents(txn)?; + let fields_ids_map = self.fields_ids_map(txn)?; + + // dump documents + let mut json_map = IndexMap::new(); + for document in documents { + let (_, reader) = document?; + + for (fid, bytes) in reader.iter() { + if let Some(name) = fields_ids_map.name(fid) { + json_map.insert(name, serde_json::from_slice::(bytes)?); + } + } + + serde_json::to_writer(&mut document_file, &json_map)?; + document_file.write(b"\n")?; + + json_map.clear(); + } + + Ok(()) + } + + fn dump_meta(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { + println!("dumping settings"); + let meta_file_path = path.as_ref().join("meta.json"); + let mut meta_file = File::create(&meta_file_path)?; + + let settings = self.settings_txn(txn)?; + let json = serde_json::json!({ + "settings": settings, + }); + + serde_json::to_writer(&mut meta_file, &json)?; + + Ok(()) + } } diff --git a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs index 601c97c01..575119410 100644 --- a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs @@ -1,4 +1,4 @@ -use std::path::{Path}; +use std::path::Path; use actix_web::web::Bytes; use tokio::sync::{mpsc, oneshot}; use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult}; diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index 0e2e63468..f6f7cdc28 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -6,7 +6,7 @@ use async_stream::stream; use futures::stream::StreamExt; use heed::CompactionOption; use log::debug; -use tokio::sync::mpsc; +use tokio::{fs, sync::mpsc}; use tokio::task::spawn_blocking; use uuid::Uuid; @@ -126,13 +126,8 @@ impl IndexActor { Snapshot { uuid, path, ret } => { let _ = ret.send(self.handle_snapshot(uuid, path).await); } - Dump { - uid, - uuid, - path, - ret, - } => { - let _ = ret.send(self.handle_dump(&uid, uuid, path).await); + Dump { uuid, path, ret } => { + let _ = ret.send(self.handle_dump(uuid, path).await); } GetStats { uuid, ret } => { let _ = ret.send(self.handle_get_stats(uuid).await); @@ -312,46 +307,17 @@ impl IndexActor { /// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the /// documents and all the settings. - async fn handle_dump(&self, uid: &str, uuid: Uuid, path: PathBuf) -> IndexResult<()> { - use std::io::prelude::*; - use tokio::fs::create_dir_all; + async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; - create_dir_all(&path).await?; + let path = path.join(format!("indexes/index-{}/", uuid)); + fs::create_dir_all(&path).await?; - if let Some(index) = self.store.get(uuid).await? { - let documents_path = path.join(uid).join("documents.jsonl"); - let settings_path = path.join(uid).join("settings.json"); - - spawn_blocking(move || -> anyhow::Result<()> { - // first we dump all the documents - let file = File::create(documents_path)?; - let mut file = std::io::BufWriter::new(file); - - // Get write txn to wait for ongoing write transaction before dump. - let txn = index.write_txn()?; - let fields_ids_map = index.fields_ids_map(&txn)?; - // we want to save **all** the fields in the dump. - let fields_to_dump: Vec = fields_ids_map.iter().map(|(id, _)| id).collect(); - - for document in index.all_documents(&txn)? { - let (_doc_id, document) = document?; - let json = milli::obkv_to_json(&fields_to_dump, &fields_ids_map, document)?; - file.write_all(serde_json::to_string(&json)?.as_bytes())?; - file.write_all(b"\n")?; - } - - // then we dump all the settings - let file = File::create(settings_path)?; - let mut file = std::io::BufWriter::new(file); - let settings = index.settings()?; - - file.write_all(serde_json::to_string(&settings)?.as_bytes())?; - file.write_all(b"\n")?; - - Ok(()) - }) - .await??; - } + tokio::task::spawn_blocking(move || index.dump(path)).await??; Ok(()) } diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index 64b63e5f0..26aa189d0 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -136,9 +136,9 @@ impl IndexActorHandle for IndexActorHandleImpl { Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn dump(&self, uid: String, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Dump { uid, uuid, path, ret }; + let msg = IndexMsg::Dump { uuid, path, ret }; let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs index 37faa1e31..714a30ecc 100644 --- a/meilisearch-http/src/index_controller/index_actor/message.rs +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -61,7 +61,6 @@ pub enum IndexMsg { ret: oneshot::Sender>, }, Dump { - uid: String, uuid: Uuid, path: PathBuf, ret: oneshot::Sender>, diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index fd1d59e8f..dbea5151d 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -109,7 +109,7 @@ pub trait IndexActorHandle { index_settings: IndexSettings, ) -> IndexResult; async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>; - async fn dump(&self, uid: String, uuid: Uuid, path: PathBuf) -> IndexResult<()>; + async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>; async fn get_index_stats(&self, uuid: Uuid) -> IndexResult; } diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index f91a2740c..d22be0bd4 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -642,7 +642,7 @@ impl UpdateStore { let path = &path; let mut stream = futures::stream::iter(uuids.iter()) - .map(|(uid, uuid)| handle.dump(uid.clone(), *uuid, path.clone())) + .map(|(uid, uuid)| handle.dump(*uuid, path.clone())) .buffer_unordered(CONCURRENT_INDEX_MSG / 3); Handle::current().block_on(async { diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index 4fbaa37b4..b497116cb 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -152,8 +152,8 @@ impl HeedUuidStore { let entry = entry?; let uuid = Uuid::from_slice(entry.1)?; uuids.insert(uuid); - serde_json::to_writer(&mut dump_file, &serde_json::json!({ "uid": entry.0, "uuid": uuid }))?; - dump_file.write(b"\n").unwrap(); + serde_json::to_writer(&mut dump_file, &serde_json::json!({ "uid": entry.0, "uuid": uuid + }))?; dump_file.write(b"\n").unwrap(); } Ok(uuids)