Compare commits

...

33 Commits

Author SHA1 Message Date
Clément Renault
4ea62f9235
Improve geo error handling 2024-11-13 14:15:42 +01:00
ManyTheFish
b311585280 Fix facet strings 2024-11-13 13:50:10 +01:00
Clément Renault
2f8ba6944f
Only register the _geo field 2024-11-13 12:05:27 +01:00
Clément Renault
379cedb096
Make the geo visible to the facet extractor 2024-11-13 11:47:58 +01:00
ManyTheFish
94cedd22a5
Add linear facet databases 2024-11-13 11:47:58 +01:00
Clément Renault
c78f931eb5
Fix some geo issues 2024-11-13 11:47:58 +01:00
Clément Renault
931c93d334
WIP 2024-11-13 11:47:30 +01:00
Clément Renault
2dd729ff43
Accept null as a valid geo value 2024-11-13 11:47:30 +01:00
Clément Renault
19864bcff3
Extract and pack the points to inserted them in the rtree 2024-11-13 11:47:30 +01:00
Clément Renault
f9e5a06699
Always collect the geopoint even when deleting one 2024-11-13 11:47:30 +01:00
Clément Renault
408a7cec6e
Finialize the GeoExtractor 2024-11-13 11:45:19 +01:00
Clément Renault
717a69dc6e
WIP 2024-11-13 11:45:18 +01:00
Clément Renault
bf88409075
WIP 2024-11-13 11:45:18 +01:00
Louis Dureuil
a01bc7b454
Fix error_document_field_limit_reached_in_one_document test 2024-11-13 10:34:54 +01:00
Louis Dureuil
7accfea624
Don't short circuit when we encounter a semantic error while extracting fields and external docid 2024-11-13 10:33:59 +01:00
Louis Dureuil
82dcaba6ca
Fix test: somehow on main vectors where displayed even though retrieveVectors: false 2024-11-12 23:58:25 +01:00
Louis Dureuil
cb1d6613dd
Adjust snapshots 2024-11-12 23:26:30 +01:00
Louis Dureuil
3b0cb5b487
Fix vector error messages 2024-11-12 23:26:16 +01:00
Louis Dureuil
bfdcd1cf33
Space changes 2024-11-12 22:52:45 +01:00
Louis Dureuil
1d13e804f7
Adjust test snapshots 2024-11-12 22:52:41 +01:00
Louis Dureuil
c4e9f761e9
Emit better error messages when parsing vectors 2024-11-12 22:49:22 +01:00
Louis Dureuil
8a6e61c77f
InvalidVectorsEmbedderConf error takes a String rather than a deserr error 2024-11-12 22:47:57 +01:00
Louis Dureuil
68bbf674c9
Make REST mock thread independent 2024-11-12 16:31:31 +01:00
Louis Dureuil
980921e078
Vector fixes 2024-11-12 16:31:22 +01:00
Louis Dureuil
1fcd5f091e
Remove progress from task 2024-11-12 12:23:13 +01:00
Louis Dureuil
6094bb299a
Fix user_provided vectors 2024-11-12 10:15:55 +01:00
Louis Dureuil
bef8fc6cf1
Fix hf embedder 2024-11-08 13:10:17 +01:00
Louis Dureuil
e32677999f
Adapt some snapshots 2024-11-08 00:06:33 +01:00
Louis Dureuil
5185aa21b8
Know if your vectors are implicit when writing them back in documents + don't write empty _vectors 2024-11-08 00:05:36 +01:00
Louis Dureuil
8a314ab81d
Fix primary key fid order 2024-11-08 00:05:12 +01:00
Louis Dureuil
4706a0eb49
Fix vector parsing 2024-11-07 23:26:20 +01:00
Louis Dureuil
d97af4d8e6
fix field order of JSON documents 2024-11-07 22:36:52 +01:00
ManyTheFish
1f5d801271 Fix crashes in facet search indexing 2024-11-07 17:22:30 +01:00
48 changed files with 1624 additions and 461 deletions

13
Cargo.lock generated
View File

