Compare commits

...

16 Commits

Author SHA1 Message Date
Louis Dureuil
3c640386af
Merge 91c58cfa38 into 94fb55bb6f 2024-11-14 10:40:21 +00:00
ManyTheFish
91c58cfa38 Fix positional databases 2024-11-14 11:40:12 +01:00
Clément Renault
9e8367f1e6
Move the rayon thread pool outside the extract method 2024-11-14 10:40:32 +01:00
Louis Dureuil
0e3c5d91ab
Document deletion test passes 2024-11-14 08:42:56 +01:00
Louis Dureuil
695c2c6b99
Cosmetic fix 2024-11-14 08:42:39 +01:00
meili-bors[bot]
94fb55bb6f
Merge #5049
Some checks failed
Test suite / Tests on ${{ matrix.os }} (macos-13) (push) Waiting to run
Test suite / Tests on ubuntu-20.04 (push) Failing after 59s
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Run tests in debug (push) Failing after 13s
Test suite / Tests on ${{ matrix.os }} (windows-2022) (push) Failing after 7m4s
Test suite / Run Clippy (push) Successful in 10m58s
Test suite / Run Rustfmt (push) Successful in 2m34s
Run the indexing fuzzer / Setup the action (push) Successful in 1h5m58s
Indexing bench (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of indexing (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for geo (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for songs (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for Wikipedia articles (push) / Run and upload benchmarks (push) Has been cancelled
5049: Fix the path used in the flaky tests CI r=irevoire a=Kerollmops

This PR fixes [the flaky tests CI](https://github.com/meilisearch/meilisearch/actions/runs/11741717787) path used.

Co-authored-by: Clément Renault <clement@meilisearch.com>
2024-11-13 10:26:50 +00:00
Clément Renault
009709eace
Fix the path used in the flaky tests CI 2024-11-13 09:52:10 +01:00
meili-bors[bot]
a5d7ae23bd
Merge #5044
5044: Adds new metrics to prometheus r=irevoire a=PedroTurik

not 100% confident in this solution, especially because i couldn't make the "Search Queue searches waiting" metric give me any value other than 0 with my local testing 😆. But i believe it solves the Issue.

# Pull Request

## Related issue
Fixes #4998 

## What does this PR do?
### Adds new metrics to prometheus;
- SearchQueue size, 
- SearchQueue searches running, 
- and Search Queue searches waiting.

## PR checklist
Please check if your PR fulfills the following requirements:
- [x] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)?
- [x] Have you read the contributing guidelines?
- [x] Have you made sure that the title is accurate and descriptive of the changes?

Co-authored-by: Pedro Turik Firmino <pedroturik@gmail.com>
2024-11-07 17:05:43 +00:00
PedroTurik
03886d0012
Applies optimizations to formatted integration tests (#5043) 2024-11-07 15:58:55 +01:00
meili-bors[bot]
b427b9e88f
Merge #5025
5025: test: improve performance of get_documents.rs r=irevoire a=PedroTurik

# Pull Request

## Related issue
Fixes one item from #4840 

## What does this PR do?
- Applies the changes recommended on the issue for `meilisearch/tests/documents/get_documents.rs`

## PR checklist
Please check if your PR fulfills the following requirements:
- [x] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)?
- [x] Have you read the contributing guidelines?
- [x] Have you made sure that the title is accurate and descriptive of the changes?

Thank you so much for contributing to Meilisearch!


Co-authored-by: Pedro Turik Firmino <pedroturik@gmail.com>
2024-11-07 09:46:34 +00:00
Pedro Turik Firmino
8b95f5ccc6 Adds new metrics to prometheus: SearchQueue size, SearchQueue searches running, and Search Queue searches waiting. 2024-11-06 15:37:16 -03:00
Pedro Turik Firmino
da59a043ba Fixes formatting issues 2024-11-06 09:55:48 -03:00
Pedro Turik Firmino
da4d47b5d0 Fixes formatting issues 2024-11-06 09:54:20 -03:00
Pedro Turik Firmino
d0b1ba20cb Improves usage of shared indexes 2024-11-04 17:26:50 -03:00
Pedro Turik Firmino
c79ca9679b Changes variable name to re-run CI 2024-11-02 18:25:33 -03:00
Pedro Turik Firmino
a934b0ac6a Applies optimizations to some integration tests 2024-10-29 18:49:06 -03:00
20 changed files with 483 additions and 381 deletions

View File

@ -21,10 +21,10 @@ jobs:
- name: Install cargo-flaky - name: Install cargo-flaky
run: cargo install cargo-flaky run: cargo install cargo-flaky
- name: Run cargo flaky in the dumps - name: Run cargo flaky in the dumps
run: cd dump; cargo flaky -i 100 --release run: cd crates/dump; cargo flaky -i 100 --release
- name: Run cargo flaky in the index-scheduler - name: Run cargo flaky in the index-scheduler
run: cd index-scheduler; cargo flaky -i 100 --release run: cd crates/index-scheduler; cargo flaky -i 100 --release
- name: Run cargo flaky in the auth - name: Run cargo flaky in the auth
run: cd meilisearch-auth; cargo flaky -i 100 --release run: cd crates/meilisearch-auth; cargo flaky -i 100 --release
- name: Run cargo flaky in meilisearch - name: Run cargo flaky in meilisearch
run: cd meilisearch; cargo flaky -i 100 --release run: cd crates/meilisearch; cargo flaky -i 100 --release

View File

@ -39,7 +39,7 @@ use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSe
use meilisearch_types::milli::vector::parsed_vectors::{ use meilisearch_types::milli::vector::parsed_vectors::{
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
}; };
use meilisearch_types::milli::{self, Filter}; use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
@ -1277,7 +1277,6 @@ impl IndexScheduler {
operations, operations,
mut tasks, mut tasks,
} => { } => {
let indexer_config = self.index_mapper.indexer_config();
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
// this is made difficult by the fact we're doing private clones of the index scheduler and sending it // this is made difficult by the fact we're doing private clones of the index scheduler and sending it
// to a fresh thread. // to a fresh thread.
@ -1386,10 +1385,16 @@ impl IndexScheduler {
} }
if tasks.iter().any(|res| res.error.is_none()) { if tasks.iter().any(|res| res.error.is_none()) {
/// TODO create a pool if needed let local_pool;
// let pool = indexer_config.thread_pool.unwrap(); let pool = match &self.index_mapper.indexer_config().thread_pool {
let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
// TODO we want to multithread this
let document_changes = indexer.into_changes( let document_changes = indexer.into_changes(
&indexer_alloc, &indexer_alloc,
index, index,
@ -1398,18 +1403,20 @@ impl IndexScheduler {
&mut new_fields_ids_map, &mut new_fields_ids_map,
)?; )?;
pool.install(|| {
indexer::index( indexer::index(
index_wtxn, index_wtxn,
index, index,
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
primary_key_has_been_set.then_some(primary_key), primary_key_has_been_set.then_some(primary_key),
&pool,
&document_changes, &document_changes,
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
&send_progress, &send_progress,
)?; )
})
.unwrap()?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} }
@ -1489,11 +1496,18 @@ impl IndexScheduler {
let result_count = Ok((candidates.len(), candidates.len())) as Result<_>; let result_count = Ok((candidates.len(), candidates.len())) as Result<_>;
if task.error.is_none() { if task.error.is_none() {
/// TODO create a pool if needed let local_pool;
// let pool = indexer_config.thread_pool.unwrap(); let pool = match &self.index_mapper.indexer_config().thread_pool {
let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone()); pool.install(|| {
let indexer =
UpdateByFunction::new(candidates, context.clone(), code.clone());
let document_changes = indexer.into_changes(&primary_key)?; let document_changes = indexer.into_changes(&primary_key)?;
let embedders = index.embedding_configs(index_wtxn)?; let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?; let embedders = self.embedders(embedders)?;
@ -1504,13 +1518,16 @@ impl IndexScheduler {
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
None, // cannot change primary key in DocumentEdition None, // cannot change primary key in DocumentEdition
&pool,
&document_changes, &document_changes,
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
&send_progress, &send_progress,
)?; )?;
Result::Ok(())
})
.unwrap()?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} }
@ -1629,9 +1646,14 @@ impl IndexScheduler {
.map_err(milli::Error::from)?; .map_err(milli::Error::from)?;
if !tasks.iter().all(|res| res.error.is_some()) { if !tasks.iter().all(|res| res.error.is_some()) {
/// TODO create a pool if needed let local_pool;
// let pool = indexer_config.thread_pool.unwrap(); let pool = match &self.index_mapper.indexer_config().thread_pool {
let pool = rayon::ThreadPoolBuilder::new().build().unwrap(); Some(pool) => pool,
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let mut indexer = indexer::DocumentDeletion::new(); let mut indexer = indexer::DocumentDeletion::new();
indexer.delete_documents_by_docids(to_delete); indexer.delete_documents_by_docids(to_delete);
@ -1639,18 +1661,20 @@ impl IndexScheduler {
let embedders = index.embedding_configs(index_wtxn)?; let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?; let embedders = self.embedders(embedders)?;
pool.install(|| {
indexer::index( indexer::index(
index_wtxn, index_wtxn,
index, index,
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
None, // document deletion never changes primary key None, // document deletion never changes primary key
&pool,
&document_changes, &document_changes,
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
&send_progress, &send_progress,
)?; )
})
.unwrap()?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} }

View File

@ -49,4 +49,18 @@ lazy_static! {
pub static ref MEILISEARCH_IS_INDEXING: IntGauge = pub static ref MEILISEARCH_IS_INDEXING: IntGauge =
register_int_gauge!(opts!("meilisearch_is_indexing", "Meilisearch Is Indexing")) register_int_gauge!(opts!("meilisearch_is_indexing", "Meilisearch Is Indexing"))
.expect("Can't create a metric"); .expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCH_QUEUE_SIZE: IntGauge = register_int_gauge!(opts!(
"meilisearch_search_queue_size",
"Meilisearch Search Queue Size"
))
.expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCHES_RUNNING: IntGauge =
register_int_gauge!(opts!("meilisearch_searches_running", "Meilisearch Searches Running"))
.expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED: IntGauge =
register_int_gauge!(opts!(
"meilisearch_searches_waiting_to_be_processed",
"Meilisearch Searches Being Processed"
))
.expect("Can't create a metric");
} }

View File

@ -10,6 +10,7 @@ use prometheus::{Encoder, TextEncoder};
use crate::extractors::authentication::policies::ActionPolicy; use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::{AuthenticationError, GuardedData}; use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::routes::create_all_stats; use crate::routes::create_all_stats;
use crate::search_queue::SearchQueue;
pub fn configure(config: &mut web::ServiceConfig) { pub fn configure(config: &mut web::ServiceConfig) {
config.service(web::resource("").route(web::get().to(get_metrics))); config.service(web::resource("").route(web::get().to(get_metrics)));
@ -18,6 +19,7 @@ pub fn configure(config: &mut web::ServiceConfig) {
pub async fn get_metrics( pub async fn get_metrics(
index_scheduler: GuardedData<ActionPolicy<{ actions::METRICS_GET }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::METRICS_GET }>, Data<IndexScheduler>>,
auth_controller: Data<AuthController>, auth_controller: Data<AuthController>,
search_queue: web::Data<SearchQueue>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
index_scheduler.features().check_metrics()?; index_scheduler.features().check_metrics()?;
let auth_filters = index_scheduler.filters(); let auth_filters = index_scheduler.filters();
@ -35,6 +37,11 @@ pub async fn get_metrics(
crate::metrics::MEILISEARCH_USED_DB_SIZE_BYTES.set(response.used_database_size as i64); crate::metrics::MEILISEARCH_USED_DB_SIZE_BYTES.set(response.used_database_size as i64);
crate::metrics::MEILISEARCH_INDEX_COUNT.set(response.indexes.len() as i64); crate::metrics::MEILISEARCH_INDEX_COUNT.set(response.indexes.len() as i64);
crate::metrics::MEILISEARCH_SEARCH_QUEUE_SIZE.set(search_queue.capacity() as i64);
crate::metrics::MEILISEARCH_SEARCHES_RUNNING.set(search_queue.searches_running() as i64);
crate::metrics::MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED
.set(search_queue.searches_waiting() as i64);
for (index, value) in response.indexes.iter() { for (index, value) in response.indexes.iter() {
crate::metrics::MEILISEARCH_INDEX_DOCS_COUNT crate::metrics::MEILISEARCH_INDEX_DOCS_COUNT
.with_label_values(&[index]) .with_label_values(&[index])

View File

@ -18,6 +18,8 @@
//! And should drop the Permit only once you have freed all the RAM consumed by the method. //! And should drop the Permit only once you have freed all the RAM consumed by the method.
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use rand::rngs::StdRng; use rand::rngs::StdRng;
@ -33,6 +35,8 @@ pub struct SearchQueue {
/// If we have waited longer than this to get a permit, we should abort the search request entirely. /// If we have waited longer than this to get a permit, we should abort the search request entirely.
/// The client probably already closed the connection, but we have no way to find out. /// The client probably already closed the connection, but we have no way to find out.
time_to_abort: Duration, time_to_abort: Duration,
searches_running: Arc<AtomicUsize>,
searches_waiting_to_be_processed: Arc<AtomicUsize>,
} }
/// You should only run search requests while holding this permit. /// You should only run search requests while holding this permit.
@ -68,14 +72,41 @@ impl SearchQueue {
// so let's not allocate any RAM and keep a capacity of 1. // so let's not allocate any RAM and keep a capacity of 1.
let (sender, receiver) = mpsc::channel(1); let (sender, receiver) = mpsc::channel(1);
tokio::task::spawn(Self::run(capacity, paralellism, receiver)); let instance = Self {
Self { sender, capacity, time_to_abort: Duration::from_secs(60) } sender,
capacity,
time_to_abort: Duration::from_secs(60),
searches_running: Default::default(),
searches_waiting_to_be_processed: Default::default(),
};
tokio::task::spawn(Self::run(
capacity,
paralellism,
receiver,
Arc::clone(&instance.searches_running),
Arc::clone(&instance.searches_waiting_to_be_processed),
));
instance
} }
pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self { pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self {
Self { time_to_abort, ..self } Self { time_to_abort, ..self }
} }
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn searches_running(&self) -> usize {
self.searches_running.load(Ordering::Relaxed)
}
pub fn searches_waiting(&self) -> usize {
self.searches_waiting_to_be_processed.load(Ordering::Relaxed)
}
/// This function is the main loop, it's in charge on scheduling which search request should execute first and /// This function is the main loop, it's in charge on scheduling which search request should execute first and
/// how many should executes at the same time. /// how many should executes at the same time.
/// ///
@ -84,6 +115,8 @@ impl SearchQueue {
capacity: usize, capacity: usize,
parallelism: NonZeroUsize, parallelism: NonZeroUsize,
mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>, mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>,
metric_searches_running: Arc<AtomicUsize>,
metric_searches_waiting: Arc<AtomicUsize>,
) { ) {
let mut queue: Vec<oneshot::Sender<Permit>> = Default::default(); let mut queue: Vec<oneshot::Sender<Permit>> = Default::default();
let mut rng: StdRng = StdRng::from_entropy(); let mut rng: StdRng = StdRng::from_entropy();
@ -133,6 +166,9 @@ impl SearchQueue {
queue.push(search_request); queue.push(search_request);
}, },
} }
metric_searches_running.store(searches_running, Ordering::Relaxed);
metric_searches_waiting.store(queue.len(), Ordering::Relaxed);
} }
} }

View File

@ -389,3 +389,25 @@ pub static VECTOR_DOCUMENTS: Lazy<Value> = Lazy::new(|| {
}, },
]) ])
}); });
pub async fn shared_index_with_test_set() -> &'static Index<'static, Shared> {
static INDEX: OnceCell<Index<'static, Shared>> = OnceCell::const_new();
INDEX
.get_or_init(|| async {
let server = Server::new_shared();
let index = server._index("SHARED_TEST_SET").to_shared();
let url = format!("/indexes/{}/documents", urlencoding::encode(index.uid.as_ref()));
let (response, code) = index
.service
.post_str(
url,
include_str!("../assets/test_set.json"),
vec![("content-type", "application/json")],
)
.await;
assert_eq!(code, 202);
index.wait_task(response.uid()).await;
index
})
.await
}

