mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-22 19:13:10 +08:00
Merge pull request #5224 from meilisearch/fix-facet-distribution
Fix facet distribution
This commit is contained in:
commit
be2717edbd
@ -14,11 +14,11 @@ arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/ar
|
|||||||
clap = { version = "4.5.9", features = ["derive"] }
|
clap = { version = "4.5.9", features = ["derive"] }
|
||||||
dump = { path = "../dump" }
|
dump = { path = "../dump" }
|
||||||
file-store = { path = "../file-store" }
|
file-store = { path = "../file-store" }
|
||||||
indexmap = {version = "2.7.0", features = ["serde"]}
|
indexmap = { version = "2.7.0", features = ["serde"] }
|
||||||
meilisearch-auth = { path = "../meilisearch-auth" }
|
meilisearch-auth = { path = "../meilisearch-auth" }
|
||||||
meilisearch-types = { path = "../meilisearch-types" }
|
meilisearch-types = { path = "../meilisearch-types" }
|
||||||
serde = { version = "1.0.209", features = ["derive"] }
|
serde = { version = "1.0.209", features = ["derive"] }
|
||||||
serde_json = {version = "1.0.133", features = ["preserve_order"]}
|
serde_json = { version = "1.0.133", features = ["preserve_order"] }
|
||||||
tempfile = "3.14.0"
|
tempfile = "3.14.0"
|
||||||
time = { version = "0.3.36", features = ["formatting", "parsing", "alloc"] }
|
time = { version = "0.3.36", features = ["formatting", "parsing", "alloc"] }
|
||||||
uuid = { version = "1.10.0", features = ["v4"], default-features = false }
|
uuid = { version = "1.10.0", features = ["v4"], default-features = false }
|
||||||
|
@ -8,7 +8,7 @@ use std::path::{Path, PathBuf};
|
|||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use meilisearch_types::versioning::create_version_file;
|
use meilisearch_types::versioning::create_version_file;
|
||||||
use v1_10::v1_9_to_v1_10;
|
use v1_10::v1_9_to_v1_10;
|
||||||
use v1_12::v1_11_to_v1_12;
|
use v1_12::{v1_11_to_v1_12, v1_12_to_v1_12_3};
|
||||||
|
|
||||||
use crate::upgrade::v1_11::v1_10_to_v1_11;
|
use crate::upgrade::v1_11::v1_10_to_v1_11;
|
||||||
|
|
||||||
@ -21,9 +21,15 @@ pub struct OfflineUpgrade {
|
|||||||
impl OfflineUpgrade {
|
impl OfflineUpgrade {
|
||||||
pub fn upgrade(self) -> anyhow::Result<()> {
|
pub fn upgrade(self) -> anyhow::Result<()> {
|
||||||
let upgrade_list = [
|
let upgrade_list = [
|
||||||
(v1_9_to_v1_10 as fn(&Path) -> Result<(), anyhow::Error>, "1", "10", "0"),
|
(
|
||||||
|
v1_9_to_v1_10 as fn(&Path, &str, &str, &str) -> Result<(), anyhow::Error>,
|
||||||
|
"1",
|
||||||
|
"10",
|
||||||
|
"0",
|
||||||
|
),
|
||||||
(v1_10_to_v1_11, "1", "11", "0"),
|
(v1_10_to_v1_11, "1", "11", "0"),
|
||||||
(v1_11_to_v1_12, "1", "12", "0"),
|
(v1_11_to_v1_12, "1", "12", "0"),
|
||||||
|
(v1_12_to_v1_12_3, "1", "12", "3"),
|
||||||
];
|
];
|
||||||
|
|
||||||
let (current_major, current_minor, current_patch) = &self.current_version;
|
let (current_major, current_minor, current_patch) = &self.current_version;
|
||||||
@ -36,6 +42,7 @@ impl OfflineUpgrade {
|
|||||||
("1", "9", _) => 0,
|
("1", "9", _) => 0,
|
||||||
("1", "10", _) => 1,
|
("1", "10", _) => 1,
|
||||||
("1", "11", _) => 2,
|
("1", "11", _) => 2,
|
||||||
|
("1", "12", x) if x == "0" || x == "1" || x == "2" => 3,
|
||||||
_ => {
|
_ => {
|
||||||
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10")
|
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10")
|
||||||
}
|
}
|
||||||
@ -46,7 +53,8 @@ impl OfflineUpgrade {
|
|||||||
let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
|
let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
|
||||||
("1", "10", _) => 0,
|
("1", "10", _) => 0,
|
||||||
("1", "11", _) => 1,
|
("1", "11", _) => 1,
|
||||||
("1", "12", _) => 2,
|
("1", "12", x) if x == "0" || x == "1" || x == "2" => 2,
|
||||||
|
("1", "12", "3") => 3,
|
||||||
(major, _, _) if major.starts_with('v') => {
|
(major, _, _) if major.starts_with('v') => {
|
||||||
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
|
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
|
||||||
}
|
}
|
||||||
@ -60,7 +68,7 @@ impl OfflineUpgrade {
|
|||||||
#[allow(clippy::needless_range_loop)]
|
#[allow(clippy::needless_range_loop)]
|
||||||
for index in start_at..=ends_at {
|
for index in start_at..=ends_at {
|
||||||
let (func, major, minor, patch) = upgrade_list[index];
|
let (func, major, minor, patch) = upgrade_list[index];
|
||||||
(func)(&self.db_path)?;
|
(func)(&self.db_path, current_major, current_minor, current_patch)?;
|
||||||
println!("Done");
|
println!("Done");
|
||||||
// We're writing the version file just in case an issue arise _while_ upgrading.
|
// We're writing the version file just in case an issue arise _while_ upgrading.
|
||||||
// We don't want the DB to fail in an unknown state.
|
// We don't want the DB to fail in an unknown state.
|
||||||
|
@ -151,7 +151,12 @@ fn date_round_trip(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn v1_9_to_v1_10(db_path: &Path) -> anyhow::Result<()> {
|
pub fn v1_9_to_v1_10(
|
||||||
|
db_path: &Path,
|
||||||
|
_origin_major: &str,
|
||||||
|
_origin_minor: &str,
|
||||||
|
_origin_patch: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
println!("Upgrading from v1.9.0 to v1.10.0");
|
println!("Upgrading from v1.9.0 to v1.10.0");
|
||||||
// 2 changes here
|
// 2 changes here
|
||||||
|
|
||||||
|
@ -14,7 +14,12 @@ use meilisearch_types::milli::index::db_name;
|
|||||||
use crate::uuid_codec::UuidCodec;
|
use crate::uuid_codec::UuidCodec;
|
||||||
use crate::{try_opening_database, try_opening_poly_database};
|
use crate::{try_opening_database, try_opening_poly_database};
|
||||||
|
|
||||||
pub fn v1_10_to_v1_11(db_path: &Path) -> anyhow::Result<()> {
|
pub fn v1_10_to_v1_11(
|
||||||
|
db_path: &Path,
|
||||||
|
_origin_major: &str,
|
||||||
|
_origin_minor: &str,
|
||||||
|
_origin_patch: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
println!("Upgrading from v1.10.0 to v1.11.0");
|
println!("Upgrading from v1.10.0 to v1.11.0");
|
||||||
|
|
||||||
let index_scheduler_path = db_path.join("tasks");
|
let index_scheduler_path = db_path.join("tasks");
|
||||||
|
@ -1,17 +1,34 @@
|
|||||||
//! The breaking changes that happened between the v1.11 and the v1.12 are:
|
//! The breaking changes that happened between the v1.11 and the v1.12 are:
|
||||||
//! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900
|
//! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900
|
||||||
|
|
||||||
|
use std::borrow::Cow;
|
||||||
use std::io::BufWriter;
|
use std::io::BufWriter;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use file_store::FileStore;
|
use file_store::FileStore;
|
||||||
use indexmap::IndexMap;
|
use indexmap::IndexMap;
|
||||||
use meilisearch_types::milli::documents::DocumentsBatchReader;
|
use meilisearch_types::milli::documents::DocumentsBatchReader;
|
||||||
|
use meilisearch_types::milli::heed::types::{SerdeJson, Str};
|
||||||
|
use meilisearch_types::milli::heed::{Database, EnvOpenOptions, RoTxn, RwTxn};
|
||||||
|
use meilisearch_types::milli::progress::Step;
|
||||||
|
use meilisearch_types::milli::{FieldDistribution, Index};
|
||||||
|
use serde::Serialize;
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::NamedTempFile;
|
||||||
|
use time::OffsetDateTime;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> {
|
use crate::try_opening_database;
|
||||||
|
use crate::uuid_codec::UuidCodec;
|
||||||
|
|
||||||
|
pub fn v1_11_to_v1_12(
|
||||||
|
db_path: &Path,
|
||||||
|
_origin_major: &str,
|
||||||
|
_origin_minor: &str,
|
||||||
|
_origin_patch: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
println!("Upgrading from v1.11.0 to v1.12.0");
|
println!("Upgrading from v1.11.0 to v1.12.0");
|
||||||
|
|
||||||
convert_update_files(db_path)?;
|
convert_update_files(db_path)?;
|
||||||
@ -19,6 +36,23 @@ pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn v1_12_to_v1_12_3(
|
||||||
|
db_path: &Path,
|
||||||
|
origin_major: &str,
|
||||||
|
origin_minor: &str,
|
||||||
|
origin_patch: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
println!("Upgrading from v1.12.{{0, 1, 2}} to v1.12.3");
|
||||||
|
|
||||||
|
if origin_minor == "12" {
|
||||||
|
rebuild_field_distribution(db_path)?;
|
||||||
|
} else {
|
||||||
|
println!("Not rebuilding field distribution as it wasn't corrupted coming from v{origin_major}.{origin_minor}.{origin_patch}");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Convert the update files from OBKV to ndjson format.
|
/// Convert the update files from OBKV to ndjson format.
|
||||||
///
|
///
|
||||||
/// 1) List all the update files using the file store.
|
/// 1) List all the update files using the file store.
|
||||||
@ -77,3 +111,188 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Rebuild field distribution as it was wrongly computed in v1.12.x if x < 3
|
||||||
|
fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
|
||||||
|
let index_scheduler_path = db_path.join("tasks");
|
||||||
|
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
|
||||||
|
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
|
||||||
|
|
||||||
|
let mut sched_wtxn = env.write_txn()?;
|
||||||
|
|
||||||
|
let index_mapping: Database<Str, UuidCodec> =
|
||||||
|
try_opening_database(&env, &sched_wtxn, "index-mapping")?;
|
||||||
|
let stats_db: Database<UuidCodec, SerdeJson<IndexStats>> =
|
||||||
|
try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| {
|
||||||
|
format!("While trying to open {:?}", index_scheduler_path.display())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let index_count =
|
||||||
|
index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?;
|
||||||
|
|
||||||
|
// FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn
|
||||||
|
// 1. immutably for the iteration
|
||||||
|
// 2. mutably for updating index stats
|
||||||
|
let indexes: Vec<_> = index_mapping
|
||||||
|
.iter(&sched_wtxn)?
|
||||||
|
.map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let progress = meilisearch_types::milli::progress::Progress::default();
|
||||||
|
let finished = AtomicBool::new(false);
|
||||||
|
|
||||||
|
std::thread::scope(|scope| {
|
||||||
|
let display_progress = std::thread::Builder::new()
|
||||||
|
.name("display_progress".into())
|
||||||
|
.spawn_scoped(scope, || {
|
||||||
|
while !finished.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||||
|
let view = progress.as_progress_view();
|
||||||
|
let Ok(view) = serde_json::to_string(&view) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
println!("{view}");
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
for (index_index, result) in indexes.into_iter().enumerate() {
|
||||||
|
let (uid, uuid) = result?;
|
||||||
|
progress.update_progress(VariableNameStep::new(
|
||||||
|
&uid,
|
||||||
|
index_index as u32,
|
||||||
|
index_count as u32,
|
||||||
|
));
|
||||||
|
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"[{}/{index_count}]Updating index `{uid}` at `{}`",
|
||||||
|
index_index + 1,
|
||||||
|
index_path.display()
|
||||||
|
);
|
||||||
|
|
||||||
|
println!("\t- Rebuilding field distribution");
|
||||||
|
|
||||||
|
let index = meilisearch_types::milli::Index::new(EnvOpenOptions::new(), &index_path)
|
||||||
|
.with_context(|| {
|
||||||
|
format!("while opening index {uid} at '{}'", index_path.display())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let mut index_txn = index.write_txn()?;
|
||||||
|
|
||||||
|
meilisearch_types::milli::update::new::reindex::field_distribution(
|
||||||
|
&index,
|
||||||
|
&mut index_txn,
|
||||||
|
&progress,
|
||||||
|
)
|
||||||
|
.context("while rebuilding field distribution")?;
|
||||||
|
|
||||||
|
let stats = IndexStats::new(&index, &index_txn)
|
||||||
|
.with_context(|| format!("computing stats for index `{uid}`"))?;
|
||||||
|
store_stats_of(stats_db, uuid, &mut sched_wtxn, &uid, &stats)?;
|
||||||
|
|
||||||
|
index_txn.commit().context("while committing the write txn for the updated index")?;
|
||||||
|
}
|
||||||
|
|
||||||
|
sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?;
|
||||||
|
|
||||||
|
finished.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
|
if let Err(panic) = display_progress.join() {
|
||||||
|
let msg = match panic.downcast_ref::<&'static str>() {
|
||||||
|
Some(s) => *s,
|
||||||
|
None => match panic.downcast_ref::<String>() {
|
||||||
|
Some(s) => &s[..],
|
||||||
|
None => "Box<dyn Any>",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
eprintln!("WARN: the display thread panicked with {msg}");
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Upgrading database succeeded");
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct VariableNameStep {
|
||||||
|
name: String,
|
||||||
|
current: u32,
|
||||||
|
total: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl VariableNameStep {
|
||||||
|
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
|
||||||
|
Self { name: name.into(), current, total }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Step for VariableNameStep {
|
||||||
|
fn name(&self) -> Cow<'static, str> {
|
||||||
|
self.name.clone().into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn current(&self) -> u32 {
|
||||||
|
self.current
|
||||||
|
}
|
||||||
|
|
||||||
|
fn total(&self) -> u32 {
|
||||||
|
self.total
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn store_stats_of(
|
||||||
|
stats_db: Database<UuidCodec, SerdeJson<IndexStats>>,
|
||||||
|
index_uuid: Uuid,
|
||||||
|
sched_wtxn: &mut RwTxn,
|
||||||
|
index_uid: &str,
|
||||||
|
stats: &IndexStats,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
stats_db
|
||||||
|
.put(sched_wtxn, &index_uuid, stats)
|
||||||
|
.with_context(|| format!("storing stats for index `{index_uid}`"))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The statistics that can be computed from an `Index` object.
|
||||||
|
#[derive(Serialize, Debug)]
|
||||||
|
pub struct IndexStats {
|
||||||
|
/// Number of documents in the index.
|
||||||
|
pub number_of_documents: u64,
|
||||||
|
/// Size taken up by the index' DB, in bytes.
|
||||||
|
///
|
||||||
|
/// This includes the size taken by both the used and free pages of the DB, and as the free pages
|
||||||
|
/// are not returned to the disk after a deletion, this number is typically larger than
|
||||||
|
/// `used_database_size` that only includes the size of the used pages.
|
||||||
|
pub database_size: u64,
|
||||||
|
/// Size taken by the used pages of the index' DB, in bytes.
|
||||||
|
///
|
||||||
|
/// As the DB backend does not return to the disk the pages that are not currently used by the DB,
|
||||||
|
/// this value is typically smaller than `database_size`.
|
||||||
|
pub used_database_size: u64,
|
||||||
|
/// Association of every field name with the number of times it occurs in the documents.
|
||||||
|
pub field_distribution: FieldDistribution,
|
||||||
|
/// Creation date of the index.
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub created_at: OffsetDateTime,
|
||||||
|
/// Date of the last update of the index.
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub updated_at: OffsetDateTime,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IndexStats {
|
||||||
|
/// Compute the stats of an index
|
||||||
|
///
|
||||||
|
/// # Parameters
|
||||||
|
///
|
||||||
|
/// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
|
||||||
|
pub fn new(index: &Index, rtxn: &RoTxn) -> meilisearch_types::milli::Result<Self> {
|
||||||
|
Ok(IndexStats {
|
||||||
|
number_of_documents: index.number_of_documents(rtxn)?,
|
||||||
|
database_size: index.on_disk_size()?,
|
||||||
|
used_database_size: index.used_size()?,
|
||||||
|
field_distribution: index.field_distribution(rtxn)?,
|
||||||
|
created_at: index.created_at(rtxn)?,
|
||||||
|
updated_at: index.updated_at(rtxn)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -3334,6 +3334,44 @@ mod tests {
|
|||||||
rtxn.commit().unwrap();
|
rtxn.commit().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn incremental_update_without_changing_facet_distribution() {
|
||||||
|
let index = TempIndex::new();
|
||||||
|
index
|
||||||
|
.add_documents(documents!([
|
||||||
|
{"id": 0, "some_field": "aaa", "other_field": "aaa" },
|
||||||
|
{"id": 1, "some_field": "bbb", "other_field": "bbb" },
|
||||||
|
]))
|
||||||
|
.unwrap();
|
||||||
|
{
|
||||||
|
let rtxn = index.read_txn().unwrap();
|
||||||
|
// count field distribution
|
||||||
|
let results = index.field_distribution(&rtxn).unwrap();
|
||||||
|
assert_eq!(Some(&2), results.get("id"));
|
||||||
|
assert_eq!(Some(&2), results.get("some_field"));
|
||||||
|
assert_eq!(Some(&2), results.get("other_field"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut index = index;
|
||||||
|
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
|
||||||
|
|
||||||
|
index
|
||||||
|
.add_documents(documents!([
|
||||||
|
{"id": 0, "other_field": "bbb" },
|
||||||
|
{"id": 1, "some_field": "ccc" },
|
||||||
|
]))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
{
|
||||||
|
let rtxn = index.read_txn().unwrap();
|
||||||
|
// count field distribution
|
||||||
|
let results = index.field_distribution(&rtxn).unwrap();
|
||||||
|
assert_eq!(Some(&2), results.get("id"));
|
||||||
|
assert_eq!(Some(&2), results.get("some_field"));
|
||||||
|
assert_eq!(Some(&2), results.get("other_field"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn delete_words_exact_attributes() {
|
fn delete_words_exact_attributes() {
|
||||||
let index = TempIndex::new();
|
let index = TempIndex::new();
|
||||||
|
@ -89,7 +89,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
|||||||
.or_default();
|
.or_default();
|
||||||
*entry -= 1;
|
*entry -= 1;
|
||||||
}
|
}
|
||||||
let content = update.updated();
|
let content =
|
||||||
|
update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
||||||
let geo_iter =
|
let geo_iter =
|
||||||
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
||||||
for res in content.iter_top_level_fields().chain(geo_iter) {
|
for res in content.iter_top_level_fields().chain(geo_iter) {
|
||||||
|
@ -16,6 +16,7 @@ pub mod indexer;
|
|||||||
mod merger;
|
mod merger;
|
||||||
mod parallel_iterator_ext;
|
mod parallel_iterator_ext;
|
||||||
mod ref_cell_ext;
|
mod ref_cell_ext;
|
||||||
|
pub mod reindex;
|
||||||
pub(crate) mod steps;
|
pub(crate) mod steps;
|
||||||
pub(crate) mod thread_local;
|
pub(crate) mod thread_local;
|
||||||
pub mod vector_document;
|
pub mod vector_document;
|
||||||
|
38
crates/milli/src/update/new/reindex.rs
Normal file
38
crates/milli/src/update/new/reindex.rs
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
use heed::RwTxn;
|
||||||
|
|
||||||
|
use super::document::{Document, DocumentFromDb};
|
||||||
|
use crate::progress::{self, AtomicSubStep, Progress};
|
||||||
|
use crate::{FieldDistribution, Index, Result};
|
||||||
|
|
||||||
|
pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progress) -> Result<()> {
|
||||||
|
let mut distribution = FieldDistribution::new();
|
||||||
|
|
||||||
|
let document_count = index.number_of_documents(wtxn)?;
|
||||||
|
let field_id_map = index.fields_ids_map(wtxn)?;
|
||||||
|
|
||||||
|
let (update_document_count, sub_step) =
|
||||||
|
AtomicSubStep::<progress::Document>::new(document_count as u32);
|
||||||
|
progress.update_progress(sub_step);
|
||||||
|
|
||||||
|
let docids = index.documents_ids(wtxn)?;
|
||||||
|
|
||||||
|
for docid in docids {
|
||||||
|
update_document_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
|
let Some(document) = DocumentFromDb::new(docid, wtxn, index, &field_id_map)? else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let geo_iter = document.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
||||||
|
for res in document.iter_top_level_fields().chain(geo_iter) {
|
||||||
|
let (field_name, _) = res?;
|
||||||
|
if let Some(count) = distribution.get_mut(field_name) {
|
||||||
|
*count += 1;
|
||||||
|
} else {
|
||||||
|
distribution.insert(field_name.to_owned(), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
index.put_field_distribution(wtxn, &distribution)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user