mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 10:07:40 +08:00
Merge #4220
4220: Bring back changes from v1.5.0 into main r=dureuill a=Kerollmops This will bring the fixes from v1.5.0 into main. By [following this guide](https://github.com/meilisearch/engine-team/blob/main/resources/meilisearch-release.md#after-the-release) I decided to create a temporary branch to fix the git conflicts and merge into main afterward. Co-authored-by: curquiza <curquiza@users.noreply.github.com> Co-authored-by: Vivek Kumar <vivek.26@outlook.com> Co-authored-by: Louis Dureuil <louis.dureuil@gmail.com> Co-authored-by: meili-bors[bot] <89034592+meili-bors[bot]@users.noreply.github.com> Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Clément Renault <clement@meilisearch.com> Co-authored-by: Louis Dureuil <louis.dureuil@xinra.net> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
commit
b366acdae6
880
Cargo.lock
generated
880
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -2,6 +2,7 @@
|
||||
resolver = "2"
|
||||
members = [
|
||||
"meilisearch",
|
||||
"meilitool",
|
||||
"meilisearch-types",
|
||||
"meilisearch-auth",
|
||||
"meili-snap",
|
||||
@ -18,7 +19,7 @@ members = [
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
version = "1.4.1"
|
||||
version = "1.5.0"
|
||||
authors = ["Quentin de Quelen <quentin@dequelen.me>", "Clément Renault <clement@meilisearch.com>"]
|
||||
description = "Meilisearch HTTP server"
|
||||
homepage = "https://meilisearch.com"
|
||||
|
11
Dockerfile
11
Dockerfile
@ -3,7 +3,7 @@ FROM rust:alpine3.16 AS compiler
|
||||
|
||||
RUN apk add -q --update-cache --no-cache build-base openssl-dev
|
||||
|
||||
WORKDIR /meilisearch
|
||||
WORKDIR /
|
||||
|
||||
ARG COMMIT_SHA
|
||||
ARG COMMIT_DATE
|
||||
@ -17,7 +17,7 @@ RUN set -eux; \
|
||||
if [ "$apkArch" = "aarch64" ]; then \
|
||||
export JEMALLOC_SYS_WITH_LG_PAGE=16; \
|
||||
fi && \
|
||||
cargo build --release
|
||||
cargo build --release -p meilisearch -p meilitool
|
||||
|
||||
# Run
|
||||
FROM alpine:3.16
|
||||
@ -28,9 +28,10 @@ ENV MEILI_SERVER_PROVIDER docker
|
||||
RUN apk update --quiet \
|
||||
&& apk add -q --no-cache libgcc tini curl
|
||||
|
||||
# add meilisearch to the `/bin` so you can run it from anywhere and it's easy
|
||||
# to find.
|
||||
COPY --from=compiler /meilisearch/target/release/meilisearch /bin/meilisearch
|
||||
# add meilisearch and meilitool to the `/bin` so you can run it from anywhere
|
||||
# and it's easy to find.
|
||||
COPY --from=compiler /target/release/meilisearch /bin/meilisearch
|
||||
COPY --from=compiler /target/release/meilitool /bin/meilitool
|
||||
# To stay compatible with the older version of the container (pre v0.27.0) we're
|
||||
# going to symlink the meilisearch binary in the path to `/meilisearch`
|
||||
RUN ln -s /bin/meilisearch /meilisearch
|
||||
|
@ -933,6 +933,10 @@ impl IndexScheduler {
|
||||
self.index_mapper.index(&rtxn, &index_uid)?
|
||||
};
|
||||
|
||||
// the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick
|
||||
*self.currently_updating_index.write().unwrap() =
|
||||
Some((index_uid.clone(), index.clone()));
|
||||
|
||||
let mut index_wtxn = index.write_txn()?;
|
||||
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
||||
index_wtxn.commit()?;
|
||||
|
@ -39,6 +39,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
||||
test_breakpoint_sdr: _,
|
||||
planned_failures: _,
|
||||
run_loop_iteration: _,
|
||||
currently_updating_index: _,
|
||||
} = scheduler;
|
||||
|
||||
let rtxn = env.read_txn().unwrap();
|
||||
|
@ -27,7 +27,7 @@ mod index_mapper;
|
||||
mod insta_snapshot;
|
||||
mod lru;
|
||||
mod utils;
|
||||
mod uuid_codec;
|
||||
pub mod uuid_codec;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
pub type TaskId = u32;
|
||||
@ -331,6 +331,10 @@ pub struct IndexScheduler {
|
||||
/// The path to the version file of Meilisearch.
|
||||
pub(crate) version_file_path: PathBuf,
|
||||
|
||||
/// A few types of long running batches of tasks that act on a single index set this field
|
||||
/// so that a handle to the index is available from other threads (search) in an optimized manner.
|
||||
currently_updating_index: Arc<RwLock<Option<(String, Index)>>>,
|
||||
|
||||
// ================= test
|
||||
// The next entry is dedicated to the tests.
|
||||
/// Provide a way to set a breakpoint in multiple part of the scheduler.
|
||||
@ -374,6 +378,7 @@ impl IndexScheduler {
|
||||
dumps_path: self.dumps_path.clone(),
|
||||
auth_path: self.auth_path.clone(),
|
||||
version_file_path: self.version_file_path.clone(),
|
||||
currently_updating_index: self.currently_updating_index.clone(),
|
||||
#[cfg(test)]
|
||||
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
||||
#[cfg(test)]
|
||||
@ -470,6 +475,7 @@ impl IndexScheduler {
|
||||
snapshots_path: options.snapshots_path,
|
||||
auth_path: options.auth_path,
|
||||
version_file_path: options.version_file_path,
|
||||
currently_updating_index: Arc::new(RwLock::new(None)),
|
||||
|
||||
#[cfg(test)]
|
||||
test_breakpoint_sdr,
|
||||
@ -652,6 +658,13 @@ impl IndexScheduler {
|
||||
/// If you need to fetch information from or perform an action on all indexes,
|
||||
/// see the `try_for_each_index` function.
|
||||
pub fn index(&self, name: &str) -> Result<Index> {
|
||||
if let Some((current_name, current_index)) =
|
||||
self.currently_updating_index.read().unwrap().as_ref()
|
||||
{
|
||||
if current_name == name {
|
||||
return Ok(current_index.clone());
|
||||
}
|
||||
}
|
||||
let rtxn = self.env.read_txn()?;
|
||||
self.index_mapper.index(&rtxn, name)
|
||||
}
|
||||
@ -1133,6 +1146,9 @@ impl IndexScheduler {
|
||||
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
|
||||
};
|
||||
|
||||
// Reset the currently updating index to relinquish the index handle
|
||||
*self.currently_updating_index.write().unwrap() = None;
|
||||
|
||||
#[cfg(test)]
|
||||
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
|
||||
|
||||
|
@ -50,6 +50,7 @@ hebrew = ["milli/hebrew"]
|
||||
japanese = ["milli/japanese"]
|
||||
# thai specialized tokenization
|
||||
thai = ["milli/thai"]
|
||||
|
||||
# allow greek specialized tokenization
|
||||
greek = ["milli/greek"]
|
||||
# allow khmer specialized tokenization
|
||||
khmer = ["milli/khmer"]
|
||||
|
@ -150,6 +150,7 @@ hebrew = ["meilisearch-types/hebrew"]
|
||||
japanese = ["meilisearch-types/japanese"]
|
||||
thai = ["meilisearch-types/thai"]
|
||||
greek = ["meilisearch-types/greek"]
|
||||
khmer = ["meilisearch-types/khmer"]
|
||||
|
||||
[package.metadata.mini-dashboard]
|
||||
assets-url = "https://github.com/meilisearch/mini-dashboard/releases/download/v0.2.11/build.zip"
|
||||
|
@ -5,9 +5,11 @@ pub mod service;
|
||||
|
||||
use std::fmt::{self, Display};
|
||||
|
||||
#[allow(unused)]
|
||||
pub use index::{GetAllDocumentsOptions, GetDocumentOptions};
|
||||
use meili_snap::json_string;
|
||||
use serde::{Deserialize, Serialize};
|
||||
#[allow(unused)]
|
||||
pub use server::{default_settings, Server};
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
|
||||
|
@ -6,21 +6,109 @@ use crate::json;
|
||||
|
||||
pub(self) static DOCUMENTS: Lazy<Value> = Lazy::new(|| {
|
||||
json!([
|
||||
{"productId": 1, "shopId": 1},
|
||||
{"productId": 2, "shopId": 1},
|
||||
{"productId": 3, "shopId": 2},
|
||||
{"productId": 4, "shopId": 2},
|
||||
{"productId": 5, "shopId": 3},
|
||||
{"productId": 6, "shopId": 3},
|
||||
{"productId": 7, "shopId": 4},
|
||||
{"productId": 8, "shopId": 4},
|
||||
{"productId": 9, "shopId": 5},
|
||||
{"productId": 10, "shopId": 5}
|
||||
{
|
||||
"id": 1,
|
||||
"description": "Leather Jacket",
|
||||
"brand": "Lee Jeans",
|
||||
"product_id": "123456",
|
||||
"color": "Brown"
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"description": "Leather Jacket",
|
||||
"brand": "Lee Jeans",
|
||||
"product_id": "123456",
|
||||
"color": "Black"
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"description": "Leather Jacket",
|
||||
"brand": "Lee Jeans",
|
||||
"product_id": "123456",
|
||||
"color": "Blue"
|
||||
},
|
||||
{
|
||||
"id": 4,
|
||||
"description": "T-Shirt",
|
||||
"brand": "Nike",
|
||||
"product_id": "789012",
|
||||
"color": "Red"
|
||||
},
|
||||
{
|
||||
"id": 5,
|
||||
"description": "T-Shirt",
|
||||
"brand": "Nike",
|
||||
"product_id": "789012",
|
||||
"color": "Blue"
|
||||
},
|
||||
{
|
||||
"id": 6,
|
||||
"description": "Running Shoes",
|
||||
"brand": "Adidas",
|
||||
"product_id": "456789",
|
||||
"color": "Black"
|
||||
},
|
||||
{
|
||||
"id": 7,
|
||||
"description": "Running Shoes",
|
||||
"brand": "Adidas",
|
||||
"product_id": "456789",
|
||||
"color": "White"
|
||||
},
|
||||
{
|
||||
"id": 8,
|
||||
"description": "Hoodie",
|
||||
"brand": "Puma",
|
||||
"product_id": "987654",
|
||||
"color": "Gray"
|
||||
},
|
||||
{
|
||||
"id": 9,
|
||||
"description": "Sweater",
|
||||
"brand": "Gap",
|
||||
"product_id": "234567",
|
||||
"color": "Green"
|
||||
},
|
||||
{
|
||||
"id": 10,
|
||||
"description": "Sweater",
|
||||
"brand": "Gap",
|
||||
"product_id": "234567",
|
||||
"color": "Red"
|
||||
},
|
||||
{
|
||||
"id": 11,
|
||||
"description": "Sweater",
|
||||
"brand": "Gap",
|
||||
"product_id": "234567",
|
||||
"color": "Blue"
|
||||
},
|
||||
{
|
||||
"id": 12,
|
||||
"description": "Jeans",
|
||||
"brand": "Levi's",
|
||||
"product_id": "345678",
|
||||
"color": "Indigo"
|
||||
},
|
||||
{
|
||||
"id": 13,
|
||||
"description": "Jeans",
|
||||
"brand": "Levi's",
|
||||
"product_id": "345678",
|
||||
"color": "Black"
|
||||
},
|
||||
{
|
||||
"id": 14,
|
||||
"description": "Jeans",
|
||||
"brand": "Levi's",
|
||||
"product_id": "345678",
|
||||
"color": "Stone Wash"
|
||||
}
|
||||
])
|
||||
});
|
||||
|
||||
pub(self) static DOCUMENT_PRIMARY_KEY: &str = "productId";
|
||||
pub(self) static DOCUMENT_DISTINCT_KEY: &str = "shopId";
|
||||
pub(self) static DOCUMENT_PRIMARY_KEY: &str = "id";
|
||||
pub(self) static DOCUMENT_DISTINCT_KEY: &str = "product_id";
|
||||
|
||||
/// testing: https://github.com/meilisearch/meilisearch/issues/4078
|
||||
#[actix_rt::test]
|
||||
@ -33,31 +121,121 @@ async fn distinct_search_with_offset_no_ranking() {
|
||||
index.update_distinct_attribute(json!(DOCUMENT_DISTINCT_KEY)).await;
|
||||
index.wait_task(1).await;
|
||||
|
||||
fn get_hits(Value(response): Value) -> Vec<i64> {
|
||||
fn get_hits(response: &Value) -> Vec<&str> {
|
||||
let hits_array = response["hits"].as_array().unwrap();
|
||||
hits_array.iter().map(|h| h[DOCUMENT_DISTINCT_KEY].as_i64().unwrap()).collect::<Vec<_>>()
|
||||
hits_array.iter().map(|h| h[DOCUMENT_DISTINCT_KEY].as_str().unwrap()).collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
let (response, code) = index.search_post(json!({"limit": 2, "offset": 0})).await;
|
||||
let hits = get_hits(response);
|
||||
let (response, code) = index.search_post(json!({"offset": 0, "limit": 2})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"2");
|
||||
snapshot!(format!("{:?}", hits), @"[1, 2]");
|
||||
snapshot!(format!("{:?}", hits), @r#"["123456", "789012"]"#);
|
||||
snapshot!(response["estimatedTotalHits"] , @"11");
|
||||
|
||||
let (response, code) = index.search_post(json!({"limit": 2, "offset": 2})).await;
|
||||
let hits = get_hits(response);
|
||||
let (response, code) = index.search_post(json!({"offset": 2, "limit": 2})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"2");
|
||||
snapshot!(format!("{:?}", hits), @"[3, 4]");
|
||||
snapshot!(format!("{:?}", hits), @r#"["456789", "987654"]"#);
|
||||
snapshot!(response["estimatedTotalHits"], @"10");
|
||||
|
||||
let (response, code) = index.search_post(json!({"limit": 10, "offset": 4})).await;
|
||||
let hits = get_hits(response);
|
||||
let (response, code) = index.search_post(json!({"offset": 4, "limit": 2})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"2");
|
||||
snapshot!(format!("{:?}", hits), @r#"["234567", "345678"]"#);
|
||||
snapshot!(response["estimatedTotalHits"], @"6");
|
||||
|
||||
let (response, code) = index.search_post(json!({"offset": 5, "limit": 2})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"1");
|
||||
snapshot!(format!("{:?}", hits), @"[5]");
|
||||
snapshot!(format!("{:?}", hits), @r#"["345678"]"#);
|
||||
snapshot!(response["estimatedTotalHits"], @"6");
|
||||
|
||||
let (response, code) = index.search_post(json!({"limit": 10, "offset": 5})).await;
|
||||
let hits = get_hits(response);
|
||||
let (response, code) = index.search_post(json!({"offset": 6, "limit": 2})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"0");
|
||||
snapshot!(format!("{:?}", hits), @r#"[]"#);
|
||||
snapshot!(response["estimatedTotalHits"], @"6");
|
||||
|
||||
let (response, code) = index.search_post(json!({"offset": 7, "limit": 2})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"0");
|
||||
snapshot!(format!("{:?}", hits), @r#"[]"#);
|
||||
snapshot!(response["estimatedTotalHits"], @"6");
|
||||
}
|
||||
|
||||
/// testing: https://github.com/meilisearch/meilisearch/issues/4130
|
||||
#[actix_rt::test]
|
||||
async fn distinct_search_with_pagination_no_ranking() {
|
||||
let server = Server::new().await;
|
||||
let index = server.index("test");
|
||||
|
||||
let documents = DOCUMENTS.clone();
|
||||
index.add_documents(documents, Some(DOCUMENT_PRIMARY_KEY)).await;
|
||||
index.update_distinct_attribute(json!(DOCUMENT_DISTINCT_KEY)).await;
|
||||
index.wait_task(1).await;
|
||||
|
||||
fn get_hits(response: &Value) -> Vec<&str> {
|
||||
let hits_array = response["hits"].as_array().unwrap();
|
||||
hits_array.iter().map(|h| h[DOCUMENT_DISTINCT_KEY].as_str().unwrap()).collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
let (response, code) = index.search_post(json!({"page": 0, "hitsPerPage": 2})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"0");
|
||||
snapshot!(format!("{:?}", hits), @r#"[]"#);
|
||||
snapshot!(response["page"], @"0");
|
||||
snapshot!(response["totalPages"], @"3");
|
||||
snapshot!(response["totalHits"], @"6");
|
||||
|
||||
let (response, code) = index.search_post(json!({"page": 1, "hitsPerPage": 2})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"2");
|
||||
snapshot!(format!("{:?}", hits), @r#"["123456", "789012"]"#);
|
||||
snapshot!(response["page"], @"1");
|
||||
snapshot!(response["totalPages"], @"3");
|
||||
snapshot!(response["totalHits"], @"6");
|
||||
|
||||
let (response, code) = index.search_post(json!({"page": 2, "hitsPerPage": 2})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"2");
|
||||
snapshot!(format!("{:?}", hits), @r#"["456789", "987654"]"#);
|
||||
snapshot!(response["page"], @"2");
|
||||
snapshot!(response["totalPages"], @"3");
|
||||
snapshot!(response["totalHits"], @"6");
|
||||
|
||||
let (response, code) = index.search_post(json!({"page": 3, "hitsPerPage": 2})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"2");
|
||||
snapshot!(format!("{:?}", hits), @r#"["234567", "345678"]"#);
|
||||
snapshot!(response["page"], @"3");
|
||||
snapshot!(response["totalPages"], @"3");
|
||||
snapshot!(response["totalHits"], @"6");
|
||||
|
||||
let (response, code) = index.search_post(json!({"page": 4, "hitsPerPage": 2})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"0");
|
||||
snapshot!(format!("{:?}", hits), @r#"[]"#);
|
||||
snapshot!(response["page"], @"4");
|
||||
snapshot!(response["totalPages"], @"3");
|
||||
snapshot!(response["totalHits"], @"6");
|
||||
|
||||
let (response, code) = index.search_post(json!({"page": 2, "hitsPerPage": 3})).await;
|
||||
let hits = get_hits(&response);
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(hits.len(), @"3");
|
||||
snapshot!(format!("{:?}", hits), @r#"["987654", "234567", "345678"]"#);
|
||||
snapshot!(response["page"], @"2");
|
||||
snapshot!(response["totalPages"], @"2");
|
||||
snapshot!(response["totalHits"], @"6");
|
||||
}
|
||||
|
19
meilitool/Cargo.toml
Normal file
19
meilitool/Cargo.toml
Normal file
@ -0,0 +1,19 @@
|
||||
[package]
|
||||
name = "meilitool"
|
||||
description = "A CLI to edit a Meilisearch database from the command line"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
homepage.workspace = true
|
||||
readme.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.75"
|
||||
clap = { version = "4.2.1", features = ["derive"] }
|
||||
dump = { path = "../dump" }
|
||||
file-store = { path = "../file-store" }
|
||||
meilisearch-auth = { path = "../meilisearch-auth" }
|
||||
meilisearch-types = { path = "../meilisearch-types" }
|
||||
time = { version = "0.3.30", features = ["formatting"] }
|
||||
uuid = { version = "1.5.0", features = ["v4"], default-features = false }
|
312
meilitool/src/main.rs
Normal file
312
meilitool/src/main.rs
Normal file
@ -0,0 +1,312 @@
|
||||
use std::fs::{read_dir, read_to_string, remove_file, File};
|
||||
use std::io::BufWriter;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::Context;
|
||||
use clap::{Parser, Subcommand};
|
||||
use dump::{DumpWriter, IndexMetadata};
|
||||
use file_store::FileStore;
|
||||
use meilisearch_auth::AuthController;
|
||||
use meilisearch_types::heed::types::{OwnedType, SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, PolyDatabase, RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
||||
use meilisearch_types::milli::{obkv_to_json, BEU32};
|
||||
use meilisearch_types::tasks::{Status, Task};
|
||||
use meilisearch_types::versioning::check_version_file;
|
||||
use meilisearch_types::Index;
|
||||
use time::macros::format_description;
|
||||
use time::OffsetDateTime;
|
||||
use uuid_codec::UuidCodec;
|
||||
|
||||
mod uuid_codec;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
struct Cli {
|
||||
/// The database path where the Meilisearch is running.
|
||||
#[arg(long, default_value = "data.ms/")]
|
||||
db_path: PathBuf,
|
||||
|
||||
#[command(subcommand)]
|
||||
command: Command,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum Command {
|
||||
/// Clears the task queue and make it empty.
|
||||
///
|
||||
/// This command can be safely executed even if Meilisearch is running and processing tasks.
|
||||
/// Once the task queue is empty you can restart Meilisearch and no more tasks must be visible,
|
||||
/// even the ones that were processing. However, it's highly possible that you see the processing
|
||||
/// tasks in the queue again with an associated internal error message.
|
||||
ClearTaskQueue,
|
||||
|
||||
/// Exports a dump from the Meilisearch database.
|
||||
///
|
||||
/// Make sure to run this command when Meilisearch is not running or running but not processing tasks.
|
||||
/// If tasks are being processed while a dump is being exported there are chances for the dump to be
|
||||
/// malformed with missing tasks.
|
||||
///
|
||||
/// TODO Verify this claim or make sure it cannot happen and we can export dumps
|
||||
/// without caring about killing Meilisearch first!
|
||||
ExportADump {
|
||||
/// The directory in which the dump will be created.
|
||||
#[arg(long, default_value = "dumps/")]
|
||||
dump_dir: PathBuf,
|
||||
|
||||
/// Skip dumping the enqueued or processing tasks.
|
||||
///
|
||||
/// Can be useful when there are a lot of them and it is not particularly useful
|
||||
/// to keep them. Note that only the enqueued tasks takes up space so skipping
|
||||
/// the processed ones is not particularly interesting.
|
||||
#[arg(long)]
|
||||
skip_enqueued_tasks: bool,
|
||||
},
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let Cli { db_path, command } = Cli::parse();
|
||||
|
||||
check_version_file(&db_path).context("While checking the version file")?;
|
||||
|
||||
match command {
|
||||
Command::ClearTaskQueue => clear_task_queue(db_path),
|
||||
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
|
||||
export_a_dump(db_path, dump_dir, skip_enqueued_tasks)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Clears the task queue located at `db_path`.
|
||||
fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
|
||||
let path = db_path.join("tasks");
|
||||
let env = EnvOpenOptions::new()
|
||||
.max_dbs(100)
|
||||
.open(&path)
|
||||
.with_context(|| format!("While trying to open {:?}", path.display()))?;
|
||||
|
||||
eprintln!("Deleting tasks from the database...");
|
||||
|
||||
let mut wtxn = env.write_txn()?;
|
||||
let all_tasks = try_opening_poly_database(&env, &wtxn, "all-tasks")?;
|
||||
let total = all_tasks.len(&wtxn)?;
|
||||
let status = try_opening_poly_database(&env, &wtxn, "status")?;
|
||||
let kind = try_opening_poly_database(&env, &wtxn, "kind")?;
|
||||
let index_tasks = try_opening_poly_database(&env, &wtxn, "index-tasks")?;
|
||||
let canceled_by = try_opening_poly_database(&env, &wtxn, "canceled_by")?;
|
||||
let enqueued_at = try_opening_poly_database(&env, &wtxn, "enqueued-at")?;
|
||||
let started_at = try_opening_poly_database(&env, &wtxn, "started-at")?;
|
||||
let finished_at = try_opening_poly_database(&env, &wtxn, "finished-at")?;
|
||||
|
||||
try_clearing_poly_database(&mut wtxn, all_tasks, "all-tasks")?;
|
||||
try_clearing_poly_database(&mut wtxn, status, "status")?;
|
||||
try_clearing_poly_database(&mut wtxn, kind, "kind")?;
|
||||
try_clearing_poly_database(&mut wtxn, index_tasks, "index-tasks")?;
|
||||
try_clearing_poly_database(&mut wtxn, canceled_by, "canceled_by")?;
|
||||
try_clearing_poly_database(&mut wtxn, enqueued_at, "enqueued-at")?;
|
||||
try_clearing_poly_database(&mut wtxn, started_at, "started-at")?;
|
||||
try_clearing_poly_database(&mut wtxn, finished_at, "finished-at")?;
|
||||
|
||||
wtxn.commit().context("While committing the transaction")?;
|
||||
|
||||
eprintln!("Successfully deleted {total} tasks from the tasks database!");
|
||||
eprintln!("Deleting the content files from disk...");
|
||||
|
||||
let mut count = 0usize;
|
||||
let update_files = db_path.join("update_files");
|
||||
let entries = read_dir(&update_files).with_context(|| {
|
||||
format!("While trying to read the content of {:?}", update_files.display())
|
||||
})?;
|
||||
for result in entries {
|
||||
match result {
|
||||
Ok(ent) => match remove_file(ent.path()) {
|
||||
Ok(_) => count += 1,
|
||||
Err(e) => eprintln!("Error while deleting {:?}: {}", ent.path().display(), e),
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("Error while reading a file in {:?}: {}", update_files.display(), e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
eprintln!("Sucessfully deleted {count} content files from disk!");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn try_opening_database<KC: 'static, DC: 'static>(
|
||||
env: &Env,
|
||||
rtxn: &RoTxn,
|
||||
db_name: &str,
|
||||
) -> anyhow::Result<Database<KC, DC>> {
|
||||
env.open_database(rtxn, Some(db_name))
|
||||
.with_context(|| format!("While opening the {db_name:?} database"))?
|
||||
.with_context(|| format!("Missing the {db_name:?} database"))
|
||||
}
|
||||
|
||||
fn try_opening_poly_database(
|
||||
env: &Env,
|
||||
rtxn: &RoTxn,
|
||||
db_name: &str,
|
||||
) -> anyhow::Result<PolyDatabase> {
|
||||
env.open_poly_database(rtxn, Some(db_name))
|
||||
.with_context(|| format!("While opening the {db_name:?} poly database"))?
|
||||
.with_context(|| format!("Missing the {db_name:?} poly database"))
|
||||
}
|
||||
|
||||
fn try_clearing_poly_database(
|
||||
wtxn: &mut RwTxn,
|
||||
database: PolyDatabase,
|
||||
db_name: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
database.clear(wtxn).with_context(|| format!("While clearing the {db_name:?} database"))
|
||||
}
|
||||
|
||||
/// Exports a dump into the dump directory.
|
||||
fn export_a_dump(
|
||||
db_path: PathBuf,
|
||||
dump_dir: PathBuf,
|
||||
skip_enqueued_tasks: bool,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let started_at = OffsetDateTime::now_utc();
|
||||
|
||||
// 1. Extracts the instance UID from disk
|
||||
let instance_uid_path = db_path.join("instance-uid");
|
||||
let instance_uid = match read_to_string(&instance_uid_path) {
|
||||
Ok(content) => match content.trim().parse() {
|
||||
Ok(uuid) => Some(uuid),
|
||||
Err(e) => {
|
||||
eprintln!("Impossible to parse instance-uid: {e}");
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("Impossible to read {}: {}", instance_uid_path.display(), e);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let dump = DumpWriter::new(instance_uid).context("While creating a new dump")?;
|
||||
let file_store =
|
||||
FileStore::new(db_path.join("update_files")).context("While opening the FileStore")?;
|
||||
|
||||
let index_scheduler_path = db_path.join("tasks");
|
||||
let env = EnvOpenOptions::new()
|
||||
.max_dbs(100)
|
||||
.open(&index_scheduler_path)
|
||||
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
|
||||
|
||||
eprintln!("Dumping the keys...");
|
||||
|
||||
// 2. dump the keys
|
||||
let auth_store = AuthController::new(&db_path, &None)
|
||||
.with_context(|| format!("While opening the auth store at {}", db_path.display()))?;
|
||||
let mut dump_keys = dump.create_keys()?;
|
||||
let mut count = 0;
|
||||
for key in auth_store.list_keys()? {
|
||||
dump_keys.push_key(&key)?;
|
||||
count += 1;
|
||||
}
|
||||
dump_keys.flush()?;
|
||||
|
||||
eprintln!("Successfully dumped {count} keys!");
|
||||
|
||||
let rtxn = env.read_txn()?;
|
||||
let all_tasks: Database<OwnedType<BEU32>, SerdeJson<Task>> =
|
||||
try_opening_database(&env, &rtxn, "all-tasks")?;
|
||||
let index_mapping: Database<Str, UuidCodec> =
|
||||
try_opening_database(&env, &rtxn, "index-mapping")?;
|
||||
|
||||
if skip_enqueued_tasks {
|
||||
eprintln!("Skip dumping the enqueued tasks...");
|
||||
} else {
|
||||
eprintln!("Dumping the enqueued tasks...");
|
||||
|
||||
// 3. dump the tasks
|
||||
let mut dump_tasks = dump.create_tasks_queue()?;
|
||||
let mut count = 0;
|
||||
for ret in all_tasks.iter(&rtxn)? {
|
||||
let (_, t) = ret?;
|
||||
let status = t.status;
|
||||
let content_file = t.content_uuid();
|
||||
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
|
||||
|
||||
// 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
|
||||
if let Some(content_file_uuid) = content_file {
|
||||
if status == Status::Enqueued {
|
||||
let content_file = file_store.get_update(content_file_uuid)?;
|
||||
|
||||
let reader =
|
||||
DocumentsBatchReader::from_reader(content_file).with_context(|| {
|
||||
format!("While reading content file {:?}", content_file_uuid)
|
||||
})?;
|
||||
|
||||
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
|
||||
while let Some(doc) = cursor.next_document().with_context(|| {
|
||||
format!("While iterating on content file {:?}", content_file_uuid)
|
||||
})? {
|
||||
dump_content_file
|
||||
.push_document(&obkv_to_object(&doc, &documents_batch_index)?)?;
|
||||
}
|
||||
dump_content_file.flush()?;
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
dump_tasks.flush()?;
|
||||
|
||||
eprintln!("Successfully dumped {count} enqueued tasks!");
|
||||
}
|
||||
|
||||
eprintln!("Dumping the indexes...");
|
||||
|
||||
// 4. Dump the indexes
|
||||
let mut count = 0;
|
||||
for result in index_mapping.iter(&rtxn)? {
|
||||
let (uid, uuid) = result?;
|
||||
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||
let index = Index::new(EnvOpenOptions::new(), &index_path).with_context(|| {
|
||||
format!("While trying to open the index at path {:?}", index_path.display())
|
||||
})?;
|
||||
|
||||
let rtxn = index.read_txn()?;
|
||||
let metadata = IndexMetadata {
|
||||
uid: uid.to_owned(),
|
||||
primary_key: index.primary_key(&rtxn)?.map(String::from),
|
||||
created_at: index.created_at(&rtxn)?,
|
||||
updated_at: index.updated_at(&rtxn)?,
|
||||
};
|
||||
let mut index_dumper = dump.create_index(uid, &metadata)?;
|
||||
|
||||
let fields_ids_map = index.fields_ids_map(&rtxn)?;
|
||||
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
||||
|
||||
// 4.1. Dump the documents
|
||||
for ret in index.all_documents(&rtxn)? {
|
||||
let (_id, doc) = ret?;
|
||||
let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?;
|
||||
index_dumper.push_document(&document)?;
|
||||
}
|
||||
|
||||
// 4.2. Dump the settings
|
||||
let settings = meilisearch_types::settings::settings(&index, &rtxn)?;
|
||||
index_dumper.settings(&settings)?;
|
||||
count += 1;
|
||||
}
|
||||
|
||||
eprintln!("Successfully dumped {count} indexes!");
|
||||
// We will not dump experimental feature settings
|
||||
eprintln!("The tool is not dumping experimental features, please set them by hand afterward");
|
||||
|
||||
let dump_uid = started_at.format(format_description!(
|
||||
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
|
||||
)).unwrap();
|
||||
|
||||
let path = dump_dir.join(format!("{}.dump", dump_uid));
|
||||
let file = File::create(&path)?;
|
||||
dump.persist_to(BufWriter::new(file))?;
|
||||
|
||||
eprintln!("Dump exported at path {:?}", path.display());
|
||||
|
||||
Ok(())
|
||||
}
|
24
meilitool/src/uuid_codec.rs
Normal file
24
meilitool/src/uuid_codec.rs
Normal file
@ -0,0 +1,24 @@
|
||||
use std::borrow::Cow;
|
||||
use std::convert::TryInto;
|
||||
|
||||
use meilisearch_types::heed::{BytesDecode, BytesEncode};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// A heed codec for value of struct Uuid.
|
||||
pub struct UuidCodec;
|
||||
|
||||
impl<'a> BytesDecode<'a> for UuidCodec {
|
||||
type DItem = Uuid;
|
||||
|
||||
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
|
||||
bytes.try_into().ok().map(Uuid::from_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl BytesEncode<'_> for UuidCodec {
|
||||
type EItem = Uuid;
|
||||
|
||||
fn bytes_encode(item: &Self::EItem) -> Option<Cow<[u8]>> {
|
||||
Some(Cow::Borrowed(item.as_bytes()))
|
||||
}
|
||||
}
|
@ -17,7 +17,7 @@ bincode = "1.3.3"
|
||||
bstr = "1.4.0"
|
||||
bytemuck = { version = "1.13.1", features = ["extern_crate_alloc"] }
|
||||
byteorder = "1.4.3"
|
||||
charabia = { version = "0.8.3", default-features = false }
|
||||
charabia = { version = "0.8.5", default-features = false }
|
||||
concat-arrays = "0.1.2"
|
||||
crossbeam-channel = "0.5.8"
|
||||
deserr = { version = "0.6.0", features = ["actix-web"]}
|
||||
@ -83,7 +83,7 @@ meili-snap = { path = "../meili-snap" }
|
||||
rand = { version = "0.8.5", features = ["small_rng"] }
|
||||
|
||||
[features]
|
||||
all-tokenizations = ["charabia/chinese", "charabia/hebrew", "charabia/japanese", "charabia/thai", "charabia/korean", "charabia/greek"]
|
||||
all-tokenizations = ["charabia/chinese", "charabia/hebrew", "charabia/japanese", "charabia/thai", "charabia/korean", "charabia/greek", "charabia/khmer"]
|
||||
|
||||
# Use POSIX semaphores instead of SysV semaphores in LMDB
|
||||
# For more information on this feature, see heed's Cargo.toml
|
||||
@ -107,3 +107,6 @@ thai = ["charabia/thai"]
|
||||
|
||||
# allow greek specialized tokenization
|
||||
greek = ["charabia/greek"]
|
||||
|
||||
# allow khmer specialized tokenization
|
||||
khmer = ["charabia/khmer"]
|
||||
|
@ -3,7 +3,7 @@ use std::fmt::{Debug, Display};
|
||||
use std::ops::Bound::{self, Excluded, Included};
|
||||
|
||||
use either::Either;
|
||||
pub use filter_parser::{Condition, Error as FPError, FilterCondition, Span, Token};
|
||||
pub use filter_parser::{Condition, Error as FPError, FilterCondition, Token};
|
||||
use roaring::RoaringBitmap;
|
||||
use serde_json::Value;
|
||||
|
||||
|
@ -11,7 +11,7 @@ use once_cell::sync::Lazy;
|
||||
use roaring::bitmap::RoaringBitmap;
|
||||
|
||||
pub use self::facet::{FacetDistribution, Filter, OrderBy, DEFAULT_VALUES_PER_FACET};
|
||||
pub use self::new::matches::{FormatOptions, MatchBounds, Matcher, MatcherBuilder, MatchingWords};
|
||||
pub use self::new::matches::{FormatOptions, MatchBounds, MatcherBuilder, MatchingWords};
|
||||
use self::new::PartialSearchResult;
|
||||
use crate::error::UserError;
|
||||
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupValue};
|
||||
|
@ -46,9 +46,8 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
|
||||
if let Some(distinct_fid) = distinct_fid {
|
||||
let mut excluded = RoaringBitmap::new();
|
||||
let mut results = vec![];
|
||||
let mut skip = 0;
|
||||
for docid in universe.iter() {
|
||||
if results.len() >= length {
|
||||
if results.len() >= from + length {
|
||||
break;
|
||||
}
|
||||
if excluded.contains(docid) {
|
||||
@ -56,16 +55,19 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
|
||||
}
|
||||
|
||||
distinct_single_docid(ctx.index, ctx.txn, distinct_fid, docid, &mut excluded)?;
|
||||
skip += 1;
|
||||
if skip <= from {
|
||||
continue;
|
||||
}
|
||||
|
||||
results.push(docid);
|
||||
}
|
||||
|
||||
let mut all_candidates = universe - excluded;
|
||||
all_candidates.extend(results.iter().copied());
|
||||
// drain the results of the skipped elements
|
||||
// this **must** be done **after** writing the entire results in `all_candidates` to ensure
|
||||
// e.g. estimatedTotalHits is correct.
|
||||
if results.len() >= from {
|
||||
results.drain(..from);
|
||||
} else {
|
||||
results.clear();
|
||||
}
|
||||
|
||||
return Ok(BucketSortOutput {
|
||||
scores: vec![Default::default(); results.len()],
|
||||
|
@ -434,7 +434,18 @@ pub fn execute_search(
|
||||
let mut search = Search::default();
|
||||
let docids = match ctx.index.vector_hnsw(ctx.txn)? {
|
||||
Some(hnsw) => {
|
||||
if let Some(expected_size) = hnsw.iter().map(|(_, point)| point.len()).next() {
|
||||
if vector.len() != expected_size {
|
||||
return Err(UserError::InvalidVectorDimensions {
|
||||
expected: expected_size,
|
||||
found: vector.len(),
|
||||
}
|
||||
.into());
|
||||
}
|
||||
}
|
||||
|
||||
let vector = NDotProductPoint::new(vector.clone());
|
||||
|
||||
let neighbors = hnsw.search(&vector, &mut search);
|
||||
|
||||
let mut docids = Vec::new();
|
||||
|
@ -29,7 +29,7 @@ use std::hash::Hash;
|
||||
pub use cheapest_paths::PathVisitor;
|
||||
pub use condition_docids_cache::ConditionDocIdsCache;
|
||||
pub use dead_ends_cache::DeadEndsCache;
|
||||
pub use exactness::{ExactnessCondition, ExactnessGraph};
|
||||
pub use exactness::ExactnessGraph;
|
||||
pub use fid::{FidCondition, FidGraph};
|
||||
pub use position::{PositionCondition, PositionGraph};
|
||||
pub use proximity::{ProximityCondition, ProximityGraph};
|
||||
|
@ -202,7 +202,7 @@ test_distinct!(
|
||||
EXTERNAL_DOCUMENTS_IDS.len(),
|
||||
1,
|
||||
vec![],
|
||||
2
|
||||
3
|
||||
);
|
||||
test_distinct!(
|
||||
// testing: https://github.com/meilisearch/meilisearch/issues/4078
|
||||
@ -212,7 +212,7 @@ test_distinct!(
|
||||
1,
|
||||
2,
|
||||
vec![],
|
||||
1
|
||||
3
|
||||
);
|
||||
test_distinct!(
|
||||
// testing: https://github.com/meilisearch/meilisearch/issues/4078
|
||||
@ -222,7 +222,7 @@ test_distinct!(
|
||||
EXTERNAL_DOCUMENTS_IDS.len(),
|
||||
2,
|
||||
vec![],
|
||||
5
|
||||
7
|
||||
);
|
||||
test_distinct!(
|
||||
// testing: https://github.com/meilisearch/meilisearch/issues/4078
|
||||
@ -232,5 +232,5 @@ test_distinct!(
|
||||
2,
|
||||
4,
|
||||
vec![],
|
||||
3
|
||||
7
|
||||
);
|
||||
|
Loading…
Reference in New Issue
Block a user