5094: Implement a bbqueue channel between the extractors and the writer r=dureuill a=Kerollmops

This PR switches from a bounded crossbeam channel only with allocated entries for the communication between the extractors and the writer to a [BBQueue](https://github.com/jamesmunns/bbqueue)-based system with a Single Producer Single Consumer kind of Circular/Ring Buffers channel.

 - [x] Implement the BBQueue channel system...
 - [x] with a crossbeam channel to wake up the receiver.
 - [x] Manage the BBQueue allocated memory dynamically.
 - [x] Support content that doesn't fit in the bbqueues.

Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2024-12-03 08:00:55 +00:00 committed by GitHub
commit 054622bd16
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 1286 additions and 758 deletions

39
Cargo.lock generated
View File

@ -489,6 +489,11 @@ version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bbqueue"
version = "0.5.1"
source = "git+https://github.com/kerollmops/bbqueue#cbb87cc707b5af415ef203bdaf2443e06ba0d6d4"
[[package]] [[package]]
name = "benchmarks" name = "benchmarks"
version = "1.12.0" version = "1.12.0"
@ -1246,19 +1251,6 @@ dependencies = [
"itertools 0.10.5", "itertools 0.10.5",
] ]
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.13" version = "0.5.13"
@ -1918,6 +1910,15 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "flume"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
dependencies = [
"spin",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -2616,7 +2617,7 @@ dependencies = [
"big_s", "big_s",
"bincode", "bincode",
"bumpalo", "bumpalo",
"crossbeam", "crossbeam-channel",
"csv", "csv",
"derive_builder 0.20.0", "derive_builder 0.20.0",
"dump", "dump",
@ -3611,6 +3612,7 @@ version = "1.12.0"
dependencies = [ dependencies = [
"allocator-api2", "allocator-api2",
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bbqueue",
"big_s", "big_s",
"bimap", "bimap",
"bincode", "bincode",
@ -3630,6 +3632,7 @@ dependencies = [
"enum-iterator", "enum-iterator",
"filter-parser", "filter-parser",
"flatten-serde-json", "flatten-serde-json",
"flume",
"fst", "fst",
"fxhash", "fxhash",
"geoutils", "geoutils",
@ -4743,8 +4746,9 @@ dependencies = [
[[package]] [[package]]
name = "roaring" name = "roaring"
version = "0.10.6" version = "0.10.7"
source = "git+https://github.com/RoaringBitmap/roaring-rs?branch=clone-iter-slice#8ff028e484fb6192a0acf5a669eaf18c30cada6e" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f81dc953b2244ddd5e7860cb0bb2a790494b898ef321d4aff8e260efab60cc88"
dependencies = [ dependencies = [
"bytemuck", "bytemuck",
"byteorder", "byteorder",
@ -5186,6 +5190,9 @@ name = "spin"
version = "0.9.8" version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]] [[package]]
name = "spm_precompiled" name = "spm_precompiled"

View File

@ -43,6 +43,3 @@ opt-level = 3
opt-level = 3 opt-level = 3
[profile.dev.package.roaring] [profile.dev.package.roaring]
opt-level = 3 opt-level = 3
[patch.crates-io]
roaring = { git = "https://github.com/RoaringBitmap/roaring-rs", branch = "clone-iter-slice" }

View File

@ -24,7 +24,7 @@ tempfile = "3.14.0"
criterion = { version = "0.5.1", features = ["html_reports"] } criterion = { version = "0.5.1", features = ["html_reports"] }
rand = "0.8.5" rand = "0.8.5"
rand_chacha = "0.3.1" rand_chacha = "0.3.1"
roaring = "0.10.6" roaring = "0.10.7"
[build-dependencies] [build-dependencies]
anyhow = "1.0.86" anyhow = "1.0.86"

View File

@ -16,6 +16,7 @@ use rand::seq::SliceRandom;
use rand_chacha::rand_core::SeedableRng; use rand_chacha::rand_core::SeedableRng;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
#[cfg(not(windows))]
#[global_allocator] #[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
@ -157,6 +158,7 @@ fn indexing_songs_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -223,6 +225,7 @@ fn reindexing_songs_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -267,6 +270,7 @@ fn reindexing_songs_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -335,6 +339,7 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -411,6 +416,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -455,6 +461,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -495,6 +502,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -562,6 +570,7 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -628,6 +637,7 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -694,6 +704,7 @@ fn indexing_wiki(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -759,6 +770,7 @@ fn reindexing_wiki(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -803,6 +815,7 @@ fn reindexing_wiki(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -870,6 +883,7 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -946,6 +960,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -991,6 +1006,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1032,6 +1048,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1098,6 +1115,7 @@ fn indexing_movies_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1163,6 +1181,7 @@ fn reindexing_movies_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1207,6 +1226,7 @@ fn reindexing_movies_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1274,6 +1294,7 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1321,6 +1342,7 @@ fn delete_documents_from_ids(index: Index, document_ids_to_delete: Vec<RoaringBi
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1385,6 +1407,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1429,6 +1452,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1469,6 +1493,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1558,6 +1583,7 @@ fn indexing_nested_movies_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1648,6 +1674,7 @@ fn deleting_nested_movies_in_batches_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1730,6 +1757,7 @@ fn indexing_nested_movies_without_faceted_fields(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1796,6 +1824,7 @@ fn indexing_geo(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1861,6 +1890,7 @@ fn reindexing_geo(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1905,6 +1935,7 @@ fn reindexing_geo(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1972,6 +2003,7 @@ fn deleting_geo_in_batches_default(c: &mut Criterion) {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,

View File

@ -5,6 +5,7 @@ use criterion::{criterion_group, criterion_main};
use milli::update::Settings; use milli::update::Settings;
use utils::Conf; use utils::Conf;
#[cfg(not(windows))]
#[global_allocator] #[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

View File

@ -5,6 +5,7 @@ use criterion::{criterion_group, criterion_main};
use milli::update::Settings; use milli::update::Settings;
use utils::Conf; use utils::Conf;
#[cfg(not(windows))]
#[global_allocator] #[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

View File

@ -5,6 +5,7 @@ use criterion::{criterion_group, criterion_main};
use milli::update::Settings; use milli::update::Settings;
use utils::Conf; use utils::Conf;
#[cfg(not(windows))]
#[global_allocator] #[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

View File

@ -117,6 +117,7 @@ pub fn base_setup(conf: &Conf) -> Index {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,

View File

@ -17,7 +17,7 @@ http = "1.1.0"
meilisearch-types = { path = "../meilisearch-types" } meilisearch-types = { path = "../meilisearch-types" }
once_cell = "1.19.0" once_cell = "1.19.0"
regex = "1.10.5" regex = "1.10.5"
roaring = { version = "0.10.6", features = ["serde"] } roaring = { version = "0.10.7", features = ["serde"] }
serde = { version = "1.0.204", features = ["derive"] } serde = { version = "1.0.204", features = ["derive"] }
serde_json = { version = "1.0.120", features = ["preserve_order"] } serde_json = { version = "1.0.120", features = ["preserve_order"] }
tar = "0.4.41" tar = "0.4.41"

View File

@ -135,6 +135,7 @@ fn main() {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,

View File

@ -24,7 +24,7 @@ meilisearch-types = { path = "../meilisearch-types" }
page_size = "0.6.0" page_size = "0.6.0"
raw-collections = { git = "https://github.com/meilisearch/raw-collections.git", version = "0.1.0" } raw-collections = { git = "https://github.com/meilisearch/raw-collections.git", version = "0.1.0" }
rayon = "1.10.0" rayon = "1.10.0"
roaring = { version = "0.10.6", features = ["serde"] } roaring = { version = "0.10.7", features = ["serde"] }
serde = { version = "1.0.204", features = ["derive"] } serde = { version = "1.0.204", features = ["derive"] }
serde_json = { version = "1.0.120", features = ["preserve_order"] } serde_json = { version = "1.0.120", features = ["preserve_order"] }
synchronoise = "1.0.1" synchronoise = "1.0.1"
@ -45,7 +45,7 @@ bumpalo = "3.16.0"
[dev-dependencies] [dev-dependencies]
arroy = "0.5.0" arroy = "0.5.0"
big_s = "1.0.2" big_s = "1.0.2"
crossbeam = "0.8.4" crossbeam-channel = "0.5.13"
insta = { version = "1.39.0", features = ["json", "redactions"] } insta = { version = "1.39.0", features = ["json", "redactions"] }
maplit = "1.0.2" maplit = "1.0.2"
meili-snap = { path = "../meili-snap" } meili-snap = { path = "../meili-snap" }

View File

@ -1258,7 +1258,10 @@ impl IndexScheduler {
let pool = match &indexer_config.thread_pool { let pool = match &indexer_config.thread_pool {
Some(pool) => pool, Some(pool) => pool,
None => { None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); local_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|i| format!("indexing-thread-{i}"))
.build()
.unwrap();
&local_pool &local_pool
} }
}; };
@ -1306,21 +1309,19 @@ impl IndexScheduler {
} }
if tasks.iter().any(|res| res.error.is_none()) { if tasks.iter().any(|res| res.error.is_none()) {
pool.install(|| { indexer::index(
indexer::index( index_wtxn,
index_wtxn, index,
index, pool,
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
primary_key, primary_key,
&document_changes, &document_changes,
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
&send_progress, &send_progress,
) )?;
})
.unwrap()?;
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} }
@ -1396,34 +1397,34 @@ impl IndexScheduler {
let pool = match &indexer_config.thread_pool { let pool = match &indexer_config.thread_pool {
Some(pool) => pool, Some(pool) => pool,
None => { None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); local_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|i| format!("indexing-thread-{i}"))
.build()
.unwrap();
&local_pool &local_pool
} }
}; };
pool.install(|| { let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone());
let indexer = let document_changes =
UpdateByFunction::new(candidates, context.clone(), code.clone()); pool.install(|| indexer.into_changes(&primary_key)).unwrap()?;
let document_changes = indexer.into_changes(&primary_key)?;
let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?;
indexer::index( let embedders = index.embedding_configs(index_wtxn)?;
index_wtxn, let embedders = self.embedders(embedders)?;
index,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
None, // cannot change primary key in DocumentEdition
&document_changes,
embedders,
&|| must_stop_processing.get(),
&send_progress,
)?;
Result::Ok(()) indexer::index(
}) index_wtxn,
.unwrap()?; index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
None, // cannot change primary key in DocumentEdition
&document_changes,
embedders,
&|| must_stop_processing.get(),
&send_progress,
)?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} }
@ -1548,7 +1549,10 @@ impl IndexScheduler {
let pool = match &indexer_config.thread_pool { let pool = match &indexer_config.thread_pool {
Some(pool) => pool, Some(pool) => pool,
None => { None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); local_pool = ThreadPoolNoAbortBuilder::new()
.thread_name(|i| format!("indexing-thread-{i}"))
.build()
.unwrap();
&local_pool &local_pool
} }
}; };
@ -1559,21 +1563,19 @@ impl IndexScheduler {
let embedders = index.embedding_configs(index_wtxn)?; let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?; let embedders = self.embedders(embedders)?;
pool.install(|| { indexer::index(
indexer::index( index_wtxn,
index_wtxn, index,
index, pool,
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
None, // document deletion never changes primary key None, // document deletion never changes primary key
&document_changes, &document_changes,
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
&send_progress, &send_progress,
) )?;
})
.unwrap()?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} }

View File

@ -407,7 +407,7 @@ pub struct IndexScheduler {
/// ///
/// See [self.breakpoint()](`IndexScheduler::breakpoint`) for an explanation. /// See [self.breakpoint()](`IndexScheduler::breakpoint`) for an explanation.
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, test_breakpoint_sdr: crossbeam_channel::Sender<(Breakpoint, bool)>,
/// A list of planned failures within the [`tick`](IndexScheduler::tick) method of the index scheduler. /// A list of planned failures within the [`tick`](IndexScheduler::tick) method of the index scheduler.
/// ///
@ -476,7 +476,7 @@ impl IndexScheduler {
/// Create an index scheduler and start its run loop. /// Create an index scheduler and start its run loop.
pub fn new( pub fn new(
options: IndexSchedulerOptions, options: IndexSchedulerOptions,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] test_breakpoint_sdr: crossbeam_channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> { ) -> Result<Self> {
std::fs::create_dir_all(&options.tasks_path)?; std::fs::create_dir_all(&options.tasks_path)?;
@ -2238,7 +2238,7 @@ mod tests {
use std::time::Instant; use std::time::Instant;
use big_s::S; use big_s::S;
use crossbeam::channel::RecvTimeoutError; use crossbeam_channel::RecvTimeoutError;
use file_store::File; use file_store::File;
use insta::assert_json_snapshot; use insta::assert_json_snapshot;
use maplit::btreeset; use maplit::btreeset;
@ -2290,7 +2290,7 @@ mod tests {
configuration: impl Fn(&mut IndexSchedulerOptions), configuration: impl Fn(&mut IndexSchedulerOptions),
) -> (Self, IndexSchedulerHandle) { ) -> (Self, IndexSchedulerHandle) {
let tempdir = TempDir::new().unwrap(); let tempdir = TempDir::new().unwrap();
let (sender, receiver) = crossbeam::channel::bounded(0); let (sender, receiver) = crossbeam_channel::bounded(0);
let indexer_config = IndexerConfig { skip_index_budget: true, ..Default::default() }; let indexer_config = IndexerConfig { skip_index_budget: true, ..Default::default() };
@ -2422,7 +2422,7 @@ mod tests {
pub struct IndexSchedulerHandle { pub struct IndexSchedulerHandle {
_tempdir: TempDir, _tempdir: TempDir,
index_scheduler: IndexScheduler, index_scheduler: IndexScheduler,
test_breakpoint_rcv: crossbeam::channel::Receiver<(Breakpoint, bool)>, test_breakpoint_rcv: crossbeam_channel::Receiver<(Breakpoint, bool)>,
last_breakpoint: Breakpoint, last_breakpoint: Breakpoint,
} }

View File

@ -17,7 +17,7 @@ hmac = "0.12.1"
maplit = "1.0.2" maplit = "1.0.2"
meilisearch-types = { path = "../meilisearch-types" } meilisearch-types = { path = "../meilisearch-types" }
rand = "0.8.5" rand = "0.8.5"
roaring = { version = "0.10.6", features = ["serde"] } roaring = { version = "0.10.7", features = ["serde"] }
serde = { version = "1.0.204", features = ["derive"] } serde = { version = "1.0.204", features = ["derive"] }
serde_json = { version = "1.0.120", features = ["preserve_order"] } serde_json = { version = "1.0.120", features = ["preserve_order"] }
sha2 = "0.10.8" sha2 = "0.10.8"

View File

@ -25,7 +25,7 @@ fst = "0.4.7"
memmap2 = "0.9.4" memmap2 = "0.9.4"
milli = { path = "../milli" } milli = { path = "../milli" }
raw-collections = { git = "https://github.com/meilisearch/raw-collections.git", version = "0.1.0" } raw-collections = { git = "https://github.com/meilisearch/raw-collections.git", version = "0.1.0" }
roaring = { version = "0.10.6", features = ["serde"] } roaring = { version = "0.10.7", features = ["serde"] }
serde = { version = "1.0.204", features = ["derive"] } serde = { version = "1.0.204", features = ["derive"] }
serde-cs = "0.2.4" serde-cs = "0.2.4"
serde_json = "1.0.120" serde_json = "1.0.120"

View File

@ -214,7 +214,7 @@ pub fn read_json(input: &File, output: impl io::Write) -> Result<u64> {
// We memory map to be able to deserialize into a RawMap that // We memory map to be able to deserialize into a RawMap that
// does not allocate when possible and only materialize the first/top level. // does not allocate when possible and only materialize the first/top level.
let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? }; let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? };
let mut doc_alloc = Bump::with_capacity(1024 * 1024 * 1024); // 1MiB let mut doc_alloc = Bump::with_capacity(1024 * 1024); // 1MiB
let mut out = BufWriter::new(output); let mut out = BufWriter::new(output);
let mut deserializer = serde_json::Deserializer::from_slice(&input); let mut deserializer = serde_json::Deserializer::from_slice(&input);

View File

@ -103,7 +103,7 @@ tracing-subscriber = { version = "0.3.18", features = ["json"] }
tracing-trace = { version = "0.1.0", path = "../tracing-trace" } tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
tracing-actix-web = "0.7.11" tracing-actix-web = "0.7.11"
build-info = { version = "1.7.0", path = "../build-info" } build-info = { version = "1.7.0", path = "../build-info" }
roaring = "0.10.2" roaring = "0.10.7"
mopa-maintained = "0.2.3" mopa-maintained = "0.2.3"
[dev-dependencies] [dev-dependencies]

View File

@ -20,14 +20,14 @@ use meilisearch::{
LogStderrType, Opt, SubscriberForSecondLayer, LogStderrType, Opt, SubscriberForSecondLayer,
}; };
use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE}; use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE};
use mimalloc::MiMalloc;
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor}; use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
use tracing::level_filters::LevelFilter; use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt as _; use tracing_subscriber::layer::SubscriberExt as _;
use tracing_subscriber::Layer; use tracing_subscriber::Layer;
#[cfg(not(windows))]
#[global_allocator] #[global_allocator]
static ALLOC: MiMalloc = MiMalloc; static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
fn default_log_route_layer() -> LogRouteType { fn default_log_route_layer() -> LogRouteType {
None.with_filter(tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF)) None.with_filter(tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF))

View File

@ -42,7 +42,7 @@ obkv = "0.3.0"
once_cell = "1.19.0" once_cell = "1.19.0"
ordered-float = "4.2.1" ordered-float = "4.2.1"
rayon = "1.10.0" rayon = "1.10.0"
roaring = { version = "0.10.6", features = ["serde"] } roaring = { version = "0.10.7", features = ["serde"] }
rstar = { version = "0.12.0", features = ["serde"] } rstar = { version = "0.12.0", features = ["serde"] }
serde = { version = "1.0.204", features = ["derive"] } serde = { version = "1.0.204", features = ["derive"] }
serde_json = { version = "1.0.120", features = ["preserve_order", "raw_value"] } serde_json = { version = "1.0.120", features = ["preserve_order", "raw_value"] }
@ -98,6 +98,8 @@ allocator-api2 = "0.2.18"
rustc-hash = "2.0.0" rustc-hash = "2.0.0"
uell = "0.1.0" uell = "0.1.0"
enum-iterator = "2.1.0" enum-iterator = "2.1.0"
bbqueue = { git = "https://github.com/kerollmops/bbqueue" }
flume = { version = "0.11.1", default-features = false }
[dev-dependencies] [dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false } mimalloc = { version = "0.1.43", default-features = false }

View File

@ -3,6 +3,7 @@ use std::convert::Infallible;
use std::fmt::Write; use std::fmt::Write;
use std::{io, str}; use std::{io, str};
use bstr::BString;
use heed::{Error as HeedError, MdbError}; use heed::{Error as HeedError, MdbError};
use rayon::ThreadPoolBuildError; use rayon::ThreadPoolBuildError;
use rhai::EvalAltResult; use rhai::EvalAltResult;
@ -62,9 +63,9 @@ pub enum InternalError {
#[error(transparent)] #[error(transparent)]
Store(#[from] MdbError), Store(#[from] MdbError),
#[error("Cannot delete {key:?} from database {database_name}: {error}")] #[error("Cannot delete {key:?} from database {database_name}: {error}")]
StoreDeletion { database_name: &'static str, key: Vec<u8>, error: heed::Error }, StoreDeletion { database_name: &'static str, key: BString, error: heed::Error },
#[error("Cannot insert {key:?} and value with length {value_length} into database {database_name}: {error}")] #[error("Cannot insert {key:?} and value with length {value_length} into database {database_name}: {error}")]
StorePut { database_name: &'static str, key: Vec<u8>, value_length: usize, error: heed::Error }, StorePut { database_name: &'static str, key: BString, value_length: usize, error: heed::Error },
#[error(transparent)] #[error(transparent)]
Utf8(#[from] str::Utf8Error), Utf8(#[from] str::Utf8Error),
#[error("An indexation process was explicitly aborted")] #[error("An indexation process was explicitly aborted")]

View File

@ -97,7 +97,7 @@ impl<'a> heed::BytesEncode<'a> for FacetGroupValueCodec {
fn bytes_encode(value: &'a Self::EItem) -> Result<Cow<'a, [u8]>, BoxedError> { fn bytes_encode(value: &'a Self::EItem) -> Result<Cow<'a, [u8]>, BoxedError> {
let mut v = vec![value.size]; let mut v = vec![value.size];
CboRoaringBitmapCodec::serialize_into(&value.bitmap, &mut v); CboRoaringBitmapCodec::serialize_into_vec(&value.bitmap, &mut v);
Ok(Cow::Owned(v)) Ok(Cow::Owned(v))
} }
} }

View File

@ -27,18 +27,27 @@ impl CboRoaringBitmapCodec {
} }
} }
pub fn serialize_into(roaring: &RoaringBitmap, vec: &mut Vec<u8>) { pub fn serialize_into_vec(roaring: &RoaringBitmap, vec: &mut Vec<u8>) {
Self::serialize_into_writer(roaring, vec).unwrap()
}
pub fn serialize_into_writer<W: io::Write>(
roaring: &RoaringBitmap,
mut writer: W,
) -> io::Result<()> {
if roaring.len() <= THRESHOLD as u64 { if roaring.len() <= THRESHOLD as u64 {
// If the number of items (u32s) to encode is less than or equal to the threshold // If the number of items (u32s) to encode is less than or equal to the threshold
// it means that it would weigh the same or less than the RoaringBitmap // it means that it would weigh the same or less than the RoaringBitmap
// header, so we directly encode them using ByteOrder instead. // header, so we directly encode them using ByteOrder instead.
for integer in roaring { for integer in roaring {
vec.write_u32::<NativeEndian>(integer).unwrap(); writer.write_u32::<NativeEndian>(integer)?;
} }
} else { } else {
// Otherwise, we use the classic RoaringBitmapCodec that writes a header. // Otherwise, we use the classic RoaringBitmapCodec that writes a header.
roaring.serialize_into(vec).unwrap(); roaring.serialize_into(writer)?;
} }
Ok(())
} }
pub fn deserialize_from(mut bytes: &[u8]) -> io::Result<RoaringBitmap> { pub fn deserialize_from(mut bytes: &[u8]) -> io::Result<RoaringBitmap> {
@ -143,7 +152,7 @@ impl CboRoaringBitmapCodec {
return Ok(None); return Ok(None);
} }
Self::serialize_into(&previous, buffer); Self::serialize_into_vec(&previous, buffer);
Ok(Some(&buffer[..])) Ok(Some(&buffer[..]))
} }
} }
@ -169,7 +178,7 @@ impl heed::BytesEncode<'_> for CboRoaringBitmapCodec {
fn bytes_encode(item: &Self::EItem) -> Result<Cow<'_, [u8]>, BoxedError> { fn bytes_encode(item: &Self::EItem) -> Result<Cow<'_, [u8]>, BoxedError> {
let mut vec = Vec::with_capacity(Self::serialized_size(item)); let mut vec = Vec::with_capacity(Self::serialized_size(item));
Self::serialize_into(item, &mut vec); Self::serialize_into_vec(item, &mut vec);
Ok(Cow::Owned(vec)) Ok(Cow::Owned(vec))
} }
} }

View File

@ -1821,6 +1821,7 @@ pub(crate) mod tests {
indexer::index( indexer::index(
wtxn, wtxn,
&self.inner, &self.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1911,6 +1912,7 @@ pub(crate) mod tests {
indexer::index( indexer::index(
wtxn, wtxn,
&self.inner, &self.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -1991,6 +1993,7 @@ pub(crate) mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,

View File

@ -1,6 +1,7 @@
#![cfg_attr(all(test, fuzzing), feature(no_coverage))] #![cfg_attr(all(test, fuzzing), feature(no_coverage))]
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
#[cfg(not(windows))]
#[cfg(test)] #[cfg(test)]
#[global_allocator] #[global_allocator]
pub static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; pub static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

View File

@ -83,6 +83,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,

View File

@ -2155,6 +2155,7 @@ mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -2216,6 +2217,7 @@ mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -2268,6 +2270,7 @@ mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -2319,6 +2322,7 @@ mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -2372,6 +2376,7 @@ mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -2430,6 +2435,7 @@ mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -2481,6 +2487,7 @@ mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -2532,6 +2539,7 @@ mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -2725,6 +2733,7 @@ mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -2783,6 +2792,7 @@ mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
@ -2838,6 +2848,7 @@ mod tests {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index.inner, &index.inner,
&crate::ThreadPoolNoAbortBuilder::new().build().unwrap(),
indexer_config.grenad_parameters(), indexer_config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,

File diff suppressed because it is too large Load Diff

View File

@ -415,21 +415,21 @@ fn spill_entry_to_sorter(
match deladd { match deladd {
DelAddRoaringBitmap { del: Some(del), add: None } => { DelAddRoaringBitmap { del: Some(del), add: None } => {
cbo_buffer.clear(); cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer); CboRoaringBitmapCodec::serialize_into_vec(&del, cbo_buffer);
value_writer.insert(DelAdd::Deletion, &cbo_buffer)?; value_writer.insert(DelAdd::Deletion, &cbo_buffer)?;
} }
DelAddRoaringBitmap { del: None, add: Some(add) } => { DelAddRoaringBitmap { del: None, add: Some(add) } => {
cbo_buffer.clear(); cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer); CboRoaringBitmapCodec::serialize_into_vec(&add, cbo_buffer);
value_writer.insert(DelAdd::Addition, &cbo_buffer)?; value_writer.insert(DelAdd::Addition, &cbo_buffer)?;
} }
DelAddRoaringBitmap { del: Some(del), add: Some(add) } => { DelAddRoaringBitmap { del: Some(del), add: Some(add) } => {
cbo_buffer.clear(); cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer); CboRoaringBitmapCodec::serialize_into_vec(&del, cbo_buffer);
value_writer.insert(DelAdd::Deletion, &cbo_buffer)?; value_writer.insert(DelAdd::Deletion, &cbo_buffer)?;
cbo_buffer.clear(); cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer); CboRoaringBitmapCodec::serialize_into_vec(&add, cbo_buffer);
value_writer.insert(DelAdd::Addition, &cbo_buffer)?; value_writer.insert(DelAdd::Addition, &cbo_buffer)?;
} }
DelAddRoaringBitmap { del: None, add: None } => return Ok(()), DelAddRoaringBitmap { del: None, add: None } => return Ok(()),

View File

@ -12,13 +12,14 @@ use crate::update::new::thread_local::FullySend;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::Result; use crate::Result;
pub struct DocumentsExtractor<'a> {
document_sender: &'a DocumentsSender<'a>, pub struct DocumentsExtractor<'a, 'b> {
document_sender: DocumentsSender<'a, 'b>,
embedders: &'a EmbeddingConfigs, embedders: &'a EmbeddingConfigs,
} }
impl<'a> DocumentsExtractor<'a> { impl<'a, 'b> DocumentsExtractor<'a, 'b> {
pub fn new(document_sender: &'a DocumentsSender<'a>, embedders: &'a EmbeddingConfigs) -> Self { pub fn new(document_sender: DocumentsSender<'a, 'b>, embedders: &'a EmbeddingConfigs) -> Self {
Self { document_sender, embedders } Self { document_sender, embedders }
} }
} }
@ -29,7 +30,7 @@ pub struct DocumentExtractorData {
pub field_distribution_delta: HashMap<String, i64>, pub field_distribution_delta: HashMap<String, i64>,
} }
impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
type Data = FullySend<RefCell<DocumentExtractorData>>; type Data = FullySend<RefCell<DocumentExtractorData>>;
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> { fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {

View File

@ -25,14 +25,14 @@ use crate::update::new::DocumentChange;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH}; use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
pub struct FacetedExtractorData<'a> { pub struct FacetedExtractorData<'a, 'b> {
attributes_to_extract: &'a [&'a str], attributes_to_extract: &'a [&'a str],
sender: &'a FieldIdDocidFacetSender<'a>, sender: &'a FieldIdDocidFacetSender<'a, 'b>,
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
buckets: usize, buckets: usize,
} }
impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> { impl<'a, 'b, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a, 'b> {
type Data = RefCell<BalancedCaches<'extractor>>; type Data = RefCell<BalancedCaches<'extractor>>;
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> { fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
@ -318,7 +318,7 @@ impl<'doc> DelAddFacetValue<'doc> {
docid: DocumentId, docid: DocumentId,
sender: &FieldIdDocidFacetSender, sender: &FieldIdDocidFacetSender,
doc_alloc: &Bump, doc_alloc: &Bump,
) -> std::result::Result<(), crossbeam_channel::SendError<()>> { ) -> crate::Result<()> {
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc); let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
for ((fid, value), deladd) in self.strings { for ((fid, value), deladd) in self.strings {
if let Ok(s) = std::str::from_utf8(&value) { if let Ok(s) = std::str::from_utf8(&value) {

View File

@ -18,17 +18,17 @@ use crate::vector::error::{
use crate::vector::{Embedder, Embedding, EmbeddingConfigs}; use crate::vector::{Embedder, Embedding, EmbeddingConfigs};
use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError}; use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError};
pub struct EmbeddingExtractor<'a> { pub struct EmbeddingExtractor<'a, 'b> {
embedders: &'a EmbeddingConfigs, embedders: &'a EmbeddingConfigs,
sender: &'a EmbeddingSender<'a>, sender: EmbeddingSender<'a, 'b>,
possible_embedding_mistakes: PossibleEmbeddingMistakes, possible_embedding_mistakes: PossibleEmbeddingMistakes,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
} }
impl<'a> EmbeddingExtractor<'a> { impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
pub fn new( pub fn new(
embedders: &'a EmbeddingConfigs, embedders: &'a EmbeddingConfigs,
sender: &'a EmbeddingSender<'a>, sender: EmbeddingSender<'a, 'b>,
field_distribution: &'a FieldDistribution, field_distribution: &'a FieldDistribution,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
) -> Self { ) -> Self {
@ -43,7 +43,7 @@ pub struct EmbeddingExtractorData<'extractor>(
unsafe impl MostlySend for EmbeddingExtractorData<'_> {} unsafe impl MostlySend for EmbeddingExtractorData<'_> {}
impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
type Data = RefCell<EmbeddingExtractorData<'extractor>>; type Data = RefCell<EmbeddingExtractorData<'extractor>>;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result<Self::Data> { fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result<Self::Data> {
@ -259,7 +259,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
// Currently this is the case as: // Currently this is the case as:
// 1. BVec are inside of the bumaplo // 1. BVec are inside of the bumaplo
// 2. All other fields are either trivial (u8) or references. // 2. All other fields are either trivial (u8) or references.
struct Chunks<'a, 'extractor> { struct Chunks<'a, 'b, 'extractor> {
texts: BVec<'a, &'a str>, texts: BVec<'a, &'a str>,
ids: BVec<'a, DocumentId>, ids: BVec<'a, DocumentId>,
@ -270,11 +270,11 @@ struct Chunks<'a, 'extractor> {
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>, user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
sender: &'a EmbeddingSender<'a>, sender: EmbeddingSender<'a, 'b>,
has_manual_generation: Option<&'a str>, has_manual_generation: Option<&'a str>,
} }
impl<'a, 'extractor> Chunks<'a, 'extractor> { impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
embedder: &'a Embedder, embedder: &'a Embedder,
@ -284,7 +284,7 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>, user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
sender: &'a EmbeddingSender<'a>, sender: EmbeddingSender<'a, 'b>,
doc_alloc: &'a Bump, doc_alloc: &'a Bump,
) -> Self { ) -> Self {
let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint(); let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint();
@ -368,7 +368,7 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
possible_embedding_mistakes: &PossibleEmbeddingMistakes, possible_embedding_mistakes: &PossibleEmbeddingMistakes,
unused_vectors_distribution: &UnusedVectorsDistributionBump, unused_vectors_distribution: &UnusedVectorsDistributionBump,
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
sender: &EmbeddingSender<'a>, sender: EmbeddingSender<'a, 'b>,
has_manual_generation: Option<&'a str>, has_manual_generation: Option<&'a str>,
) -> Result<()> { ) -> Result<()> {
if let Some(external_docid) = has_manual_generation { if let Some(external_docid) = has_manual_generation {

View File

@ -70,7 +70,7 @@ impl<
F: FnOnce(&'extractor Bump) -> Result<T>, F: FnOnce(&'extractor Bump) -> Result<T>,
{ {
let doc_alloc = let doc_alloc =
doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024 * 1024)))); doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024))));
let doc_alloc = doc_alloc.0.take(); let doc_alloc = doc_alloc.0.take();
let fields_ids_map = fields_ids_map_store let fields_ids_map = fields_ids_map_store
.get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into()); .get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into());

View File

@ -62,6 +62,7 @@ mod update_by_function;
pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>( pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>(
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
index: &'index Index, index: &'index Index,
pool: &ThreadPoolNoAbort,
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,
db_fields_ids_map: &'indexer FieldsIdsMap, db_fields_ids_map: &'indexer FieldsIdsMap,
new_fields_ids_map: FieldsIdsMap, new_fields_ids_map: FieldsIdsMap,
@ -76,9 +77,28 @@ where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync, SP: Fn(Progress) + Sync,
{ {
let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000); let mut bbbuffers = Vec::new();
let finished_extraction = AtomicBool::new(false); let finished_extraction = AtomicBool::new(false);
// We compute and remove the allocated BBQueues buffers capacity from the indexing memory.
let minimum_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
(grenad_parameters, 2 * minimum_capacity), // 100 MiB by thread by default
|max_memory| {
// 2% of the indexing memory
let total_bbbuffer_capacity = (max_memory / 100 / 2).min(minimum_capacity);
let new_grenad_parameters = GrenadParameters {
max_memory: Some(max_memory - total_bbbuffer_capacity),
..grenad_parameters
};
(new_grenad_parameters, total_bbbuffer_capacity)
},
);
let (extractor_sender, mut writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap();
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
let new_fields_ids_map = RwLock::new(new_fields_ids_map); let new_fields_ids_map = RwLock::new(new_fields_ids_map);
@ -96,6 +116,7 @@ where
send_progress, send_progress,
}; };
let mut index_embeddings = index.embedding_configs(wtxn)?;
let mut field_distribution = index.field_distribution(wtxn)?; let mut field_distribution = index.field_distribution(wtxn)?;
let mut document_ids = index.documents_ids(wtxn)?; let mut document_ids = index.documents_ids(wtxn)?;
@ -107,261 +128,261 @@ where
let field_distribution = &mut field_distribution; let field_distribution = &mut field_distribution;
let document_ids = &mut document_ids; let document_ids = &mut document_ids;
let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); pool.install(move || {
let _entered = span.enter(); let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
let rtxn = index.read_txn()?;
// document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
{
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");
let _entered = span.enter(); let _entered = span.enter();
extract(document_changes,
&document_extractor, let rtxn = index.read_txn()?;
indexing_context,
&mut extractor_allocs, // document but we need to create a function that collects and compresses documents.
&datastore, let document_sender = extractor_sender.documents();
Step::ExtractingDocuments, let document_extractor = DocumentsExtractor::new(document_sender, embedders);
)?; let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
} {
{ let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents"); let _entered = span.enter();
let _entered = span.enter(); extract(
for document_extractor_data in datastore { document_changes,
let document_extractor_data = document_extractor_data.0.into_inner(); &document_extractor,
for (field, delta) in document_extractor_data.field_distribution_delta { indexing_context,
let current = field_distribution.entry(field).or_default(); &mut extractor_allocs,
// adding the delta should never cause a negative result, as we are removing fields that previously existed. &datastore,
*current = current.saturating_add_signed(delta); Step::ExtractingDocuments,
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents");
let _entered = span.enter();
for document_extractor_data in datastore {
let document_extractor_data = document_extractor_data.0.into_inner();
for (field, delta) in document_extractor_data.field_distribution_delta {
let current = field_distribution.entry(field).or_default();
// adding the delta should never cause a negative result, as we are removing fields that previously existed.
*current = current.saturating_add_signed(delta);
}
document_extractor_data.docids_delta.apply_to(document_ids);
} }
document_extractor_data.docids_delta.apply_to(document_ids);
field_distribution.retain(|_, v| *v != 0);
} }
field_distribution.retain(|_, v| *v != 0); let facet_field_ids_delta;
}
let facet_field_ids_delta; {
let caches = {
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted");
let _entered = span.enter();
{ FacetedDocidsExtractor::run_extraction(
let caches = { grenad_parameters,
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted"); document_changes,
let _entered = span.enter(); indexing_context,
&mut extractor_allocs,
&extractor_sender.field_id_docid_facet_sender(),
Step::ExtractingFacets
)?
};
FacetedDocidsExtractor::run_extraction( {
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted");
let _entered = span.enter();
facet_field_ids_delta = merge_and_send_facet_docids(
caches,
FacetDatabases::new(index),
index,
extractor_sender.facet_docids(),
)?;
}
}
{
let WordDocidsCaches {
word_docids,
word_fid_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
} = {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
WordDocidsExtractors::run_extraction(
grenad_parameters, grenad_parameters,
document_changes, document_changes,
indexing_context, indexing_context,
&mut extractor_allocs, &mut extractor_allocs,
&extractor_sender.field_id_docid_facet_sender(), Step::ExtractingWords
Step::ExtractingFacets
)? )?
}; };
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted"); let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter(); let _entered = span.enter();
merge_and_send_docids(
word_docids,
index.word_docids.remap_types(),
index,
extractor_sender.docids::<WordDocids>(),
&indexing_context.must_stop_processing,
)?;
}
facet_field_ids_delta = merge_and_send_facet_docids( {
caches, let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
FacetDatabases::new(index), let _entered = span.enter();
index, merge_and_send_docids(
extractor_sender.facet_docids(), word_fid_docids,
)?; index.word_fid_docids.remap_types(),
} index,
} extractor_sender.docids::<WordFidDocids>(),
&indexing_context.must_stop_processing,
)?;
}
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter();
merge_and_send_docids(
exact_word_docids,
index.exact_word_docids.remap_types(),
index,
extractor_sender.docids::<ExactWordDocids>(),
&indexing_context.must_stop_processing,
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter();
merge_and_send_docids(
word_position_docids,
index.word_position_docids.remap_types(),
index,
extractor_sender.docids::<WordPositionDocids>(),
&indexing_context.must_stop_processing,
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let WordDocidsCaches { let _entered = span.enter();
word_docids, merge_and_send_docids(
word_fid_docids, fid_word_count_docids,
exact_word_docids, index.field_id_word_count_docids.remap_types(),
word_position_docids, index,
fid_word_count_docids, extractor_sender.docids::<FidWordCountDocids>(),
} = { &indexing_context.must_stop_processing,
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); )?;
let _entered = span.enter(); }
WordDocidsExtractors::run_extraction(
grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
Step::ExtractingWords
)?
};
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter();
merge_and_send_docids(
word_docids,
index.word_docids.remap_types(),
index,
extractor_sender.docids::<WordDocids>(),
&indexing_context.must_stop_processing,
)?;
} }
{ // run the proximity extraction only if the precision is by word
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); // this works only if the settings didn't change during this transaction.
let _entered = span.enter(); let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default();
merge_and_send_docids( if proximity_precision == ProximityPrecision::ByWord {
word_fid_docids, let caches = {
index.word_fid_docids.remap_types(), let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
index, let _entered = span.enter();
extractor_sender.docids::<WordFidDocids>(),
&indexing_context.must_stop_processing, <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(
)?; grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
Step::ExtractingWordProximity,
)?
};
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
let _entered = span.enter();
merge_and_send_docids(
caches,
index.word_pair_proximity_docids.remap_types(),
index,
extractor_sender.docids::<WordPairProximityDocids>(),
&indexing_context.must_stop_processing,
)?;
}
} }
{ 'vectors: {
let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); if index_embeddings.is_empty() {
let _entered = span.enter(); break 'vectors;
merge_and_send_docids( }
exact_word_docids,
index.exact_word_docids.remap_types(),
index,
extractor_sender.docids::<ExactWordDocids>(),
&indexing_context.must_stop_processing,
)?;
}
{ let embedding_sender = extractor_sender.embeddings();
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); let extractor = EmbeddingExtractor::new(embedders, embedding_sender, field_distribution, request_threads());
let _entered = span.enter(); let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
merge_and_send_docids( {
word_position_docids, let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
index.word_position_docids.remap_types(), let _entered = span.enter();
index,
extractor_sender.docids::<WordPositionDocids>(),
&indexing_context.must_stop_processing,
)?;
}
{ extract(
let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); document_changes,
let _entered = span.enter(); &extractor,
merge_and_send_docids( indexing_context,
fid_word_count_docids, &mut extractor_allocs,
index.field_id_word_count_docids.remap_types(), &datastore,
index, Step::ExtractingEmbeddings,
extractor_sender.docids::<FidWordCountDocids>(), )?;
&indexing_context.must_stop_processing, }
)?; {
} let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors");
} let _entered = span.enter();
// run the proximity extraction only if the precision is by word for config in &mut index_embeddings {
// this works only if the settings didn't change during this transaction. 'data: for data in datastore.iter_mut() {
let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); let data = &mut data.get_mut().0;
if proximity_precision == ProximityPrecision::ByWord { let Some(deladd) = data.remove(&config.name) else { continue 'data; };
let caches = { deladd.apply_to(&mut config.user_provided);
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); }
let _entered = span.enter();
<WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(
grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
Step::ExtractingWordProximity,
)?
};
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
let _entered = span.enter();
merge_and_send_docids(
caches,
index.word_pair_proximity_docids.remap_types(),
index,
extractor_sender.docids::<WordPairProximityDocids>(),
&indexing_context.must_stop_processing,
)?;
}
}
'vectors: {
let mut index_embeddings = index.embedding_configs(&rtxn)?;
if index_embeddings.is_empty() {
break 'vectors;
}
let embedding_sender = extractor_sender.embeddings();
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter();
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, Step::ExtractingEmbeddings)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors");
let _entered = span.enter();
for config in &mut index_embeddings {
'data: for data in datastore.iter_mut() {
let data = &mut data.get_mut().0;
let Some(deladd) = data.remove(&config.name) else { continue 'data; };
deladd.apply_to(&mut config.user_provided);
} }
} }
} }
embedding_sender.finish(index_embeddings).unwrap(); 'geo: {
} let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
break 'geo;
};
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
'geo: { {
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
break 'geo; let _entered = span.enter();
};
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
{ extract(
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); document_changes,
let _entered = span.enter(); &extractor,
indexing_context,
&mut extractor_allocs,
&datastore,
Step::WritingGeoPoints
)?;
}
extract( merge_and_send_rtree(
document_changes, datastore,
&extractor, &rtxn,
indexing_context, index,
&mut extractor_allocs, extractor_sender.geo(),
&datastore, &indexing_context.must_stop_processing,
Step::WritingGeoPoints
)?; )?;
} }
merge_and_send_rtree( (indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase));
datastore,
&rtxn,
index,
extractor_sender.geo(),
&indexing_context.must_stop_processing,
)?;
}
(indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase)); finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); Result::Ok((facet_field_ids_delta, index_embeddings))
}).unwrap()
Result::Ok(facet_field_ids_delta)
})?; })?;
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
let vector_arroy = index.vector_arroy; let vector_arroy = index.vector_arroy;
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
let indexer_span = tracing::Span::current(); let indexer_span = tracing::Span::current();
let arroy_writers: Result<HashMap<_, _>> = embedders let arroy_writers: Result<HashMap<_, _>> = embedders
.inner_as_ref() .inner_as_ref()
@ -384,7 +405,11 @@ where
}) })
.collect(); .collect();
// Used by by the ArroySetVector to copy the embedding into an
// aligned memory area, required by arroy to accept a new vector.
let mut aligned_embedding = Vec::new();
let mut arroy_writers = arroy_writers?; let mut arroy_writers = arroy_writers?;
{ {
let span = tracing::trace_span!(target: "indexing::write_db", "all"); let span = tracing::trace_span!(target: "indexing::write_db", "all");
let _entered = span.enter(); let _entered = span.enter();
@ -392,110 +417,93 @@ where
let span = tracing::trace_span!(target: "indexing::write_db", "post_merge"); let span = tracing::trace_span!(target: "indexing::write_db", "post_merge");
let mut _entered_post_merge = None; let mut _entered_post_merge = None;
for operation in writer_receiver { while let Some(action) = writer_receiver.recv_action() {
if _entered_post_merge.is_none() if _entered_post_merge.is_none()
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed) && finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
{ {
_entered_post_merge = Some(span.enter()); _entered_post_merge = Some(span.enter());
} }
match operation {
WriterOperation::DbOperation(db_operation) => { match action {
let database = db_operation.database(index); ReceiverAction::WakeUp => (),
let database_name = db_operation.database_name(); ReceiverAction::LargeEntry(LargeEntry { database, key, value }) => {
match db_operation.entry() { let database_name = database.database_name();
EntryOperation::Delete(e) => match database.delete(wtxn, e.entry()) { let database = database.database(index);
Ok(false) => unreachable!("We tried to delete an unknown key"), if let Err(error) = database.put(wtxn, &key, &value) {
Ok(_) => (), return Err(Error::InternalError(InternalError::StorePut {
Err(error) => { database_name,
return Err(Error::InternalError( key: bstr::BString::from(&key[..]),
InternalError::StoreDeletion { value_length: value.len(),
database_name, error,
key: e.entry().to_owned(), }));
error,
},
));
}
},
EntryOperation::Write(e) => {
if let Err(error) = database.put(wtxn, e.key(), e.value()) {
return Err(Error::InternalError(InternalError::StorePut {
database_name,
key: e.key().to_owned(),
value_length: e.value().len(),
error,
}));
}
}
} }
} }
WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation { ReceiverAction::LargeVectors(large_vectors) => {
ArroyOperation::DeleteVectors { docid } => { let LargeVectors { docid, embedder_id, .. } = large_vectors;
for ( let (_, _, writer, dimensions) =
_embedder_index, arroy_writers.get(&embedder_id).expect("requested a missing embedder");
(_embedder_name, _embedder, writer, dimensions), let mut embeddings = Embeddings::new(*dimensions);
) in &mut arroy_writers for embedding in large_vectors.read_embeddings(*dimensions) {
{ embeddings.push(embedding.to_vec()).unwrap();
let dimensions = *dimensions;
writer.del_items(wtxn, dimensions, docid)?;
}
} }
ArroyOperation::SetVectors { writer.del_items(wtxn, *dimensions, docid)?;
docid, writer.add_items(wtxn, docid, &embeddings)?;
embedder_id, }
embeddings: raw_embeddings,
} => {
let (_, _, writer, dimensions) = arroy_writers
.get(&embedder_id)
.expect("requested a missing embedder");
let mut embeddings = Embeddings::new(*dimensions);
for embedding in raw_embeddings {
embeddings.append(embedding).unwrap();
}
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_items(wtxn, docid, &embeddings)?;
}
ArroyOperation::SetVector { docid, embedder_id, embedding } => {
let (_, _, writer, dimensions) = arroy_writers
.get(&embedder_id)
.expect("requested a missing embedder");
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_item(wtxn, docid, &embedding)?;
}
ArroyOperation::Finish { configs } => {
let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
let _entered = span.enter();
(indexing_context.send_progress)(Progress::from_step(
Step::WritingEmbeddingsToDatabase,
));
for (
_embedder_index,
(_embedder_name, _embedder, writer, dimensions),
) in &mut arroy_writers
{
let dimensions = *dimensions;
writer.build_and_quantize(
wtxn,
&mut rng,
dimensions,
false,
&indexing_context.must_stop_processing,
)?;
}
index.put_embedding_configs(wtxn, configs)?;
}
},
} }
// Every time the is a message in the channel we search
// for new entries in the BBQueue buffers.
write_from_bbqueue(
&mut writer_receiver,
index,
wtxn,
&arroy_writers,
&mut aligned_embedding,
)?;
} }
// Once the extractor/writer channel is closed
// we must process the remaining BBQueue messages.
write_from_bbqueue(
&mut writer_receiver,
index,
wtxn,
&arroy_writers,
&mut aligned_embedding,
)?;
} }
(indexing_context.send_progress)(Progress::from_step(Step::WaitingForExtractors)); (indexing_context.send_progress)(Progress::from_step(Step::WaitingForExtractors));
let facet_field_ids_delta = extractor_handle.join().unwrap()?; let (facet_field_ids_delta, index_embeddings) = extractor_handle.join().unwrap()?;
'vectors: {
let span =
tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
let _entered = span.enter();
if index_embeddings.is_empty() {
break 'vectors;
}
(indexing_context.send_progress)(Progress::from_step(
Step::WritingEmbeddingsToDatabase,
));
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
for (_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers {
let dimensions = *dimensions;
writer.build_and_quantize(
wtxn,
&mut rng,
dimensions,
false,
&indexing_context.must_stop_processing,
)?;
}
index.put_embedding_configs(wtxn, index_embeddings)?;
}
(indexing_context.send_progress)(Progress::from_step(Step::PostProcessingFacets)); (indexing_context.send_progress)(Progress::from_step(Step::PostProcessingFacets));
@ -537,6 +545,72 @@ where
Ok(()) Ok(())
} }
/// A function dedicated to manage all the available BBQueue frames.
///
/// It reads all the available frames, do the corresponding database operations
/// and stops when no frame are available.
fn write_from_bbqueue(
writer_receiver: &mut WriterBbqueueReceiver<'_>,
index: &Index,
wtxn: &mut RwTxn<'_>,
arroy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, ArroyWrapper, usize)>,
aligned_embedding: &mut Vec<f32>,
) -> crate::Result<()> {
while let Some(frame_with_header) = writer_receiver.recv_frame() {
match frame_with_header.header() {
EntryHeader::DbOperation(operation) => {
let database_name = operation.database.database_name();
let database = operation.database.database(index);
let frame = frame_with_header.frame();
match operation.key_value(frame) {
(key, Some(value)) => {
if let Err(error) = database.put(wtxn, key, value) {
return Err(Error::InternalError(InternalError::StorePut {
database_name,
key: key.into(),
value_length: value.len(),
error,
}));
}
}
(key, None) => match database.delete(wtxn, key) {
Ok(false) => {
unreachable!("We tried to delete an unknown key: {key:?}")
}
Ok(_) => (),
Err(error) => {
return Err(Error::InternalError(InternalError::StoreDeletion {
database_name,
key: key.into(),
error,
}));
}
},
}
}
EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => {
for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers {
let dimensions = *dimensions;
writer.del_items(wtxn, dimensions, docid)?;
}
}
EntryHeader::ArroySetVectors(asvs) => {
let ArroySetVectors { docid, embedder_id, .. } = asvs;
let frame = frame_with_header.frame();
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
let mut embeddings = Embeddings::new(*dimensions);
let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding);
embeddings.append(all_embeddings.to_vec()).unwrap();
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_items(wtxn, docid, &embeddings)?;
}
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
fn compute_prefix_database( fn compute_prefix_database(
index: &Index, index: &Index,

View File

@ -19,7 +19,7 @@ pub fn merge_and_send_rtree<'extractor, MSP>(
datastore: impl IntoIterator<Item = RefCell<GeoExtractorData<'extractor>>>, datastore: impl IntoIterator<Item = RefCell<GeoExtractorData<'extractor>>>,
rtxn: &RoTxn, rtxn: &RoTxn,
index: &Index, index: &Index,
geo_sender: GeoSender<'_>, geo_sender: GeoSender<'_, '_>,
must_stop_processing: &MSP, must_stop_processing: &MSP,
) -> Result<()> ) -> Result<()>
where where
@ -56,25 +56,25 @@ where
let rtree_mmap = unsafe { Mmap::map(&file)? }; let rtree_mmap = unsafe { Mmap::map(&file)? };
geo_sender.set_rtree(rtree_mmap).unwrap(); geo_sender.set_rtree(rtree_mmap).unwrap();
geo_sender.set_geo_faceted(&faceted).unwrap(); geo_sender.set_geo_faceted(&faceted)?;
Ok(()) Ok(())
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_docids<'extractor, MSP>( pub fn merge_and_send_docids<'extractor, MSP, D>(
mut caches: Vec<BalancedCaches<'extractor>>, mut caches: Vec<BalancedCaches<'extractor>>,
database: Database<Bytes, Bytes>, database: Database<Bytes, Bytes>,
index: &Index, index: &Index,
docids_sender: impl DocidsSender + Sync, docids_sender: WordDocidsSender<D>,
must_stop_processing: &MSP, must_stop_processing: &MSP,
) -> Result<()> ) -> Result<()>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
D: DatabaseType + Sync,
{ {
transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| { transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| {
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
let mut buffer = Vec::new();
if must_stop_processing() { if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into()); return Err(InternalError::AbortedIndexation.into());
} }
@ -82,12 +82,11 @@ where
let current = database.get(&rtxn, key)?; let current = database.get(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? { match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); docids_sender.write(key, &bitmap)?;
docids_sender.write(key, value).unwrap();
Ok(()) Ok(())
} }
Operation::Delete => { Operation::Delete => {
docids_sender.delete(key).unwrap(); docids_sender.delete(key)?;
Ok(()) Ok(())
} }
Operation::Ignore => Ok(()), Operation::Ignore => Ok(()),
@ -101,26 +100,24 @@ pub fn merge_and_send_facet_docids<'extractor>(
mut caches: Vec<BalancedCaches<'extractor>>, mut caches: Vec<BalancedCaches<'extractor>>,
database: FacetDatabases, database: FacetDatabases,
index: &Index, index: &Index,
docids_sender: impl DocidsSender + Sync, docids_sender: FacetDocidsSender,
) -> Result<FacetFieldIdsDelta> { ) -> Result<FacetFieldIdsDelta> {
transpose_and_freeze_caches(&mut caches)? transpose_and_freeze_caches(&mut caches)?
.into_par_iter() .into_par_iter()
.map(|frozen| { .map(|frozen| {
let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); let mut facet_field_ids_delta = FacetFieldIdsDelta::default();
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
let mut buffer = Vec::new();
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? { match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => {
facet_field_ids_delta.register_from_key(key); facet_field_ids_delta.register_from_key(key);
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); docids_sender.write(key, &bitmap)?;
docids_sender.write(key, value).unwrap();
Ok(()) Ok(())
} }
Operation::Delete => { Operation::Delete => {
facet_field_ids_delta.register_from_key(key); facet_field_ids_delta.register_from_key(key);
docids_sender.delete(key).unwrap(); docids_sender.delete(key)?;
Ok(()) Ok(())
} }
Operation::Ignore => Ok(()), Operation::Ignore => Ok(()),
@ -252,10 +249,3 @@ fn merge_cbo_bitmaps(
} }
} }
} }
/// TODO Return the slice directly from the serialize_into method
fn cbo_bitmap_serialize_into_vec<'b>(bitmap: &RoaringBitmap, buffer: &'b mut Vec<u8>) -> &'b [u8] {
buffer.clear();
CboRoaringBitmapCodec::serialize_into(bitmap, buffer);
buffer.as_slice()
}

