Compare commits

...

12 Commits

Author SHA1 Message Date
Louis Dureuil
0c576760d8
Merge 68bbf674c9 into a5d7ae23bd 2024-11-12 16:31:57 +01:00
Louis Dureuil
68bbf674c9
Make REST mock thread independent 2024-11-12 16:31:31 +01:00
Louis Dureuil
980921e078
Vector fixes 2024-11-12 16:31:22 +01:00
meili-bors[bot]
a5d7ae23bd
Merge #5044
5044: Adds new metrics to prometheus r=irevoire a=PedroTurik

not 100% confident in this solution, especially because i couldn't make the "Search Queue searches waiting" metric give me any value other than 0 with my local testing 😆. But i believe it solves the Issue.

# Pull Request

## Related issue
Fixes #4998 

## What does this PR do?
### Adds new metrics to prometheus;
- SearchQueue size, 
- SearchQueue searches running, 
- and Search Queue searches waiting.

## PR checklist
Please check if your PR fulfills the following requirements:
- [x] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)?
- [x] Have you read the contributing guidelines?
- [x] Have you made sure that the title is accurate and descriptive of the changes?

Co-authored-by: Pedro Turik Firmino <pedroturik@gmail.com>
2024-11-07 17:05:43 +00:00
PedroTurik
03886d0012
Applies optimizations to formatted integration tests (#5043) 2024-11-07 15:58:55 +01:00
meili-bors[bot]
b427b9e88f
Merge #5025
5025: test: improve performance of get_documents.rs r=irevoire a=PedroTurik

# Pull Request

## Related issue
Fixes one item from #4840 

## What does this PR do?
- Applies the changes recommended on the issue for `meilisearch/tests/documents/get_documents.rs`

## PR checklist
Please check if your PR fulfills the following requirements:
- [x] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)?
- [x] Have you read the contributing guidelines?
- [x] Have you made sure that the title is accurate and descriptive of the changes?

Thank you so much for contributing to Meilisearch!


Co-authored-by: Pedro Turik Firmino <pedroturik@gmail.com>
2024-11-07 09:46:34 +00:00
Pedro Turik Firmino
8b95f5ccc6 Adds new metrics to prometheus: SearchQueue size, SearchQueue searches running, and Search Queue searches waiting. 2024-11-06 15:37:16 -03:00
Pedro Turik Firmino
da59a043ba Fixes formatting issues 2024-11-06 09:55:48 -03:00
Pedro Turik Firmino
da4d47b5d0 Fixes formatting issues 2024-11-06 09:54:20 -03:00
Pedro Turik Firmino
d0b1ba20cb Improves usage of shared indexes 2024-11-04 17:26:50 -03:00
Pedro Turik Firmino
c79ca9679b Changes variable name to re-run CI 2024-11-02 18:25:33 -03:00
Pedro Turik Firmino
a934b0ac6a Applies optimizations to some integration tests 2024-10-29 18:49:06 -03:00
12 changed files with 235 additions and 121 deletions

View File

@ -49,4 +49,18 @@ lazy_static! {
pub static ref MEILISEARCH_IS_INDEXING: IntGauge =
register_int_gauge!(opts!("meilisearch_is_indexing", "Meilisearch Is Indexing"))
.expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCH_QUEUE_SIZE: IntGauge = register_int_gauge!(opts!(
"meilisearch_search_queue_size",
"Meilisearch Search Queue Size"
))
.expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCHES_RUNNING: IntGauge =
register_int_gauge!(opts!("meilisearch_searches_running", "Meilisearch Searches Running"))
.expect("Can't create a metric");
pub static ref MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED: IntGauge =
register_int_gauge!(opts!(
"meilisearch_searches_waiting_to_be_processed",
"Meilisearch Searches Being Processed"
))
.expect("Can't create a metric");
}

View File

