mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 18:17:39 +08:00
import the update_file_store in the index-scheduler
This commit is contained in:
parent
2afb381f95
commit
76597fc382
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -1738,6 +1738,7 @@ dependencies = [
|
|||||||
"csv",
|
"csv",
|
||||||
"derivative",
|
"derivative",
|
||||||
"either",
|
"either",
|
||||||
|
"file-store",
|
||||||
"fst",
|
"fst",
|
||||||
"indexmap",
|
"indexmap",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
|
@ -11,6 +11,7 @@ bincode = "1.3.3"
|
|||||||
csv = "1.1.6"
|
csv = "1.1.6"
|
||||||
derivative = "2.2.0"
|
derivative = "2.2.0"
|
||||||
either = { version = "1.6.1", features = ["serde"] }
|
either = { version = "1.6.1", features = ["serde"] }
|
||||||
|
file-store = { path = "../file-store" }
|
||||||
fst = "0.4.7"
|
fst = "0.4.7"
|
||||||
indexmap = { version = "1.8.0", features = ["serde-1"] }
|
indexmap = { version = "1.8.0", features = ["serde-1"] }
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
|
@ -4,8 +4,6 @@ use meilisearch_types::error::{Code, ErrorCode};
|
|||||||
use meilisearch_types::internal_error;
|
use meilisearch_types::internal_error;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::update_file_store;
|
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, IndexError>;
|
pub type Result<T> = std::result::Result<T, IndexError>;
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
@ -25,23 +23,10 @@ internal_error!(
|
|||||||
milli::heed::Error,
|
milli::heed::Error,
|
||||||
fst::Error,
|
fst::Error,
|
||||||
serde_json::Error,
|
serde_json::Error,
|
||||||
update_file_store::UpdateFileStoreError,
|
file_store::Error,
|
||||||
milli::documents::Error
|
milli::documents::Error
|
||||||
);
|
);
|
||||||
|
|
||||||
/*
|
|
||||||
impl ErrorCode for IndexError {
|
|
||||||
fn error_code(&self) -> Code {
|
|
||||||
match self {
|
|
||||||
IndexError::Internal(_) => Code::Internal,
|
|
||||||
IndexError::DocumentNotFound(_) => Code::DocumentNotFound,
|
|
||||||
IndexError::Facet(e) => e.error_code(),
|
|
||||||
IndexError::Milli(e) => MilliError(e).error_code(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
impl From<milli::UserError> for IndexError {
|
impl From<milli::UserError> for IndexError {
|
||||||
fn from(error: milli::UserError) -> IndexError {
|
fn from(error: milli::UserError) -> IndexError {
|
||||||
IndexError::Milli(error.into())
|
IndexError::Milli(error.into())
|
||||||
|
@ -13,7 +13,7 @@ use uuid::Uuid;
|
|||||||
|
|
||||||
use super::error::{IndexError, Result};
|
use super::error::{IndexError, Result};
|
||||||
use super::index::{Index, IndexMeta};
|
use super::index::{Index, IndexMeta};
|
||||||
use crate::update_file_store::UpdateFileStore;
|
use file_store::UpdateFileStore;
|
||||||
|
|
||||||
fn serialize_with_wildcard<S>(
|
fn serialize_with_wildcard<S>(
|
||||||
field: &Setting<Vec<String>>,
|
field: &Setting<Vec<String>>,
|
||||||
|
@ -4,16 +4,15 @@ mod document_formats;
|
|||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod index;
|
pub mod index;
|
||||||
pub mod task;
|
pub mod task;
|
||||||
mod update_file_store;
|
|
||||||
mod utils;
|
mod utils;
|
||||||
|
|
||||||
use batch::Batch;
|
use batch::Batch;
|
||||||
pub use error::Error;
|
pub use error::Error;
|
||||||
|
use file_store::UpdateFileStore;
|
||||||
use index::Index;
|
use index::Index;
|
||||||
pub use task::Task;
|
pub use task::Task;
|
||||||
use task::{Kind, KindWithContent, Status};
|
use task::{Kind, KindWithContent, Status};
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use update_file_store::UpdateFileStore;
|
|
||||||
|
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
@ -1,258 +0,0 @@
|
|||||||
use std::fs::{create_dir_all, File};
|
|
||||||
use std::io::{self, BufReader, BufWriter, Write};
|
|
||||||
use std::ops::{Deref, DerefMut};
|
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
|
|
||||||
use milli::documents::DocumentsBatchReader;
|
|
||||||
use serde_json::Map;
|
|
||||||
use tempfile::{NamedTempFile, PersistError};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
#[cfg(not(test))]
|
|
||||||
pub use store::UpdateFileStore;
|
|
||||||
#[cfg(test)]
|
|
||||||
pub use test::MockUpdateFileStore as UpdateFileStore;
|
|
||||||
|
|
||||||
const UPDATE_FILES_PATH: &str = "updates/updates_files";
|
|
||||||
|
|
||||||
use crate::document_formats::read_ndjson;
|
|
||||||
|
|
||||||
pub struct UpdateFile {
|
|
||||||
path: PathBuf,
|
|
||||||
file: NamedTempFile,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
|
||||||
#[error("Error while persisting update to disk: {0}")]
|
|
||||||
pub struct UpdateFileStoreError(Box<dyn std::error::Error + Sync + Send + 'static>);
|
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, UpdateFileStoreError>;
|
|
||||||
|
|
||||||
macro_rules! into_update_store_error {
|
|
||||||
($($other:path),*) => {
|
|
||||||
$(
|
|
||||||
impl From<$other> for UpdateFileStoreError {
|
|
||||||
fn from(other: $other) -> Self {
|
|
||||||
Self(Box::new(other))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)*
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
into_update_store_error!(
|
|
||||||
PersistError,
|
|
||||||
io::Error,
|
|
||||||
serde_json::Error,
|
|
||||||
milli::documents::Error,
|
|
||||||
milli::documents::DocumentsBatchCursorError
|
|
||||||
);
|
|
||||||
|
|
||||||
impl UpdateFile {
|
|
||||||
pub fn persist(self) -> Result<()> {
|
|
||||||
self.file.persist(&self.path)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Deref for UpdateFile {
|
|
||||||
type Target = NamedTempFile;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.file
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DerefMut for UpdateFile {
|
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
||||||
&mut self.file
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mod store {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct UpdateFileStore {
|
|
||||||
path: PathBuf,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UpdateFileStore {
|
|
||||||
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
|
|
||||||
let src_update_files_path = src.as_ref().join(UPDATE_FILES_PATH);
|
|
||||||
let dst_update_files_path = dst.as_ref().join(UPDATE_FILES_PATH);
|
|
||||||
|
|
||||||
// No update files to load
|
|
||||||
if !src_update_files_path.exists() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
create_dir_all(&dst_update_files_path)?;
|
|
||||||
|
|
||||||
let entries = std::fs::read_dir(src_update_files_path)?;
|
|
||||||
|
|
||||||
for entry in entries {
|
|
||||||
let entry = entry?;
|
|
||||||
let update_file = BufReader::new(File::open(entry.path())?);
|
|
||||||
let file_uuid = entry.file_name();
|
|
||||||
let file_uuid = file_uuid
|
|
||||||
.to_str()
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("invalid update file name"))?;
|
|
||||||
let dst_path = dst_update_files_path.join(file_uuid);
|
|
||||||
let dst_file = BufWriter::new(File::create(dst_path)?);
|
|
||||||
read_ndjson(update_file, dst_file)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
|
|
||||||
let path = path.as_ref().join(UPDATE_FILES_PATH);
|
|
||||||
std::fs::create_dir_all(&path)?;
|
|
||||||
Ok(Self { path })
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a new temporary update file.
|
|
||||||
/// A call to `persist` is needed to persist the file in the database.
|
|
||||||
pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> {
|
|
||||||
let file = NamedTempFile::new_in(&self.path)?;
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
let path = self.path.join(uuid.to_string());
|
|
||||||
let update_file = UpdateFile { file, path };
|
|
||||||
|
|
||||||
Ok((uuid, update_file))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the file corresponding to the requested uuid.
|
|
||||||
pub fn get_update(&self, uuid: Uuid) -> Result<File> {
|
|
||||||
let path = self.path.join(uuid.to_string());
|
|
||||||
let file = File::open(path)?;
|
|
||||||
Ok(file)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Copies the content of the update file pointed to by `uuid` to the `dst` directory.
|
|
||||||
pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef<Path>) -> Result<()> {
|
|
||||||
let src = self.path.join(uuid.to_string());
|
|
||||||
let mut dst = dst.as_ref().join(UPDATE_FILES_PATH);
|
|
||||||
std::fs::create_dir_all(&dst)?;
|
|
||||||
dst.push(uuid.to_string());
|
|
||||||
std::fs::copy(src, dst)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Peforms a dump of the given update file uuid into the provided dump path.
|
|
||||||
pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef<Path>) -> Result<()> {
|
|
||||||
let uuid_string = uuid.to_string();
|
|
||||||
let update_file_path = self.path.join(&uuid_string);
|
|
||||||
let mut dst = dump_path.as_ref().join(UPDATE_FILES_PATH);
|
|
||||||
std::fs::create_dir_all(&dst)?;
|
|
||||||
dst.push(&uuid_string);
|
|
||||||
|
|
||||||
let update_file = File::open(update_file_path)?;
|
|
||||||
let mut dst_file = NamedTempFile::new_in(&dump_path)?;
|
|
||||||
let (mut document_cursor, index) =
|
|
||||||
DocumentsBatchReader::from_reader(update_file)?.into_cursor_and_fields_index();
|
|
||||||
|
|
||||||
let mut document_buffer = Map::new();
|
|
||||||
// TODO: we need to find a way to do this more efficiently. (create a custom serializer
|
|
||||||
// for jsonl for example...)
|
|
||||||
while let Some(document) = document_cursor.next_document()? {
|
|
||||||
for (field_id, content) in document.iter() {
|
|
||||||
if let Some(field_name) = index.name(field_id) {
|
|
||||||
let content = serde_json::from_slice(content)?;
|
|
||||||
document_buffer.insert(field_name.to_string(), content);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
serde_json::to_writer(&mut dst_file, &document_buffer)?;
|
|
||||||
dst_file.write_all(b"\n")?;
|
|
||||||
document_buffer.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
dst_file.persist(dst)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_size(&self, uuid: Uuid) -> Result<u64> {
|
|
||||||
Ok(self.get_update(uuid)?.metadata()?.len())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn delete(&self, uuid: Uuid) -> Result<()> {
|
|
||||||
let path = self.path.join(uuid.to_string());
|
|
||||||
std::fs::remove_file(path)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use nelson::Mocker;
|
|
||||||
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub enum MockUpdateFileStore {
|
|
||||||
Real(store::UpdateFileStore),
|
|
||||||
Mock(Arc<Mocker>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MockUpdateFileStore {
|
|
||||||
pub fn mock(mocker: Mocker) -> Self {
|
|
||||||
Self::Mock(Arc::new(mocker))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
|
|
||||||
store::UpdateFileStore::load_dump(src, dst)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
|
|
||||||
store::UpdateFileStore::new(path).map(Self::Real)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> {
|
|
||||||
match self {
|
|
||||||
MockUpdateFileStore::Real(s) => s.new_update(),
|
|
||||||
MockUpdateFileStore::Mock(_) => todo!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_update(&self, uuid: Uuid) -> Result<File> {
|
|
||||||
match self {
|
|
||||||
MockUpdateFileStore::Real(s) => s.get_update(uuid),
|
|
||||||
MockUpdateFileStore::Mock(_) => todo!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef<Path>) -> Result<()> {
|
|
||||||
match self {
|
|
||||||
MockUpdateFileStore::Real(s) => s.snapshot(uuid, dst),
|
|
||||||
MockUpdateFileStore::Mock(_) => todo!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef<Path>) -> Result<()> {
|
|
||||||
match self {
|
|
||||||
MockUpdateFileStore::Real(s) => s.dump(uuid, dump_path),
|
|
||||||
MockUpdateFileStore::Mock(_) => todo!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_size(&self, uuid: Uuid) -> Result<u64> {
|
|
||||||
match self {
|
|
||||||
MockUpdateFileStore::Real(s) => s.get_size(uuid),
|
|
||||||
MockUpdateFileStore::Mock(_) => todo!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn delete(&self, uuid: Uuid) -> Result<()> {
|
|
||||||
match self {
|
|
||||||
MockUpdateFileStore::Real(s) => s.delete(uuid),
|
|
||||||
MockUpdateFileStore::Mock(mocker) => unsafe { mocker.get("delete").call(uuid) },
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user