@ -2623,6 +2623,7 @@ dependencies = [
"meilisearch-types",
"memmap2",
"page_size",
"raw-collections",
"rayon",
"roaring",
"serde",
@ -3538,6 +3539,7 @@ version = "1.11.0"
dependencies = [
"actix-web",
"anyhow",
"bumpalo",
"convert_case 0.6.0",
"csv",
"deserr",
@ -3550,6 +3552,7 @@ dependencies = [
"meili-snap",
"memmap2",
"milli",
"raw-collections",
"roaring",
"serde",
"serde-cs",
@ -3661,6 +3664,7 @@ dependencies = [
"time",
"tokenizers",
"tracing",
"uell",
"ureq",
"url",
"uuid",
@ -5789,6 +5793,15 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
[[package]]
name = "uell"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40de5982e28612e20330e77d81f1559b74f66caf3c7fc10b19ada4843f4b4fd7"
dependencies = [
"bumpalo",
]
[[package]]
name = "unescaper"
version = "0.1.5"

View File

@ -22,6 +22,7 @@ flate2 = "1.0.30"
meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
page_size = "0.6.0"
raw-collections = { git = "https://github.com/dureuill/raw-collections.git", version = "0.1.0" }
rayon = "1.10.0"
roaring = { version = "0.10.6", features = ["serde"] }
serde = { version = "1.0.204", features = ["derive"] }

View File

@ -43,6 +43,7 @@ use meilisearch_types::milli::{self, Filter};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
use raw_collections::RawMap;
use roaring::RoaringBitmap;
use time::macros::format_description;
use time::OffsetDateTime;
@ -1318,7 +1319,12 @@ impl IndexScheduler {
index,
&mut new_fields_ids_map,
primary_key.as_deref(),
first_document.as_ref(),
first_document
.map(|raw| RawMap::from_raw_value(raw, &indexer_alloc))
.transpose()
.map_err(|error| {
milli::Error::UserError(milli::UserError::SerdeJson(error))
})?,
)?
.map_err(milli::Error::from)?;

View File

@ -148,7 +148,6 @@ pub fn snapshot_task(task: &Task) -> String {
enqueued_at: _,
started_at: _,
finished_at: _,
progress: _,
error,
canceled_by,
details,

View File

@ -978,12 +978,7 @@ impl IndexScheduler {
Ok((
ret.map(|task| {
if processing.contains(task.uid) {
Task {
status: Status::Processing,
progress: progress.clone(),
started_at: Some(started_at),
..task
}
Task { status: Status::Processing, started_at: Some(started_at), ..task }
} else {
task
}
@ -1025,7 +1020,6 @@ impl IndexScheduler {
enqueued_at: OffsetDateTime::now_utc(),
started_at: None,
finished_at: None,
progress: None,
error: None,
canceled_by: None,
details: kind.default_details(),
@ -1606,8 +1600,6 @@ impl<'a> Dump<'a> {
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,
/// FIXME: should we update dump to contain progress information? 🤔
progress: None,
error: task.error,
canceled_by: task.canceled_by,
details: task.details,

View File

@ -1,5 +1,5 @@
---
source: index-scheduler/src/lib.rs
source: crates/index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
@ -22,7 +22,7 @@ succeeded [0,1,]
doggos [0,1,2,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 1, field_distribution: {"_vectors": 1, "breed": 1, "doggo": 1, "id": 1} }
doggos: { number_of_documents: 1, field_distribution: {"breed": 1, "doggo": 1, "id": 1} }
----------------------------------------------------------------------
### Canceled By:

View File

@ -1,5 +1,5 @@
---
source: index-scheduler/src/lib.rs
source: crates/index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
@ -21,7 +21,7 @@ succeeded [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 1, field_distribution: {"_vectors": 1, "breed": 1, "doggo": 1, "id": 1} }
doggos: { number_of_documents: 1, field_distribution: {"breed": 1, "doggo": 1, "id": 1} }
----------------------------------------------------------------------
### Canceled By:

View File

@ -345,8 +345,6 @@ impl IndexScheduler {
enqueued_at,
started_at,
finished_at,
/// FIXME: assert something here? ask tamo 🤔
progress: _,
error: _,
canceled_by,
details,

View File

@ -13,6 +13,7 @@ license.workspace = true
[dependencies]
actix-web = { version = "4.8.0", default-features = false }
anyhow = "1.0.86"
bumpalo = "3.16.0"
convert_case = "0.6.0"
csv = "1.3.0"
deserr = { version = "0.6.2", features = ["actix-web"] }
@ -23,6 +24,7 @@ flate2 = "1.0.30"
fst = "0.4.7"
memmap2 = "0.9.4"
milli = { path = "../milli" }
raw-collections = { git = "https://github.com/dureuill/raw-collections.git", version = "0.1.0" }
roaring = { version = "0.10.6", features = ["serde"] }
serde = { version = "1.0.204", features = ["derive"] }
serde-cs = "0.2.4"
@ -70,4 +72,3 @@ swedish-recomposition = ["milli/swedish-recomposition"]
german = ["milli/german"]
# allow turkish normalization
turkish = ["milli/turkish"]

View File

@ -3,13 +3,16 @@ use std::fs::File;
use std::io::{self, BufWriter};
use std::marker::PhantomData;
use bumpalo::Bump;
use memmap2::Mmap;
use milli::documents::Error;
use milli::update::new::TopLevelMap;
use milli::Object;
use raw_collections::RawMap;
use serde::de::{SeqAccess, Visitor};
use serde::{Deserialize, Deserializer};
use serde_json::error::Category;
use serde_json::value::RawValue;
use serde_json::{to_writer, Map, Value};
use crate::error::{Code, ErrorCode};
@ -213,10 +216,15 @@ pub fn read_json(input: &File, output: impl io::Write) -> Result<u64> {
// We memory map to be able to deserailize into a TopLevelMap<'pl> that
// does not allocate when possible and only materialize the first/top level.
let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? };
let mut doc_alloc = Bump::with_capacity(1024 * 1024 * 1024); // 1MiB
let mut out = BufWriter::new(output);
let mut deserializer = serde_json::Deserializer::from_slice(&input);
let count = match array_each(&mut deserializer, |obj: TopLevelMap| to_writer(&mut out, &obj)) {
let count = match array_each(&mut deserializer, |obj: &RawValue| {
doc_alloc.reset();
let map = RawMap::from_raw_value(obj, &doc_alloc)?;
to_writer(&mut out, &map)
}) {
// The json data has been deserialized and does not need to be processed again.
// The data has been transferred to the writer during the deserialization process.
Ok(Ok(count)) => count,

View File

@ -4,9 +4,7 @@ use time::{Duration, OffsetDateTime};
use crate::error::ResponseError;
use crate::settings::{Settings, Unchecked};
use crate::tasks::{
serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId, TaskProgress,
};
use crate::tasks::{serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId};
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
@ -29,8 +27,6 @@ pub struct TaskView {
pub started_at: Option<OffsetDateTime>,
#[serde(with = "time::serde::rfc3339::option", default)]
pub finished_at: Option<OffsetDateTime>,
#[serde(skip_serializing_if = "Option::is_none")]
pub progress: Option<TaskProgress>,
}
impl TaskView {
@ -47,7 +43,6 @@ impl TaskView {
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,
progress: task.progress.clone(),
}
}
}

View File

@ -31,8 +31,6 @@ pub struct Task {
#[serde(with = "time::serde::rfc3339::option")]
pub finished_at: Option<OffsetDateTime>,
pub progress: Option<TaskProgress>,
pub error: Option<ResponseError>,
pub canceled_by: Option<TaskId>,
pub details: Option<Details>,

View File

@ -1335,7 +1335,6 @@ async fn error_add_documents_missing_document_id() {
}
#[actix_rt::test]
#[should_panic]
async fn error_document_field_limit_reached_in_one_document() {
let server = Server::new().await;
let index = server.index("test");
@ -1352,7 +1351,7 @@ async fn error_document_field_limit_reached_in_one_document() {
let documents = json!([big_object]);
let (response, code) = index.update_documents(documents, Some("id")).await;
snapshot!(code, @"500 Internal Server Error");
snapshot!(code, @"202 Accepted");
let response = index.wait_task(response.uid()).await;
snapshot!(code, @"202 Accepted");
@ -1360,16 +1359,21 @@ async fn error_document_field_limit_reached_in_one_document() {
snapshot!(response,
@r###"
{
"uid": 1,
"uid": "[uid]",
"indexUid": "test",
"status": "succeeded",
"status": "failed",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
"indexedDocuments": 0
},
"error": {
"message": "A document cannot contain more than 65,535 fields.",
"code": "max_fields_limit_exceeded",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#max_fields_limit_exceeded"
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
@ -1660,7 +1664,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "The `_geo` field in the document with the id: `11` is not an object. Was expecting an object with the `_geo.lat` and `_geo.lng` fields but instead got `\"foobar\"`.",
"message": "The `_geo` field in the document with the id: `\"11\"` is not an object. Was expecting an object with the `_geo.lat` and `_geo.lng` fields but instead got `\"foobar\"`.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1697,7 +1701,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not find latitude nor longitude in the document with the id: `11`. Was expecting `_geo.lat` and `_geo.lng` fields.",
"message": "Could not find latitude nor longitude in the document with the id: `\"11\"`. Was expecting `_geo.lat` and `_geo.lng` fields.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1734,7 +1738,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not find latitude nor longitude in the document with the id: `11`. Was expecting `_geo.lat` and `_geo.lng` fields.",
"message": "Could not find latitude nor longitude in the document with the id: `\"11\"`. Was expecting `_geo.lat` and `_geo.lng` fields.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1771,7 +1775,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not find longitude in the document with the id: `11`. Was expecting a `_geo.lng` field.",
"message": "Could not find longitude in the document with the id: `\"11\"`. Was expecting a `_geo.lng` field.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1808,7 +1812,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not find latitude in the document with the id: `11`. Was expecting a `_geo.lat` field.",
"message": "Could not find latitude in the document with the id: `\"11\"`. Was expecting a `_geo.lat` field.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1845,7 +1849,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not find longitude in the document with the id: `11`. Was expecting a `_geo.lng` field.",
"message": "Could not find longitude in the document with the id: `\"11\"`. Was expecting a `_geo.lng` field.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1882,7 +1886,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not find latitude in the document with the id: `11`. Was expecting a `_geo.lat` field.",
"message": "Could not find latitude in the document with the id: `\"11\"`. Was expecting a `_geo.lat` field.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1919,7 +1923,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not parse latitude nor longitude in the document with the id: `11`. Was expecting finite numbers but instead got `false` and `true`.",
"message": "Could not parse latitude nor longitude in the document with the id: `\"11\"`. Was expecting finite numbers but instead got `false` and `true`.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1956,7 +1960,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not find longitude in the document with the id: `11`. Was expecting a `_geo.lng` field.",
"message": "Could not find longitude in the document with the id: `\"11\"`. Was expecting a `_geo.lng` field.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -1993,7 +1997,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not find latitude in the document with the id: `11`. Was expecting a `_geo.lat` field.",
"message": "Could not find latitude in the document with the id: `\"11\"`. Was expecting a `_geo.lat` field.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2030,7 +2034,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not parse latitude nor longitude in the document with the id: `11`. Was expecting finite numbers but instead got `\"doggo\"` and `\"doggo\"`.",
"message": "Could not parse latitude nor longitude in the document with the id: `\"11\"`. Was expecting finite numbers but instead got `\"doggo\"` and `\"doggo\"`.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2067,7 +2071,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "The `_geo` field in the document with the id: `11` contains the following unexpected fields: `{\"doggo\":\"are the best\"}`.",
"message": "The `_geo` field in the document with the id: `\"11\"` contains the following unexpected fields: `{\"doggo\":\"are the best\"}`.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2105,7 +2109,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not parse longitude in the document with the id: `12`. Was expecting a finite number but instead got `null`.",
"message": "Could not parse longitude in the document with the id: `\"12\"`. Was expecting a finite number but instead got `null`.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2141,7 +2145,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not parse latitude in the document with the id: `12`. Was expecting a finite number but instead got `null`.",
"message": "Could not parse latitude in the document with the id: `\"12\"`. Was expecting a finite number but instead got `null`.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2177,7 +2181,7 @@ async fn add_documents_invalid_geo_field() {
"indexedDocuments": 0
},
"error": {
"message": "Could not parse latitude nor longitude in the document with the id: `13`. Was expecting finite numbers but instead got `null` and `null`.",
"message": "Could not parse latitude nor longitude in the document with the id: `\"13\"`. Was expecting finite numbers but instead got `null` and `null`.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
@ -2197,7 +2201,7 @@ async fn add_invalid_geo_and_then_settings() {
let index = server.index("test");
index.create(Some("id")).await;
// _geo is not an object
// _geo is not a correct object
let documents = json!([
{
"id": "11",
@ -2226,7 +2230,7 @@ async fn add_invalid_geo_and_then_settings() {
}
"###);
let (ret, code) = index.update_settings(json!({"sortableAttributes": ["_geo"]})).await;
let (ret, code) = index.update_settings(json!({ "sortableAttributes": ["_geo"] })).await;
snapshot!(code, @"202 Accepted");
let ret = index.wait_task(ret.uid()).await;
snapshot!(ret, @r###"

View File

@ -750,9 +750,9 @@ async fn test_score_details() {
],
"_vectors": {
"manual": [
-100.0,
231.0,
32.0
-100,
231,
32
]
},
"_rankingScoreDetails": {
@ -1543,9 +1543,9 @@ async fn simple_search_with_strange_synonyms() {
],
"_vectors": {
"manual": [
-100.0,
231.0,
32.0
-100,
231,
32
]
}
}
@ -1568,9 +1568,9 @@ async fn simple_search_with_strange_synonyms() {
],
"_vectors": {
"manual": [
-100.0,
231.0,
32.0
-100,
231,
32
]
}
}
@ -1593,9 +1593,9 @@ async fn simple_search_with_strange_synonyms() {
],
"_vectors": {
"manual": [
-100.0,
231.0,
32.0
-100,
231,
32
]
}
}

View File

@ -113,9 +113,9 @@ async fn simple_search_single_index() {
],
"_vectors": {
"manual": [
-100.0,
340.0,
90.0
-100,
340,
90
]
}
}
@ -138,9 +138,9 @@ async fn simple_search_single_index() {
],
"_vectors": {
"manual": [
1.0,
2.0,
54.0
1,
2,
54
]
}
}
@ -182,9 +182,9 @@ async fn federation_single_search_single_index() {
],
"_vectors": {
"manual": [
-100.0,
340.0,
90.0
-100,
340,
90
]
},
"_federation": {
@ -305,9 +305,9 @@ async fn federation_two_search_single_index() {
],
"_vectors": {
"manual": [
-100.0,
340.0,
90.0
-100,
340,
90
]
},
"_federation": {
@ -325,9 +325,9 @@ async fn federation_two_search_single_index() {
],
"_vectors": {
"manual": [
1.0,
2.0,
54.0
1,
2,
54
]
},
"_federation": {
@ -480,9 +480,9 @@ async fn simple_search_two_indexes() {
],
"_vectors": {
"manual": [
-100.0,
340.0,
90.0
-100,
340,
90
]
}
}
@ -513,9 +513,9 @@ async fn simple_search_two_indexes() {
"cattos": "pésti",
"_vectors": {
"manual": [
1.0,
2.0,
3.0
1,
2,
3
]
}
},
@ -535,9 +535,9 @@ async fn simple_search_two_indexes() {
],
"_vectors": {
"manual": [
1.0,
2.0,
54.0
1,
2,
54
]
}
}
@ -1393,9 +1393,9 @@ async fn federation_sort_same_indexes_same_criterion_same_direction() {
"cattos": "pésti",
"_vectors": {
"manual": [
1.0,
2.0,
3.0
1,
2,
3
]
},
"_federation": {
@ -1414,9 +1414,9 @@ async fn federation_sort_same_indexes_same_criterion_same_direction() {
],
"_vectors": {
"manual": [
10.0,
23.0,
32.0
10,
23,
32
]
},
"_federation": {
@ -1442,9 +1442,9 @@ async fn federation_sort_same_indexes_same_criterion_same_direction() {
],
"_vectors": {
"manual": [
1.0,
2.0,
54.0
1,
2,
54
]
},
"_federation": {
@ -1474,9 +1474,9 @@ async fn federation_sort_same_indexes_same_criterion_same_direction() {
],
"_vectors": {
"manual": [
10.0,
23.0,
32.0
10,
23,
32
]
},
"_federation": {
@ -1716,9 +1716,9 @@ async fn federation_sort_same_indexes_different_criterion_same_direction() {
"cattos": "pésti",
"_vectors": {
"manual": [
1.0,
2.0,
3.0
1,
2,
3
]
},
"_federation": {
@ -1748,9 +1748,9 @@ async fn federation_sort_same_indexes_different_criterion_same_direction() {
],
"_vectors": {
"manual": [
10.0,
23.0,
32.0
10,
23,
32
]
},
"_federation": {
@ -1769,9 +1769,9 @@ async fn federation_sort_same_indexes_different_criterion_same_direction() {
],
"_vectors": {
"manual": [
10.0,
23.0,
32.0
10,
23,
32
]
},
"_federation": {
@ -1797,9 +1797,9 @@ async fn federation_sort_same_indexes_different_criterion_same_direction() {
],
"_vectors": {
"manual": [
1.0,
2.0,
54.0
1,
2,
54
]
},
"_federation": {
@ -2103,9 +2103,9 @@ async fn federation_sort_different_indexes_same_criterion_same_direction() {
],
"_vectors": {
"manual": [
1.0,
2.0,
54.0
1,
2,
54
]
},
"_federation": {
@ -2124,9 +2124,9 @@ async fn federation_sort_different_indexes_same_criterion_same_direction() {
],
"_vectors": {
"manual": [
10.0,
-23.0,
32.0
10,
-23,
32
]
},
"_federation": {
@ -2145,9 +2145,9 @@ async fn federation_sort_different_indexes_same_criterion_same_direction() {
],
"_vectors": {
"manual": [
-100.0,
340.0,
90.0
-100,
340,
90
]
},
"_federation": {
@ -2166,9 +2166,9 @@ async fn federation_sort_different_indexes_same_criterion_same_direction() {
],
"_vectors": {
"manual": [
-100.0,
231.0,
32.0
-100,
231,
32
]
},
"_federation": {
@ -2187,9 +2187,9 @@ async fn federation_sort_different_indexes_same_criterion_same_direction() {
],
"_vectors": {
"manual": [
1.0,
2.0,
3.0
1,
2,
3
]
},
"_federation": {
@ -2228,9 +2228,9 @@ async fn federation_sort_different_indexes_same_criterion_same_direction() {
],
"_vectors": {
"manual": [
1.0,
2.0,
54.0
1,
2,
54
]
},
"_federation": {
@ -2415,9 +2415,9 @@ async fn federation_sort_different_ranking_rules() {
],
"_vectors": {
"manual": [
1.0,
2.0,
54.0
1,
2,
54
]
},
"_federation": {
@ -2436,9 +2436,9 @@ async fn federation_sort_different_ranking_rules() {
],
"_vectors": {
"manual": [
10.0,
-23.0,
32.0
10,
-23,
32
]
},
"_federation": {
@ -2457,9 +2457,9 @@ async fn federation_sort_different_ranking_rules() {
],
"_vectors": {
"manual": [
-100.0,
340.0,
90.0
-100,
340,
90
]
},
"_federation": {
@ -2478,9 +2478,9 @@ async fn federation_sort_different_ranking_rules() {
],
"_vectors": {
"manual": [
-100.0,
231.0,
32.0
-100,
231,
32
]
},
"_federation": {
@ -2499,9 +2499,9 @@ async fn federation_sort_different_ranking_rules() {
],
"_vectors": {
"manual": [
1.0,
2.0,
3.0
1,
2,
3
]
},
"_federation": {
@ -2716,9 +2716,9 @@ async fn federation_sort_different_indexes_different_criterion_same_direction()
],
"_vectors": {
"manual": [
1.0,
2.0,
54.0
1,
2,
54
]
},
"_federation": {
@ -2757,9 +2757,9 @@ async fn federation_sort_different_indexes_different_criterion_same_direction()
],
"_vectors": {
"manual": [
10.0,
-23.0,
32.0
10,
-23,
32
]
},
"_federation": {
@ -2778,9 +2778,9 @@ async fn federation_sort_different_indexes_different_criterion_same_direction()
],
"_vectors": {
"manual": [
-100.0,
340.0,
90.0
-100,
340,
90
]
},
"_federation": {
@ -2799,9 +2799,9 @@ async fn federation_sort_different_indexes_different_criterion_same_direction()
],
"_vectors": {
"manual": [
-100.0,
231.0,
32.0
-100,
231,
32
]
},
"_federation": {
@ -2820,9 +2820,9 @@ async fn federation_sort_different_indexes_different_criterion_same_direction()
],
"_vectors": {
"manual": [
1.0,
2.0,
3.0
1,
2,
3
]
},
"_federation": {
@ -2881,9 +2881,9 @@ async fn federation_sort_different_indexes_different_criterion_same_direction()
],
"_vectors": {
"manual": [
1.0,
2.0,
54.0
1,
2,
54
]
},
"_federation": {
@ -4346,10 +4346,10 @@ async fn federation_vector_two_indexes() {
let (response, code) = server
.multi_search(json!({"federation": {}, "queries": [
{"indexUid" : "vectors-animal", "vector": [1.0, 0.0, 0.5], "hybrid": {"semanticRatio": 1.0, "embedder": "animal"}},
{"indexUid" : "vectors-animal", "vector": [1.0, 0.0, 0.5], "hybrid": {"semanticRatio": 1.0, "embedder": "animal"}, "retrieveVectors": true},
// joyful and energetic first
{"indexUid": "vectors-sentiment", "vector": [0.8, 0.6], "hybrid": {"semanticRatio": 1.0, "embedder": "sentiment"}},
{"indexUid": "vectors-sentiment", "q": "dog"},
{"indexUid": "vectors-sentiment", "vector": [0.8, 0.6], "hybrid": {"semanticRatio": 1.0, "embedder": "sentiment"}, "retrieveVectors": true},
{"indexUid": "vectors-sentiment", "q": "dog", "retrieveVectors": true},
]}))
.await;
snapshot!(code, @"200 OK");
@ -4364,7 +4364,16 @@ async fn federation_vector_two_indexes() {
0.8,
0.09,
0.8
]
],
"sentiment": {
"embeddings": [
[
0.800000011920929,
0.30000001192092896
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-sentiment",
@ -4379,7 +4388,17 @@ async fn federation_vector_two_indexes() {
"sentiment": [
0.8,
0.3
]
],
"animal": {
"embeddings": [
[
0.800000011920929,
0.09000000357627869,
0.800000011920929
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-animal",
@ -4394,7 +4413,17 @@ async fn federation_vector_two_indexes() {
"sentiment": [
-1.0,
0.1
]
],
"animal": {
"embeddings": [
[
0.8500000238418579,
0.019999999552965164,
0.10000000149011612
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-animal",
@ -4410,7 +4439,16 @@ async fn federation_vector_two_indexes() {
0.9,
0.8,
0.05
]
],
"sentiment": {
"embeddings": [
[
-0.10000000149011612,
0.550000011920929
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-sentiment",
@ -4426,7 +4464,16 @@ async fn federation_vector_two_indexes() {
0.85,
0.02,
0.1
]
],
"sentiment": {
"embeddings": [
[
-1.0,
0.10000000149011612
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-sentiment",
@ -4441,7 +4488,17 @@ async fn federation_vector_two_indexes() {
"sentiment": [
-0.2,
0.65
]
],
"animal": {
"embeddings": [
[
0.800000011920929,
0.8999999761581421,
0.5
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-animal",
@ -4456,7 +4513,17 @@ async fn federation_vector_two_indexes() {
"sentiment": [
-0.1,
0.55
]
],
"animal": {
"embeddings": [
[
0.8999999761581421,
0.800000011920929,
0.05000000074505806
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-animal",
@ -4472,7 +4539,16 @@ async fn federation_vector_two_indexes() {
0.8,
0.9,
0.5
]
],
"sentiment": {
"embeddings": [
[
-0.20000000298023224,
0.6499999761581421
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-sentiment",
@ -4492,8 +4568,8 @@ async fn federation_vector_two_indexes() {
// hybrid search, distinct embedder
let (response, code) = server
.multi_search(json!({"federation": {}, "queries": [
{"indexUid" : "vectors-animal", "vector": [1.0, 0.0, 0.5], "hybrid": {"semanticRatio": 1.0, "embedder": "animal"}, "showRankingScore": true},
{"indexUid": "vectors-sentiment", "vector": [-1, 0.6], "q": "beagle", "hybrid": {"semanticRatio": 1.0, "embedder": "sentiment"}, "showRankingScore": true},
{"indexUid" : "vectors-animal", "vector": [1.0, 0.0, 0.5], "hybrid": {"semanticRatio": 1.0, "embedder": "animal"}, "showRankingScore": true, "retrieveVectors": true},
{"indexUid": "vectors-sentiment", "vector": [-1, 0.6], "q": "beagle", "hybrid": {"semanticRatio": 1.0, "embedder": "sentiment"}, "showRankingScore": true, "retrieveVectors": true,},
]}))
.await;
snapshot!(code, @"200 OK");
@ -4507,7 +4583,17 @@ async fn federation_vector_two_indexes() {
"sentiment": [
0.8,
0.3
]
],
"animal": {
"embeddings": [
[
0.800000011920929,
0.09000000357627869,
0.800000011920929
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-animal",
@ -4523,7 +4609,17 @@ async fn federation_vector_two_indexes() {
"sentiment": [
-1.0,
0.1
]
],
"animal": {
"embeddings": [
[
0.8500000238418579,
0.019999999552965164,
0.10000000149011612
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-animal",
@ -4540,7 +4636,16 @@ async fn federation_vector_two_indexes() {
0.85,
0.02,
0.1
]
],
"sentiment": {
"embeddings": [
[
-1.0,
0.10000000149011612
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-sentiment",
@ -4557,7 +4662,16 @@ async fn federation_vector_two_indexes() {
0.8,
0.9,
0.5
]
],
"sentiment": {
"embeddings": [
[
-0.20000000298023224,
0.6499999761581421
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-sentiment",
@ -4573,7 +4687,17 @@ async fn federation_vector_two_indexes() {
"sentiment": [
-0.2,
0.65
]
],
"animal": {
"embeddings": [
[
0.800000011920929,
0.8999999761581421,
0.5
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-animal",
@ -4589,7 +4713,17 @@ async fn federation_vector_two_indexes() {
"sentiment": [
-0.1,
0.55
]
],
"animal": {
"embeddings": [
[
0.8999999761581421,
0.800000011920929,
0.05000000074505806
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-animal",
@ -4606,7 +4740,16 @@ async fn federation_vector_two_indexes() {
0.9,
0.8,
0.05
]
],
"sentiment": {
"embeddings": [
[
-0.10000000149011612,
0.550000011920929
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-sentiment",
@ -4623,7 +4766,16 @@ async fn federation_vector_two_indexes() {
0.8,
0.09,
0.8
]
],
"sentiment": {
"embeddings": [
[
0.800000011920929,
0.30000001192092896
]
],
"regenerate": false
}
},
"_federation": {
"indexUid": "vectors-sentiment",

View File

@ -249,7 +249,7 @@ async fn user_provided_embeddings_error() {
"indexedDocuments": 0
},
"error": {
"message": "Bad embedder configuration in the document with id: `\"0\"`. Missing field `regenerate` inside `.manual`",
"message": "Bad embedder configuration in the document with id: `0`. Missing field `._vectors.manual.regenerate`\n - note: `._vectors.manual` must be an array of floats, an array of arrays of floats, or an object with field `regenerate`",
"code": "invalid_vectors_type",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vectors_type"
@ -278,7 +278,7 @@ async fn user_provided_embeddings_error() {
"indexedDocuments": 0
},
"error": {
"message": "Bad embedder configuration in the document with id: `\"0\"`. Missing field `regenerate` inside `.manual`",
"message": "Bad embedder configuration in the document with id: `0`. Missing field `._vectors.manual.regenerate`\n - note: `._vectors.manual` must be an array of floats, an array of arrays of floats, or an object with field `regenerate`",
"code": "invalid_vectors_type",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vectors_type"
@ -308,7 +308,7 @@ async fn user_provided_embeddings_error() {
"indexedDocuments": 0
},
"error": {
"message": "Bad embedder configuration in the document with id: `\"0\"`. Invalid value type at `.manual.regenerate`: expected a boolean, but found a string: `\"yes please\"`",
"message": "Bad embedder configuration in the document with id: `0`. Could not parse `._vectors.manual.regenerate`: invalid type: string \"yes please\", expected a boolean at line 1 column 26",
"code": "invalid_vectors_type",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vectors_type"
@ -320,8 +320,7 @@ async fn user_provided_embeddings_error() {
}
"###);
let documents =
json!({"id": 0, "name": "kefir", "_vectors": { "manual": { "embeddings": true }}});
let documents = json!({"id": 0, "name": "kefir", "_vectors": { "manual": { "embeddings": true, "regenerate": true }}});
let (value, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted");
let task = index.wait_task(value.uid()).await;
@ -337,7 +336,7 @@ async fn user_provided_embeddings_error() {
"indexedDocuments": 0
},
"error": {
"message": "Bad embedder configuration in the document with id: `\"0\"`. Invalid value type at `.manual.embeddings`: expected null or an array, but found a boolean: `true`",
"message": "Bad embedder configuration in the document with id: `0`. Invalid value type at `._vectors.manual.embeddings`: expected null or an array, but found a boolean: `true`",
"code": "invalid_vectors_type",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vectors_type"
@ -349,8 +348,7 @@ async fn user_provided_embeddings_error() {
}
"###);
let documents =
json!({"id": 0, "name": "kefir", "_vectors": { "manual": { "embeddings": [true] }}});
let documents = json!({"id": 0, "name": "kefir", "_vectors": { "manual": { "embeddings": [true], "regenerate": true }}});
let (value, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted");
let task = index.wait_task(value.uid()).await;
@ -366,7 +364,7 @@ async fn user_provided_embeddings_error() {
"indexedDocuments": 0
},
"error": {
"message": "Bad embedder configuration in the document with id: `\"0\"`. Invalid value type at `.manual.embeddings[0]`: expected a number or an array, but found a boolean: `true`",
"message": "Bad embedder configuration in the document with id: `0`. Invalid value type at `._vectors.manual.embeddings[0]`: expected a number or an array, but found a boolean: `true`",
"code": "invalid_vectors_type",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vectors_type"
@ -378,8 +376,7 @@ async fn user_provided_embeddings_error() {
}
"###);
let documents =
json!({"id": 0, "name": "kefir", "_vectors": { "manual": { "embeddings": [[true]] }}});
let documents = json!({"id": 0, "name": "kefir", "_vectors": { "manual": { "embeddings": [[true]], "regenerate": false }}});
let (value, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted");
let task = index.wait_task(value.uid()).await;
@ -395,7 +392,7 @@ async fn user_provided_embeddings_error() {
"indexedDocuments": 0
},
"error": {
"message": "Bad embedder configuration in the document with id: `\"0\"`. Invalid value type at `.manual.embeddings[0][0]`: expected a number, but found a boolean: `true`",
"message": "Bad embedder configuration in the document with id: `0`. Invalid value type at `._vectors.manual.embeddings[0][0]`: expected a number, but found a boolean: `true`",
"code": "invalid_vectors_type",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vectors_type"
@ -436,7 +433,7 @@ async fn user_provided_embeddings_error() {
"indexedDocuments": 0
},
"error": {
"message": "Bad embedder configuration in the document with id: `\"0\"`. Invalid value type at `.manual.embeddings[1]`: expected a number, but found an array: `[0.2,0.3]`",
"message": "Bad embedder configuration in the document with id: `0`. Invalid value type at `._vectors.manual.embeddings[1]`: expected a number, but found an array: `[0.2,0.3]`",
"code": "invalid_vectors_type",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vectors_type"
@ -464,7 +461,7 @@ async fn user_provided_embeddings_error() {
"indexedDocuments": 0
},
"error": {
"message": "Bad embedder configuration in the document with id: `\"0\"`. Invalid value type at `.manual.embeddings[1]`: expected an array, but found a number: `0.3`",
"message": "Bad embedder configuration in the document with id: `0`. Invalid value type at `._vectors.manual.embeddings[1]`: expected an array, but found a number: `0.3`",
"code": "invalid_vectors_type",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vectors_type"
@ -492,7 +489,7 @@ async fn user_provided_embeddings_error() {
"indexedDocuments": 0
},
"error": {
"message": "Bad embedder configuration in the document with id: `\"0\"`. Invalid value type at `.manual.embeddings[0][1]`: expected a number, but found a boolean: `true`",
"message": "Bad embedder configuration in the document with id: `0`. Invalid value type at `._vectors.manual.embeddings[0][1]`: expected a number, but found a boolean: `true`",
"code": "invalid_vectors_type",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_vectors_type"
@ -532,7 +529,7 @@ async fn user_provided_vectors_error() {
"indexedDocuments": 0
},
"error": {
"message": "While embedding documents for embedder `manual`: no vectors provided for document \"40\" and at least 4 other document(s)\n- Note: `manual` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.manual`.\n- Hint: opt-out for a document with `_vectors.manual: null`",
"message": "While embedding documents for embedder `manual`: no vectors provided for document `40` and at least 4 other document(s)\n- Note: `manual` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.manual`.\n- Hint: opt-out for a document with `_vectors.manual: null`",
"code": "vector_embedding_error",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#vector_embedding_error"
@ -561,7 +558,7 @@ async fn user_provided_vectors_error() {
"indexedDocuments": 0
},
"error": {
"message": "While embedding documents for embedder `manual`: no vectors provided for document \"42\"\n- Note: `manual` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.manual`.\n- Hint: try replacing `_vector` by `_vectors` in 1 document(s).",
"message": "While embedding documents for embedder `manual`: no vectors provided for document `42`\n- Note: `manual` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.manual`.\n- Hint: try replacing `_vector` by `_vectors` in 1 document(s).",
"code": "vector_embedding_error",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#vector_embedding_error"
@ -590,7 +587,7 @@ async fn user_provided_vectors_error() {
"indexedDocuments": 0
},
"error": {
"message": "While embedding documents for embedder `manual`: no vectors provided for document \"42\"\n- Note: `manual` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.manual`.\n- Hint: try replacing `_vectors.manaul` by `_vectors.manual` in 1 document(s).",
"message": "While embedding documents for embedder `manual`: no vectors provided for document `42`\n- Note: `manual` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.manual`.\n- Hint: try replacing `_vectors.manaul` by `_vectors.manual` in 1 document(s).",
"code": "vector_embedding_error",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#vector_embedding_error"

View File

@ -1,5 +1,4 @@
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use meili_snap::{json_string, snapshot};
use reqwest::IntoUrl;
@ -13,13 +12,22 @@ use crate::vector::{get_server_vector, GetAllDocumentsOptions};
async fn create_mock() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let counter = AtomicUsize::new(0);
let text_to_embedding: BTreeMap<_, _> = vec![
// text -> embedding
("kefir", [0.0, 0.0, 0.0]),
("intel", [1.0, 1.0, 1.0]),
]
// turn into btree
.into_iter()
.collect();
Mock::given(method("POST"))
.and(path("/"))
.respond_with(move |_req: &Request| {
let counter = counter.fetch_add(1, Ordering::Relaxed);
ResponseTemplate::new(200).set_body_json(json!({ "data": vec![counter; 3] }))
.respond_with(move |req: &Request| {
let text: String = req.body_json().unwrap();
ResponseTemplate::new(200).set_body_json(
json!({ "data": text_to_embedding.get(text.as_str()).unwrap_or(&[99., 99., 99.]) }),
)
})
.mount(&mock_server)
.await;
@ -32,13 +40,14 @@ async fn create_mock() -> (MockServer, Value) {
"request": "{{text}}",
"response": {
"data": "{{embedding}}"
}
},
"documentTemplate": "{{doc.name}}",
});
(mock_server, embedder_settings)
}
async fn create_mock_map() -> (MockServer, Value) {
async fn create_mock_default_template() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let text_to_embedding: BTreeMap<_, _> = vec![
@ -97,7 +106,14 @@ struct SingleResponse {
async fn create_mock_multiple() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let counter = AtomicUsize::new(0);
let text_to_embedding: BTreeMap<_, _> = vec![
// text -> embedding
("kefir", [0.0, 0.0, 0.0]),
("intel", [1.0, 1.0, 1.0]),
]
// turn into btree
.into_iter()
.collect();
Mock::given(method("POST"))
.and(path("/"))
@ -115,8 +131,11 @@ async fn create_mock_multiple() -> (MockServer, Value) {
.input
.into_iter()
.map(|text| SingleResponse {
embedding: text_to_embedding
.get(text.as_str())
.unwrap_or(&[99., 99., 99.])
.to_vec(),
text,
embedding: vec![counter.fetch_add(1, Ordering::Relaxed) as f32; 3],
})
.collect();
@ -142,7 +161,8 @@ async fn create_mock_multiple() -> (MockServer, Value) {
},
"{{..}}"
]
}
},
"documentTemplate": "{{doc.name}}"
});
(mock_server, embedder_settings)
@ -156,7 +176,14 @@ struct SingleRequest {
async fn create_mock_single_response_in_array() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let counter = AtomicUsize::new(0);
let text_to_embedding: BTreeMap<_, _> = vec![
// text -> embedding
("kefir", [0.0, 0.0, 0.0]),
("intel", [1.0, 1.0, 1.0]),
]
// turn into btree
.into_iter()
.collect();
Mock::given(method("POST"))
.and(path("/"))
@ -171,8 +198,11 @@ async fn create_mock_single_response_in_array() -> (MockServer, Value) {
};
let output = vec![SingleResponse {
embedding: text_to_embedding
.get(req.input.as_str())
.unwrap_or(&[99., 99., 99.])
.to_vec(),
text: req.input,
embedding: vec![counter.fetch_add(1, Ordering::Relaxed) as f32; 3],
}];
let response = MultipleResponse { output };
@ -196,7 +226,8 @@ async fn create_mock_single_response_in_array() -> (MockServer, Value) {
"embedding": "{{embedding}}"
}
]
}
},
"documentTemplate": "{{doc.name}}"
});
(mock_server, embedder_settings)
@ -205,7 +236,14 @@ async fn create_mock_single_response_in_array() -> (MockServer, Value) {
async fn create_mock_raw_with_custom_header() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let counter = AtomicUsize::new(0);
let text_to_embedding: BTreeMap<_, _> = vec![
// text -> embedding
("kefir", [0.0, 0.0, 0.0]),
("intel", [1.0, 1.0, 1.0]),
]
// turn into btree
.into_iter()
.collect();
Mock::given(method("POST"))
.and(path("/"))
@ -223,7 +261,7 @@ async fn create_mock_raw_with_custom_header() -> (MockServer, Value) {
}
}
let _req: String = match req.body_json() {
let req: String = match req.body_json() {
Ok(req) => req,
Err(error) => {
return ResponseTemplate::new(400).set_body_json(json!({
@ -232,7 +270,7 @@ async fn create_mock_raw_with_custom_header() -> (MockServer, Value) {
}
};
let output = vec![counter.fetch_add(1, Ordering::Relaxed) as f32; 3];
let output = text_to_embedding.get(req.as_str()).unwrap_or(&[99., 99., 99.]).to_vec();
ResponseTemplate::new(200).set_body_json(output)
})
@ -245,7 +283,8 @@ async fn create_mock_raw_with_custom_header() -> (MockServer, Value) {
"url": url,
"request": "{{text}}",
"response": "{{embedding}}",
"headers": {"my-nonstandard-auth": "bearer of the ring"}
"headers": {"my-nonstandard-auth": "bearer of the ring"},
"documentTemplate": "{{doc.name}}"
});
(mock_server, embedder_settings)
@ -254,12 +293,19 @@ async fn create_mock_raw_with_custom_header() -> (MockServer, Value) {
async fn create_mock_raw() -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let counter = AtomicUsize::new(0);
let text_to_embedding: BTreeMap<_, _> = vec![
// text -> embedding
("kefir", [0.0, 0.0, 0.0]),
("intel", [1.0, 1.0, 1.0]),
]
// turn into btree
.into_iter()
.collect();
Mock::given(method("POST"))
.and(path("/"))
.respond_with(move |req: &Request| {
let _req: String = match req.body_json() {
let req: String = match req.body_json() {
Ok(req) => req,
Err(error) => {
return ResponseTemplate::new(400).set_body_json(json!({
@ -268,7 +314,7 @@ async fn create_mock_raw() -> (MockServer, Value) {
}
};
let output = vec![counter.fetch_add(1, Ordering::Relaxed) as f32; 3];
let output = text_to_embedding.get(req.as_str()).unwrap_or(&[99., 99., 99.]).to_vec();
ResponseTemplate::new(200).set_body_json(output)
})
@ -281,29 +327,30 @@ async fn create_mock_raw() -> (MockServer, Value) {
"url": url,
"dimensions": 3,
"request": "{{text}}",
"response": "{{embedding}}"
"response": "{{embedding}}",
"documentTemplate": "{{doc.name}}"
});
(mock_server, embedder_settings)
}
pub async fn post<T: IntoUrl>(url: T) -> reqwest::Result<reqwest::Response> {
reqwest::Client::builder().build()?.post(url).send().await
pub async fn post<T: IntoUrl>(url: T, text: &str) -> reqwest::Result<reqwest::Response> {
reqwest::Client::builder().build()?.post(url).json(&json!(text)).send().await
}
#[actix_rt::test]
async fn dummy_testing_the_mock() {
let (mock, _setting) = create_mock().await;
let body = post(&mock.uri()).await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[0,0,0]}"###);
let body = post(&mock.uri()).await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[1,1,1]}"###);
let body = post(&mock.uri()).await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[2,2,2]}"###);
let body = post(&mock.uri()).await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[3,3,3]}"###);
let body = post(&mock.uri()).await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[4,4,4]}"###);
let body = post(&mock.uri(), "kefir").await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[0.0,0.0,0.0]}"###);
let body = post(&mock.uri(), "intel").await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[1.0,1.0,1.0]}"###);
let body = post(&mock.uri(), "kefir").await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[0.0,0.0,0.0]}"###);
let body = post(&mock.uri(), "kefir").await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[0.0,0.0,0.0]}"###);
let body = post(&mock.uri(), "intel").await.unwrap().text().await.unwrap();
snapshot!(body, @r###"{"data":[1.0,1.0,1.0]}"###);
}
#[actix_rt::test]
@ -953,7 +1000,7 @@ async fn bad_settings() {
let (response, code) = index
.update_settings(json!({
"embedders": {
"rest": json!({ "source": "rest", "url": mock.uri(), "request": "{{text}}", "response": { "data": "{{embedding}}" }, "dimensions": 2 }),
"rest": json!({ "source": "rest", "url": mock.uri(), "request": "{{text}}", "response": { "data": "{{embedding}}" }, "dimensions": 2, "documentTemplate": "{{doc.name}}" }),
},
}))
.await;
@ -1920,6 +1967,7 @@ async fn server_custom_header() {
"embedders": {
"rest": {
"source": "rest",
"documentTemplate": "{{doc.name}}",
"url": "[url]",
"request": "{{text}}",
"response": "{{embedding}}",
@ -1940,7 +1988,7 @@ async fn server_custom_header() {
#[actix_rt::test]
async fn searchable_reindex() {
let (_mock, setting) = create_mock_map().await;
let (_mock, setting) = create_mock_default_template().await;
let server = get_server_vector().await;
let index = server.index("doggo");

View File

@ -100,6 +100,7 @@ bumpalo = "3.16.0"
thread_local = "1.1.8"
allocator-api2 = "0.2.18"
rustc-hash = "2.0.0"
uell = "0.1.0"
[dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false }

View File

@ -122,7 +122,7 @@ and can not be more than 512 bytes.", .document_id.to_string()
#[error("The `_vectors` field in the document with id: `{document_id}` is not an object. Was expecting an object with a key for each embedder with manually provided vectors, but instead got `{value}`")]
InvalidVectorsMapType { document_id: String, value: Value },
#[error("Bad embedder configuration in the document with id: `{document_id}`. {error}")]
InvalidVectorsEmbedderConf { document_id: String, error: deserr::errors::JsonError },
InvalidVectorsEmbedderConf { document_id: String, error: String },
#[error("{0}")]
InvalidFilter(String),
#[error("Invalid type for filter subexpression: expected: {}, found: {1}.", .0.join(", "))]

View File

@ -27,17 +27,34 @@ impl heed::BytesEncode<'_> for OrderedF64Codec {
fn bytes_encode(f: &Self::EItem) -> Result<Cow<'_, [u8]>, BoxedError> {
let mut buffer = [0u8; 16];
// write the globally ordered float
let bytes = f64_into_bytes(*f).ok_or(InvalidGloballyOrderedFloatError { float: *f })?;
buffer[..8].copy_from_slice(&bytes[..]);
// Then the f64 value just to be able to read it back
let bytes = f.to_be_bytes();
buffer[8..16].copy_from_slice(&bytes[..]);
encode_f64_into_ordered_bytes(*f, &mut buffer)?;
Ok(Cow::Owned(buffer.to_vec()))
}
}
impl OrderedF64Codec {
pub fn serialize_into(
f: f64,
buffer: &mut [u8; 16],
) -> Result<(), InvalidGloballyOrderedFloatError> {
encode_f64_into_ordered_bytes(f, buffer)
}
}
fn encode_f64_into_ordered_bytes(
f: f64,
buffer: &mut [u8; 16],
) -> Result<(), InvalidGloballyOrderedFloatError> {
let bytes = f64_into_bytes(f).ok_or(InvalidGloballyOrderedFloatError { float: f })?;
buffer[..8].copy_from_slice(&bytes[..]);
// Then the f64 value just to be able to read it back
let bytes = f.to_be_bytes();
buffer[8..16].copy_from_slice(&bytes[..]);
Ok(())
}
#[derive(Error, Debug)]
#[error("the float {float} cannot be converted to a globally ordered representation")]
pub struct InvalidGloballyOrderedFloatError {

View File

@ -737,7 +737,7 @@ pub(crate) fn write_typed_chunk_into_index(
}
/// Converts the latitude and longitude back to an xyz GeoPoint.
fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint {
pub fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint {
let (lat, tail) = helpers::try_split_array_at::<u8, 8>(value).unwrap();
let (lng, _) = helpers::try_split_array_at::<u8, 8>(tail).unwrap();
let point = [f64::from_ne_bytes(lat), f64::from_ne_bytes(lng)];

View File

@ -2,12 +2,16 @@ use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
use hashbrown::HashMap;
use heed::types::Bytes;
use heed::BytesDecode;
use memmap2::Mmap;
use roaring::RoaringBitmap;
use super::extract::FacetKind;
use super::StdResult;
use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec};
use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY};
use crate::index::IndexEmbeddingConfig;
use crate::update::new::KvReaderFieldId;
use crate::vector::Embedding;
use crate::{DocumentId, Index};
@ -26,9 +30,9 @@ pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver)
)
}
pub struct KeyValueEntry {
pub key_length: usize,
pub data: Box<[u8]>,
pub enum KeyValueEntry {
Small { key_length: usize, data: Box<[u8]> },
Large { key_entry: KeyEntry, data: Mmap },
}
impl KeyValueEntry {
@ -36,14 +40,25 @@ impl KeyValueEntry {
let mut data = Vec::with_capacity(key.len() + value.len());
data.extend_from_slice(key);
data.extend_from_slice(value);
KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() }
KeyValueEntry::Small { key_length: key.len(), data: data.into_boxed_slice() }
}
fn from_large_key_value(key: &[u8], value: Mmap) -> Self {
KeyValueEntry::Large { key_entry: KeyEntry::from_key(key), data: value }
}
pub fn key(&self) -> &[u8] {
&self.data[..self.key_length]
match self {
KeyValueEntry::Small { key_length, data } => &data[..*key_length],
KeyValueEntry::Large { key_entry, data: _ } => key_entry.entry(),
}
}
pub fn value(&self) -> &[u8] {
&self.data[self.key_length..]
match self {
KeyValueEntry::Small { key_length, data } => &data[*key_length..],
KeyValueEntry::Large { key_entry: _, data } => &data[..],
}
}
}
@ -87,7 +102,7 @@ pub enum ArroyOperation {
embedding: Embedding,
},
Finish {
user_provided: HashMap<String, RoaringBitmap>,
configs: Vec<IndexEmbeddingConfig>,
},
}
@ -98,6 +113,7 @@ pub struct DbOperation {
#[derive(Debug)]
pub enum Database {
Main,
Documents,
ExternalDocumentsIds,
ExactWordDocids,
@ -111,11 +127,14 @@ pub enum Database {
FacetIdExistsDocids,
FacetIdF64NumberDocids,
FacetIdStringDocids,
FieldIdDocidFacetStrings,
FieldIdDocidFacetF64s,
}
impl Database {
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
match self {
Database::Main => index.main.remap_types(),
Database::Documents => index.documents.remap_types(),
Database::ExternalDocumentsIds => index.external_documents_ids.remap_types(),
Database::ExactWordDocids => index.exact_word_docids.remap_types(),
@ -129,6 +148,8 @@ impl Database {
Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(),
Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(),
Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(),
Database::FieldIdDocidFacetStrings => index.field_id_docid_facet_strings.remap_types(),
Database::FieldIdDocidFacetF64s => index.field_id_docid_facet_f64s.remap_types(),
}
}
}
@ -200,6 +221,10 @@ impl ExtractorSender {
FacetDocidsSender { sender: self }
}
pub fn field_id_docid_facet_sender(&self) -> FieldIdDocidFacetSender<'_> {
FieldIdDocidFacetSender(self)
}
pub fn documents(&self) -> DocumentsSender<'_> {
DocumentsSender(self)
}
@ -208,6 +233,10 @@ impl ExtractorSender {
EmbeddingSender(&self.sender)
}
pub fn geo(&self) -> GeoSender<'_> {
GeoSender(&self.sender)
}
fn send_delete_vector(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
match self
.sender
@ -332,6 +361,36 @@ impl DocidsSender for FacetDocidsSender<'_> {
}
}
pub struct FieldIdDocidFacetSender<'a>(&'a ExtractorSender);
impl FieldIdDocidFacetSender<'_> {
pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok());
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &value));
self.0
.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry })
}
pub fn write_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok());
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &[]));
self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry })
}
pub fn delete_facet_string(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok());
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
self.0
.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry })
}
pub fn delete_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok());
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry })
}
}
pub struct DocumentsSender<'a>(&'a ExtractorSender);
impl DocumentsSender<'_> {
@ -418,12 +477,40 @@ impl EmbeddingSender<'_> {
}
/// Marks all embedders as "to be built"
pub fn finish(
self,
user_provided: HashMap<String, RoaringBitmap>,
) -> StdResult<(), SendError<()>> {
pub fn finish(self, configs: Vec<IndexEmbeddingConfig>) -> StdResult<(), SendError<()>> {
self.0
.send(WriterOperation::ArroyOperation(ArroyOperation::Finish { user_provided }))
.send(WriterOperation::ArroyOperation(ArroyOperation::Finish { configs }))
.map_err(|_| SendError(()))
}
}
pub struct GeoSender<'a>(&'a Sender<WriterOperation>);
impl GeoSender<'_> {
pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> {
self.0
.send(WriterOperation::DbOperation(DbOperation {
database: Database::Main,
entry: EntryOperation::Write(KeyValueEntry::from_large_key_value(
GEO_RTREE_KEY.as_bytes(),
value,
)),
}))
.map_err(|_| SendError(()))
}
pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> StdResult<(), SendError<()>> {
let mut buffer = Vec::new();
bitmap.serialize_into(&mut buffer).unwrap();
self.0
.send(WriterOperation::DbOperation(DbOperation {
database: Database::Main,
entry: EntryOperation::Write(KeyValueEntry::from_small_key_value(
GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(),
&buffer,
)),
}))
.map_err(|_| SendError(()))
}
}

View File

@ -332,18 +332,31 @@ where
}
vectors.insert(
name,
serde_json::json!({
"regenerate": entry.regenerate,
// TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object
"embeddings": entry.embeddings,
}),
if entry.implicit {
serde_json::json!(entry.embeddings)
} else {
serde_json::json!({
"regenerate": entry.regenerate,
// TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object
"embeddings": entry.embeddings,
})
},
);
}
if vectors.is_empty() {
break 'inject_vectors;
}
vectors_value = serde_json::value::to_raw_value(&vectors).unwrap();
unordered_field_buffer.push((vectors_fid, &vectors_value));
}
if let Some(geo_value) = document.geo_field()? {
let fid = fields_ids_map.id_or_insert("_geo").ok_or(UserError::AttributeLimitReached)?;
unordered_field_buffer.push((fid, geo_value));
}
unordered_field_buffer.sort_by_key(|(fid, _)| *fid);
for (fid, value) in unordered_field_buffer.iter() {
writer.insert(*fid, value.get().as_bytes()).unwrap();
@ -398,6 +411,7 @@ impl<'doc> Versions<'doc> {
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn top_level_field(&self, k: &str) -> Option<&'doc RawValue> {
if k == RESERVED_VECTORS_FIELD_NAME || k == "_geo" {
return None;

View File

@ -97,7 +97,7 @@ impl<'doc> Insertion<'doc> {
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
) -> Result<Option<VectorDocumentFromVersions<'doc>>> {
VectorDocumentFromVersions::new(&self.new, doc_alloc, embedders)
VectorDocumentFromVersions::new(self.external_document_id, &self.new, doc_alloc, embedders)
}
}
@ -169,7 +169,7 @@ impl<'doc> Update<'doc> {
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
) -> Result<Option<VectorDocumentFromVersions<'doc>>> {
VectorDocumentFromVersions::new(&self.new, doc_alloc, embedders)
VectorDocumentFromVersions::new(self.external_document_id, &self.new, doc_alloc, embedders)
}
pub fn merged_vectors<Mapper: FieldIdMapper>(
@ -181,10 +181,22 @@ impl<'doc> Update<'doc> {
embedders: &'doc EmbeddingConfigs,
) -> Result<Option<MergedVectorDocument<'doc>>> {
if self.has_deletion {
MergedVectorDocument::without_db(&self.new, doc_alloc, embedders)
MergedVectorDocument::without_db(
self.external_document_id,
&self.new,
doc_alloc,
embedders,
)
} else {
MergedVectorDocument::with_db(
self.docid, index, rtxn, mapper, &self.new, doc_alloc, embedders,
self.docid,
self.external_document_id,
index,
rtxn,
mapper,
&self.new,
doc_alloc,
embedders,
)
}
}

View File

@ -54,7 +54,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
let content = deletion.current(
&context.txn,
&context.rtxn,
context.index,
&context.db_fields_ids_map,
)?;
@ -72,7 +72,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
DocumentChange::Update(update) => {
let docid = update.docid();
let content =
update.current(&context.txn, context.index, &context.db_fields_ids_map)?;
update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
for res in content.iter_top_level_fields() {
let (f, _) = res?;
let entry = document_extractor_data
@ -92,9 +92,9 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
}
let content =
update.merged(&context.txn, context.index, &context.db_fields_ids_map)?;
update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?;
let vector_content = update.merged_vectors(
&context.txn,
&context.rtxn,
context.index,
&context.db_fields_ids_map,
&context.doc_alloc,

View File

@ -1,16 +1,21 @@
use std::cell::RefCell;
use std::collections::HashSet;
use std::mem::size_of;
use std::ops::DerefMut as _;
use bumpalo::collections::Vec as BVec;
use bumpalo::Bump;
use heed::RoTxn;
use hashbrown::HashMap;
use heed::{BytesDecode, RoTxn};
use serde_json::Value;
use super::super::cache::BalancedCaches;
use super::facet_document::extract_document_facets;
use super::FacetKind;
use crate::facet::value_encoding::f64_into_bytes;
use crate::update::new::extract::DocidsExtractor;
use crate::heed_codec::facet::OrderedF64Codec;
use crate::update::del_add::DelAdd;
use crate::update::new::channel::FieldIdDocidFacetSender;
use crate::update::new::indexer::document_changes::{
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
Progress, ThreadLocal,
@ -22,6 +27,7 @@ use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
pub struct FacetedExtractorData<'a> {
attributes_to_extract: &'a [&'a str],
sender: &'a FieldIdDocidFacetSender<'a>,
grenad_parameters: GrenadParameters,
buckets: usize,
}
@ -48,6 +54,7 @@ impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> {
context,
self.attributes_to_extract,
change,
self.sender,
)?
}
Ok(())
@ -61,12 +68,15 @@ impl FacetedDocidsExtractor {
context: &DocumentChangeContext<RefCell<BalancedCaches>>,
attributes_to_extract: &[&str],
document_change: DocumentChange,
sender: &FieldIdDocidFacetSender,
) -> Result<()> {
let index = &context.index;
let rtxn = &context.txn;
let rtxn = &context.rtxn;
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let mut cached_sorter = context.data.borrow_mut_or_yield();
match document_change {
let mut del_add_facet_value = DelAddFacetValue::new(&context.doc_alloc);
let docid = document_change.docid();
let res = match document_change {
DocumentChange::Deletion(inner) => extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index, context.db_fields_ids_map)?,
@ -76,7 +86,9 @@ impl FacetedDocidsExtractor {
&context.doc_alloc,
cached_sorter.deref_mut(),
BalancedCaches::insert_del_u32,
inner.docid(),
&mut del_add_facet_value,
DelAddFacetValue::insert_del,
docid,
fid,
value,
)
@ -92,7 +104,9 @@ impl FacetedDocidsExtractor {
&context.doc_alloc,
cached_sorter.deref_mut(),
BalancedCaches::insert_del_u32,
inner.docid(),
&mut del_add_facet_value,
DelAddFacetValue::insert_del,
docid,
fid,
value,
)
@ -108,7 +122,9 @@ impl FacetedDocidsExtractor {
&context.doc_alloc,
cached_sorter.deref_mut(),
BalancedCaches::insert_add_u32,
inner.docid(),
&mut del_add_facet_value,
DelAddFacetValue::insert_add,
docid,
fid,
value,
)
@ -124,24 +140,31 @@ impl FacetedDocidsExtractor {
&context.doc_alloc,
cached_sorter.deref_mut(),
BalancedCaches::insert_add_u32,
inner.docid(),
&mut del_add_facet_value,
DelAddFacetValue::insert_add,
docid,
fid,
value,
)
},
),
}
};
del_add_facet_value.send_data(docid, sender, &context.doc_alloc).unwrap();
res
}
fn facet_fn_with_options<'extractor>(
doc_alloc: &Bump,
fn facet_fn_with_options<'extractor, 'doc>(
doc_alloc: &'doc Bump,
cached_sorter: &mut BalancedCaches<'extractor>,
cache_fn: impl Fn(&mut BalancedCaches<'extractor>, &[u8], u32) -> Result<()>,
del_add_facet_value: &mut DelAddFacetValue<'doc>,
facet_fn: impl Fn(&mut DelAddFacetValue<'doc>, FieldId, BVec<'doc, u8>, FacetKind),
docid: DocumentId,
fid: FieldId,
value: &Value,
) -> Result<()> {
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
let mut buffer = BVec::new_in(doc_alloc);
// Exists
// key: fid
buffer.push(FacetKind::Exists as u8);
@ -152,15 +175,21 @@ impl FacetedDocidsExtractor {
// Number
// key: fid - level - orderedf64 - orignalf64
Value::Number(number) => {
if let Some((n, ordered)) =
number.as_f64().and_then(|n| f64_into_bytes(n).map(|ordered| (n, ordered)))
let mut ordered = [0u8; 16];
if number
.as_f64()
.and_then(|n| OrderedF64Codec::serialize_into(n, &mut ordered).ok())
.is_some()
{
let mut number = BVec::with_capacity_in(16, doc_alloc);
number.extend_from_slice(&ordered);
facet_fn(del_add_facet_value, fid, number, FacetKind::Number);
buffer.clear();
buffer.push(FacetKind::Number as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(0); // level 0
buffer.extend_from_slice(&ordered);
buffer.extend_from_slice(&n.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid)
} else {
Ok(())
@ -169,6 +198,10 @@ impl FacetedDocidsExtractor {
// String
// key: fid - level - truncated_string
Value::String(s) => {
let mut string = BVec::new_in(doc_alloc);
string.extend_from_slice(s.as_bytes());
facet_fn(del_add_facet_value, fid, string, FacetKind::String);
let normalized = crate::normalize_facet(s);
let truncated = truncate_str(&normalized);
buffer.clear();
@ -211,6 +244,83 @@ impl FacetedDocidsExtractor {
}
}
struct DelAddFacetValue<'doc> {
strings: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
f64s: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
}
impl<'doc> DelAddFacetValue<'doc> {
fn new(doc_alloc: &'doc Bump) -> Self {
Self { strings: HashMap::new_in(doc_alloc), f64s: HashMap::new_in(doc_alloc) }
}
fn insert_add(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
let cache = match kind {
FacetKind::String => &mut self.strings,
FacetKind::Number => &mut self.f64s,
_ => return,
};
let key = (fid, value);
if let Some(DelAdd::Deletion) = cache.get(&key) {
cache.remove(&key);
} else {
cache.insert(key, DelAdd::Addition);
}
}
fn insert_del(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
let cache = match kind {
FacetKind::String => &mut self.strings,
FacetKind::Number => &mut self.f64s,
_ => return,
};
let key = (fid, value);
if let Some(DelAdd::Addition) = cache.get(&key) {
cache.remove(&key);
} else {
cache.insert(key, DelAdd::Deletion);
}
}
fn send_data(
self,
docid: DocumentId,
sender: &FieldIdDocidFacetSender,
doc_alloc: &Bump,
) -> std::result::Result<(), crossbeam_channel::SendError<()>> {
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
for ((fid, value), deladd) in self.strings {
if let Ok(s) = std::str::from_utf8(&value) {
buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.extend_from_slice(&docid.to_be_bytes());
let normalized = crate::normalize_facet(s);
let truncated = truncate_str(&normalized);
buffer.extend_from_slice(truncated.as_bytes());
match deladd {
DelAdd::Deletion => sender.delete_facet_string(&buffer)?,
DelAdd::Addition => sender.write_facet_string(&buffer, &value)?,
}
}
}
for ((fid, value), deladd) in self.f64s {
buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.extend_from_slice(&docid.to_be_bytes());
buffer.extend_from_slice(&value);
match deladd {
DelAdd::Deletion => sender.delete_facet_f64(&buffer)?,
DelAdd::Addition => sender.write_facet_f64(&buffer)?,
}
}
Ok(())
}
}
/// Truncates a string to the biggest valid LMDB key size.
fn truncate_str(s: &str) -> &str {
let index = s
@ -223,13 +333,23 @@ fn truncate_str(s: &str) -> &str {
&s[..index.unwrap_or(0)]
}
impl DocidsExtractor for FacetedDocidsExtractor {
impl FacetedDocidsExtractor {
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP, SP>(
pub fn run_extraction<
'pl,
'fid,
'indexer,
'index,
'extractor,
DC: DocumentChanges<'pl>,
MSP,
SP,
>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
sender: &FieldIdDocidFacetSender,
finished_steps: u16,
total_steps: u16,
step_name: &'static str,
@ -254,6 +374,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
attributes_to_extract: &attributes_to_extract,
grenad_parameters,
buckets: rayon::current_num_threads(),
sender,
};
extract(
document_changes,

View File

@ -10,7 +10,8 @@ pub fn extract_document_facets<'doc>(
field_id_map: &mut GlobalFieldsIdsMap,
facet_fn: &mut impl FnMut(FieldId, &Value) -> Result<()>,
) -> Result<()> {
for res in document.iter_top_level_fields() {
let geo = document.geo_field().transpose().map(|res| res.map(|rval| ("_geo", rval)));
for res in document.iter_top_level_fields().chain(geo) {
let (field_name, value) = res?;
let mut tokenize_field = |name: &str, value: &Value| match field_id_map.id_or_insert(name) {

View File

@ -28,7 +28,6 @@ impl From<u8> for FacetKind {
impl FacetKind {
pub fn extract_from_key(key: &[u8]) -> (FacetKind, &[u8]) {
debug_assert!(key.len() > 3);
(FacetKind::from(key[0]), &key[1..])
}
}

View File

@ -0,0 +1,320 @@
use std::cell::RefCell;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Write as _};
use std::{iter, mem, result};
use bumpalo::Bump;
use bytemuck::{bytes_of, from_bytes, pod_read_unaligned, Pod, Zeroable};
use heed::RoTxn;
use serde_json::value::RawValue;
use serde_json::Value;
use crate::error::GeoError;
use crate::update::new::document::Document;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::DocumentChange;
use crate::update::GrenadParameters;
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Object, Result};
pub struct GeoExtractor {
grenad_parameters: GrenadParameters,
}
impl GeoExtractor {
pub fn new(
rtxn: &RoTxn,
index: &Index,
grenad_parameters: GrenadParameters,
) -> Result<Option<Self>> {
let is_sortable = index.sortable_fields(rtxn)?.contains("_geo");
let is_filterable = index.filterable_fields(rtxn)?.contains("_geo");
if is_sortable || is_filterable {
Ok(Some(GeoExtractor { grenad_parameters }))
} else {
Ok(None)
}
}
}
#[derive(Pod, Zeroable, Copy, Clone)]
#[repr(C, packed)]
pub struct ExtractedGeoPoint {
pub docid: DocumentId,
pub lat_lng: [f64; 2],
}
impl From<ExtractedGeoPoint> for GeoPoint {
/// Converts the latitude and longitude back to an xyz GeoPoint.
fn from(value: ExtractedGeoPoint) -> Self {
let [lat, lng] = value.lat_lng;
let point = [lat, lng];
let xyz_point = lat_lng_to_xyz(&point);
GeoPoint::new(xyz_point, (value.docid, point))
}
}
pub struct GeoExtractorData<'extractor> {
/// The set of documents ids that were removed. If a document sees its geo
/// point being updated, we first put it in the deleted and then in the inserted.
removed: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>,
inserted: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>,
/// TODO Do the doc
spilled_removed: Option<BufWriter<File>>,
/// TODO Do the doc
spilled_inserted: Option<BufWriter<File>>,
}
impl<'extractor> GeoExtractorData<'extractor> {
pub fn freeze(self) -> Result<FrozenGeoExtractorData<'extractor>> {
let GeoExtractorData { removed, inserted, spilled_removed, spilled_inserted } = self;
Ok(FrozenGeoExtractorData {
removed: removed.into_bump_slice(),
inserted: inserted.into_bump_slice(),
spilled_removed: spilled_removed
.map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
.transpose()?,
spilled_inserted: spilled_inserted
.map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
.transpose()?,
})
}
}
unsafe impl MostlySend for GeoExtractorData<'_> {}
pub struct FrozenGeoExtractorData<'extractor> {
pub removed: &'extractor [ExtractedGeoPoint],
pub inserted: &'extractor [ExtractedGeoPoint],
pub spilled_removed: Option<BufReader<File>>,
pub spilled_inserted: Option<BufReader<File>>,
}
impl<'extractor> FrozenGeoExtractorData<'extractor> {
pub fn iter_and_clear_removed(
&mut self,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
mem::take(&mut self.removed)
.iter()
.copied()
.map(Ok)
.chain(iterator_over_spilled_geopoints(&mut self.spilled_removed))
}
pub fn iter_and_clear_inserted(
&mut self,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
mem::take(&mut self.inserted)
.iter()
.copied()
.map(Ok)
.chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted))
}
}
fn iterator_over_spilled_geopoints(
spilled: &mut Option<BufReader<File>>,
) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
let mut spilled = spilled.take();
iter::from_fn(move || match &mut spilled {
Some(file) => {
let geopoint_bytes = &mut [0u8; mem::size_of::<ExtractedGeoPoint>()];
match file.read_exact(geopoint_bytes) {
Ok(()) => Some(Ok(pod_read_unaligned(geopoint_bytes))),
Err(e) if e.kind() == ErrorKind::UnexpectedEof => None,
Err(e) => Some(Err(e)),
}
}
None => None,
})
}
impl<'extractor> Extractor<'extractor> for GeoExtractor {
type Data = RefCell<GeoExtractorData<'extractor>>;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(GeoExtractorData {
removed: bumpalo::collections::Vec::new_in(extractor_alloc),
// inserted: Uell::new_in(extractor_alloc),
inserted: bumpalo::collections::Vec::new_in(extractor_alloc),
spilled_inserted: None,
spilled_removed: None,
}))
}
fn process<'doc>(
&'doc self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &'doc DocumentChangeContext<Self::Data>,
) -> Result<()> {
let rtxn = &context.rtxn;
let index = context.index;
let max_memory = self.grenad_parameters.max_memory;
let db_fields_ids_map = context.db_fields_ids_map;
let mut data_ref = context.data.borrow_mut_or_yield();
for change in changes {
if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) {
// We must spill as we allocated too much memory
data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?;
data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?;
}
match change? {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
let external_id = deletion.external_document_id();
let current = deletion.current(rtxn, index, db_fields_ids_map)?;
let current_geo = current
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
if let Some(lat_lng) = current_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_removed {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.removed.push(geopoint),
}
}
}
DocumentChange::Update(update) => {
let current = update.current(rtxn, index, db_fields_ids_map)?;
let external_id = update.external_document_id();
let docid = update.docid();
let current_geo = current
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
let updated_geo = update
.updated()
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
if current_geo != updated_geo {
// If the current and new geo points are different it means that
// we need to replace the current by the new point and therefore
// delete the current point from the RTree.
if let Some(lat_lng) = current_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_removed {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.removed.push(geopoint),
}
}
if let Some(lat_lng) = updated_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_inserted {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.inserted.push(geopoint),
}
}
}
}
DocumentChange::Insertion(insertion) => {
let external_id = insertion.external_document_id();
let docid = insertion.docid();
let inserted_geo = insertion
.inserted()
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;
if let Some(lat_lng) = inserted_geo.flatten() {
let geopoint = ExtractedGeoPoint { docid, lat_lng };
match &mut data_ref.spilled_inserted {
Some(file) => file.write_all(bytes_of(&geopoint))?,
None => data_ref.inserted.push(geopoint),
}
}
}
}
}
Ok(())
}
}
/// Extracts and validate the latitude and latitude from a document geo field.
///
/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`.
fn extract_geo_coordinates(external_id: &str, raw_value: &RawValue) -> Result<Option<[f64; 2]>> {
let mut geo = match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)? {
Value::Null => return Ok(None),
Value::Object(map) => map,
value => {
return Err(
GeoError::NotAnObject { document_id: Value::from(external_id), value }.into()
)
}
};
let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) {
(Some(lat), Some(lng)) => {
if geo.is_empty() {
[lat, lng]
} else {
return Err(GeoError::UnexpectedExtraFields {
document_id: Value::from(external_id),
value: Value::from(geo),
}
.into());
}
}
(Some(_), None) => {
return Err(GeoError::MissingLongitude { document_id: Value::from(external_id) }.into())
}
(None, Some(_)) => {
return Err(GeoError::MissingLatitude { document_id: Value::from(external_id) }.into())
}
(None, None) => {
return Err(GeoError::MissingLatitudeAndLongitude {
document_id: Value::from(external_id),
}
.into())
}
};
match (extract_finite_float_from_value(lat), extract_finite_float_from_value(lng)) {
(Ok(lat), Ok(lng)) => Ok(Some([lat, lng])),
(Ok(_), Err(value)) => {
Err(GeoError::BadLongitude { document_id: Value::from(external_id), value }.into())
}
(Err(value), Ok(_)) => {
Err(GeoError::BadLatitude { document_id: Value::from(external_id), value }.into())
}
(Err(lat), Err(lng)) => Err(GeoError::BadLatitudeAndLongitude {
document_id: Value::from(external_id),
lat,
lng,
}
.into()),
}
}
/// Extracts and validate that a serde JSON Value is actually a finite f64.
pub fn extract_finite_float_from_value(value: Value) -> result::Result<f64, Value> {
let number = match value {
Value::Number(ref n) => match n.as_f64() {
Some(number) => number,
None => return Err(value),
},
Value::String(ref s) => match s.parse::<f64>() {
Ok(number) => number,
Err(_) => return Err(value),
},
value => return Err(value),
};
if number.is_finite() {
Ok(number)
} else {
Err(value)
}
}

View File

@ -1,6 +1,7 @@
mod cache;
mod documents;
mod faceted;
mod geo;
mod searchable;
mod vectors;
@ -8,6 +9,7 @@ use bumpalo::Bump;
pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap};
pub use documents::*;
pub use faceted::*;
pub use geo::*;
pub use searchable::*;
pub use vectors::EmbeddingExtractor;

View File

@ -326,7 +326,7 @@ impl WordDocidsExtractors {
document_change: DocumentChange,
) -> Result<()> {
let index = &context.index;
let rtxn = &context.txn;
let rtxn = &context.rtxn;
let mut cached_sorter_ref = context.data.borrow_mut_or_yield();
let cached_sorter = cached_sorter_ref.as_mut().unwrap();
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();

View File

@ -39,7 +39,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
let doc_alloc = &context.doc_alloc;
let index = context.index;
let rtxn = &context.txn;
let rtxn = &context.rtxn;
let mut key_buffer = bumpalo::collections::Vec::new_in(doc_alloc);
let mut del_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc);

View File

@ -2,13 +2,13 @@ use std::cell::RefCell;
use bumpalo::collections::Vec as BVec;
use bumpalo::Bump;
use hashbrown::HashMap;
use hashbrown::{DefaultHashBuilder, HashMap};
use super::cache::DelAddRoaringBitmap;
use crate::error::FaultSource;
use crate::prompt::Prompt;
use crate::update::new::channel::EmbeddingSender;
use crate::update::new::indexer::document_changes::{Extractor, MostlySend};
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
use crate::update::new::vector_document::VectorDocument;
use crate::update::new::DocumentChange;
use crate::vector::error::{
@ -37,7 +37,7 @@ impl<'a> EmbeddingExtractor<'a> {
}
pub struct EmbeddingExtractorData<'extractor>(
pub HashMap<String, DelAddRoaringBitmap, hashbrown::DefaultHashBuilder, &'extractor Bump>,
pub HashMap<String, DelAddRoaringBitmap, DefaultHashBuilder, &'extractor Bump>,
);
unsafe impl MostlySend for EmbeddingExtractorData<'_> {}
@ -52,9 +52,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
fn process<'doc>(
&'doc self,
changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
context: &'doc crate::update::new::indexer::document_changes::DocumentChangeContext<
Self::Data,
>,
context: &'doc DocumentChangeContext<Self::Data>,
) -> crate::Result<()> {
let embedders = self.embedders.inner_as_ref();
let mut unused_vectors_distribution =
@ -63,7 +61,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc);
for (embedder_name, (embedder, prompt, _is_quantized)) in embedders {
let embedder_id =
context.index.embedder_category_id.get(&context.txn, embedder_name)?.ok_or_else(
context.index.embedder_category_id.get(&context.rtxn, embedder_name)?.ok_or_else(
|| InternalError::DatabaseMissingEntry {
db_name: "embedder_category_id",
key: None,
@ -85,12 +83,17 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
for change in changes {
let change = change?;
match change {
DocumentChange::Deletion(_deletion) => {
// handled by document sender
DocumentChange::Deletion(deletion) => {
// vector deletion is handled by document sender,
// we still need to accomodate deletion from user_provided
for chunks in &mut all_chunks {
// regenerate: true means we delete from user_provided
chunks.set_regenerate(deletion.docid(), true);
}
}
DocumentChange::Update(update) => {
let old_vectors = update.current_vectors(
&context.txn,
&context.rtxn,
context.index,
context.db_fields_ids_map,
&context.doc_alloc,
@ -110,11 +113,8 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
new_vectors.vectors_for_key(embedder_name).transpose()
}) {
let new_vectors = new_vectors?;
match (old_vectors.regenerate, new_vectors.regenerate) {
(true, true) | (false, false) => todo!(),
_ => {
chunks.set_regenerate(update.docid(), new_vectors.regenerate);
}
if old_vectors.regenerate != new_vectors.regenerate {
chunks.set_regenerate(update.docid(), new_vectors.regenerate);
}
// do we have set embeddings?
if let Some(embeddings) = new_vectors.embeddings {
@ -124,13 +124,13 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
.into_vec(&context.doc_alloc, embedder_name)
.map_err(|error| UserError::InvalidVectorsEmbedderConf {
document_id: update.external_document_id().to_string(),
error,
error: error.to_string(),
})?,
);
} else if new_vectors.regenerate {
let new_rendered = prompt.render_document(
update.current(
&context.txn,
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
@ -139,7 +139,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
)?;
let old_rendered = prompt.render_document(
update.merged(
&context.txn,
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
@ -149,6 +149,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
if new_rendered != old_rendered {
chunks.set_autogenerated(
update.docid(),
update.external_document_id(),
new_rendered,
&unused_vectors_distribution,
)?;
@ -157,7 +158,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
} else if old_vectors.regenerate {
let old_rendered = prompt.render_document(
update.current(
&context.txn,
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
@ -166,7 +167,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
)?;
let new_rendered = prompt.render_document(
update.merged(
&context.txn,
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
@ -176,6 +177,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
if new_rendered != old_rendered {
chunks.set_autogenerated(
update.docid(),
update.external_document_id(),
new_rendered,
&unused_vectors_distribution,
)?;
@ -208,7 +210,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
document_id: insertion
.external_document_id()
.to_string(),
error,
error: error.to_string(),
})?,
);
} else if new_vectors.regenerate {
@ -219,6 +221,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
)?;
chunks.set_autogenerated(
insertion.docid(),
insertion.external_document_id(),
rendered,
&unused_vectors_distribution,
)?;
@ -231,6 +234,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
)?;
chunks.set_autogenerated(
insertion.docid(),
insertion.external_document_id(),
rendered,
&unused_vectors_distribution,
)?;
@ -266,6 +270,7 @@ struct Chunks<'a, 'extractor> {
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
threads: &'a ThreadPoolNoAbort,
sender: &'a EmbeddingSender<'a>,
has_manual_generation: Option<&'a str>,
}
impl<'a, 'extractor> Chunks<'a, 'extractor> {
@ -295,15 +300,22 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
embedder_id,
embedder_name,
user_provided,
has_manual_generation: None,
}
}
pub fn set_autogenerated(
&mut self,
docid: DocumentId,
external_docid: &'a str,
rendered: &'a str,
unused_vectors_distribution: &UnusedVectorsDistributionBump,
) -> Result<()> {
let is_manual = matches!(&self.embedder, &Embedder::UserProvided(_));
if is_manual {
self.has_manual_generation.get_or_insert(external_docid);
}
if self.texts.len() < self.texts.capacity() {
self.texts.push(rendered);
self.ids.push(docid);
@ -320,6 +332,7 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
unused_vectors_distribution,
self.threads,
self.sender,
self.has_manual_generation.take(),
)
}
@ -337,6 +350,7 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
unused_vectors_distribution,
self.threads,
self.sender,
self.has_manual_generation,
);
// optimization: don't run bvec dtors as they only contain bumpalo allocated stuff
std::mem::forget(self);
@ -354,7 +368,46 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
unused_vectors_distribution: &UnusedVectorsDistributionBump,
threads: &ThreadPoolNoAbort,
sender: &EmbeddingSender<'a>,
has_manual_generation: Option<&'a str>,
) -> Result<()> {
if let Some(external_docid) = has_manual_generation {
let mut msg = format!(
r"While embedding documents for embedder `{embedder_name}`: no vectors provided for document `{}`{}",
external_docid,
if ids.len() > 1 {
format!(" and at least {} other document(s)", ids.len() - 1)
} else {
"".to_string()
}
);
msg += &format!("\n- Note: `{embedder_name}` has `source: userProvided`, so documents must provide embeddings as an array in `_vectors.{embedder_name}`.");
let mut hint_count = 0;
for (vector_misspelling, count) in possible_embedding_mistakes.vector_mistakes().take(2)
{
msg += &format!("\n- Hint: try replacing `{vector_misspelling}` by `_vectors` in {count} document(s).");
hint_count += 1;
}
for (embedder_misspelling, count) in possible_embedding_mistakes
.embedder_mistakes_bump(embedder_name, unused_vectors_distribution)
.take(2)
{
msg += &format!("\n- Hint: try replacing `_vectors.{embedder_misspelling}` by `_vectors.{embedder_name}` in {count} document(s).");
hint_count += 1;
}
if hint_count == 0 {
msg += &format!(
"\n- Hint: opt-out for a document with `_vectors.{embedder_name}: null`"
);
}
return Err(crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg)));
}
let res = match embedder.embed_chunks_ref(texts.as_slice(), threads) {
Ok(embeddings) => {
for (docid, embedding) in ids.into_iter().zip(embeddings) {
@ -423,9 +476,9 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
let user_provided = user_provided.0.entry_ref(self.embedder_name).or_default();
if regenerate {
// regenerate == !user_provided
user_provided.del.get_or_insert(Default::default()).insert(docid);
user_provided.insert_del_u32(docid);
} else {
user_provided.add.get_or_insert(Default::default()).insert(docid);
user_provided.insert_add_u32(docid);
}
}

View File

@ -54,23 +54,12 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
}
}
fn extract_key_data<'k>(&self, key: &'k [u8]) -> Result<Option<FacetGroupKey<&'k str>>> {
match FacetKind::from(key[0]) {
// Only strings are searchable
FacetKind::String => Ok(Some(
FacetGroupKeyCodec::<StrRefCodec>::bytes_decode(&key[1..])
.map_err(heed::Error::Encoding)?,
)),
_ => Ok(None),
}
}
pub fn register_from_key(&mut self, deladd: DelAdd, facet_key: &[u8]) -> Result<()> {
let Some(FacetGroupKey { field_id, level: _level, left_bound }) =
self.extract_key_data(facet_key)?
else {
return Ok(());
};
pub fn register_from_key(
&mut self,
deladd: DelAdd,
facet_key: FacetGroupKey<&str>,
) -> Result<()> {
let FacetGroupKey { field_id, level: _level, left_bound } = facet_key;
if deladd == DelAdd::Addition {
self.registered_facets.entry(field_id).and_modify(|count| *count += 1).or_insert(1);

View File

@ -41,6 +41,11 @@ impl<'de, 'p, 'indexer: 'de, Mapper: MutFieldIdMapper> Visitor<'de>
where
A: serde::de::MapAccess<'de>,
{
// We need to remember if we encountered a semantic error, because raw values don't like to be parsed partially
// (trying to do so results in parsing errors).
// So we'll exhaust all keys and values even if we encounter an error, and we'll then return any error we detected.
let mut attribute_limit_reached = false;
let mut document_id_extraction_error = None;
let mut docid = None;
while let Some(((level_name, right), (fid, fields_ids_map))) =
@ -49,20 +54,36 @@ impl<'de, 'p, 'indexer: 'de, Mapper: MutFieldIdMapper> Visitor<'de>
visitor: MutFieldIdMapVisitor(self.fields_ids_map),
})?
{
let Some(_fid) = fid else {
return Ok(Err(crate::UserError::AttributeLimitReached));
};
self.fields_ids_map = fields_ids_map;
let value: &'de RawValue = map.next_value()?;
if attribute_limit_reached || document_id_extraction_error.is_some() {
continue;
}
let Some(_fid) = fid else {
attribute_limit_reached = true;
continue;
};
match match_component(level_name, right, value, self.indexer, &mut docid) {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(Err(err)) => return Err(serde::de::Error::custom(err)),
ControlFlow::Break(Ok(err)) => return Ok(Ok(Err(err))),
ControlFlow::Break(Ok(err)) => {
document_id_extraction_error = Some(err);
continue;
}
}
}
// return previously detected errors
if attribute_limit_reached {
return Ok(Err(UserError::AttributeLimitReached));
}
if let Some(document_id_extraction_error) = document_id_extraction_error {
return Ok(Ok(Err(document_id_extraction_error)));
}
Ok(Ok(match docid {
Some(docid) => Ok(docid),
None => Err(DocumentIdExtractionError::MissingDocumentId),

View File

@ -197,7 +197,7 @@ pub struct DocumentChangeContext<
/// inside of the DB.
pub db_fields_ids_map: &'indexer FieldsIdsMap,
/// A transaction providing data from the DB before all indexing operations
pub txn: RoTxn<'indexer>,
pub rtxn: RoTxn<'indexer>,
/// Global field id map that is up to date with the current state of the indexing process.
///
@ -255,7 +255,7 @@ impl<
let txn = index.read_txn()?;
Ok(DocumentChangeContext {
index,
txn,
rtxn: txn,
db_fields_ids_map,
new_fields_ids_map: fields_ids_map,
doc_alloc,

View File

@ -63,7 +63,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
where
'pl: 'doc, // the payload must survive the process calls
{
let current = context.index.document(&context.txn, *docid)?;
let current = context.index.document(&context.rtxn, *docid)?;
let external_document_id = self.primary_key.extract_docid_from_db(
current,

View File

@ -12,6 +12,7 @@ use heed::{RoTxn, RwTxn};
use itertools::{merge_join_by, EitherOrBoth};
pub use partial_dump::PartialDump;
use rand::SeedableRng as _;
use raw_collections::RawMap;
use rayon::ThreadPool;
use time::OffsetDateTime;
pub use update_by_function::UpdateByFunction;
@ -24,7 +25,7 @@ use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
use super::words_prefix_docids::{
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
};
use super::{StdResult, TopLevelMap};
use super::StdResult;
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::facet::FacetType;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
@ -32,6 +33,7 @@ use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::proximity::ProximityPrecision;
use crate::update::del_add::DelAdd;
use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::InnerIndexSettings;
@ -56,6 +58,7 @@ mod steps {
"extracting words",
"extracting word proximity",
"extracting embeddings",
"writing geo points",
"writing to database",
"writing embeddings to database",
"waiting for extractors",
@ -92,29 +95,33 @@ mod steps {
step(4)
}
pub const fn write_db() -> (u16, &'static str) {
pub const fn extract_geo_points() -> (u16, &'static str) {
step(5)
}
pub const fn write_embedding_db() -> (u16, &'static str) {
pub const fn write_db() -> (u16, &'static str) {
step(6)
}
pub const fn waiting_extractors() -> (u16, &'static str) {
pub const fn write_embedding_db() -> (u16, &'static str) {
step(7)
}
pub const fn post_processing_facets() -> (u16, &'static str) {
pub const fn waiting_extractors() -> (u16, &'static str) {
step(8)
}
pub const fn post_processing_words() -> (u16, &'static str) {
pub const fn post_processing_facets() -> (u16, &'static str) {
step(9)
}
pub const fn finalizing() -> (u16, &'static str) {
pub const fn post_processing_words() -> (u16, &'static str) {
step(10)
}
pub const fn finalizing() -> (u16, &'static str) {
step(11)
}
}
/// This is the main function of this crate.
@ -143,11 +150,8 @@ where
let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000);
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads());
let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
@ -198,7 +202,7 @@ where
document_extractor_data.docids_delta.apply_to(document_ids);
}
field_distribution.retain(|_, v| *v == 0);
field_distribution.retain(|_, v| *v != 0);
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
let current_num_threads = rayon::current_num_threads();
@ -219,7 +223,7 @@ where
let (finished_steps, step_name) = steps::extract_facets();
facet_field_ids_delta = merge_and_send_facet_docids(
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?,
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, &extractor_sender.field_id_docid_facet_sender(), finished_steps, total_steps, step_name)?,
FacetDatabases::new(index),
index,
extractor_sender.facet_docids(),
@ -327,7 +331,15 @@ where
let (finished_steps, step_name) = steps::extract_word_proximity();
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?;
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
finished_steps,
total_steps,
step_name,
)?;
merge_and_send_docids(
caches,
index.word_pair_proximity_docids.remap_types(),
@ -341,35 +353,55 @@ where
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter();
let index_embeddings = index.embedding_configs(&rtxn)?;
let mut index_embeddings = index.embedding_configs(&rtxn)?;
if index_embeddings.is_empty() {
break 'vectors;
}
let embedding_sender = extractor_sender.embeddings();
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_embeddings();
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
let mut user_provided = HashMap::new();
for data in datastore {
let data = data.into_inner().0;
for (embedder, deladd) in data.into_iter() {
let user_provided = user_provided.entry(embedder).or_insert(Default::default());
if let Some(del) = deladd.del {
*user_provided -= del;
}
if let Some(add) = deladd.add {
*user_provided |= add;
}
for config in &mut index_embeddings {
'data: for data in datastore.iter_mut() {
let data = &mut data.get_mut().0;
let Some(deladd) = data.remove(&config.name) else { continue 'data; };
deladd.apply_to(&mut config.user_provided);
}
}
embedding_sender.finish(user_provided).unwrap();
embedding_sender.finish(index_embeddings).unwrap();
}
'geo: {
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
let _entered = span.enter();
// let geo_sender = extractor_sender.geo_points();
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
break 'geo;
};
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_geo_points();
extract(document_changes,
&extractor,
indexing_context,
&mut extractor_allocs,
&datastore,
finished_steps,
total_steps,
step_name,
)?;
merge_and_send_rtree(
datastore,
&rtxn,
index,
extractor_sender.geo(),
&indexing_context.must_stop_processing,
)?;
}
// TODO THIS IS TOO MUCH
@ -471,7 +503,7 @@ where
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_item(wtxn, docid, &embedding)?;
}
ArroyOperation::Finish { mut user_provided } => {
ArroyOperation::Finish { configs } => {
let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
let _entered = span.enter();
@ -496,14 +528,6 @@ where
)?;
}
let mut configs = index.embedding_configs(wtxn)?;
for config in &mut configs {
if let Some(user_provided) = user_provided.remove(&config.name) {
config.user_provided = user_provided;
}
}
index.put_embedding_configs(wtxn, configs)?;
}
},
@ -681,13 +705,11 @@ fn compute_facet_search_database(
}
EitherOrBoth::Left(result) => {
let (key, _) = result?;
facet_search_builder
.register_from_key(DelAdd::Deletion, key.left_bound.as_ref())?;
facet_search_builder.register_from_key(DelAdd::Deletion, key)?;
}
EitherOrBoth::Right(result) => {
let (key, _) = result?;
facet_search_builder
.register_from_key(DelAdd::Addition, key.left_bound.as_ref())?;
facet_search_builder.register_from_key(DelAdd::Addition, key)?;
}
}
}
@ -735,7 +757,7 @@ pub fn retrieve_or_guess_primary_key<'a>(
index: &Index,
new_fields_ids_map: &mut FieldsIdsMap,
primary_key_from_op: Option<&'a str>,
first_document: Option<&'a TopLevelMap<'a>>,
first_document: Option<RawMap<'a>>,
) -> Result<StdResult<(PrimaryKey<'a>, bool), UserError>> {
// make sure that we have a declared primary key, either fetching it from the index or attempting to guess it.
@ -771,12 +793,18 @@ pub fn retrieve_or_guess_primary_key<'a>(
None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
};
let mut guesses: Vec<&str> = first_document
let guesses: Result<Vec<&str>> = first_document
.keys()
.map(AsRef::as_ref)
.filter(|name| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY))
.filter_map(|name| {
let Some(_) = new_fields_ids_map.insert(name) else {
return Some(Err(UserError::AttributeLimitReached.into()));
};
name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name))
})
.collect();
let mut guesses = guesses?;
// sort the keys in lexicographical order, so that fields are always in the same order.
guesses.sort_unstable();

View File

@ -93,7 +93,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
let DocumentChangeContext {
index,
db_fields_ids_map,
txn,
rtxn: txn,
new_fields_ids_map,
doc_alloc,
..

View File

@ -1,68 +1,63 @@
use std::io::{self};
use std::cell::RefCell;
use std::io;
use bincode::ErrorKind;
use hashbrown::HashSet;
use heed::types::Bytes;
use heed::{Database, RoTxn};
use memmap2::Mmap;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap;
use super::channel::*;
use super::extract::{
merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind,
GeoExtractorData,
};
use super::DocumentChange;
use crate::{
CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, InternalError,
Result,
};
use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
pub struct GeoExtractor {
rtree: Option<rstar::RTree<GeoPoint>>,
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_rtree<'extractor, MSP>(
datastore: impl IntoIterator<Item = RefCell<GeoExtractorData<'extractor>>>,
rtxn: &RoTxn,
index: &Index,
geo_sender: GeoSender<'_>,
must_stop_processing: &MSP,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,
{
let mut rtree = index.geo_rtree(rtxn)?.unwrap_or_default();
let mut faceted = index.geo_faceted_documents_ids(rtxn)?;
impl GeoExtractor {
pub fn new(rtxn: &RoTxn, index: &Index) -> Result<Option<Self>> {
let is_sortable = index.sortable_fields(rtxn)?.contains("_geo");
let is_filterable = index.filterable_fields(rtxn)?.contains("_geo");
if is_sortable || is_filterable {
Ok(Some(GeoExtractor { rtree: index.geo_rtree(rtxn)? }))
} else {
Ok(None)
for data in datastore {
if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into());
}
let mut frozen = data.into_inner().freeze()?;
for result in frozen.iter_and_clear_removed() {
let extracted_geo_point = result?;
debug_assert!(rtree.remove(&GeoPoint::from(extracted_geo_point)).is_some());
debug_assert!(faceted.remove(extracted_geo_point.docid));
}
for result in frozen.iter_and_clear_inserted() {
let extracted_geo_point = result?;
rtree.insert(GeoPoint::from(extracted_geo_point));
debug_assert!(faceted.insert(extracted_geo_point.docid));
}
}
pub fn manage_change(
&mut self,
fidmap: &mut GlobalFieldsIdsMap,
change: &DocumentChange,
) -> Result<()> {
match change {
DocumentChange::Deletion(_) => todo!(),
DocumentChange::Update(_) => todo!(),
DocumentChange::Insertion(_) => todo!(),
}
}
let mut file = tempfile::tempfile()?;
/// manage error
bincode::serialize_into(&mut file, &rtree).unwrap();
file.sync_all()?;
pub fn serialize_rtree<W: io::Write>(self, writer: &mut W) -> Result<bool> {
match self.rtree {
Some(rtree) => {
// TODO What should I do?
bincode::serialize_into(writer, &rtree).map(|_| true).map_err(|e| match *e {
ErrorKind::Io(e) => Error::IoError(e),
ErrorKind::InvalidUtf8Encoding(_) => todo!(),
ErrorKind::InvalidBoolEncoding(_) => todo!(),
ErrorKind::InvalidCharEncoding => todo!(),
ErrorKind::InvalidTagEncoding(_) => todo!(),
ErrorKind::DeserializeAnyNotSupported => todo!(),
ErrorKind::SizeLimit => todo!(),
ErrorKind::SequenceMustHaveLength => todo!(),
ErrorKind::Custom(_) => todo!(),
})
}
None => Ok(false),
}
}
let rtree_mmap = unsafe { Mmap::map(&file)? };
geo_sender.set_rtree(rtree_mmap).unwrap();
geo_sender.set_geo_faceted(&faceted).unwrap();
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]

View File

@ -12,7 +12,7 @@ use super::indexer::de::DeserrRawValue;
use crate::documents::FieldIdMapper;
use crate::index::IndexEmbeddingConfig;
use crate::vector::parsed_vectors::{
RawVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
RawVectors, RawVectorsError, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
};
use crate::vector::{ArroyWrapper, Embedding, EmbeddingConfigs};
use crate::{DocumentId, Index, InternalError, Result, UserError};
@ -71,6 +71,7 @@ pub struct VectorEntry<'doc> {
pub has_configured_embedder: bool,
pub embeddings: Option<Embeddings<'doc>>,
pub regenerate: bool,
pub implicit: bool,
}
pub trait VectorDocument<'doc> {
@ -125,6 +126,7 @@ impl<'t> VectorDocumentFromDb<'t> {
has_configured_embedder: true,
embeddings: Some(Embeddings::FromDb(vectors)),
regenerate: !config.user_provided.contains(self.docid),
implicit: false,
})
}
}
@ -141,7 +143,14 @@ impl<'t> VectorDocument<'t> for VectorDocumentFromDb<'t> {
Ok((&*config_name, entry))
})
.chain(self.vectors_field.iter().flat_map(|map| map.iter()).map(|(name, value)| {
Ok((name, entry_from_raw_value(value, false).map_err(InternalError::SerdeJson)?))
Ok((
name,
entry_from_raw_value(value, false).map_err(|_| {
InternalError::Serialization(crate::SerializationError::Decoding {
db_name: Some(crate::index::db_name::VECTOR_ARROY),
})
})?,
))
}))
}
@ -153,43 +162,71 @@ impl<'t> VectorDocument<'t> for VectorDocumentFromDb<'t> {
Some(self.entry_from_db(embedder_id, config)?)
}
None => match self.vectors_field.as_ref().and_then(|obkv| obkv.get(key)) {
Some(embedding_from_doc) => Some(
entry_from_raw_value(embedding_from_doc, false)
.map_err(InternalError::SerdeJson)?,
),
Some(embedding_from_doc) => {
Some(entry_from_raw_value(embedding_from_doc, false).map_err(|_| {
InternalError::Serialization(crate::SerializationError::Decoding {
db_name: Some(crate::index::db_name::VECTOR_ARROY),
})
})?)
}
None => None,
},
})
}
}
fn entry_from_raw_value_user<'doc>(
external_docid: &str,
embedder_name: &str,
value: &'doc RawValue,
has_configured_embedder: bool,
) -> Result<VectorEntry<'doc>> {
entry_from_raw_value(value, has_configured_embedder).map_err(|error| {
UserError::InvalidVectorsEmbedderConf {
document_id: external_docid.to_string(),
error: error.msg(embedder_name),
}
.into()
})
}
fn entry_from_raw_value(
value: &RawValue,
has_configured_embedder: bool,
) -> std::result::Result<VectorEntry<'_>, serde_json::Error> {
let value: RawVectors = serde_json::from_str(value.get())?;
) -> std::result::Result<VectorEntry<'_>, RawVectorsError> {
let value: RawVectors = RawVectors::from_raw_value(value)?;
Ok(match value {
RawVectors::Explicit(raw_explicit_vectors) => VectorEntry {
has_configured_embedder,
embeddings: raw_explicit_vectors.embeddings.map(Embeddings::FromJsonExplicit),
regenerate: raw_explicit_vectors.regenerate,
implicit: false,
},
RawVectors::ImplicitlyUserProvided(value) => VectorEntry {
has_configured_embedder,
embeddings: Some(Embeddings::FromJsonImplicityUserProvided(value)),
// implicitly user provided always provide embeddings
// `None` here means that there are no embeddings
embeddings: Some(
value
.map(Embeddings::FromJsonImplicityUserProvided)
.unwrap_or(Embeddings::FromDb(Default::default())),
),
regenerate: false,
implicit: true,
},
})
}
pub struct VectorDocumentFromVersions<'doc> {
external_document_id: &'doc str,
vectors: RawMap<'doc>,
embedders: &'doc EmbeddingConfigs,
}
impl<'doc> VectorDocumentFromVersions<'doc> {
pub fn new(
external_document_id: &'doc str,
versions: &Versions<'doc>,
bump: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
@ -198,7 +235,7 @@ impl<'doc> VectorDocumentFromVersions<'doc> {
if let Some(vectors_field) = document.vectors_field()? {
let vectors =
RawMap::from_raw_value(vectors_field, bump).map_err(UserError::SerdeJson)?;
Ok(Some(Self { vectors, embedders }))
Ok(Some(Self { external_document_id, vectors, embedders }))
} else {
Ok(None)
}
@ -208,16 +245,24 @@ impl<'doc> VectorDocumentFromVersions<'doc> {
impl<'doc> VectorDocument<'doc> for VectorDocumentFromVersions<'doc> {
fn iter_vectors(&self) -> impl Iterator<Item = Result<(&'doc str, VectorEntry<'doc>)>> {
self.vectors.iter().map(|(embedder, vectors)| {
let vectors = entry_from_raw_value(vectors, self.embedders.contains(embedder))
.map_err(UserError::SerdeJson)?;
let vectors = entry_from_raw_value_user(
self.external_document_id,
embedder,
vectors,
self.embedders.contains(embedder),
)?;
Ok((embedder, vectors))
})
}
fn vectors_for_key(&self, key: &str) -> Result<Option<VectorEntry<'doc>>> {
let Some(vectors) = self.vectors.get(key) else { return Ok(None) };
let vectors = entry_from_raw_value(vectors, self.embedders.contains(key))
.map_err(UserError::SerdeJson)?;
let vectors = entry_from_raw_value_user(
self.external_document_id,
key,
vectors,
self.embedders.contains(key),
)?;
Ok(Some(vectors))
}
}
@ -228,8 +273,10 @@ pub struct MergedVectorDocument<'doc> {
}
impl<'doc> MergedVectorDocument<'doc> {
#[allow(clippy::too_many_arguments)]
pub fn with_db<Mapper: FieldIdMapper>(
docid: DocumentId,
external_document_id: &'doc str,
index: &'doc Index,
rtxn: &'doc RoTxn,
db_fields_ids_map: &'doc Mapper,
@ -238,16 +285,20 @@ impl<'doc> MergedVectorDocument<'doc> {
embedders: &'doc EmbeddingConfigs,
) -> Result<Option<Self>> {
let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?;
let new_doc = VectorDocumentFromVersions::new(versions, doc_alloc, embedders)?;
let new_doc =
VectorDocumentFromVersions::new(&external_document_id, versions, doc_alloc, embedders)?;
Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) })
}
pub fn without_db(
external_document_id: &'doc str,
versions: &Versions<'doc>,
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,
) -> Result<Option<Self>> {
let Some(new_doc) = VectorDocumentFromVersions::new(versions, doc_alloc, embedders)? else {
let Some(new_doc) =
VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?
else {
return Ok(None);
};
Ok(Some(Self { new_doc: Some(new_doc), db: None }))

View File

@ -183,14 +183,17 @@ impl Embedder {
let token_ids = if token_ids.len() > 512 { &token_ids[..512] } else { token_ids };
let token_ids =
Tensor::new(token_ids, &self.model.device).map_err(EmbedError::tensor_shape)?;
let token_ids = Tensor::stack(&[token_ids], 0).map_err(EmbedError::tensor_shape)?;
let token_type_ids = token_ids.zeros_like().map_err(EmbedError::tensor_shape)?;
let embeddings =
self.model.forward(&token_ids, &token_type_ids).map_err(EmbedError::model_forward)?;
// Apply some avg-pooling by taking the mean embedding value for all tokens (including padding)
let (n_tokens, _hidden_size) = embeddings.dims2().map_err(EmbedError::tensor_shape)?;
let embedding = (embeddings.sum(0).map_err(EmbedError::tensor_value)? / (n_tokens as f64))
let (_n_sentence, n_tokens, _hidden_size) =
embeddings.dims3().map_err(EmbedError::tensor_shape)?;
let embedding = (embeddings.sum(1).map_err(EmbedError::tensor_value)? / (n_tokens as f64))
.map_err(EmbedError::tensor_shape)?;
let embedding = embedding.squeeze(0).map_err(EmbedError::tensor_shape)?;
let embedding: Embedding = embedding.to_vec1().map_err(EmbedError::tensor_shape)?;
Ok(embedding)
}

View File

@ -648,7 +648,7 @@ impl Embedder {
Embedder::HuggingFace(embedder) => embedder.chunk_count_hint(),
Embedder::OpenAi(embedder) => embedder.chunk_count_hint(),
Embedder::Ollama(embedder) => embedder.chunk_count_hint(),
Embedder::UserProvided(_) => 1,
Embedder::UserProvided(_) => 100,
Embedder::Rest(embedder) => embedder.chunk_count_hint(),
}
}

View File

@ -113,7 +113,7 @@ impl Embedder {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.chunk_count_hint())
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk))
.collect();

View File

@ -266,7 +266,7 @@ impl Embedder {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.chunk_count_hint())
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk))
.collect();

View File

@ -12,11 +12,248 @@ use crate::{DocumentId, FieldId, InternalError, UserError};
pub const RESERVED_VECTORS_FIELD_NAME: &str = "_vectors";
#[derive(serde::Serialize, serde::Deserialize, Debug)]
#[derive(serde::Serialize, Debug)]
#[serde(untagged)]
pub enum RawVectors<'doc> {
Explicit(#[serde(borrow)] RawExplicitVectors<'doc>),
ImplicitlyUserProvided(#[serde(borrow)] &'doc RawValue),
ImplicitlyUserProvided(#[serde(borrow)] Option<&'doc RawValue>),
}
pub enum RawVectorsError {
DeserializeSeq { index: usize, error: String },
DeserializeKey { error: String },
DeserializeRegenerate { error: String },
DeserializeEmbeddings { error: String },
UnknownField { field: String },
MissingRegenerate,
WrongKind { kind: &'static str, value: String },
Parsing(serde_json::Error),
}
impl RawVectorsError {
pub fn msg(self, embedder_name: &str) -> String {
match self {
RawVectorsError::DeserializeSeq { index, error } => format!(
"Could not parse `._vectors.{embedder_name}[{index}]`: {error}"
),
RawVectorsError::DeserializeKey { error } => format!(
"Could not parse a field at `._vectors.{embedder_name}`: {error}"
),
RawVectorsError::DeserializeRegenerate { error } => format!(
"Could not parse `._vectors.{embedder_name}.regenerate`: {error}"
),
RawVectorsError::DeserializeEmbeddings { error } => format!(
"Could not parse `._vectors.{embedder_name}.embeddings`: {error}"
),
RawVectorsError::UnknownField { field } => format!(
"Unexpected field `._vectors.{embedder_name}.{field}`\n \
- note: the allowed fields are `regenerate` and `embeddings`"
),
RawVectorsError::MissingRegenerate => format!(
"Missing field `._vectors.{embedder_name}.regenerate`\n \
- note: `._vectors.{embedder_name}` must be an array of floats, an array of arrays of floats, or an object with field `regenerate`"
),
RawVectorsError::WrongKind { kind, value } => format!(
"Expected `._vectors.{embedder_name}` to be an array of floats, an array of arrays of floats, or an object with at least the field `regenerate`, but got the {kind} `{value}`"
),
RawVectorsError::Parsing(error) => format!(
"Could not parse `._vectors.{embedder_name}`: {error}"
),
}
}
}
impl<'doc> RawVectors<'doc> {
pub fn from_raw_value(raw: &'doc RawValue) -> Result<Self, RawVectorsError> {
use serde::de::Deserializer as _;
Ok(match raw.deserialize_any(RawVectorsVisitor).map_err(RawVectorsError::Parsing)?? {
RawVectorsVisitorValue::ImplicitNone => RawVectors::ImplicitlyUserProvided(None),
RawVectorsVisitorValue::Implicit => RawVectors::ImplicitlyUserProvided(Some(raw)),
RawVectorsVisitorValue::Explicit { regenerate, embeddings } => {
RawVectors::Explicit(RawExplicitVectors { embeddings, regenerate })
}
})
}
}
struct RawVectorsVisitor;
enum RawVectorsVisitorValue<'doc> {
ImplicitNone,
Implicit,
Explicit { regenerate: bool, embeddings: Option<&'doc RawValue> },
}
impl<'doc> serde::de::Visitor<'doc> for RawVectorsVisitor {
type Value = std::result::Result<RawVectorsVisitorValue<'doc>, RawVectorsError>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a map containing at least `regenerate`, or an array of floats`")
}
fn visit_none<E>(self) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Ok(RawVectorsVisitorValue::ImplicitNone))
}
fn visit_some<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'doc>,
{
deserializer.deserialize_any(self)
}
fn visit_unit<E>(self) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Ok(RawVectorsVisitorValue::ImplicitNone))
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'doc>,
{
let mut index = 0;
// must consume all elements or parsing fails
loop {
match seq.next_element::<&RawValue>() {
Ok(Some(_)) => index += 1,
Err(error) => {
return Ok(Err(RawVectorsError::DeserializeSeq {
index,
error: error.to_string(),
}))
}
Ok(None) => break,
};
}
Ok(Ok(RawVectorsVisitorValue::Implicit))
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'doc>,
{
let mut regenerate = None;
let mut embeddings = None;
loop {
match map.next_key::<&str>() {
Ok(Some("regenerate")) => {
let value: bool = match map.next_value() {
Ok(value) => value,
Err(error) => {
return Ok(Err(RawVectorsError::DeserializeRegenerate {
error: error.to_string(),
}))
}
};
regenerate = Some(value);
}
Ok(Some("embeddings")) => {
let value: &RawValue = match map.next_value() {
Ok(value) => value,
Err(error) => {
return Ok(Err(RawVectorsError::DeserializeEmbeddings {
error: error.to_string(),
}))
}
};
embeddings = Some(value);
}
Ok(Some(other)) => {
return Ok(Err(RawVectorsError::UnknownField { field: other.to_string() }))
}
Ok(None) => break,
Err(error) => {
return Ok(Err(RawVectorsError::DeserializeKey { error: error.to_string() }))
}
}
}
let Some(regenerate) = regenerate else {
return Ok(Err(RawVectorsError::MissingRegenerate));
};
Ok(Ok(RawVectorsVisitorValue::Explicit { regenerate, embeddings }))
}
fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Err(RawVectorsError::WrongKind { kind: "boolean", value: v.to_string() }))
}
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Err(RawVectorsError::WrongKind { kind: "integer", value: v.to_string() }))
}
fn visit_i128<E>(self, v: i128) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Err(RawVectorsError::WrongKind { kind: "integer", value: v.to_string() }))
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Err(RawVectorsError::WrongKind { kind: "integer", value: v.to_string() }))
}
fn visit_u128<E>(self, v: u128) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Err(RawVectorsError::WrongKind { kind: "integer", value: v.to_string() }))
}
fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Err(RawVectorsError::WrongKind { kind: "number", value: v.to_string() }))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Err(RawVectorsError::WrongKind { kind: "string", value: v.to_string() }))
}
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Err(RawVectorsError::WrongKind { kind: "string", value: v }))
}
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Err(RawVectorsError::WrongKind { kind: "bytes", value: format!("{v:?}") }))
}
fn visit_newtype_struct<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'doc>,
{
deserializer.deserialize_any(self)
}
fn visit_enum<A>(self, _data: A) -> Result<Self::Value, A::Error>
where
A: serde::de::EnumAccess<'doc>,
{
Ok(Err(RawVectorsError::WrongKind { kind: "enum", value: "a variant".to_string() }))
}
}
#[derive(serde::Serialize, Debug)]
@ -86,7 +323,7 @@ impl<'doc> RawVectors<'doc> {
}
pub fn embeddings(&self) -> Option<&'doc RawValue> {
match self {
RawVectors::ImplicitlyUserProvided(embeddings) => Some(embeddings),
RawVectors::ImplicitlyUserProvided(embeddings) => *embeddings,
RawVectors::Explicit(RawExplicitVectors { embeddings, regenerate: _ }) => *embeddings,
}
}
@ -258,7 +495,7 @@ impl Error {
Error::InvalidEmbedderConf { error } => {
crate::Error::UserError(UserError::InvalidVectorsEmbedderConf {
document_id,
error,
error: error.to_string(),
})
}
Error::InternalSerdeJson(error) => {

View File

@ -193,7 +193,7 @@ impl Embedder {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.chunk_count_hint())
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk))
.collect();