diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index e0e2bfb75..cf603e996 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -5201,9 +5201,10 @@ mod tests { let configs = index_scheduler.embedders(configs).unwrap(); let (hf_embedder, _, _) = configs.get(&simple_hf_name).unwrap(); - let beagle_embed = hf_embedder.embed_one(S("Intel the beagle best doggo")).unwrap(); - let lab_embed = hf_embedder.embed_one(S("Max the lab best doggo")).unwrap(); - let patou_embed = hf_embedder.embed_one(S("kefir the patou best doggo")).unwrap(); + let beagle_embed = + hf_embedder.embed_one(S("Intel the beagle best doggo"), None).unwrap(); + let lab_embed = hf_embedder.embed_one(S("Max the lab best doggo"), None).unwrap(); + let patou_embed = hf_embedder.embed_one(S("kefir the patou best doggo"), None).unwrap(); (fakerest_name, simple_hf_name, beagle_embed, lab_embed, patou_embed) }; diff --git a/meilisearch/src/search/mod.rs b/meilisearch/src/search/mod.rs index 7832c1761..60e48b196 100644 --- a/meilisearch/src/search/mod.rs +++ b/meilisearch/src/search/mod.rs @@ -796,8 +796,10 @@ fn prepare_search<'t>( let span = tracing::trace_span!(target: "search::vector", "embed_one"); let _entered = span.enter(); + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10); + embedder - .embed_one(query.q.clone().unwrap()) + .embed_one(query.q.clone().unwrap(), Some(deadline)) .map_err(milli::vector::Error::from) .map_err(milli::Error::from)? } diff --git a/meilisearch/tests/vector/openai.rs b/meilisearch/tests/vector/openai.rs index 04c068c40..94291ebea 100644 --- a/meilisearch/tests/vector/openai.rs +++ b/meilisearch/tests/vector/openai.rs @@ -137,13 +137,14 @@ fn long_text() -> &'static str { } async fn create_mock_tokenized() -> (MockServer, Value) { - create_mock_with_template("{{doc.text}}", ModelDimensions::Large, false).await + create_mock_with_template("{{doc.text}}", ModelDimensions::Large, false, false).await } async fn create_mock_with_template( document_template: &str, model_dimensions: ModelDimensions, fallible: bool, + slow: bool, ) -> (MockServer, Value) { let mock_server = MockServer::start().await; const API_KEY: &str = "my-api-key"; @@ -154,7 +155,11 @@ async fn create_mock_with_template( Mock::given(method("POST")) .and(path("/")) .respond_with(move |req: &Request| { - // 0. maybe return 500 + // 0. wait for a long time + if slow { + std::thread::sleep(std::time::Duration::from_secs(1)); + } + // 1. maybe return 500 if fallible { let attempt = attempt.fetch_add(1, Ordering::Relaxed); let failed = matches!(attempt % 4, 0 | 1 | 3); @@ -167,7 +172,7 @@ async fn create_mock_with_template( })) } } - // 1. check API key + // 3. check API key match req.headers.get("Authorization") { Some(api_key) if api_key == API_KEY_BEARER => { {} @@ -202,7 +207,7 @@ async fn create_mock_with_template( ) } } - // 2. parse text inputs + // 3. parse text inputs let query: serde_json::Value = match req.body_json() { Ok(query) => query, Err(_error) => return ResponseTemplate::new(400).set_body_json( @@ -223,7 +228,7 @@ async fn create_mock_with_template( panic!("Expected {model_dimensions:?}, got {query_model_dimensions:?}") } - // 3. for each text, find embedding in responses + // 4. for each text, find embedding in responses let serde_json::Value::Array(inputs) = &query["input"] else { panic!("Unexpected `input` value") }; @@ -283,7 +288,7 @@ async fn create_mock_with_template( "embedding": embedding, })).collect(); - // 4. produce output from embeddings + // 5. produce output from embeddings ResponseTemplate::new(200).set_body_json(json!({ "object": "list", "data": data, @@ -317,23 +322,27 @@ const DOGGO_TEMPLATE: &str = r#"{%- if doc.gender == "F" -%}Une chienne nommée {%- endif %}, de race {{doc.breed}}."#; async fn create_mock() -> (MockServer, Value) { - create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large, false).await + create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large, false, false).await } async fn create_mock_dimensions() -> (MockServer, Value) { - create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large512, false).await + create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large512, false, false).await } async fn create_mock_small_embedding_model() -> (MockServer, Value) { - create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Small, false).await + create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Small, false, false).await } async fn create_mock_legacy_embedding_model() -> (MockServer, Value) { - create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Ada, false).await + create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Ada, false, false).await } async fn create_fallible_mock() -> (MockServer, Value) { - create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large, true).await + create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large, true, false).await +} + +async fn create_slow_mock() -> (MockServer, Value) { + create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large, true, true).await } // basic test "it works" @@ -1873,4 +1882,114 @@ async fn it_still_works() { ] "###); } + +// test with a server that responds 500 on 3 out of 4 calls +#[actix_rt::test] +async fn timeout() { + let (_mock, setting) = create_slow_mock().await; + let server = get_server_vector().await; + let index = server.index("doggo"); + + let (response, code) = index + .update_settings(json!({ + "embedders": { + "default": setting, + }, + })) + .await; + snapshot!(code, @"202 Accepted"); + let task = server.wait_task(response.uid()).await; + snapshot!(task["status"], @r###""succeeded""###); + let documents = json!([ + {"id": 0, "name": "kefir", "gender": "M", "birthyear": 2023, "breed": "Patou"}, + ]); + let (value, code) = index.add_documents(documents, None).await; + snapshot!(code, @"202 Accepted"); + let task = index.wait_task(value.uid()).await; + snapshot!(task, @r###" + { + "uid": "[uid]", + "indexUid": "doggo", + "status": "succeeded", + "type": "documentAdditionOrUpdate", + "canceledBy": null, + "details": { + "receivedDocuments": 1, + "indexedDocuments": 1 + }, + "error": null, + "duration": "[duration]", + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]" + } + "###); + + let (documents, _code) = index + .get_all_documents(GetAllDocumentsOptions { retrieve_vectors: true, ..Default::default() }) + .await; + snapshot!(json_string!(documents, {".results.*._vectors.default.embeddings" => "[vector]"}), @r###" + { + "results": [ + { + "id": 0, + "name": "kefir", + "gender": "M", + "birthyear": 2023, + "breed": "Patou", + "_vectors": { + "default": { + "embeddings": "[vector]", + "regenerate": true + } + } + } + ], + "offset": 0, + "limit": 20, + "total": 1 + } + "###); + + let (response, code) = index + .search_post(json!({ + "q": "grand chien de berger des montagnes", + "hybrid": {"semanticRatio": 0.99, "embedder": "default"} + })) + .await; + snapshot!(code, @"200 OK"); + snapshot!(json_string!(response["semanticHitCount"]), @"0"); + snapshot!(json_string!(response["hits"]), @"[]"); + + let (response, code) = index + .search_post(json!({ + "q": "grand chien de berger des montagnes", + "hybrid": {"semanticRatio": 0.99, "embedder": "default"} + })) + .await; + snapshot!(code, @"200 OK"); + snapshot!(json_string!(response["semanticHitCount"]), @"1"); + snapshot!(json_string!(response["hits"]), @r###" + [ + { + "id": 0, + "name": "kefir", + "gender": "M", + "birthyear": 2023, + "breed": "Patou" + } + ] + "###); + + let (response, code) = index + .search_post(json!({ + "q": "grand chien de berger des montagnes", + "hybrid": {"semanticRatio": 0.99, "embedder": "default"} + })) + .await; + snapshot!(code, @"200 OK"); + snapshot!(json_string!(response["semanticHitCount"]), @"0"); + snapshot!(json_string!(response["hits"]), @"[]"); +} + // test with a server that wrongly responds 400 diff --git a/milli/src/search/hybrid.rs b/milli/src/search/hybrid.rs index 8b274804c..5187b572b 100644 --- a/milli/src/search/hybrid.rs +++ b/milli/src/search/hybrid.rs @@ -201,7 +201,9 @@ impl<'a> Search<'a> { let span = tracing::trace_span!(target: "search::hybrid", "embed_one"); let _entered = span.enter(); - match embedder.embed_one(query) { + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3); + + match embedder.embed_one(query, Some(deadline)) { Ok(embedding) => embedding, Err(error) => { tracing::error!(error=%error, "Embedding failed"); diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index 571c02c8c..bdf6754a2 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; use arroy::distances::{BinaryQuantizedCosine, Cosine}; use arroy::ItemId; @@ -594,18 +595,23 @@ impl Embedder { pub fn embed( &self, texts: Vec, + deadline: Option, ) -> std::result::Result>, EmbedError> { match self { Embedder::HuggingFace(embedder) => embedder.embed(texts), - Embedder::OpenAi(embedder) => embedder.embed(texts), - Embedder::Ollama(embedder) => embedder.embed(texts), + Embedder::OpenAi(embedder) => embedder.embed(texts, deadline), + Embedder::Ollama(embedder) => embedder.embed(texts, deadline), Embedder::UserProvided(embedder) => embedder.embed(texts), - Embedder::Rest(embedder) => embedder.embed(texts), + Embedder::Rest(embedder) => embedder.embed(texts, deadline), } } - pub fn embed_one(&self, text: String) -> std::result::Result { - let mut embeddings = self.embed(vec![text])?; + pub fn embed_one( + &self, + text: String, + deadline: Option, + ) -> std::result::Result { + let mut embeddings = self.embed(vec![text], deadline)?; let embeddings = embeddings.pop().ok_or_else(EmbedError::missing_embedding)?; Ok(if embeddings.iter().nth(1).is_some() { tracing::warn!("Ignoring embeddings past the first one in long search query"); diff --git a/milli/src/vector/ollama.rs b/milli/src/vector/ollama.rs index 7d41ab4e9..ef422a57e 100644 --- a/milli/src/vector/ollama.rs +++ b/milli/src/vector/ollama.rs @@ -1,3 +1,5 @@ +use std::time::Instant; + use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _}; use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind}; @@ -75,8 +77,12 @@ impl Embedder { Ok(Self { rest_embedder }) } - pub fn embed(&self, texts: Vec) -> Result>, EmbedError> { - match self.rest_embedder.embed(texts) { + pub fn embed( + &self, + texts: Vec, + deadline: Option, + ) -> Result>, EmbedError> { + match self.rest_embedder.embed(texts, deadline) { Ok(embeddings) => Ok(embeddings), Err(EmbedError { kind: EmbedErrorKind::RestOtherStatusCode(404, error), fault: _ }) => { Err(EmbedError::ollama_model_not_found(error)) @@ -92,7 +98,7 @@ impl Embedder { ) -> Result>>, EmbedError> { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect() + text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), diff --git a/milli/src/vector/openai.rs b/milli/src/vector/openai.rs index 152d1fb7a..b303e8720 100644 --- a/milli/src/vector/openai.rs +++ b/milli/src/vector/openai.rs @@ -1,3 +1,5 @@ +use std::time::Instant; + use ordered_float::OrderedFloat; use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; @@ -206,32 +208,40 @@ impl Embedder { Ok(Self { options, rest_embedder, tokenizer }) } - pub fn embed(&self, texts: Vec) -> Result>, EmbedError> { - match self.rest_embedder.embed_ref(&texts) { + pub fn embed( + &self, + texts: Vec, + deadline: Option, + ) -> Result>, EmbedError> { + match self.rest_embedder.embed_ref(&texts, deadline) { Ok(embeddings) => Ok(embeddings), Err(EmbedError { kind: EmbedErrorKind::RestBadRequest(error, _), fault: _ }) => { tracing::warn!(error=?error, "OpenAI: received `BAD_REQUEST`. Input was maybe too long, retrying on tokenized version. For best performance, limit the size of your document template."); - self.try_embed_tokenized(&texts) + self.try_embed_tokenized(&texts, deadline) } Err(error) => Err(error), } } - fn try_embed_tokenized(&self, text: &[String]) -> Result>, EmbedError> { + fn try_embed_tokenized( + &self, + text: &[String], + deadline: Option, + ) -> Result>, EmbedError> { let mut all_embeddings = Vec::with_capacity(text.len()); for text in text { let max_token_count = self.options.embedding_model.max_token(); let encoded = self.tokenizer.encode_ordinary(text.as_str()); let len = encoded.len(); if len < max_token_count { - all_embeddings.append(&mut self.rest_embedder.embed_ref(&[text])?); + all_embeddings.append(&mut self.rest_embedder.embed_ref(&[text], deadline)?); continue; } let tokens = &encoded.as_slice()[0..max_token_count]; let mut embeddings_for_prompt = Embeddings::new(self.dimensions()); - let embedding = self.rest_embedder.embed_tokens(tokens)?; + let embedding = self.rest_embedder.embed_tokens(tokens, deadline)?; embeddings_for_prompt.append(embedding.into_inner()).map_err(|got| { EmbedError::rest_unexpected_dimension(self.dimensions(), got.len()) })?; @@ -248,7 +258,7 @@ impl Embedder { ) -> Result>>, EmbedError> { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect() + text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), diff --git a/milli/src/vector/rest.rs b/milli/src/vector/rest.rs index 2538f2fff..0568ec67f 100644 --- a/milli/src/vector/rest.rs +++ b/milli/src/vector/rest.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::time::Instant; use deserr::Deserr; use rand::Rng; @@ -154,19 +155,31 @@ impl Embedder { Ok(Self { data, dimensions, distribution: options.distribution }) } - pub fn embed(&self, texts: Vec) -> Result>, EmbedError> { - embed(&self.data, texts.as_slice(), texts.len(), Some(self.dimensions)) + pub fn embed( + &self, + texts: Vec, + deadline: Option, + ) -> Result>, EmbedError> { + embed(&self.data, texts.as_slice(), texts.len(), Some(self.dimensions), deadline) } - pub fn embed_ref(&self, texts: &[S]) -> Result>, EmbedError> + pub fn embed_ref( + &self, + texts: &[S], + deadline: Option, + ) -> Result>, EmbedError> where S: AsRef + Serialize, { - embed(&self.data, texts, texts.len(), Some(self.dimensions)) + embed(&self.data, texts, texts.len(), Some(self.dimensions), deadline) } - pub fn embed_tokens(&self, tokens: &[usize]) -> Result, EmbedError> { - let mut embeddings = embed(&self.data, tokens, 1, Some(self.dimensions))?; + pub fn embed_tokens( + &self, + tokens: &[usize], + deadline: Option, + ) -> Result, EmbedError> { + let mut embeddings = embed(&self.data, tokens, 1, Some(self.dimensions), deadline)?; // unwrap: guaranteed that embeddings.len() == 1, otherwise the previous line terminated in error Ok(embeddings.pop().unwrap()) } @@ -178,7 +191,7 @@ impl Embedder { ) -> Result>>, EmbedError> { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect() + text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), @@ -207,7 +220,7 @@ impl Embedder { } fn infer_dimensions(data: &EmbedderData) -> Result { - let v = embed(data, ["test"].as_slice(), 1, None) + let v = embed(data, ["test"].as_slice(), 1, None, None) .map_err(NewEmbedderError::could_not_determine_dimension)?; // unwrap: guaranteed that v.len() == 1, otherwise the previous line terminated in error Ok(v.first().unwrap().dimension()) @@ -218,6 +231,7 @@ fn embed( inputs: &[S], expected_count: usize, expected_dimension: Option, + deadline: Option, ) -> Result>, EmbedError> where S: Serialize, @@ -245,7 +259,18 @@ where } Err(retry) => { tracing::warn!("Failed: {}", retry.error); - retry.into_duration(attempt) + if let Some(deadline) = deadline { + let now = std::time::Instant::now(); + if now > deadline { + tracing::warn!("Could not embed due to deadline"); + return Err(retry.into_error()); + } + + let duration_to_deadline = deadline - now; + retry.into_duration(attempt).map(|duration| duration.min(duration_to_deadline)) + } else { + retry.into_duration(attempt) + } } }?;