View File

@ -4,24 +4,27 @@ use meili_snap::*;
use urlencoding::encode as urlencode; use urlencoding::encode as urlencode;
use crate::common::encoder::Encoder; use crate::common::encoder::Encoder;
use crate::common::{GetAllDocumentsOptions, Server, Value}; use crate::common::{
shared_does_not_exists_index, shared_empty_index, shared_index_with_test_set,
GetAllDocumentsOptions, Server, Value,
};
use crate::json; use crate::json;
// TODO: partial test since we are testing error, amd error is not yet fully implemented in // TODO: partial test since we are testing error, amd error is not yet fully implemented in
// transplant // transplant
#[actix_rt::test] #[actix_rt::test]
async fn get_unexisting_index_single_document() { async fn get_unexisting_index_single_document() {
let server = Server::new().await; let (_response, code) = shared_does_not_exists_index().await.get_document(1, None).await;
let (_response, code) = server.index("test").get_document(1, None).await;
assert_eq!(code, 404); assert_eq!(code, 404);
} }
#[actix_rt::test] #[actix_rt::test]
async fn error_get_unexisting_document() { async fn error_get_unexisting_document() {
let server = Server::new().await; let server = Server::new_shared();
let index = server.index("test"); let index = server.unique_index();
index.create(None).await; let (task, _code) = index.create(None).await;
index.wait_task(0).await; index.wait_task(task.uid()).await.succeeded();
let (response, code) = index.get_document(1, None).await; let (response, code) = index.get_document(1, None).await;
let expected_response = json!({ let expected_response = json!({
@ -37,18 +40,19 @@ async fn error_get_unexisting_document() {
#[actix_rt::test] #[actix_rt::test]
async fn get_document() { async fn get_document() {
let server = Server::new().await; let server = Server::new_shared();
let index = server.index("test"); let index = server.unique_index();
index.create(None).await; let (task, _code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded();
let documents = json!([ let documents = json!([
{ {
"id": 0, "id": 0,
"nested": { "content": "foobar" }, "nested": { "content": "foobar" },
} }
]); ]);
let (_, code) = index.add_documents(documents, None).await; let (task, code) = index.add_documents(documents, None).await;
assert_eq!(code, 202); assert_eq!(code, 202);
index.wait_task(1).await; index.wait_task(task.uid()).await.succeeded();
let (response, code) = index.get_document(0, None).await; let (response, code) = index.get_document(0, None).await;
assert_eq!(code, 200); assert_eq!(code, 200);
assert_eq!( assert_eq!(
@ -81,12 +85,11 @@ async fn get_document() {
#[actix_rt::test] #[actix_rt::test]
async fn error_get_unexisting_index_all_documents() { async fn error_get_unexisting_index_all_documents() {
let server = Server::new().await; let index = shared_does_not_exists_index().await;
let (response, code) = let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
server.index("test").get_all_documents(GetAllDocumentsOptions::default()).await;
let expected_response = json!({ let expected_response = json!({
"message": "Index `test` not found.", "message": "Index `DOES_NOT_EXISTS` not found.",
"code": "index_not_found", "code": "index_not_found",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#index_not_found" "link": "https://docs.meilisearch.com/errors#index_not_found"
@ -98,12 +101,7 @@ async fn error_get_unexisting_index_all_documents() {
#[actix_rt::test] #[actix_rt::test]
async fn get_no_document() { async fn get_no_document() {
let server = Server::new().await; let index = shared_empty_index().await;
let index = server.index("test");
let (_, code) = index.create(None).await;
assert_eq!(code, 202);
index.wait_task(0).await;
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await; let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
assert_eq!(code, 200); assert_eq!(code, 200);
@ -112,14 +110,12 @@ async fn get_no_document() {
#[actix_rt::test] #[actix_rt::test]
async fn get_all_documents_no_options() { async fn get_all_documents_no_options() {
let server = Server::new().await; let index = shared_index_with_test_set().await;
let index = server.index("test");
index.load_test_set().await;
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await; let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
assert_eq!(code, 200); assert_eq!(code, 200);
let arr = response["results"].as_array().unwrap(); let results = response["results"].as_array().unwrap();
assert_eq!(arr.len(), 20); assert_eq!(results.len(), 20);
let first = json!({ let first = json!({
"id":0, "id":0,
"isActive":false, "isActive":false,
@ -138,19 +134,16 @@ async fn get_all_documents_no_options() {
"longitude":-145.725388, "longitude":-145.725388,
"tags":["bug" "tags":["bug"
,"bug"]}); ,"bug"]});
assert_eq!(first, arr[0]); assert_eq!(first, results[0]);
} }
#[actix_rt::test] #[actix_rt::test]
async fn get_all_documents_no_options_with_response_compression() { async fn get_all_documents_no_options_with_response_compression() {
let server = Server::new().await; let index = shared_index_with_test_set().await;
let index_uid = "test";
let index = server.index(index_uid);
index.load_test_set().await;
let app = server.init_web_app().await; let app = Server::new_shared().init_web_app().await;
let req = test::TestRequest::get() let req = test::TestRequest::get()
.uri(&format!("/indexes/{}/documents?", urlencode(index_uid))) .uri(&format!("/indexes/{}/documents?", urlencode(&index.uid)))
.insert_header((ACCEPT_ENCODING, "gzip")) .insert_header((ACCEPT_ENCODING, "gzip"))
.to_request(); .to_request();
@ -169,9 +162,7 @@ async fn get_all_documents_no_options_with_response_compression() {
#[actix_rt::test] #[actix_rt::test]
async fn test_get_all_documents_limit() { async fn test_get_all_documents_limit() {
let server = Server::new().await; let index = shared_index_with_test_set().await;
let index = server.index("test");
index.load_test_set().await;
let (response, code) = index let (response, code) = index
.get_all_documents(GetAllDocumentsOptions { limit: Some(5), ..Default::default() }) .get_all_documents(GetAllDocumentsOptions { limit: Some(5), ..Default::default() })
@ -186,9 +177,7 @@ async fn test_get_all_documents_limit() {
#[actix_rt::test] #[actix_rt::test]
async fn test_get_all_documents_offset() { async fn test_get_all_documents_offset() {
let server = Server::new().await; let index = shared_index_with_test_set().await;
let index = server.index("test");
index.load_test_set().await;
let (response, code) = index let (response, code) = index
.get_all_documents(GetAllDocumentsOptions { offset: Some(5), ..Default::default() }) .get_all_documents(GetAllDocumentsOptions { offset: Some(5), ..Default::default() })
@ -203,9 +192,7 @@ async fn test_get_all_documents_offset() {
#[actix_rt::test] #[actix_rt::test]
async fn test_get_all_documents_attributes_to_retrieve() { async fn test_get_all_documents_attributes_to_retrieve() {
let server = Server::new().await; let index = shared_index_with_test_set().await;
let index = server.index("test");
index.load_test_set().await;
let (response, code) = index let (response, code) = index
.get_all_documents(GetAllDocumentsOptions { .get_all_documents(GetAllDocumentsOptions {
@ -286,9 +273,11 @@ async fn test_get_all_documents_attributes_to_retrieve() {
#[actix_rt::test] #[actix_rt::test]
async fn get_document_s_nested_attributes_to_retrieve() { async fn get_document_s_nested_attributes_to_retrieve() {
let server = Server::new().await; let server = Server::new_shared();
let index = server.index("test"); let index = server.unique_index();
index.create(None).await; let (task, _code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded();
let documents = json!([ let documents = json!([
{ {
"id": 0, "id": 0,
@ -302,9 +291,9 @@ async fn get_document_s_nested_attributes_to_retrieve() {
}, },
}, },
]); ]);
let (_, code) = index.add_documents(documents, None).await; let (task, code) = index.add_documents(documents, None).await;
assert_eq!(code, 202); assert_eq!(code, 202);
index.wait_task(1).await; index.wait_task(task.uid()).await.succeeded();
let (response, code) = index.get_document(0, Some(json!({ "fields": ["content"] }))).await; let (response, code) = index.get_document(0, Some(json!({ "fields": ["content"] }))).await;
assert_eq!(code, 200); assert_eq!(code, 200);
@ -343,10 +332,10 @@ async fn get_document_s_nested_attributes_to_retrieve() {
#[actix_rt::test] #[actix_rt::test]
async fn get_documents_displayed_attributes_is_ignored() { async fn get_documents_displayed_attributes_is_ignored() {
let server = Server::new().await; let server = Server::new_shared();
let index = server.index("test"); let index = server.unique_index();
index.update_settings(json!({"displayedAttributes": ["gender"]})).await;
index.load_test_set().await; index.load_test_set().await;
index.update_settings(json!({"displayedAttributes": ["gender"]})).await;
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await; let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
assert_eq!(code, 200); assert_eq!(code, 200);
@ -366,10 +355,10 @@ async fn get_documents_displayed_attributes_is_ignored() {
#[actix_rt::test] #[actix_rt::test]
async fn get_document_by_filter() { async fn get_document_by_filter() {
let server = Server::new().await; let server = Server::new_shared();
let index = server.index("doggo"); let index = server.unique_index();
index.update_settings_filterable_attributes(json!(["color"])).await; index.update_settings_filterable_attributes(json!(["color"])).await;
index let (task, _code) = index
.add_documents( .add_documents(
json!([ json!([
{ "id": 0, "color": "red" }, { "id": 0, "color": "red" },
@ -380,7 +369,7 @@ async fn get_document_by_filter() {
Some("id"), Some("id"),
) )
.await; .await;
index.wait_task(1).await; index.wait_task(task.uid()).await.succeeded();
let (response, code) = index.get_document_by_filter(json!({})).await; let (response, code) = index.get_document_by_filter(json!({})).await;
let (response2, code2) = index.get_all_documents_raw("").await; let (response2, code2) = index.get_all_documents_raw("").await;
@ -552,7 +541,7 @@ async fn get_document_with_vectors() {
})) }))
.await; .await;
snapshot!(code, @"202 Accepted"); snapshot!(code, @"202 Accepted");
server.wait_task(response.uid()).await; server.wait_task(response.uid()).await.succeeded();
let documents = json!([ let documents = json!([
{"id": 0, "name": "kefir", "_vectors": { "manual": [0, 0, 0] }}, {"id": 0, "name": "kefir", "_vectors": { "manual": [0, 0, 0] }},
@ -560,7 +549,7 @@ async fn get_document_with_vectors() {
]); ]);
let (value, code) = index.add_documents(documents, None).await; let (value, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted"); snapshot!(code, @"202 Accepted");
index.wait_task(value.uid()).await; index.wait_task(value.uid()).await.succeeded();
// by default you shouldn't see the `_vectors` object // by default you shouldn't see the `_vectors` object
let (documents, _code) = index.get_all_documents(Default::default()).await; let (documents, _code) = index.get_all_documents(Default::default()).await;

View File

@ -6,14 +6,14 @@ use crate::json;
#[actix_rt::test] #[actix_rt::test]
async fn formatted_contain_wildcard() { async fn formatted_contain_wildcard() {
let server = Server::new().await; let server = Server::new_shared();
let index = server.index("test"); let index = server.unique_index();
index.update_settings(json!({ "displayedAttributes": ["id", "cattos"] })).await; index.update_settings(json!({ "displayedAttributes": ["id", "cattos"] })).await;
let documents = NESTED_DOCUMENTS.clone(); let documents = NESTED_DOCUMENTS.clone();
index.add_documents(documents, None).await; let (response, _) = index.add_documents(documents, None).await;
index.wait_task(1).await; index.wait_task(response.uid()).await;
index.search(json!({ "q": "pésti", "attributesToRetrieve": ["father", "mother"], "attributesToHighlight": ["father", "mother", "*"], "attributesToCrop": ["doggos"], "showMatchesPosition": true }), index.search(json!({ "q": "pésti", "attributesToRetrieve": ["father", "mother"], "attributesToHighlight": ["father", "mother", "*"], "attributesToCrop": ["doggos"], "showMatchesPosition": true }),
|response, code| |response, code|
@ -135,12 +135,7 @@ async fn formatted_contain_wildcard() {
#[actix_rt::test] #[actix_rt::test]
async fn format_nested() { async fn format_nested() {
let server = Server::new().await; let index = shared_index_with_nested_documents().await;
let index = server.index("test");
let documents = NESTED_DOCUMENTS.clone();
index.add_documents(documents, None).await;
index.wait_task(0).await;
index index
.search(json!({ "q": "pésti", "attributesToRetrieve": ["doggos"] }), |response, code| { .search(json!({ "q": "pésti", "attributesToRetrieve": ["doggos"] }), |response, code| {
@ -340,15 +335,15 @@ async fn format_nested() {
#[actix_rt::test] #[actix_rt::test]
async fn displayedattr_2_smol() { async fn displayedattr_2_smol() {
let server = Server::new().await; let server = Server::new_shared();
let index = server.index("test"); let index = server.unique_index();
// not enough displayed for the other settings // not enough displayed for the other settings
index.update_settings(json!({ "displayedAttributes": ["id"] })).await; index.update_settings(json!({ "displayedAttributes": ["id"] })).await;
let documents = NESTED_DOCUMENTS.clone(); let documents = NESTED_DOCUMENTS.clone();
index.add_documents(documents, None).await; let (response, _) = index.add_documents(documents, None).await;
index.wait_task(1).await; index.wait_task(response.uid()).await;
index index
.search(json!({ "attributesToRetrieve": ["father", "id"], "attributesToHighlight": ["mother"], "attributesToCrop": ["cattos"] }), .search(json!({ "attributesToRetrieve": ["father", "id"], "attributesToHighlight": ["mother"], "attributesToCrop": ["cattos"] }),
@ -538,15 +533,15 @@ async fn displayedattr_2_smol() {
#[cfg(feature = "default")] #[cfg(feature = "default")]
#[actix_rt::test] #[actix_rt::test]
async fn test_cjk_highlight() { async fn test_cjk_highlight() {
let server = Server::new().await; let server = Server::new_shared();
let index = server.index("test"); let index = server.unique_index();
let documents = json!([ let documents = json!([
{ "id": 0, "title": "この度、クーポンで無料で頂きました。" }, { "id": 0, "title": "この度、クーポンで無料で頂きました。" },
{ "id": 1, "title": "大卫到了扫罗那里" }, { "id": 1, "title": "大卫到了扫罗那里" },
]); ]);
index.add_documents(documents, None).await; let (response, _) = index.add_documents(documents, None).await;
index.wait_task(0).await; index.wait_task(response.uid()).await;
index index
.search(json!({"q": "", "attributesToHighlight": ["title"]}), |response, code| { .search(json!({"q": "", "attributesToHighlight": ["title"]}), |response, code| {

View File

@ -3,6 +3,9 @@ mod r#match;
mod matching_words; mod matching_words;
mod simple_token_kind; mod simple_token_kind;
use std::borrow::Cow;
use std::cmp::{max, min};
use charabia::{Language, SeparatorKind, Token, Tokenizer}; use charabia::{Language, SeparatorKind, Token, Tokenizer};
use either::Either; use either::Either;
pub use matching_words::MatchingWords; pub use matching_words::MatchingWords;
@ -10,10 +13,6 @@ use matching_words::{MatchType, PartialMatch};
use r#match::{Match, MatchPosition}; use r#match::{Match, MatchPosition};
use serde::Serialize; use serde::Serialize;
use simple_token_kind::SimpleTokenKind; use simple_token_kind::SimpleTokenKind;
use std::{
borrow::Cow,
cmp::{max, min},
};
const DEFAULT_CROP_MARKER: &str = ""; const DEFAULT_CROP_MARKER: &str = "";
const DEFAULT_HIGHLIGHT_PREFIX: &str = "<em>"; const DEFAULT_HIGHLIGHT_PREFIX: &str = "<em>";

View File

@ -366,14 +366,14 @@ pub struct FieldIdDocidFacetSender<'a>(&'a ExtractorSender);
impl FieldIdDocidFacetSender<'_> { impl FieldIdDocidFacetSender<'_> {
pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok()); debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok());
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &value)); let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value));
self.0 self.0
.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry }) .send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry })
} }
pub fn write_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> { pub fn write_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok()); debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok());
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &[])); let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, &[]));
self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry }) self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry })
} }

View File

@ -58,7 +58,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
context.index, context.index,
&context.db_fields_ids_map, &context.db_fields_ids_map,
)?; )?;
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); let geo_iter =
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
for res in content.iter_top_level_fields().chain(geo_iter) { for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data
@ -74,7 +75,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
let docid = update.docid(); let docid = update.docid();
let content = let content =
update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?; update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); let geo_iter =
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
for res in content.iter_top_level_fields().chain(geo_iter) { for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data
@ -84,7 +86,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
*entry -= 1; *entry -= 1;
} }
let content = update.updated(); let content = update.updated();
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); let geo_iter =
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
for res in content.iter_top_level_fields().chain(geo_iter) { for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data
@ -114,7 +117,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
DocumentChange::Insertion(insertion) => { DocumentChange::Insertion(insertion) => {
let docid = insertion.docid(); let docid = insertion.docid();
let content = insertion.inserted(); let content = insertion.inserted();
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); let geo_iter =
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
for res in content.iter_top_level_fields().chain(geo_iter) { for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data

View File

@ -3,7 +3,6 @@ use std::collections::HashMap;
use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder}; use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
use serde_json::Value; use serde_json::Value;
use crate::proximity::MAX_DISTANCE;
use crate::update::new::document::Document; use crate::update::new::document::Document;
use crate::update::new::extract::perm_json_p::{ use crate::update::new::extract::perm_json_p::{
seek_leaf_values_in_array, seek_leaf_values_in_object, select_field, seek_leaf_values_in_array, seek_leaf_values_in_object, select_field,
@ -13,6 +12,9 @@ use crate::{
MAX_WORD_LENGTH, MAX_WORD_LENGTH,
}; };
// todo: should be crate::proximity::MAX_DISTANCE but it has been forgotten
const MAX_DISTANCE: u32 = 8;
pub struct DocumentTokenizer<'a> { pub struct DocumentTokenizer<'a> {
pub tokenizer: &'a Tokenizer<'a>, pub tokenizer: &'a Tokenizer<'a>,
pub attribute_to_extract: Option<&'a [&'a str]>, pub attribute_to_extract: Option<&'a [&'a str]>,
@ -251,22 +253,22 @@ mod test {
]: "doggo", ]: "doggo",
[ [
2, 2,
MAX_DISTANCE, 8,
]: "doggo", ]: "doggo",
[ [
2, 2,
16, 16,
]: "catto", ]: "catto",
[ [
3, 5,
0, 0,
]: "10", ]: "10",
[ [
4, 6,
0, 0,
]: "pesti", ]: "pesti",
[ [
5, 7,
0, 0,
]: "23", ]: "23",
} }

View File

@ -6,11 +6,9 @@ use grenad::Sorter;
use heed::types::{Bytes, SerdeJson}; use heed::types::{Bytes, SerdeJson};
use heed::{BytesDecode, BytesEncode, RoTxn, RwTxn}; use heed::{BytesDecode, BytesEncode, RoTxn, RwTxn};
use super::extract::FacetKind;
use super::fst_merger_builder::FstMergerBuilder; use super::fst_merger_builder::FstMergerBuilder;
use super::KvReaderDelAdd; use super::KvReaderDelAdd;
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec}; use crate::heed_codec::facet::FacetGroupKey;
use crate::heed_codec::StrRefCodec;
use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::update::{create_sorter, MergeDeladdBtreesetString}; use crate::update::{create_sorter, MergeDeladdBtreesetString};
use crate::{ use crate::{

View File

@ -1,10 +1,12 @@
use std::{fs::File, io::BufWriter}; use std::fs::File;
use std::io::BufWriter;
use fst::{Set, SetBuilder, Streamer}; use fst::{Set, SetBuilder, Streamer};
use memmap2::Mmap; use memmap2::Mmap;
use tempfile::tempfile; use tempfile::tempfile;
use crate::{update::del_add::DelAdd, InternalError, Result}; use crate::update::del_add::DelAdd;
use crate::{InternalError, Result};
pub struct FstMergerBuilder<'a> { pub struct FstMergerBuilder<'a> {
stream: Option<fst::set::Stream<'a>>, stream: Option<fst::set::Stream<'a>>,

View File

@ -132,7 +132,7 @@ mod test {
} }
let mut deletions = DocumentDeletion::new(); let mut deletions = DocumentDeletion::new();
deletions.delete_documents_by_docids(vec![0, 2, 42].into_iter().collect()); deletions.delete_documents_by_docids(Vec::<u32>::new().into_iter().collect());
let indexer = Bump::new(); let indexer = Bump::new();
let index = TempIndex::new(); let index = TempIndex::new();

View File

@ -356,11 +356,11 @@ impl MergeChanges for MergeDocumentForUpdates {
let has_deletion = last_deletion.is_some(); let has_deletion = last_deletion.is_some();
if operations.is_empty() { if operations.is_empty() {
return if !is_new { return if is_new {
Ok(None)
} else {
let deletion = Deletion::create(docid, external_docid); let deletion = Deletion::create(docid, external_docid);
Ok(Some(DocumentChange::Deletion(deletion))) Ok(Some(DocumentChange::Deletion(deletion)))
} else {
Ok(None)
}; };
} }

View File

@ -13,7 +13,6 @@ use itertools::{merge_join_by, EitherOrBoth};
pub use partial_dump::PartialDump; pub use partial_dump::PartialDump;
use rand::SeedableRng as _; use rand::SeedableRng as _;
use raw_collections::RawMap; use raw_collections::RawMap;
use rayon::ThreadPool;
use time::OffsetDateTime; use time::OffsetDateTime;
pub use update_by_function::UpdateByFunction; pub use update_by_function::UpdateByFunction;
@ -136,7 +135,6 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>(
db_fields_ids_map: &'indexer FieldsIdsMap, db_fields_ids_map: &'indexer FieldsIdsMap,
new_fields_ids_map: FieldsIdsMap, new_fields_ids_map: FieldsIdsMap,
new_primary_key: Option<PrimaryKey<'pl>>, new_primary_key: Option<PrimaryKey<'pl>>,
pool: &ThreadPool,
document_changes: &DC, document_changes: &DC,
embedders: EmbeddingConfigs, embedders: EmbeddingConfigs,
must_stop_processing: &'indexer MSP, must_stop_processing: &'indexer MSP,
@ -152,9 +150,9 @@ where
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
let new_fields_ids_map = RwLock::new(new_fields_ids_map); let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads()); let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads());
let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads());
let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads());
let indexing_context = IndexingContext { let indexing_context = IndexingContext {
index, index,
@ -179,7 +177,6 @@ where
let document_ids = &mut document_ids; let document_ids = &mut document_ids;
// TODO manage the errors correctly // TODO manage the errors correctly
let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
let result = pool.in_place_scope(|_s| {
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
let _entered = span.enter(); let _entered = span.enter();
@ -188,9 +185,17 @@ where
// document but we need to create a function that collects and compresses documents. // document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.documents(); let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender, embedders); let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
let (finished_steps, step_name) = steps::extract_documents(); let (finished_steps, step_name) = steps::extract_documents();
extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; extract(document_changes,
&document_extractor,
indexing_context,
&mut extractor_allocs,
&datastore,
finished_steps,
total_steps,
step_name,
)?;
for document_extractor_data in datastore { for document_extractor_data in datastore {
let document_extractor_data = document_extractor_data.0.into_inner(); let document_extractor_data = document_extractor_data.0.into_inner();
@ -223,7 +228,15 @@ where
let (finished_steps, step_name) = steps::extract_facets(); let (finished_steps, step_name) = steps::extract_facets();
facet_field_ids_delta = merge_and_send_facet_docids( facet_field_ids_delta = merge_and_send_facet_docids(
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, &extractor_sender.field_id_docid_facet_sender(), finished_steps, total_steps, step_name)?, FacetedDocidsExtractor::run_extraction(grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
&extractor_sender.field_id_docid_facet_sender(),
finished_steps,
total_steps,
step_name,
)?,
FacetDatabases::new(index), FacetDatabases::new(index),
index, index,
extractor_sender.facet_docids(), extractor_sender.facet_docids(),
@ -360,7 +373,7 @@ where
let embedding_sender = extractor_sender.embeddings(); let embedding_sender = extractor_sender.embeddings();
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
let (finished_steps, step_name) = steps::extract_embeddings(); let (finished_steps, step_name) = steps::extract_embeddings();
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
@ -383,7 +396,7 @@ where
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
break 'geo; break 'geo;
}; };
let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
let (finished_steps, step_name) = steps::extract_geo_points(); let (finished_steps, step_name) = steps::extract_geo_points();
extract(document_changes, extract(document_changes,
&extractor, &extractor,
@ -419,9 +432,6 @@ where
// - [x] Extract fieldid facet number docids // - [x] Extract fieldid facet number docids
// - [x] Extract fieldid facet string docids // - [x] Extract fieldid facet string docids
Result::Ok(facet_field_ids_delta)
});
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
let _entered = span.enter(); let _entered = span.enter();
@ -429,7 +439,7 @@ where
(indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None }); (indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None });
} }
result Result::Ok(facet_field_ids_delta)
})?; })?;
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);

View File

@ -1,5 +1,4 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::io;
use hashbrown::HashSet; use hashbrown::HashSet;
use heed::types::Bytes; use heed::types::Bytes;

View File

@ -286,7 +286,7 @@ impl<'doc> MergedVectorDocument<'doc> {
) -> Result<Option<Self>> { ) -> Result<Option<Self>> {
let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?; let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?;
let new_doc = let new_doc =
VectorDocumentFromVersions::new(&external_document_id, versions, doc_alloc, embedders)?; VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?;
Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) }) Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) })
} }

View File

@ -1,13 +1,14 @@
use std::collections::HashSet;
use std::io::BufWriter; use std::io::BufWriter;
use fst::{Set, SetBuilder, Streamer}; use fst::{Set, SetBuilder, Streamer};
use memmap2::Mmap; use memmap2::Mmap;
use std::collections::HashSet;
use tempfile::tempfile; use tempfile::tempfile;
use crate::{index::PrefixSettings, update::del_add::DelAdd, InternalError, Prefix, Result};
use super::fst_merger_builder::FstMergerBuilder; use super::fst_merger_builder::FstMergerBuilder;
use crate::index::PrefixSettings;
use crate::update::del_add::DelAdd;
use crate::{InternalError, Prefix, Result};
pub struct WordFstBuilder<'a> { pub struct WordFstBuilder<'a> {
word_fst_builder: FstMergerBuilder<'a>, word_fst_builder: FstMergerBuilder<'a>,