5332: Fix geo update r=Kerollmops a=dureuill

# Pull Request

## Related issue
Fixes #5331

## What does this PR do?
- use the merged version that contains all fields instead of the updated version that contains only updated fields
- add test that detects the problem
- As it is the second time that `changes.updated` is causing a bug, I'm changing its name to `only_changed_fields`, hopefully better communicating that old fields are not there


Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-02-11 18:51:33 +00:00 committed by GitHub
commit 70305b9f71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 275 additions and 5 deletions

View File

@ -1803,6 +1803,275 @@ async fn add_documents_with_geo_field() {
"finishedAt": "[date]"
}
"###);
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@r###"
{
"results": [
{
"id": "1"
},
{
"id": "2",
"_geo": null
},
{
"id": "3",
"_geo": {
"lat": 1,
"lng": 1
}
},
{
"id": "4",
"_geo": {
"lat": "1",
"lng": "1"
}
}
],
"offset": 0,
"limit": 20,
"total": 4
}
"###);
let (response, code) = index
.search_post(json!({"sort": ["_geoPoint(50.629973371633746,3.0569447399419567):desc"]}))
.await;
snapshot!(code, @"200 OK");
// we are expecting docs 4 and 3 first as they have geo
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]" }),
@r###"
{
"hits": [
{
"id": "4",
"_geo": {
"lat": "1",
"lng": "1"
},
"_geoDistance": 5522018
},
{
"id": "3",
"_geo": {
"lat": 1,
"lng": 1
},
"_geoDistance": 5522018
},
{
"id": "1"
},
{
"id": "2",
"_geo": null
}
],
"query": "",
"processingTimeMs": "[time]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 4
}
"###);
}
#[actix_rt::test]
async fn update_documents_with_geo_field() {
let server = Server::new().await;
let index = server.index("doggo");
index.update_settings(json!({"sortableAttributes": ["_geo"]})).await;
let documents = json!([
{
"id": "1",
},
{
"id": "2",
"_geo": null,
},
{
"id": "3",
"_geo": { "lat": 1, "lng": 1 },
},
{
"id": "4",
"_geo": { "lat": "1", "lng": "1" },
},
]);
let (task, _status_code) = index.add_documents(documents, None).await;
let response = index.wait_task(task.uid()).await;
snapshot!(json_string!(response, { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@r###"
{
"uid": 1,
"batchUid": 1,
"indexUid": "doggo",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 4,
"indexedDocuments": 4
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"###);
let (response, code) = index
.search_post(json!({"sort": ["_geoPoint(50.629973371633746,3.0569447399419567):desc"]}))
.await;
snapshot!(code, @"200 OK");
// we are expecting docs 4 and 3 first as they have geo
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]" }),
@r###"
{
"hits": [
{
"id": "4",
"_geo": {
"lat": "1",
"lng": "1"
},
"_geoDistance": 5522018
},
{
"id": "3",
"_geo": {
"lat": 1,
"lng": 1
},
"_geoDistance": 5522018
},
{
"id": "1"
},
{
"id": "2",
"_geo": null
}
],
"query": "",
"processingTimeMs": "[time]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 4
}
"###);
let updated_documents = json!([{
"id": "3",
"doggo": "kefir",
}]);
let (task, _status_code) = index.update_documents(updated_documents, None).await;
let response = index.wait_task(task.uid()).await;
snapshot!(json_string!(response, { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@r###"
{
"uid": 2,
"batchUid": 2,
"indexUid": "doggo",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"###);
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@r###"
{
"results": [
{
"id": "1"
},
{
"id": "2",
"_geo": null
},
{
"id": "3",
"_geo": {
"lat": 1,
"lng": 1
},
"doggo": "kefir"
},
{
"id": "4",
"_geo": {
"lat": "1",
"lng": "1"
}
}
],
"offset": 0,
"limit": 20,
"total": 4
}
"###);
let (response, code) = index
.search_post(json!({"sort": ["_geoPoint(50.629973371633746,3.0569447399419567):desc"]}))
.await;
snapshot!(code, @"200 OK");
// the search response should not have changed: we are expecting docs 4 and 3 first as they have geo
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]" }),
@r###"
{
"hits": [
{
"id": "4",
"_geo": {
"lat": "1",
"lng": "1"
},
"_geoDistance": 5522018
},
{
"id": "3",
"_geo": {
"lat": 1,
"lng": 1
},
"doggo": "kefir",
"_geoDistance": 5522018
},
{
"id": "1"
},
{
"id": "2",
"_geo": null
}
],
"query": "",
"processingTimeMs": "[time]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 4
}
"###);
}
#[actix_rt::test]

View File

@ -144,7 +144,7 @@ impl<'doc> Update<'doc> {
)?)
}
pub fn updated(&self) -> DocumentFromVersions<'_, 'doc> {
pub fn only_changed_fields(&self) -> DocumentFromVersions<'_, 'doc> {
DocumentFromVersions::new(&self.new)
}
@ -182,7 +182,7 @@ impl<'doc> Update<'doc> {
let mut cached_current = None;
let mut updated_selected_field_count = 0;
for entry in self.updated().iter_top_level_fields() {
for entry in self.only_changed_fields().iter_top_level_fields() {
let (key, updated_value) = entry?;
if perm_json_p::select_field(key, fields, &[]) == perm_json_p::Selection::Skip {
@ -241,7 +241,7 @@ impl<'doc> Update<'doc> {
Ok(has_deleted_fields)
}
pub fn updated_vectors(
pub fn only_changed_vectors(
&self,
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,

View File

@ -199,7 +199,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
.transpose()?;
let updated_geo = update
.updated()
.merged(rtxn, index, db_fields_ids_map)?
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;

View File

@ -99,7 +99,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
context.db_fields_ids_map,
&context.doc_alloc,
)?;
let new_vectors = update.updated_vectors(&context.doc_alloc, self.embedders)?;
let new_vectors =
update.only_changed_vectors(&context.doc_alloc, self.embedders)?;
if let Some(new_vectors) = &new_vectors {
unused_vectors_distribution.append(new_vectors)?;