@ -10,6 +10,7 @@ use prometheus::{Encoder, TextEncoder};
use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::routes::create_all_stats;
use crate::search_queue::SearchQueue;
pub fn configure(config: &mut web::ServiceConfig) {
config.service(web::resource("").route(web::get().to(get_metrics)));
@ -18,6 +19,7 @@ pub fn configure(config: &mut web::ServiceConfig) {
pub async fn get_metrics(
index_scheduler: GuardedData<ActionPolicy<{ actions::METRICS_GET }>, Data<IndexScheduler>>,
auth_controller: Data<AuthController>,
search_queue: web::Data<SearchQueue>,
) -> Result<HttpResponse, ResponseError> {
index_scheduler.features().check_metrics()?;
let auth_filters = index_scheduler.filters();
@ -35,6 +37,11 @@ pub async fn get_metrics(
crate::metrics::MEILISEARCH_USED_DB_SIZE_BYTES.set(response.used_database_size as i64);
crate::metrics::MEILISEARCH_INDEX_COUNT.set(response.indexes.len() as i64);
crate::metrics::MEILISEARCH_SEARCH_QUEUE_SIZE.set(search_queue.capacity() as i64);
crate::metrics::MEILISEARCH_SEARCHES_RUNNING.set(search_queue.searches_running() as i64);
crate::metrics::MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED
.set(search_queue.searches_waiting() as i64);
for (index, value) in response.indexes.iter() {
crate::metrics::MEILISEARCH_INDEX_DOCS_COUNT
.with_label_values(&[index])

View File

@ -18,6 +18,8 @@
//! And should drop the Permit only once you have freed all the RAM consumed by the method.
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use rand::rngs::StdRng;
@ -33,6 +35,8 @@ pub struct SearchQueue {
/// If we have waited longer than this to get a permit, we should abort the search request entirely.
/// The client probably already closed the connection, but we have no way to find out.
time_to_abort: Duration,
searches_running: Arc<AtomicUsize>,
searches_waiting_to_be_processed: Arc<AtomicUsize>,
}
/// You should only run search requests while holding this permit.
@ -68,14 +72,41 @@ impl SearchQueue {
// so let's not allocate any RAM and keep a capacity of 1.
let (sender, receiver) = mpsc::channel(1);
tokio::task::spawn(Self::run(capacity, paralellism, receiver));
Self { sender, capacity, time_to_abort: Duration::from_secs(60) }
let instance = Self {
sender,
capacity,
time_to_abort: Duration::from_secs(60),
searches_running: Default::default(),
searches_waiting_to_be_processed: Default::default(),
};
tokio::task::spawn(Self::run(
capacity,
paralellism,
receiver,
Arc::clone(&instance.searches_running),
Arc::clone(&instance.searches_waiting_to_be_processed),
));
instance
}
pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self {
Self { time_to_abort, ..self }
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn searches_running(&self) -> usize {
self.searches_running.load(Ordering::Relaxed)
}
pub fn searches_waiting(&self) -> usize {
self.searches_waiting_to_be_processed.load(Ordering::Relaxed)
}
/// This function is the main loop, it's in charge on scheduling which search request should execute first and
/// how many should executes at the same time.
///
@ -84,6 +115,8 @@ impl SearchQueue {
capacity: usize,
parallelism: NonZeroUsize,
mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>,
metric_searches_running: Arc<AtomicUsize>,
metric_searches_waiting: Arc<AtomicUsize>,
) {
let mut queue: Vec<oneshot::Sender<Permit>> = Default::default();
let mut rng: StdRng = StdRng::from_entropy();
@ -133,6 +166,9 @@ impl SearchQueue {
queue.push(search_request);
},
}
metric_searches_running.store(searches_running, Ordering::Relaxed);
metric_searches_waiting.store(queue.len(), Ordering::Relaxed);
}
}

View File

@ -389,3 +389,25 @@ pub static VECTOR_DOCUMENTS: Lazy<Value> = Lazy::new(|| {
},
])
});
pub async fn shared_index_with_test_set() -> &'static Index<'static, Shared> {
static INDEX: OnceCell<Index<'static, Shared>> = OnceCell::const_new();
INDEX
.get_or_init(|| async {
let server = Server::new_shared();
let index = server._index("SHARED_TEST_SET").to_shared();
let url = format!("/indexes/{}/documents", urlencoding::encode(index.uid.as_ref()));
let (response, code) = index
.service
.post_str(
url,
include_str!("../assets/test_set.json"),
vec![("content-type", "application/json")],
)
.await;
assert_eq!(code, 202);
index.wait_task(response.uid()).await;
index
})
.await
}

View File

@ -4,24 +4,27 @@ use meili_snap::*;
use urlencoding::encode as urlencode;
use crate::common::encoder::Encoder;
use crate::common::{GetAllDocumentsOptions, Server, Value};
use crate::common::{
shared_does_not_exists_index, shared_empty_index, shared_index_with_test_set,
GetAllDocumentsOptions, Server, Value,
};
use crate::json;
// TODO: partial test since we are testing error, amd error is not yet fully implemented in
// transplant
#[actix_rt::test]
async fn get_unexisting_index_single_document() {
let server = Server::new().await;
let (_response, code) = server.index("test").get_document(1, None).await;
let (_response, code) = shared_does_not_exists_index().await.get_document(1, None).await;
assert_eq!(code, 404);
}
#[actix_rt::test]
async fn error_get_unexisting_document() {
let server = Server::new().await;
let index = server.index("test");
index.create(None).await;
index.wait_task(0).await;
let server = Server::new_shared();
let index = server.unique_index();
let (task, _code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded();
let (response, code) = index.get_document(1, None).await;
let expected_response = json!({
@ -37,18 +40,19 @@ async fn error_get_unexisting_document() {
#[actix_rt::test]
async fn get_document() {
let server = Server::new().await;
let index = server.index("test");
index.create(None).await;
let server = Server::new_shared();
let index = server.unique_index();
let (task, _code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded();
let documents = json!([
{
"id": 0,
"nested": { "content": "foobar" },
}
]);
let (_, code) = index.add_documents(documents, None).await;
let (task, code) = index.add_documents(documents, None).await;
assert_eq!(code, 202);
index.wait_task(1).await;
index.wait_task(task.uid()).await.succeeded();
let (response, code) = index.get_document(0, None).await;
assert_eq!(code, 200);
assert_eq!(
@ -81,12 +85,11 @@ async fn get_document() {
#[actix_rt::test]
async fn error_get_unexisting_index_all_documents() {
let server = Server::new().await;
let (response, code) =
server.index("test").get_all_documents(GetAllDocumentsOptions::default()).await;
let index = shared_does_not_exists_index().await;
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
let expected_response = json!({
"message": "Index `test` not found.",
"message": "Index `DOES_NOT_EXISTS` not found.",
"code": "index_not_found",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#index_not_found"
@ -98,12 +101,7 @@ async fn error_get_unexisting_index_all_documents() {
#[actix_rt::test]
async fn get_no_document() {
let server = Server::new().await;
let index = server.index("test");
let (_, code) = index.create(None).await;
assert_eq!(code, 202);
index.wait_task(0).await;
let index = shared_empty_index().await;
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
assert_eq!(code, 200);
@ -112,14 +110,12 @@ async fn get_no_document() {
#[actix_rt::test]
async fn get_all_documents_no_options() {
let server = Server::new().await;
let index = server.index("test");
index.load_test_set().await;
let index = shared_index_with_test_set().await;
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
assert_eq!(code, 200);
let arr = response["results"].as_array().unwrap();
assert_eq!(arr.len(), 20);
let results = response["results"].as_array().unwrap();
assert_eq!(results.len(), 20);
let first = json!({
"id":0,
"isActive":false,
@ -138,19 +134,16 @@ async fn get_all_documents_no_options() {
"longitude":-145.725388,
"tags":["bug"
,"bug"]});
assert_eq!(first, arr[0]);
assert_eq!(first, results[0]);
}
#[actix_rt::test]
async fn get_all_documents_no_options_with_response_compression() {
let server = Server::new().await;
let index_uid = "test";
let index = server.index(index_uid);
index.load_test_set().await;
let index = shared_index_with_test_set().await;
let app = server.init_web_app().await;
let app = Server::new_shared().init_web_app().await;
let req = test::TestRequest::get()
.uri(&format!("/indexes/{}/documents?", urlencode(index_uid)))
.uri(&format!("/indexes/{}/documents?", urlencode(&index.uid)))
.insert_header((ACCEPT_ENCODING, "gzip"))
.to_request();
@ -169,9 +162,7 @@ async fn get_all_documents_no_options_with_response_compression() {
#[actix_rt::test]
async fn test_get_all_documents_limit() {
let server = Server::new().await;
let index = server.index("test");
index.load_test_set().await;
let index = shared_index_with_test_set().await;
let (response, code) = index
.get_all_documents(GetAllDocumentsOptions { limit: Some(5), ..Default::default() })
@ -186,9 +177,7 @@ async fn test_get_all_documents_limit() {
#[actix_rt::test]
async fn test_get_all_documents_offset() {
let server = Server::new().await;
let index = server.index("test");
index.load_test_set().await;
let index = shared_index_with_test_set().await;
let (response, code) = index
.get_all_documents(GetAllDocumentsOptions { offset: Some(5), ..Default::default() })
@ -203,9 +192,7 @@ async fn test_get_all_documents_offset() {
#[actix_rt::test]
async fn test_get_all_documents_attributes_to_retrieve() {
let server = Server::new().await;
let index = server.index("test");
index.load_test_set().await;
let index = shared_index_with_test_set().await;
let (response, code) = index
.get_all_documents(GetAllDocumentsOptions {
@ -286,9 +273,11 @@ async fn test_get_all_documents_attributes_to_retrieve() {
#[actix_rt::test]
async fn get_document_s_nested_attributes_to_retrieve() {
let server = Server::new().await;
let index = server.index("test");
index.create(None).await;
let server = Server::new_shared();
let index = server.unique_index();
let (task, _code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded();
let documents = json!([
{
"id": 0,
@ -302,9 +291,9 @@ async fn get_document_s_nested_attributes_to_retrieve() {
},
},
]);
let (_, code) = index.add_documents(documents, None).await;
let (task, code) = index.add_documents(documents, None).await;
assert_eq!(code, 202);
index.wait_task(1).await;
index.wait_task(task.uid()).await.succeeded();
let (response, code) = index.get_document(0, Some(json!({ "fields": ["content"] }))).await;
assert_eq!(code, 200);
@ -343,10 +332,10 @@ async fn get_document_s_nested_attributes_to_retrieve() {
#[actix_rt::test]
async fn get_documents_displayed_attributes_is_ignored() {
let server = Server::new().await;
let index = server.index("test");
index.update_settings(json!({"displayedAttributes": ["gender"]})).await;
let server = Server::new_shared();
let index = server.unique_index();
index.load_test_set().await;
index.update_settings(json!({"displayedAttributes": ["gender"]})).await;
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
assert_eq!(code, 200);
@ -366,10 +355,10 @@ async fn get_documents_displayed_attributes_is_ignored() {
#[actix_rt::test]
async fn get_document_by_filter() {
let server = Server::new().await;
let index = server.index("doggo");
let server = Server::new_shared();
let index = server.unique_index();
index.update_settings_filterable_attributes(json!(["color"])).await;
index
let (task, _code) = index
.add_documents(
json!([
{ "id": 0, "color": "red" },
@ -380,7 +369,7 @@ async fn get_document_by_filter() {
Some("id"),
)
.await;
index.wait_task(1).await;
index.wait_task(task.uid()).await.succeeded();
let (response, code) = index.get_document_by_filter(json!({})).await;
let (response2, code2) = index.get_all_documents_raw("").await;
@ -552,7 +541,7 @@ async fn get_document_with_vectors() {
}))
.await;
snapshot!(code, @"202 Accepted");
server.wait_task(response.uid()).await;
server.wait_task(response.uid()).await.succeeded();
let documents = json!([
{"id": 0, "name": "kefir", "_vectors": { "manual": [0, 0, 0] }},
@ -560,7 +549,7 @@ async fn get_document_with_vectors() {
]);
let (value, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted");
index.wait_task(value.uid()).await;
index.wait_task(value.uid()).await.succeeded();
// by default you shouldn't see the `_vectors` object
let (documents, _code) = index.get_all_documents(Default::default()).await;

View File

@ -6,14 +6,14 @@ use crate::json;
#[actix_rt::test]
async fn formatted_contain_wildcard() {
let server = Server::new().await;
let index = server.index("test");
let server = Server::new_shared();
let index = server.unique_index();
index.update_settings(json!({ "displayedAttributes": ["id", "cattos"] })).await;
let documents = NESTED_DOCUMENTS.clone();
index.add_documents(documents, None).await;
index.wait_task(1).await;
let (response, _) = index.add_documents(documents, None).await;
index.wait_task(response.uid()).await;
index.search(json!({ "q": "pésti", "attributesToRetrieve": ["father", "mother"], "attributesToHighlight": ["father", "mother", "*"], "attributesToCrop": ["doggos"], "showMatchesPosition": true }),
|response, code|
@ -135,12 +135,7 @@ async fn formatted_contain_wildcard() {
#[actix_rt::test]
async fn format_nested() {
let server = Server::new().await;
let index = server.index("test");
let documents = NESTED_DOCUMENTS.clone();
index.add_documents(documents, None).await;
index.wait_task(0).await;
let index = shared_index_with_nested_documents().await;
index
.search(json!({ "q": "pésti", "attributesToRetrieve": ["doggos"] }), |response, code| {
@ -340,15 +335,15 @@ async fn format_nested() {
#[actix_rt::test]
async fn displayedattr_2_smol() {
let server = Server::new().await;
let index = server.index("test");
let server = Server::new_shared();
let index = server.unique_index();
// not enough displayed for the other settings
index.update_settings(json!({ "displayedAttributes": ["id"] })).await;
let documents = NESTED_DOCUMENTS.clone();
index.add_documents(documents, None).await;
index.wait_task(1).await;
let (response, _) = index.add_documents(documents, None).await;
index.wait_task(response.uid()).await;
index
.search(json!({ "attributesToRetrieve": ["father", "id"], "attributesToHighlight": ["mother"], "attributesToCrop": ["cattos"] }),
@ -538,15 +533,15 @@ async fn displayedattr_2_smol() {
#[cfg(feature = "default")]
#[actix_rt::test]
async fn test_cjk_highlight() {
let server = Server::new().await;
let index = server.index("test");
let server = Server::new_shared();
let index = server.unique_index();
let documents = json!([
{ "id": 0, "title": "この度、クーポンで無料で頂きました。" },
{ "id": 1, "title": "大卫到了扫罗那里" },
]);
index.add_documents(documents, None).await;
index.wait_task(0).await;
let (response, _) = index.add_documents(documents, None).await;
index.wait_task(response.uid()).await;
index
.search(json!({"q": "", "attributesToHighlight": ["title"]}), |response, code| {

View File

@ -1,5 +1,4 @@
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use meili_snap::{json_string, snapshot};
use reqwest::IntoUrl;
@ -13,13 +12,22 @@ use crate::vector::{get_server_vector, GetAllDocumentsOptions};
async fn create_mock() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let counter = AtomicUsize::new(0);
let text_to_embedding: BTreeMap<_, _> = vec![
// text -> embedding
("kefir", [0.0, 0.0, 0.0]),
("intel", [1.0, 1.0, 1.0]),
]
// turn into btree
.into_iter()
.collect();
Mock::given(method("POST"))
.and(path("/"))
.respond_with(move |_req: &Request| {
let counter = counter.fetch_add(1, Ordering::Relaxed);
ResponseTemplate::new(200).set_body_json(json!({ "data": vec![counter; 3] }))
.respond_with(move |req: &Request| {
let text: String = req.body_json().unwrap();
ResponseTemplate::new(200).set_body_json(
json!({ "data": text_to_embedding.get(text.as_str()).unwrap_or(&[99., 99., 99.]) }),
)
})
.mount(&mock_server)
.await;
@ -32,13 +40,14 @@ async fn create_mock() -> (MockServer, Value) {
"request": "{{text}}",
"response": {
"data": "{{embedding}}"
}
},
"documentTemplate": "{{doc.name}}",
});
(mock_server, embedder_settings)
}
async fn create_mock_map() -> (MockServer, Value) {
async fn create_mock_default_template() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let text_to_embedding: BTreeMap<_, _> = vec![
@ -97,7 +106,14 @@ struct SingleResponse {
async fn create_mock_multiple() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let counter = AtomicUsize::new(0);
let text_to_embedding: BTreeMap<_, _> = vec![
// text -> embedding
("kefir", [0.0, 0.0, 0.0]),
("intel", [1.0, 1.0, 1.0]),
]
// turn into btree
.into_iter()
.collect();
Mock::given(method("POST"))
.and(path("/"))
@ -115,8 +131,11 @@ async fn create_mock_multiple() -> (MockServer, Value) {
.input
.into_iter()
.map(|text| SingleResponse {
embedding: text_to_embedding
.get(text.as_str())
.unwrap_or(&[99., 99., 99.])
.to_vec(),
text,
embedding: vec![counter.fetch_add(1, Ordering::Relaxed) as f32; 3],
})
.collect();
@ -142,7 +161,8 @@ async fn create_mock_multiple() -> (MockServer, Value) {
},
"{{..}}"
]
}
},
"documentTemplate": "{{doc.name}}"
});
(mock_server, embedder_settings)
@ -156,7 +176,14 @@ struct SingleRequest {
async fn create_mock_single_response_in_array() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let counter = AtomicUsize::new(0);
let text_to_embedding: BTreeMap<_, _> = vec![
// text -> embedding
("kefir", [0.0, 0.0, 0.0]),
("intel", [1.0, 1.0, 1.0]),
]
// turn into btree
.into_iter()
.collect();
Mock::given(method("POST"))
.and(path("/"))
@ -171,8 +198,11 @@ async fn create_mock_single_response_in_array() -> (MockServer, Value) {
};
let output = vec![SingleResponse {
embedding: text_to_embedding
.get(req.input.as_str())
.unwrap_or(&[99., 99., 99.])
.to_vec(),
text: req.input,
embedding: vec![counter.fetch_add(1, Ordering::Relaxed) as f32; 3],
}];
let response = MultipleResponse { output };
@ -196,7 +226,8 @@ async fn create_mock_single_response_in_array() -> (MockServer, Value) {
"embedding": "{{embedding}}"
}
]
}
},
"documentTemplate": "{{doc.name}}"
});
(mock_server, embedder_settings)
@ -205,7 +236,14 @@ async fn create_mock_single_response_in_array() -> (MockServer, Value) {
async fn create_mock_raw_with_custom_header() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let counter = AtomicUsize::new(0);
let text_to_embedding: BTreeMap<_, _> = vec![
// text -> embedding
("kefir", [0.0, 0.0, 0.0]),
("intel", [1.0, 1.0, 1.0]),
]
// turn into btree
.into_iter()
.collect();
Mock::given(method("POST"))
.and(path("/"))
@ -223,7 +261,7 @@ async fn create_mock_raw_with_custom_header() -> (MockServer, Value) {
}
}
let _req: String = match req.body_json() {
let req: String = match req.body_json() {
Ok(req) => req,
Err(error) => {
return ResponseTemplate::new(400).set_body_json(json!({
@ -232,7 +270,7 @@ async fn create_mock_raw_with_custom_header() -> (MockServer, Value) {
}
};
let output = vec![counter.fetch_add(1, Ordering::Relaxed) as f32; 3];
let output = text_to_embedding.get(req.as_str()).unwrap_or(&[99., 99., 99.]).to_vec();
ResponseTemplate::new(200).set_body_json(output)
})
@ -245,7 +283,8 @@ async fn create_mock_raw_with_custom_header() -> (MockServer, Value) {
"url": url,
"request": "{{text}}",
"response": "{{embedding}}",
"headers": {"my-nonstandard-auth": "bearer of the ring"}
"headers": {"my-nonstandard-auth": "bearer of the ring"},
"documentTemplate": "{{doc.name}}"
});
(mock_server, embedder_settings)
@ -254,12 +293,19 @@ async fn create_mock_raw_with_custom_header() -> (MockServer, Value) {
async fn create_mock_raw() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let counter = AtomicUsize::new(0);
let text_to_embedding: BTreeMap<_, _> = vec![
// text -> embedding
("kefir", [0.0, 0.0, 0.0]),
("intel", [1.0, 1.0, 1.0]),
]
// turn into btree
.into_iter()
.collect();
Mock::given(method("POST"))
.and(path("/"))
.respond_with(move |req: &Request| {
let _req: String = match req.body_json() {
let req: String = match req.body_json() {
Ok(req) => req,
Err(error) => {
return ResponseTemplate::new(400).set_body_json(json!({
@ -268,7 +314,7 @@ async fn create_mock_raw() -> (MockServer, Value) {
}
};
let output = vec![counter.fetch_add(1, Ordering::Relaxed) as f32; 3];
let output = text_to_embedding.get(req.as_str()).unwrap_or(&[99., 99., 99.]).to_vec();
ResponseTemplate::new(200).set_body_json(output)
})
@ -281,29 +327,30 @@ async fn create_mock_raw() -> (MockServer, Value) {
"url": url,
"dimensions": 3,
"request": "{{text}}",
"response": "{{embedding}}"
"response": "{{embedding}}",
"documentTemplate": "{{doc.name}}"
});
(mock_server, embedder_settings)
}
pub async fn post<T: IntoUrl>(url: T) -> reqwest::Result<reqwest::Response> {
reqwest::Client::builder().build()?.post(url).send().await
pub async fn post<T: IntoUrl>(url: T, text: &str) -> reqwest::Result<reqwest::Response> {
reqwest::Client::builder().build()?.post(url).json(&json!(text)).send().await
}
#[actix_rt::test]
async fn dummy_testing_the_mock() {
let (mock, _setting) = create_mock().await;
let body = post(&mock.uri()).await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[0,0,0]}"###);
let body = post(&mock.uri()).await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[1,1,1]}"###);
let body = post(&mock.uri()).await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[2,2,2]}"###);
let body = post(&mock.uri()).await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[3,3,3]}"###);
let body = post(&mock.uri()).await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[4,4,4]}"###);
let body = post(&mock.uri(), "kefir").await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[0.0,0.0,0.0]}"###);
let body = post(&mock.uri(), "intel").await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[1.0,1.0,1.0]}"###);
let body = post(&mock.uri(), "kefir").await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[0.0,0.0,0.0]}"###);
let body = post(&mock.uri(), "kefir").await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[0.0,0.0,0.0]}"###);
let body = post(&mock.uri(), "intel").await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[1.0,1.0,1.0]}"###);
}
#[actix_rt::test]
@ -953,7 +1000,7 @@ async fn bad_settings() {
let (response, code) = index
.update_settings(json!({
"embedders": {
"rest": json!({ "source": "rest", "url": mock.uri(), "request": "{{text}}", "response": { "data": "{{embedding}}" }, "dimensions": 2 }),
"rest": json!({ "source": "rest", "url": mock.uri(), "request": "{{text}}", "response": { "data": "{{embedding}}" }, "dimensions": 2, "documentTemplate": "{{doc.name}}" }),
},
}))
.await;
@ -1920,6 +1967,7 @@ async fn server_custom_header() {
"embedders": {
"rest": {
"source": "rest",
"documentTemplate": "{{doc.name}}",
"url": "[url]",
"request": "{{text}}",
"response": "{{embedding}}",
@ -1940,7 +1988,7 @@ async fn server_custom_header() {
#[actix_rt::test]
async fn searchable_reindex() {
let (_mock, setting) = create_mock_map().await;
let (_mock, setting) = create_mock_default_template().await;
let server = get_server_vector().await;
let index = server.index("doggo");

View File

@ -115,11 +115,8 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
new_vectors.vectors_for_key(embedder_name).transpose()
}) {
let new_vectors = new_vectors?;
match (old_vectors.regenerate, new_vectors.regenerate) {
(true, true) | (false, false) => todo!(),
_ => {
chunks.set_regenerate(update.docid(), new_vectors.regenerate);
}
if old_vectors.regenerate != new_vectors.regenerate {
chunks.set_regenerate(update.docid(), new_vectors.regenerate);
}
// do we have set embeddings?
if let Some(embeddings) = new_vectors.embeddings {

View File

@ -180,7 +180,13 @@ fn entry_from_raw_value(
},
RawVectors::ImplicitlyUserProvided(value) => VectorEntry {
has_configured_embedder,
embeddings: value.map(Embeddings::FromJsonImplicityUserProvided),
// implicitly user provided always provide embeddings
// `None` here means that there are no embeddings
embeddings: Some(
value
.map(Embeddings::FromJsonImplicityUserProvided)
.unwrap_or(Embeddings::FromDb(Default::default())),
),
regenerate: false,
implicit: true,
},

View File

@ -113,7 +113,7 @@ impl Embedder {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.chunk_count_hint())
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk))
.collect();

View File

@ -266,7 +266,7 @@ impl Embedder {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.chunk_count_hint())
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk))
.collect();

View File

@ -193,7 +193,7 @@ impl Embedder {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.chunk_count_hint())
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk))
.collect();