View File

@ -5,6 +5,7 @@ pub trait RefCellExt<T: ?Sized> {
&self, &self,
) -> std::result::Result<RefMut<'_, T>, std::cell::BorrowMutError>; ) -> std::result::Result<RefMut<'_, T>, std::cell::BorrowMutError>;
#[track_caller]
fn borrow_mut_or_yield(&self) -> RefMut<'_, T> { fn borrow_mut_or_yield(&self) -> RefMut<'_, T> {
self.try_borrow_mut_or_yield().unwrap() self.try_borrow_mut_or_yield().unwrap()
} }

View File

@ -11,8 +11,8 @@ pub enum Step {
ExtractingEmbeddings, ExtractingEmbeddings,
WritingGeoPoints, WritingGeoPoints,
WritingToDatabase, WritingToDatabase,
WritingEmbeddingsToDatabase,
WaitingForExtractors, WaitingForExtractors,
WritingEmbeddingsToDatabase,
PostProcessingFacets, PostProcessingFacets,
PostProcessingWords, PostProcessingWords,
Finalizing, Finalizing,
@ -29,8 +29,8 @@ impl Step {
Step::ExtractingEmbeddings => "extracting embeddings", Step::ExtractingEmbeddings => "extracting embeddings",
Step::WritingGeoPoints => "writing geo points", Step::WritingGeoPoints => "writing geo points",
Step::WritingToDatabase => "writing to database", Step::WritingToDatabase => "writing to database",
Step::WritingEmbeddingsToDatabase => "writing embeddings to database",
Step::WaitingForExtractors => "waiting for extractors", Step::WaitingForExtractors => "waiting for extractors",
Step::WritingEmbeddingsToDatabase => "writing embeddings to database",
Step::PostProcessingFacets => "post-processing facets", Step::PostProcessingFacets => "post-processing facets",
Step::PostProcessingWords => "post-processing words", Step::PostProcessingWords => "post-processing words",
Step::Finalizing => "finalizing", Step::Finalizing => "finalizing",

View File

@ -76,7 +76,7 @@ impl WordPrefixDocids {
.union()?; .union()?;
buffer.clear(); buffer.clear();
CboRoaringBitmapCodec::serialize_into(&output, buffer); CboRoaringBitmapCodec::serialize_into_vec(&output, buffer);
index.push(PrefixEntry { prefix, serialized_length: buffer.len() }); index.push(PrefixEntry { prefix, serialized_length: buffer.len() });
file.write_all(buffer) file.write_all(buffer)
})?; })?;
@ -211,7 +211,7 @@ impl WordPrefixIntegerDocids {
.union()?; .union()?;
buffer.clear(); buffer.clear();
CboRoaringBitmapCodec::serialize_into(&output, buffer); CboRoaringBitmapCodec::serialize_into_vec(&output, buffer);
index.push(PrefixIntegerEntry { prefix, pos, serialized_length: buffer.len() }); index.push(PrefixIntegerEntry { prefix, pos, serialized_length: buffer.len() });
file.write_all(buffer)?; file.write_all(buffer)?;
} }

View File

@ -475,7 +475,7 @@ impl<F> Embeddings<F> {
Ok(()) Ok(())
} }
/// Append a flat vector of embeddings a the end of the embeddings. /// Append a flat vector of embeddings at the end of the embeddings.
/// ///
/// If `embeddings.len() % self.dimension != 0`, then the append operation fails. /// If `embeddings.len() % self.dimension != 0`, then the append operation fails.
pub fn append(&mut self, mut embeddings: Vec<F>) -> Result<(), Vec<F>> { pub fn append(&mut self, mut embeddings: Vec<F>) -> Result<(), Vec<F>> {

View File

@ -64,6 +64,7 @@ fn test_facet_distribution_with_no_facet_values() {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,

View File

@ -101,6 +101,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,

View File

@ -333,6 +333,7 @@ fn criteria_ascdesc() {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,

View File

@ -142,6 +142,7 @@ fn test_typo_disabled_on_word() {
indexer::index( indexer::index(
&mut wtxn, &mut wtxn,
&index, &index,
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
config.grenad_parameters(), config.grenad_parameters(),
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,