mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 18:17:39 +08:00
Compare commits
16 Commits
cb44e630a6
...
3c640386af
Author | SHA1 | Date | |
---|---|---|---|
|
3c640386af | ||
|
91c58cfa38 | ||
|
9e8367f1e6 | ||
|
0e3c5d91ab | ||
|
695c2c6b99 | ||
|
94fb55bb6f | ||
|
009709eace | ||
|
a5d7ae23bd | ||
|
03886d0012 | ||
|
b427b9e88f | ||
|
8b95f5ccc6 | ||
|
da59a043ba | ||
|
da4d47b5d0 | ||
|
d0b1ba20cb | ||
|
c79ca9679b | ||
|
a934b0ac6a |
8
.github/workflows/flaky-tests.yml
vendored
8
.github/workflows/flaky-tests.yml
vendored
@ -21,10 +21,10 @@ jobs:
|
|||||||
- name: Install cargo-flaky
|
- name: Install cargo-flaky
|
||||||
run: cargo install cargo-flaky
|
run: cargo install cargo-flaky
|
||||||
- name: Run cargo flaky in the dumps
|
- name: Run cargo flaky in the dumps
|
||||||
run: cd dump; cargo flaky -i 100 --release
|
run: cd crates/dump; cargo flaky -i 100 --release
|
||||||
- name: Run cargo flaky in the index-scheduler
|
- name: Run cargo flaky in the index-scheduler
|
||||||
run: cd index-scheduler; cargo flaky -i 100 --release
|
run: cd crates/index-scheduler; cargo flaky -i 100 --release
|
||||||
- name: Run cargo flaky in the auth
|
- name: Run cargo flaky in the auth
|
||||||
run: cd meilisearch-auth; cargo flaky -i 100 --release
|
run: cd crates/meilisearch-auth; cargo flaky -i 100 --release
|
||||||
- name: Run cargo flaky in meilisearch
|
- name: Run cargo flaky in meilisearch
|
||||||
run: cd meilisearch; cargo flaky -i 100 --release
|
run: cd crates/meilisearch; cargo flaky -i 100 --release
|
||||||
|
@ -39,7 +39,7 @@ use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSe
|
|||||||
use meilisearch_types::milli::vector::parsed_vectors::{
|
use meilisearch_types::milli::vector::parsed_vectors::{
|
||||||
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
|
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
|
||||||
};
|
};
|
||||||
use meilisearch_types::milli::{self, Filter};
|
use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
||||||
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
|
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
|
||||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||||
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
|
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
|
||||||
@ -1277,7 +1277,6 @@ impl IndexScheduler {
|
|||||||
operations,
|
operations,
|
||||||
mut tasks,
|
mut tasks,
|
||||||
} => {
|
} => {
|
||||||
let indexer_config = self.index_mapper.indexer_config();
|
|
||||||
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
|
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
|
||||||
// this is made difficult by the fact we're doing private clones of the index scheduler and sending it
|
// this is made difficult by the fact we're doing private clones of the index scheduler and sending it
|
||||||
// to a fresh thread.
|
// to a fresh thread.
|
||||||
@ -1386,10 +1385,16 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if tasks.iter().any(|res| res.error.is_none()) {
|
if tasks.iter().any(|res| res.error.is_none()) {
|
||||||
/// TODO create a pool if needed
|
let local_pool;
|
||||||
// let pool = indexer_config.thread_pool.unwrap();
|
let pool = match &self.index_mapper.indexer_config().thread_pool {
|
||||||
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
|
Some(pool) => pool,
|
||||||
|
None => {
|
||||||
|
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||||
|
&local_pool
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO we want to multithread this
|
||||||
let document_changes = indexer.into_changes(
|
let document_changes = indexer.into_changes(
|
||||||
&indexer_alloc,
|
&indexer_alloc,
|
||||||
index,
|
index,
|
||||||
@ -1398,18 +1403,20 @@ impl IndexScheduler {
|
|||||||
&mut new_fields_ids_map,
|
&mut new_fields_ids_map,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
pool.install(|| {
|
||||||
indexer::index(
|
indexer::index(
|
||||||
index_wtxn,
|
index_wtxn,
|
||||||
index,
|
index,
|
||||||
&db_fields_ids_map,
|
&db_fields_ids_map,
|
||||||
new_fields_ids_map,
|
new_fields_ids_map,
|
||||||
primary_key_has_been_set.then_some(primary_key),
|
primary_key_has_been_set.then_some(primary_key),
|
||||||
&pool,
|
|
||||||
&document_changes,
|
&document_changes,
|
||||||
embedders,
|
embedders,
|
||||||
&|| must_stop_processing.get(),
|
&|| must_stop_processing.get(),
|
||||||
&send_progress,
|
&send_progress,
|
||||||
)?;
|
)
|
||||||
|
})
|
||||||
|
.unwrap()?;
|
||||||
|
|
||||||
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
||||||
}
|
}
|
||||||
@ -1489,11 +1496,18 @@ impl IndexScheduler {
|
|||||||
let result_count = Ok((candidates.len(), candidates.len())) as Result<_>;
|
let result_count = Ok((candidates.len(), candidates.len())) as Result<_>;
|
||||||
|
|
||||||
if task.error.is_none() {
|
if task.error.is_none() {
|
||||||
/// TODO create a pool if needed
|
let local_pool;
|
||||||
// let pool = indexer_config.thread_pool.unwrap();
|
let pool = match &self.index_mapper.indexer_config().thread_pool {
|
||||||
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
|
Some(pool) => pool,
|
||||||
|
None => {
|
||||||
|
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||||
|
&local_pool
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone());
|
pool.install(|| {
|
||||||
|
let indexer =
|
||||||
|
UpdateByFunction::new(candidates, context.clone(), code.clone());
|
||||||
let document_changes = indexer.into_changes(&primary_key)?;
|
let document_changes = indexer.into_changes(&primary_key)?;
|
||||||
let embedders = index.embedding_configs(index_wtxn)?;
|
let embedders = index.embedding_configs(index_wtxn)?;
|
||||||
let embedders = self.embedders(embedders)?;
|
let embedders = self.embedders(embedders)?;
|
||||||
@ -1504,13 +1518,16 @@ impl IndexScheduler {
|
|||||||
&db_fields_ids_map,
|
&db_fields_ids_map,
|
||||||
new_fields_ids_map,
|
new_fields_ids_map,
|
||||||
None, // cannot change primary key in DocumentEdition
|
None, // cannot change primary key in DocumentEdition
|
||||||
&pool,
|
|
||||||
&document_changes,
|
&document_changes,
|
||||||
embedders,
|
embedders,
|
||||||
&|| must_stop_processing.get(),
|
&|| must_stop_processing.get(),
|
||||||
&send_progress,
|
&send_progress,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
Result::Ok(())
|
||||||
|
})
|
||||||
|
.unwrap()?;
|
||||||
|
|
||||||
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1629,9 +1646,14 @@ impl IndexScheduler {
|
|||||||
.map_err(milli::Error::from)?;
|
.map_err(milli::Error::from)?;
|
||||||
|
|
||||||
if !tasks.iter().all(|res| res.error.is_some()) {
|
if !tasks.iter().all(|res| res.error.is_some()) {
|
||||||
/// TODO create a pool if needed
|
let local_pool;
|
||||||
// let pool = indexer_config.thread_pool.unwrap();
|
let pool = match &self.index_mapper.indexer_config().thread_pool {
|
||||||
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
|
Some(pool) => pool,
|
||||||
|
None => {
|
||||||
|
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||||
|
&local_pool
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let mut indexer = indexer::DocumentDeletion::new();
|
let mut indexer = indexer::DocumentDeletion::new();
|
||||||
indexer.delete_documents_by_docids(to_delete);
|
indexer.delete_documents_by_docids(to_delete);
|
||||||
@ -1639,18 +1661,20 @@ impl IndexScheduler {
|
|||||||
let embedders = index.embedding_configs(index_wtxn)?;
|
let embedders = index.embedding_configs(index_wtxn)?;
|
||||||
let embedders = self.embedders(embedders)?;
|
let embedders = self.embedders(embedders)?;
|
||||||
|
|
||||||
|
pool.install(|| {
|
||||||
indexer::index(
|
indexer::index(
|
||||||
index_wtxn,
|
index_wtxn,
|
||||||
index,
|
index,
|
||||||
&db_fields_ids_map,
|
&db_fields_ids_map,
|
||||||
new_fields_ids_map,
|
new_fields_ids_map,
|
||||||
None, // document deletion never changes primary key
|
None, // document deletion never changes primary key
|
||||||
&pool,
|
|
||||||
&document_changes,
|
&document_changes,
|
||||||
embedders,
|
embedders,
|
||||||
&|| must_stop_processing.get(),
|
&|| must_stop_processing.get(),
|
||||||
&send_progress,
|
&send_progress,
|
||||||
)?;
|
)
|
||||||
|
})
|
||||||
|
.unwrap()?;
|
||||||
|
|
||||||
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
||||||
}
|
}
|
||||||
|
@ -49,4 +49,18 @@ lazy_static! {
|
|||||||
pub static ref MEILISEARCH_IS_INDEXING: IntGauge =
|
pub static ref MEILISEARCH_IS_INDEXING: IntGauge =
|
||||||
register_int_gauge!(opts!("meilisearch_is_indexing", "Meilisearch Is Indexing"))
|
register_int_gauge!(opts!("meilisearch_is_indexing", "Meilisearch Is Indexing"))
|
||||||
.expect("Can't create a metric");
|
.expect("Can't create a metric");
|
||||||
|
pub static ref MEILISEARCH_SEARCH_QUEUE_SIZE: IntGauge = register_int_gauge!(opts!(
|
||||||
|
"meilisearch_search_queue_size",
|
||||||
|
"Meilisearch Search Queue Size"
|
||||||
|
))
|
||||||
|
.expect("Can't create a metric");
|
||||||
|
pub static ref MEILISEARCH_SEARCHES_RUNNING: IntGauge =
|
||||||
|
register_int_gauge!(opts!("meilisearch_searches_running", "Meilisearch Searches Running"))
|
||||||
|
.expect("Can't create a metric");
|
||||||
|
pub static ref MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED: IntGauge =
|
||||||
|
register_int_gauge!(opts!(
|
||||||
|
"meilisearch_searches_waiting_to_be_processed",
|
||||||
|
"Meilisearch Searches Being Processed"
|
||||||
|
))
|
||||||
|
.expect("Can't create a metric");
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ use prometheus::{Encoder, TextEncoder};
|
|||||||
use crate::extractors::authentication::policies::ActionPolicy;
|
use crate::extractors::authentication::policies::ActionPolicy;
|
||||||
use crate::extractors::authentication::{AuthenticationError, GuardedData};
|
use crate::extractors::authentication::{AuthenticationError, GuardedData};
|
||||||
use crate::routes::create_all_stats;
|
use crate::routes::create_all_stats;
|
||||||
|
use crate::search_queue::SearchQueue;
|
||||||
|
|
||||||
pub fn configure(config: &mut web::ServiceConfig) {
|
pub fn configure(config: &mut web::ServiceConfig) {
|
||||||
config.service(web::resource("").route(web::get().to(get_metrics)));
|
config.service(web::resource("").route(web::get().to(get_metrics)));
|
||||||
@ -18,6 +19,7 @@ pub fn configure(config: &mut web::ServiceConfig) {
|
|||||||
pub async fn get_metrics(
|
pub async fn get_metrics(
|
||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::METRICS_GET }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::METRICS_GET }>, Data<IndexScheduler>>,
|
||||||
auth_controller: Data<AuthController>,
|
auth_controller: Data<AuthController>,
|
||||||
|
search_queue: web::Data<SearchQueue>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
index_scheduler.features().check_metrics()?;
|
index_scheduler.features().check_metrics()?;
|
||||||
let auth_filters = index_scheduler.filters();
|
let auth_filters = index_scheduler.filters();
|
||||||
@ -35,6 +37,11 @@ pub async fn get_metrics(
|
|||||||
crate::metrics::MEILISEARCH_USED_DB_SIZE_BYTES.set(response.used_database_size as i64);
|
crate::metrics::MEILISEARCH_USED_DB_SIZE_BYTES.set(response.used_database_size as i64);
|
||||||
crate::metrics::MEILISEARCH_INDEX_COUNT.set(response.indexes.len() as i64);
|
crate::metrics::MEILISEARCH_INDEX_COUNT.set(response.indexes.len() as i64);
|
||||||
|
|
||||||
|
crate::metrics::MEILISEARCH_SEARCH_QUEUE_SIZE.set(search_queue.capacity() as i64);
|
||||||
|
crate::metrics::MEILISEARCH_SEARCHES_RUNNING.set(search_queue.searches_running() as i64);
|
||||||
|
crate::metrics::MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED
|
||||||
|
.set(search_queue.searches_waiting() as i64);
|
||||||
|
|
||||||
for (index, value) in response.indexes.iter() {
|
for (index, value) in response.indexes.iter() {
|
||||||
crate::metrics::MEILISEARCH_INDEX_DOCS_COUNT
|
crate::metrics::MEILISEARCH_INDEX_DOCS_COUNT
|
||||||
.with_label_values(&[index])
|
.with_label_values(&[index])
|
||||||
|
@ -18,6 +18,8 @@
|
|||||||
//! And should drop the Permit only once you have freed all the RAM consumed by the method.
|
//! And should drop the Permit only once you have freed all the RAM consumed by the method.
|
||||||
|
|
||||||
use std::num::NonZeroUsize;
|
use std::num::NonZeroUsize;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use rand::rngs::StdRng;
|
use rand::rngs::StdRng;
|
||||||
@ -33,6 +35,8 @@ pub struct SearchQueue {
|
|||||||
/// If we have waited longer than this to get a permit, we should abort the search request entirely.
|
/// If we have waited longer than this to get a permit, we should abort the search request entirely.
|
||||||
/// The client probably already closed the connection, but we have no way to find out.
|
/// The client probably already closed the connection, but we have no way to find out.
|
||||||
time_to_abort: Duration,
|
time_to_abort: Duration,
|
||||||
|
searches_running: Arc<AtomicUsize>,
|
||||||
|
searches_waiting_to_be_processed: Arc<AtomicUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// You should only run search requests while holding this permit.
|
/// You should only run search requests while holding this permit.
|
||||||
@ -68,14 +72,41 @@ impl SearchQueue {
|
|||||||
// so let's not allocate any RAM and keep a capacity of 1.
|
// so let's not allocate any RAM and keep a capacity of 1.
|
||||||
let (sender, receiver) = mpsc::channel(1);
|
let (sender, receiver) = mpsc::channel(1);
|
||||||
|
|
||||||
tokio::task::spawn(Self::run(capacity, paralellism, receiver));
|
let instance = Self {
|
||||||
Self { sender, capacity, time_to_abort: Duration::from_secs(60) }
|
sender,
|
||||||
|
capacity,
|
||||||
|
time_to_abort: Duration::from_secs(60),
|
||||||
|
searches_running: Default::default(),
|
||||||
|
searches_waiting_to_be_processed: Default::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::task::spawn(Self::run(
|
||||||
|
capacity,
|
||||||
|
paralellism,
|
||||||
|
receiver,
|
||||||
|
Arc::clone(&instance.searches_running),
|
||||||
|
Arc::clone(&instance.searches_waiting_to_be_processed),
|
||||||
|
));
|
||||||
|
|
||||||
|
instance
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self {
|
pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self {
|
||||||
Self { time_to_abort, ..self }
|
Self { time_to_abort, ..self }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn capacity(&self) -> usize {
|
||||||
|
self.capacity
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn searches_running(&self) -> usize {
|
||||||
|
self.searches_running.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn searches_waiting(&self) -> usize {
|
||||||
|
self.searches_waiting_to_be_processed.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
/// This function is the main loop, it's in charge on scheduling which search request should execute first and
|
/// This function is the main loop, it's in charge on scheduling which search request should execute first and
|
||||||
/// how many should executes at the same time.
|
/// how many should executes at the same time.
|
||||||
///
|
///
|
||||||
@ -84,6 +115,8 @@ impl SearchQueue {
|
|||||||
capacity: usize,
|
capacity: usize,
|
||||||
parallelism: NonZeroUsize,
|
parallelism: NonZeroUsize,
|
||||||
mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>,
|
mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>,
|
||||||
|
metric_searches_running: Arc<AtomicUsize>,
|
||||||
|
metric_searches_waiting: Arc<AtomicUsize>,
|
||||||
) {
|
) {
|
||||||
let mut queue: Vec<oneshot::Sender<Permit>> = Default::default();
|
let mut queue: Vec<oneshot::Sender<Permit>> = Default::default();
|
||||||
let mut rng: StdRng = StdRng::from_entropy();
|
let mut rng: StdRng = StdRng::from_entropy();
|
||||||
@ -133,6 +166,9 @@ impl SearchQueue {
|
|||||||
queue.push(search_request);
|
queue.push(search_request);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metric_searches_running.store(searches_running, Ordering::Relaxed);
|
||||||
|
metric_searches_waiting.store(queue.len(), Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -389,3 +389,25 @@ pub static VECTOR_DOCUMENTS: Lazy<Value> = Lazy::new(|| {
|
|||||||
},
|
},
|
||||||
])
|
])
|
||||||
});
|
});
|
||||||
|
|
||||||
|
pub async fn shared_index_with_test_set() -> &'static Index<'static, Shared> {
|
||||||
|
static INDEX: OnceCell<Index<'static, Shared>> = OnceCell::const_new();
|
||||||
|
INDEX
|
||||||
|
.get_or_init(|| async {
|
||||||
|
let server = Server::new_shared();
|
||||||
|
let index = server._index("SHARED_TEST_SET").to_shared();
|
||||||
|
let url = format!("/indexes/{}/documents", urlencoding::encode(index.uid.as_ref()));
|
||||||
|
let (response, code) = index
|
||||||
|
.service
|
||||||
|
.post_str(
|
||||||
|
url,
|
||||||
|
include_str!("../assets/test_set.json"),
|
||||||
|
vec![("content-type", "application/json")],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(code, 202);
|
||||||
|
index.wait_task(response.uid()).await;
|
||||||
|
index
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
@ -4,24 +4,27 @@ use meili_snap::*;
|
|||||||
use urlencoding::encode as urlencode;
|
use urlencoding::encode as urlencode;
|
||||||
|
|
||||||
use crate::common::encoder::Encoder;
|
use crate::common::encoder::Encoder;
|
||||||
use crate::common::{GetAllDocumentsOptions, Server, Value};
|
use crate::common::{
|
||||||
|
shared_does_not_exists_index, shared_empty_index, shared_index_with_test_set,
|
||||||
|
GetAllDocumentsOptions, Server, Value,
|
||||||
|
};
|
||||||
use crate::json;
|
use crate::json;
|
||||||
|
|
||||||
// TODO: partial test since we are testing error, amd error is not yet fully implemented in
|
// TODO: partial test since we are testing error, amd error is not yet fully implemented in
|
||||||
// transplant
|
// transplant
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn get_unexisting_index_single_document() {
|
async fn get_unexisting_index_single_document() {
|
||||||
let server = Server::new().await;
|
let (_response, code) = shared_does_not_exists_index().await.get_document(1, None).await;
|
||||||
let (_response, code) = server.index("test").get_document(1, None).await;
|
|
||||||
assert_eq!(code, 404);
|
assert_eq!(code, 404);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn error_get_unexisting_document() {
|
async fn error_get_unexisting_document() {
|
||||||
let server = Server::new().await;
|
let server = Server::new_shared();
|
||||||
let index = server.index("test");
|
let index = server.unique_index();
|
||||||
index.create(None).await;
|
let (task, _code) = index.create(None).await;
|
||||||
index.wait_task(0).await;
|
index.wait_task(task.uid()).await.succeeded();
|
||||||
|
|
||||||
let (response, code) = index.get_document(1, None).await;
|
let (response, code) = index.get_document(1, None).await;
|
||||||
|
|
||||||
let expected_response = json!({
|
let expected_response = json!({
|
||||||
@ -37,18 +40,19 @@ async fn error_get_unexisting_document() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn get_document() {
|
async fn get_document() {
|
||||||
let server = Server::new().await;
|
let server = Server::new_shared();
|
||||||
let index = server.index("test");
|
let index = server.unique_index();
|
||||||
index.create(None).await;
|
let (task, _code) = index.create(None).await;
|
||||||
|
index.wait_task(task.uid()).await.succeeded();
|
||||||
let documents = json!([
|
let documents = json!([
|
||||||
{
|
{
|
||||||
"id": 0,
|
"id": 0,
|
||||||
"nested": { "content": "foobar" },
|
"nested": { "content": "foobar" },
|
||||||
}
|
}
|
||||||
]);
|
]);
|
||||||
let (_, code) = index.add_documents(documents, None).await;
|
let (task, code) = index.add_documents(documents, None).await;
|
||||||
assert_eq!(code, 202);
|
assert_eq!(code, 202);
|
||||||
index.wait_task(1).await;
|
index.wait_task(task.uid()).await.succeeded();
|
||||||
let (response, code) = index.get_document(0, None).await;
|
let (response, code) = index.get_document(0, None).await;
|
||||||
assert_eq!(code, 200);
|
assert_eq!(code, 200);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@ -81,12 +85,11 @@ async fn get_document() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn error_get_unexisting_index_all_documents() {
|
async fn error_get_unexisting_index_all_documents() {
|
||||||
let server = Server::new().await;
|
let index = shared_does_not_exists_index().await;
|
||||||
let (response, code) =
|
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
||||||
server.index("test").get_all_documents(GetAllDocumentsOptions::default()).await;
|
|
||||||
|
|
||||||
let expected_response = json!({
|
let expected_response = json!({
|
||||||
"message": "Index `test` not found.",
|
"message": "Index `DOES_NOT_EXISTS` not found.",
|
||||||
"code": "index_not_found",
|
"code": "index_not_found",
|
||||||
"type": "invalid_request",
|
"type": "invalid_request",
|
||||||
"link": "https://docs.meilisearch.com/errors#index_not_found"
|
"link": "https://docs.meilisearch.com/errors#index_not_found"
|
||||||
@ -98,12 +101,7 @@ async fn error_get_unexisting_index_all_documents() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn get_no_document() {
|
async fn get_no_document() {
|
||||||
let server = Server::new().await;
|
let index = shared_empty_index().await;
|
||||||
let index = server.index("test");
|
|
||||||
let (_, code) = index.create(None).await;
|
|
||||||
assert_eq!(code, 202);
|
|
||||||
|
|
||||||
index.wait_task(0).await;
|
|
||||||
|
|
||||||
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
||||||
assert_eq!(code, 200);
|
assert_eq!(code, 200);
|
||||||
@ -112,14 +110,12 @@ async fn get_no_document() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn get_all_documents_no_options() {
|
async fn get_all_documents_no_options() {
|
||||||
let server = Server::new().await;
|
let index = shared_index_with_test_set().await;
|
||||||
let index = server.index("test");
|
|
||||||
index.load_test_set().await;
|
|
||||||
|
|
||||||
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
||||||
assert_eq!(code, 200);
|
assert_eq!(code, 200);
|
||||||
let arr = response["results"].as_array().unwrap();
|
let results = response["results"].as_array().unwrap();
|
||||||
assert_eq!(arr.len(), 20);
|
assert_eq!(results.len(), 20);
|
||||||
let first = json!({
|
let first = json!({
|
||||||
"id":0,
|
"id":0,
|
||||||
"isActive":false,
|
"isActive":false,
|
||||||
@ -138,19 +134,16 @@ async fn get_all_documents_no_options() {
|
|||||||
"longitude":-145.725388,
|
"longitude":-145.725388,
|
||||||
"tags":["bug"
|
"tags":["bug"
|
||||||
,"bug"]});
|
,"bug"]});
|
||||||
assert_eq!(first, arr[0]);
|
assert_eq!(first, results[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn get_all_documents_no_options_with_response_compression() {
|
async fn get_all_documents_no_options_with_response_compression() {
|
||||||
let server = Server::new().await;
|
let index = shared_index_with_test_set().await;
|
||||||
let index_uid = "test";
|
|
||||||
let index = server.index(index_uid);
|
|
||||||
index.load_test_set().await;
|
|
||||||
|
|
||||||
let app = server.init_web_app().await;
|
let app = Server::new_shared().init_web_app().await;
|
||||||
let req = test::TestRequest::get()
|
let req = test::TestRequest::get()
|
||||||
.uri(&format!("/indexes/{}/documents?", urlencode(index_uid)))
|
.uri(&format!("/indexes/{}/documents?", urlencode(&index.uid)))
|
||||||
.insert_header((ACCEPT_ENCODING, "gzip"))
|
.insert_header((ACCEPT_ENCODING, "gzip"))
|
||||||
.to_request();
|
.to_request();
|
||||||
|
|
||||||
@ -169,9 +162,7 @@ async fn get_all_documents_no_options_with_response_compression() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_get_all_documents_limit() {
|
async fn test_get_all_documents_limit() {
|
||||||
let server = Server::new().await;
|
let index = shared_index_with_test_set().await;
|
||||||
let index = server.index("test");
|
|
||||||
index.load_test_set().await;
|
|
||||||
|
|
||||||
let (response, code) = index
|
let (response, code) = index
|
||||||
.get_all_documents(GetAllDocumentsOptions { limit: Some(5), ..Default::default() })
|
.get_all_documents(GetAllDocumentsOptions { limit: Some(5), ..Default::default() })
|
||||||
@ -186,9 +177,7 @@ async fn test_get_all_documents_limit() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_get_all_documents_offset() {
|
async fn test_get_all_documents_offset() {
|
||||||
let server = Server::new().await;
|
let index = shared_index_with_test_set().await;
|
||||||
let index = server.index("test");
|
|
||||||
index.load_test_set().await;
|
|
||||||
|
|
||||||
let (response, code) = index
|
let (response, code) = index
|
||||||
.get_all_documents(GetAllDocumentsOptions { offset: Some(5), ..Default::default() })
|
.get_all_documents(GetAllDocumentsOptions { offset: Some(5), ..Default::default() })
|
||||||
@ -203,9 +192,7 @@ async fn test_get_all_documents_offset() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_get_all_documents_attributes_to_retrieve() {
|
async fn test_get_all_documents_attributes_to_retrieve() {
|
||||||
let server = Server::new().await;
|
let index = shared_index_with_test_set().await;
|
||||||
let index = server.index("test");
|
|
||||||
index.load_test_set().await;
|
|
||||||
|
|
||||||
let (response, code) = index
|
let (response, code) = index
|
||||||
.get_all_documents(GetAllDocumentsOptions {
|
.get_all_documents(GetAllDocumentsOptions {
|
||||||
@ -286,9 +273,11 @@ async fn test_get_all_documents_attributes_to_retrieve() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn get_document_s_nested_attributes_to_retrieve() {
|
async fn get_document_s_nested_attributes_to_retrieve() {
|
||||||
let server = Server::new().await;
|
let server = Server::new_shared();
|
||||||
let index = server.index("test");
|
let index = server.unique_index();
|
||||||
index.create(None).await;
|
let (task, _code) = index.create(None).await;
|
||||||
|
index.wait_task(task.uid()).await.succeeded();
|
||||||
|
|
||||||
let documents = json!([
|
let documents = json!([
|
||||||
{
|
{
|
||||||
"id": 0,
|
"id": 0,
|
||||||
@ -302,9 +291,9 @@ async fn get_document_s_nested_attributes_to_retrieve() {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
]);
|
]);
|
||||||
let (_, code) = index.add_documents(documents, None).await;
|
let (task, code) = index.add_documents(documents, None).await;
|
||||||
assert_eq!(code, 202);
|
assert_eq!(code, 202);
|
||||||
index.wait_task(1).await;
|
index.wait_task(task.uid()).await.succeeded();
|
||||||
|
|
||||||
let (response, code) = index.get_document(0, Some(json!({ "fields": ["content"] }))).await;
|
let (response, code) = index.get_document(0, Some(json!({ "fields": ["content"] }))).await;
|
||||||
assert_eq!(code, 200);
|
assert_eq!(code, 200);
|
||||||
@ -343,10 +332,10 @@ async fn get_document_s_nested_attributes_to_retrieve() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn get_documents_displayed_attributes_is_ignored() {
|
async fn get_documents_displayed_attributes_is_ignored() {
|
||||||
let server = Server::new().await;
|
let server = Server::new_shared();
|
||||||
let index = server.index("test");
|
let index = server.unique_index();
|
||||||
index.update_settings(json!({"displayedAttributes": ["gender"]})).await;
|
|
||||||
index.load_test_set().await;
|
index.load_test_set().await;
|
||||||
|
index.update_settings(json!({"displayedAttributes": ["gender"]})).await;
|
||||||
|
|
||||||
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
||||||
assert_eq!(code, 200);
|
assert_eq!(code, 200);
|
||||||
@ -366,10 +355,10 @@ async fn get_documents_displayed_attributes_is_ignored() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn get_document_by_filter() {
|
async fn get_document_by_filter() {
|
||||||
let server = Server::new().await;
|
let server = Server::new_shared();
|
||||||
let index = server.index("doggo");
|
let index = server.unique_index();
|
||||||
index.update_settings_filterable_attributes(json!(["color"])).await;
|
index.update_settings_filterable_attributes(json!(["color"])).await;
|
||||||
index
|
let (task, _code) = index
|
||||||
.add_documents(
|
.add_documents(
|
||||||
json!([
|
json!([
|
||||||
{ "id": 0, "color": "red" },
|
{ "id": 0, "color": "red" },
|
||||||
@ -380,7 +369,7 @@ async fn get_document_by_filter() {
|
|||||||
Some("id"),
|
Some("id"),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
index.wait_task(1).await;
|
index.wait_task(task.uid()).await.succeeded();
|
||||||
|
|
||||||
let (response, code) = index.get_document_by_filter(json!({})).await;
|
let (response, code) = index.get_document_by_filter(json!({})).await;
|
||||||
let (response2, code2) = index.get_all_documents_raw("").await;
|
let (response2, code2) = index.get_all_documents_raw("").await;
|
||||||
@ -552,7 +541,7 @@ async fn get_document_with_vectors() {
|
|||||||
}))
|
}))
|
||||||
.await;
|
.await;
|
||||||
snapshot!(code, @"202 Accepted");
|
snapshot!(code, @"202 Accepted");
|
||||||
server.wait_task(response.uid()).await;
|
server.wait_task(response.uid()).await.succeeded();
|
||||||
|
|
||||||
let documents = json!([
|
let documents = json!([
|
||||||
{"id": 0, "name": "kefir", "_vectors": { "manual": [0, 0, 0] }},
|
{"id": 0, "name": "kefir", "_vectors": { "manual": [0, 0, 0] }},
|
||||||
@ -560,7 +549,7 @@ async fn get_document_with_vectors() {
|
|||||||
]);
|
]);
|
||||||
let (value, code) = index.add_documents(documents, None).await;
|
let (value, code) = index.add_documents(documents, None).await;
|
||||||
snapshot!(code, @"202 Accepted");
|
snapshot!(code, @"202 Accepted");
|
||||||
index.wait_task(value.uid()).await;
|
index.wait_task(value.uid()).await.succeeded();
|
||||||
|
|
||||||
// by default you shouldn't see the `_vectors` object
|
// by default you shouldn't see the `_vectors` object
|
||||||
let (documents, _code) = index.get_all_documents(Default::default()).await;
|
let (documents, _code) = index.get_all_documents(Default::default()).await;
|
||||||
|
@ -6,14 +6,14 @@ use crate::json;
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn formatted_contain_wildcard() {
|
async fn formatted_contain_wildcard() {
|
||||||
let server = Server::new().await;
|
let server = Server::new_shared();
|
||||||
let index = server.index("test");
|
let index = server.unique_index();
|
||||||
|
|
||||||
index.update_settings(json!({ "displayedAttributes": ["id", "cattos"] })).await;
|
index.update_settings(json!({ "displayedAttributes": ["id", "cattos"] })).await;
|
||||||
|
|
||||||
let documents = NESTED_DOCUMENTS.clone();
|
let documents = NESTED_DOCUMENTS.clone();
|
||||||
index.add_documents(documents, None).await;
|
let (response, _) = index.add_documents(documents, None).await;
|
||||||
index.wait_task(1).await;
|
index.wait_task(response.uid()).await;
|
||||||
|
|
||||||
index.search(json!({ "q": "pésti", "attributesToRetrieve": ["father", "mother"], "attributesToHighlight": ["father", "mother", "*"], "attributesToCrop": ["doggos"], "showMatchesPosition": true }),
|
index.search(json!({ "q": "pésti", "attributesToRetrieve": ["father", "mother"], "attributesToHighlight": ["father", "mother", "*"], "attributesToCrop": ["doggos"], "showMatchesPosition": true }),
|
||||||
|response, code|
|
|response, code|
|
||||||
@ -135,12 +135,7 @@ async fn formatted_contain_wildcard() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn format_nested() {
|
async fn format_nested() {
|
||||||
let server = Server::new().await;
|
let index = shared_index_with_nested_documents().await;
|
||||||
let index = server.index("test");
|
|
||||||
|
|
||||||
let documents = NESTED_DOCUMENTS.clone();
|
|
||||||
index.add_documents(documents, None).await;
|
|
||||||
index.wait_task(0).await;
|
|
||||||
|
|
||||||
index
|
index
|
||||||
.search(json!({ "q": "pésti", "attributesToRetrieve": ["doggos"] }), |response, code| {
|
.search(json!({ "q": "pésti", "attributesToRetrieve": ["doggos"] }), |response, code| {
|
||||||
@ -340,15 +335,15 @@ async fn format_nested() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn displayedattr_2_smol() {
|
async fn displayedattr_2_smol() {
|
||||||
let server = Server::new().await;
|
let server = Server::new_shared();
|
||||||
let index = server.index("test");
|
let index = server.unique_index();
|
||||||
|
|
||||||
// not enough displayed for the other settings
|
// not enough displayed for the other settings
|
||||||
index.update_settings(json!({ "displayedAttributes": ["id"] })).await;
|
index.update_settings(json!({ "displayedAttributes": ["id"] })).await;
|
||||||
|
|
||||||
let documents = NESTED_DOCUMENTS.clone();
|
let documents = NESTED_DOCUMENTS.clone();
|
||||||
index.add_documents(documents, None).await;
|
let (response, _) = index.add_documents(documents, None).await;
|
||||||
index.wait_task(1).await;
|
index.wait_task(response.uid()).await;
|
||||||
|
|
||||||
index
|
index
|
||||||
.search(json!({ "attributesToRetrieve": ["father", "id"], "attributesToHighlight": ["mother"], "attributesToCrop": ["cattos"] }),
|
.search(json!({ "attributesToRetrieve": ["father", "id"], "attributesToHighlight": ["mother"], "attributesToCrop": ["cattos"] }),
|
||||||
@ -538,15 +533,15 @@ async fn displayedattr_2_smol() {
|
|||||||
#[cfg(feature = "default")]
|
#[cfg(feature = "default")]
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_cjk_highlight() {
|
async fn test_cjk_highlight() {
|
||||||
let server = Server::new().await;
|
let server = Server::new_shared();
|
||||||
let index = server.index("test");
|
let index = server.unique_index();
|
||||||
|
|
||||||
let documents = json!([
|
let documents = json!([
|
||||||
{ "id": 0, "title": "この度、クーポンで無料で頂きました。" },
|
{ "id": 0, "title": "この度、クーポンで無料で頂きました。" },
|
||||||
{ "id": 1, "title": "大卫到了扫罗那里" },
|
{ "id": 1, "title": "大卫到了扫罗那里" },
|
||||||
]);
|
]);
|
||||||
index.add_documents(documents, None).await;
|
let (response, _) = index.add_documents(documents, None).await;
|
||||||
index.wait_task(0).await;
|
index.wait_task(response.uid()).await;
|
||||||
|
|
||||||
index
|
index
|
||||||
.search(json!({"q": "で", "attributesToHighlight": ["title"]}), |response, code| {
|
.search(json!({"q": "で", "attributesToHighlight": ["title"]}), |response, code| {
|
||||||
|
@ -3,6 +3,9 @@ mod r#match;
|
|||||||
mod matching_words;
|
mod matching_words;
|
||||||
mod simple_token_kind;
|
mod simple_token_kind;
|
||||||
|
|
||||||
|
use std::borrow::Cow;
|
||||||
|
use std::cmp::{max, min};
|
||||||
|
|
||||||
use charabia::{Language, SeparatorKind, Token, Tokenizer};
|
use charabia::{Language, SeparatorKind, Token, Tokenizer};
|
||||||
use either::Either;
|
use either::Either;
|
||||||
pub use matching_words::MatchingWords;
|
pub use matching_words::MatchingWords;
|
||||||
@ -10,10 +13,6 @@ use matching_words::{MatchType, PartialMatch};
|
|||||||
use r#match::{Match, MatchPosition};
|
use r#match::{Match, MatchPosition};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use simple_token_kind::SimpleTokenKind;
|
use simple_token_kind::SimpleTokenKind;
|
||||||
use std::{
|
|
||||||
borrow::Cow,
|
|
||||||
cmp::{max, min},
|
|
||||||
};
|
|
||||||
|
|
||||||
const DEFAULT_CROP_MARKER: &str = "…";
|
const DEFAULT_CROP_MARKER: &str = "…";
|
||||||
const DEFAULT_HIGHLIGHT_PREFIX: &str = "<em>";
|
const DEFAULT_HIGHLIGHT_PREFIX: &str = "<em>";
|
||||||
|
@ -366,14 +366,14 @@ pub struct FieldIdDocidFacetSender<'a>(&'a ExtractorSender);
|
|||||||
impl FieldIdDocidFacetSender<'_> {
|
impl FieldIdDocidFacetSender<'_> {
|
||||||
pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
|
pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok());
|
debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok());
|
||||||
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &value));
|
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value));
|
||||||
self.0
|
self.0
|
||||||
.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry })
|
.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
|
pub fn write_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
|
||||||
debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok());
|
debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok());
|
||||||
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &[]));
|
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, &[]));
|
||||||
self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry })
|
self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,7 +58,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
|
|||||||
context.index,
|
context.index,
|
||||||
&context.db_fields_ids_map,
|
&context.db_fields_ids_map,
|
||||||
)?;
|
)?;
|
||||||
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
let geo_iter =
|
||||||
|
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
||||||
for res in content.iter_top_level_fields().chain(geo_iter) {
|
for res in content.iter_top_level_fields().chain(geo_iter) {
|
||||||
let (f, _) = res?;
|
let (f, _) = res?;
|
||||||
let entry = document_extractor_data
|
let entry = document_extractor_data
|
||||||
@ -74,7 +75,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
|
|||||||
let docid = update.docid();
|
let docid = update.docid();
|
||||||
let content =
|
let content =
|
||||||
update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
||||||
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
let geo_iter =
|
||||||
|
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
||||||
for res in content.iter_top_level_fields().chain(geo_iter) {
|
for res in content.iter_top_level_fields().chain(geo_iter) {
|
||||||
let (f, _) = res?;
|
let (f, _) = res?;
|
||||||
let entry = document_extractor_data
|
let entry = document_extractor_data
|
||||||
@ -84,7 +86,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
|
|||||||
*entry -= 1;
|
*entry -= 1;
|
||||||
}
|
}
|
||||||
let content = update.updated();
|
let content = update.updated();
|
||||||
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
let geo_iter =
|
||||||
|
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
||||||
for res in content.iter_top_level_fields().chain(geo_iter) {
|
for res in content.iter_top_level_fields().chain(geo_iter) {
|
||||||
let (f, _) = res?;
|
let (f, _) = res?;
|
||||||
let entry = document_extractor_data
|
let entry = document_extractor_data
|
||||||
@ -114,7 +117,8 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
|
|||||||
DocumentChange::Insertion(insertion) => {
|
DocumentChange::Insertion(insertion) => {
|
||||||
let docid = insertion.docid();
|
let docid = insertion.docid();
|
||||||
let content = insertion.inserted();
|
let content = insertion.inserted();
|
||||||
let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
let geo_iter =
|
||||||
|
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
|
||||||
for res in content.iter_top_level_fields().chain(geo_iter) {
|
for res in content.iter_top_level_fields().chain(geo_iter) {
|
||||||
let (f, _) = res?;
|
let (f, _) = res?;
|
||||||
let entry = document_extractor_data
|
let entry = document_extractor_data
|
||||||
|
@ -3,7 +3,6 @@ use std::collections::HashMap;
|
|||||||
use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
|
use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::proximity::MAX_DISTANCE;
|
|
||||||
use crate::update::new::document::Document;
|
use crate::update::new::document::Document;
|
||||||
use crate::update::new::extract::perm_json_p::{
|
use crate::update::new::extract::perm_json_p::{
|
||||||
seek_leaf_values_in_array, seek_leaf_values_in_object, select_field,
|
seek_leaf_values_in_array, seek_leaf_values_in_object, select_field,
|
||||||
@ -13,6 +12,9 @@ use crate::{
|
|||||||
MAX_WORD_LENGTH,
|
MAX_WORD_LENGTH,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// todo: should be crate::proximity::MAX_DISTANCE but it has been forgotten
|
||||||
|
const MAX_DISTANCE: u32 = 8;
|
||||||
|
|
||||||
pub struct DocumentTokenizer<'a> {
|
pub struct DocumentTokenizer<'a> {
|
||||||
pub tokenizer: &'a Tokenizer<'a>,
|
pub tokenizer: &'a Tokenizer<'a>,
|
||||||
pub attribute_to_extract: Option<&'a [&'a str]>,
|
pub attribute_to_extract: Option<&'a [&'a str]>,
|
||||||
@ -251,22 +253,22 @@ mod test {
|
|||||||
]: "doggo",
|
]: "doggo",
|
||||||
[
|
[
|
||||||
2,
|
2,
|
||||||
MAX_DISTANCE,
|
8,
|
||||||
]: "doggo",
|
]: "doggo",
|
||||||
[
|
[
|
||||||
2,
|
2,
|
||||||
16,
|
16,
|
||||||
]: "catto",
|
]: "catto",
|
||||||
[
|
[
|
||||||
3,
|
5,
|
||||||
0,
|
0,
|
||||||
]: "10",
|
]: "10",
|
||||||
[
|
[
|
||||||
4,
|
6,
|
||||||
0,
|
0,
|
||||||
]: "pesti",
|
]: "pesti",
|
||||||
[
|
[
|
||||||
5,
|
7,
|
||||||
0,
|
0,
|
||||||
]: "23",
|
]: "23",
|
||||||
}
|
}
|
||||||
|
@ -6,11 +6,9 @@ use grenad::Sorter;
|
|||||||
use heed::types::{Bytes, SerdeJson};
|
use heed::types::{Bytes, SerdeJson};
|
||||||
use heed::{BytesDecode, BytesEncode, RoTxn, RwTxn};
|
use heed::{BytesDecode, BytesEncode, RoTxn, RwTxn};
|
||||||
|
|
||||||
use super::extract::FacetKind;
|
|
||||||
use super::fst_merger_builder::FstMergerBuilder;
|
use super::fst_merger_builder::FstMergerBuilder;
|
||||||
use super::KvReaderDelAdd;
|
use super::KvReaderDelAdd;
|
||||||
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec};
|
use crate::heed_codec::facet::FacetGroupKey;
|
||||||
use crate::heed_codec::StrRefCodec;
|
|
||||||
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
|
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
|
||||||
use crate::update::{create_sorter, MergeDeladdBtreesetString};
|
use crate::update::{create_sorter, MergeDeladdBtreesetString};
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -1,10 +1,12 @@
|
|||||||
use std::{fs::File, io::BufWriter};
|
use std::fs::File;
|
||||||
|
use std::io::BufWriter;
|
||||||
|
|
||||||
use fst::{Set, SetBuilder, Streamer};
|
use fst::{Set, SetBuilder, Streamer};
|
||||||
use memmap2::Mmap;
|
use memmap2::Mmap;
|
||||||
use tempfile::tempfile;
|
use tempfile::tempfile;
|
||||||
|
|
||||||
use crate::{update::del_add::DelAdd, InternalError, Result};
|
use crate::update::del_add::DelAdd;
|
||||||
|
use crate::{InternalError, Result};
|
||||||
|
|
||||||
pub struct FstMergerBuilder<'a> {
|
pub struct FstMergerBuilder<'a> {
|
||||||
stream: Option<fst::set::Stream<'a>>,
|
stream: Option<fst::set::Stream<'a>>,
|
||||||
|
@ -132,7 +132,7 @@ mod test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut deletions = DocumentDeletion::new();
|
let mut deletions = DocumentDeletion::new();
|
||||||
deletions.delete_documents_by_docids(vec![0, 2, 42].into_iter().collect());
|
deletions.delete_documents_by_docids(Vec::<u32>::new().into_iter().collect());
|
||||||
let indexer = Bump::new();
|
let indexer = Bump::new();
|
||||||
|
|
||||||
let index = TempIndex::new();
|
let index = TempIndex::new();
|
||||||
|
@ -356,11 +356,11 @@ impl MergeChanges for MergeDocumentForUpdates {
|
|||||||
let has_deletion = last_deletion.is_some();
|
let has_deletion = last_deletion.is_some();
|
||||||
|
|
||||||
if operations.is_empty() {
|
if operations.is_empty() {
|
||||||
return if !is_new {
|
return if is_new {
|
||||||
|
Ok(None)
|
||||||
|
} else {
|
||||||
let deletion = Deletion::create(docid, external_docid);
|
let deletion = Deletion::create(docid, external_docid);
|
||||||
Ok(Some(DocumentChange::Deletion(deletion)))
|
Ok(Some(DocumentChange::Deletion(deletion)))
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,7 +13,6 @@ use itertools::{merge_join_by, EitherOrBoth};
|
|||||||
pub use partial_dump::PartialDump;
|
pub use partial_dump::PartialDump;
|
||||||
use rand::SeedableRng as _;
|
use rand::SeedableRng as _;
|
||||||
use raw_collections::RawMap;
|
use raw_collections::RawMap;
|
||||||
use rayon::ThreadPool;
|
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
pub use update_by_function::UpdateByFunction;
|
pub use update_by_function::UpdateByFunction;
|
||||||
|
|
||||||
@ -136,7 +135,6 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>(
|
|||||||
db_fields_ids_map: &'indexer FieldsIdsMap,
|
db_fields_ids_map: &'indexer FieldsIdsMap,
|
||||||
new_fields_ids_map: FieldsIdsMap,
|
new_fields_ids_map: FieldsIdsMap,
|
||||||
new_primary_key: Option<PrimaryKey<'pl>>,
|
new_primary_key: Option<PrimaryKey<'pl>>,
|
||||||
pool: &ThreadPool,
|
|
||||||
document_changes: &DC,
|
document_changes: &DC,
|
||||||
embedders: EmbeddingConfigs,
|
embedders: EmbeddingConfigs,
|
||||||
must_stop_processing: &'indexer MSP,
|
must_stop_processing: &'indexer MSP,
|
||||||
@ -152,9 +150,9 @@ where
|
|||||||
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
|
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
|
||||||
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
|
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
|
||||||
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
|
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
|
||||||
let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads());
|
let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
|
let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
|
let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
|
|
||||||
let indexing_context = IndexingContext {
|
let indexing_context = IndexingContext {
|
||||||
index,
|
index,
|
||||||
@ -179,7 +177,6 @@ where
|
|||||||
let document_ids = &mut document_ids;
|
let document_ids = &mut document_ids;
|
||||||
// TODO manage the errors correctly
|
// TODO manage the errors correctly
|
||||||
let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
|
let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
|
||||||
let result = pool.in_place_scope(|_s| {
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
|
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
|
|
||||||
@ -188,9 +185,17 @@ where
|
|||||||
// document but we need to create a function that collects and compresses documents.
|
// document but we need to create a function that collects and compresses documents.
|
||||||
let document_sender = extractor_sender.documents();
|
let document_sender = extractor_sender.documents();
|
||||||
let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
|
let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
|
||||||
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
|
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
let (finished_steps, step_name) = steps::extract_documents();
|
let (finished_steps, step_name) = steps::extract_documents();
|
||||||
extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
|
extract(document_changes,
|
||||||
|
&document_extractor,
|
||||||
|
indexing_context,
|
||||||
|
&mut extractor_allocs,
|
||||||
|
&datastore,
|
||||||
|
finished_steps,
|
||||||
|
total_steps,
|
||||||
|
step_name,
|
||||||
|
)?;
|
||||||
|
|
||||||
for document_extractor_data in datastore {
|
for document_extractor_data in datastore {
|
||||||
let document_extractor_data = document_extractor_data.0.into_inner();
|
let document_extractor_data = document_extractor_data.0.into_inner();
|
||||||
@ -223,7 +228,15 @@ where
|
|||||||
let (finished_steps, step_name) = steps::extract_facets();
|
let (finished_steps, step_name) = steps::extract_facets();
|
||||||
|
|
||||||
facet_field_ids_delta = merge_and_send_facet_docids(
|
facet_field_ids_delta = merge_and_send_facet_docids(
|
||||||
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, &extractor_sender.field_id_docid_facet_sender(), finished_steps, total_steps, step_name)?,
|
FacetedDocidsExtractor::run_extraction(grenad_parameters,
|
||||||
|
document_changes,
|
||||||
|
indexing_context,
|
||||||
|
&mut extractor_allocs,
|
||||||
|
&extractor_sender.field_id_docid_facet_sender(),
|
||||||
|
finished_steps,
|
||||||
|
total_steps,
|
||||||
|
step_name,
|
||||||
|
)?,
|
||||||
FacetDatabases::new(index),
|
FacetDatabases::new(index),
|
||||||
index,
|
index,
|
||||||
extractor_sender.facet_docids(),
|
extractor_sender.facet_docids(),
|
||||||
@ -360,7 +373,7 @@ where
|
|||||||
|
|
||||||
let embedding_sender = extractor_sender.embeddings();
|
let embedding_sender = extractor_sender.embeddings();
|
||||||
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
|
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
|
||||||
let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads());
|
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
let (finished_steps, step_name) = steps::extract_embeddings();
|
let (finished_steps, step_name) = steps::extract_embeddings();
|
||||||
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
|
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
|
||||||
|
|
||||||
@ -383,7 +396,7 @@ where
|
|||||||
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
|
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
|
||||||
break 'geo;
|
break 'geo;
|
||||||
};
|
};
|
||||||
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
|
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
let (finished_steps, step_name) = steps::extract_geo_points();
|
let (finished_steps, step_name) = steps::extract_geo_points();
|
||||||
extract(document_changes,
|
extract(document_changes,
|
||||||
&extractor,
|
&extractor,
|
||||||
@ -419,9 +432,6 @@ where
|
|||||||
// - [x] Extract fieldid facet number docids
|
// - [x] Extract fieldid facet number docids
|
||||||
// - [x] Extract fieldid facet string docids
|
// - [x] Extract fieldid facet string docids
|
||||||
|
|
||||||
Result::Ok(facet_field_ids_delta)
|
|
||||||
});
|
|
||||||
|
|
||||||
{
|
{
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
@ -429,7 +439,7 @@ where
|
|||||||
(indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None });
|
(indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None });
|
||||||
}
|
}
|
||||||
|
|
||||||
result
|
Result::Ok(facet_field_ids_delta)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
|
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::io;
|
|
||||||
|
|
||||||
use hashbrown::HashSet;
|
use hashbrown::HashSet;
|
||||||
use heed::types::Bytes;
|
use heed::types::Bytes;
|
||||||
|
@ -286,7 +286,7 @@ impl<'doc> MergedVectorDocument<'doc> {
|
|||||||
) -> Result<Option<Self>> {
|
) -> Result<Option<Self>> {
|
||||||
let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?;
|
let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?;
|
||||||
let new_doc =
|
let new_doc =
|
||||||
VectorDocumentFromVersions::new(&external_document_id, versions, doc_alloc, embedders)?;
|
VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?;
|
||||||
Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) })
|
Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,13 +1,14 @@
|
|||||||
|
use std::collections::HashSet;
|
||||||
use std::io::BufWriter;
|
use std::io::BufWriter;
|
||||||
|
|
||||||
use fst::{Set, SetBuilder, Streamer};
|
use fst::{Set, SetBuilder, Streamer};
|
||||||
use memmap2::Mmap;
|
use memmap2::Mmap;
|
||||||
use std::collections::HashSet;
|
|
||||||
use tempfile::tempfile;
|
use tempfile::tempfile;
|
||||||
|
|
||||||
use crate::{index::PrefixSettings, update::del_add::DelAdd, InternalError, Prefix, Result};
|
|
||||||
|
|
||||||
use super::fst_merger_builder::FstMergerBuilder;
|
use super::fst_merger_builder::FstMergerBuilder;
|
||||||
|
use crate::index::PrefixSettings;
|
||||||
|
use crate::update::del_add::DelAdd;
|
||||||
|
use crate::{InternalError, Prefix, Result};
|
||||||
|
|
||||||
pub struct WordFstBuilder<'a> {
|
pub struct WordFstBuilder<'a> {
|
||||||
word_fst_builder: FstMergerBuilder<'a>,
|
word_fst_builder: FstMergerBuilder<'a>,
|
||||||
|
Loading…
Reference in New Issue
Block a user