mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-25 19:45:05 +08:00
Compare commits
1 Commits
75d006d060
...
e186a250df
Author | SHA1 | Date | |
---|---|---|---|
|
e186a250df |
@ -32,15 +32,18 @@ use meilisearch_types::error::Code;
|
||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
|
||||
use meilisearch_types::milli::heed::CompactionOption;
|
||||
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
|
||||
use meilisearch_types::milli::update::new::indexer::{
|
||||
self, retrieve_or_guess_primary_key, UpdateByFunction,
|
||||
};
|
||||
use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSettings};
|
||||
use meilisearch_types::milli::vector::parsed_vectors::{
|
||||
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
|
||||
};
|
||||
use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder};
|
||||
use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
||||
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
|
||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
|
||||
use raw_collections::RawMap;
|
||||
use roaring::RoaringBitmap;
|
||||
use time::macros::format_description;
|
||||
use time::OffsetDateTime;
|
||||
@ -1226,7 +1229,9 @@ impl IndexScheduler {
|
||||
const PRINT_SECS_DELTA: u64 = 1;
|
||||
|
||||
let processing_tasks = self.processing_tasks.clone();
|
||||
|
||||
let must_stop_processing = self.must_stop_processing.clone();
|
||||
|
||||
let send_progress = |progress| {
|
||||
let now = std::time::Instant::now();
|
||||
let elapsed = secs_since_started_processing_at.load(atomic::Ordering::Relaxed);
|
||||
@ -1275,6 +1280,16 @@ impl IndexScheduler {
|
||||
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
|
||||
// this is made difficult by the fact we're doing private clones of the index scheduler and sending it
|
||||
// to a fresh thread.
|
||||
|
||||
/// TODO manage errors correctly
|
||||
let first_addition_uuid = operations
|
||||
.iter()
|
||||
.find_map(|op| match op {
|
||||
DocumentOperation::Add(content_uuid) => Some(content_uuid),
|
||||
_ => None,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let mut content_files = Vec::new();
|
||||
for operation in &operations {
|
||||
if let DocumentOperation::Add(content_uuid) = operation {
|
||||
@ -1290,30 +1305,88 @@ impl IndexScheduler {
|
||||
let db_fields_ids_map = index.fields_ids_map(&rtxn)?;
|
||||
let mut new_fields_ids_map = db_fields_ids_map.clone();
|
||||
|
||||
let first_document = match content_files.first() {
|
||||
Some(mmap) => {
|
||||
let mut iter = serde_json::Deserializer::from_slice(mmap).into_iter();
|
||||
iter.next().transpose().map_err(|e| e.into()).map_err(Error::IoError)?
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let (primary_key, primary_key_has_been_set) = retrieve_or_guess_primary_key(
|
||||
&rtxn,
|
||||
index,
|
||||
&mut new_fields_ids_map,
|
||||
primary_key.as_deref(),
|
||||
first_document
|
||||
.map(|raw| RawMap::from_raw_value(raw, &indexer_alloc))
|
||||
.transpose()
|
||||
.map_err(|error| {
|
||||
milli::Error::UserError(milli::UserError::SerdeJson(error))
|
||||
})?,
|
||||
)?
|
||||
.map_err(milli::Error::from)?;
|
||||
|
||||
let mut content_files_iter = content_files.iter();
|
||||
let mut indexer = indexer::DocumentOperation::new(method);
|
||||
let embedders = index.embedding_configs(index_wtxn)?;
|
||||
let embedders = self.embedders(embedders)?;
|
||||
for operation in operations {
|
||||
for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) {
|
||||
match operation {
|
||||
DocumentOperation::Add(_content_uuid) => {
|
||||
let mmap = content_files_iter.next().unwrap();
|
||||
indexer.add_documents(mmap)?;
|
||||
let stats = indexer.add_documents(mmap)?;
|
||||
// builder = builder.with_embedders(embedders.clone());
|
||||
|
||||
let received_documents =
|
||||
if let Some(Details::DocumentAdditionOrUpdate {
|
||||
received_documents,
|
||||
..
|
||||
}) = task.details
|
||||
{
|
||||
received_documents
|
||||
} else {
|
||||
// In the case of a `documentAdditionOrUpdate` the details MUST be set
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
task.status = Status::Succeeded;
|
||||
task.details = Some(Details::DocumentAdditionOrUpdate {
|
||||
received_documents,
|
||||
indexed_documents: Some(stats.document_count as u64),
|
||||
})
|
||||
}
|
||||
DocumentOperation::Delete(document_ids) => {
|
||||
let count = document_ids.len();
|
||||
let document_ids: bumpalo::collections::vec::Vec<_> = document_ids
|
||||
.iter()
|
||||
.map(|s| &*indexer_alloc.alloc_str(s))
|
||||
.collect_in(&indexer_alloc);
|
||||
indexer.delete_documents(document_ids.into_bump_slice());
|
||||
// Uses Invariant: remove documents actually always returns Ok for the inner result
|
||||
// let count = user_result.unwrap();
|
||||
let provided_ids =
|
||||
if let Some(Details::DocumentDeletion { provided_ids, .. }) =
|
||||
task.details
|
||||
{
|
||||
provided_ids
|
||||
} else {
|
||||
// In the case of a `documentAdditionOrUpdate` the details MUST be set
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
task.status = Status::Succeeded;
|
||||
task.details = Some(Details::DocumentDeletion {
|
||||
provided_ids,
|
||||
deleted_documents: Some(count as u64),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if tasks.iter().any(|res| res.error.is_none()) {
|
||||
let local_pool;
|
||||
let indexer_config = self.index_mapper.indexer_config();
|
||||
let pool = match &indexer_config.thread_pool {
|
||||
let pool = match &self.index_mapper.indexer_config().thread_pool {
|
||||
Some(pool) => pool,
|
||||
None => {
|
||||
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||
@ -1321,55 +1394,22 @@ impl IndexScheduler {
|
||||
}
|
||||
};
|
||||
|
||||
let (document_changes, operation_stats, primary_key) = indexer.into_changes(
|
||||
// TODO we want to multithread this
|
||||
let document_changes = indexer.into_changes(
|
||||
&indexer_alloc,
|
||||
index,
|
||||
&rtxn,
|
||||
primary_key.as_deref(),
|
||||
&primary_key,
|
||||
&mut new_fields_ids_map,
|
||||
)?;
|
||||
|
||||
let mut addition = 0;
|
||||
for (stats, task) in operation_stats.into_iter().zip(&mut tasks) {
|
||||
addition += stats.document_count;
|
||||
match stats.error {
|
||||
Some(error) => {
|
||||
task.status = Status::Failed;
|
||||
task.error = Some(milli::Error::UserError(error).into());
|
||||
}
|
||||
None => task.status = Status::Succeeded,
|
||||
}
|
||||
|
||||
task.details = match task.details {
|
||||
Some(Details::DocumentAdditionOrUpdate { received_documents, .. }) => {
|
||||
Some(Details::DocumentAdditionOrUpdate {
|
||||
received_documents,
|
||||
indexed_documents: Some(stats.document_count),
|
||||
})
|
||||
}
|
||||
Some(Details::DocumentDeletion { provided_ids, .. }) => {
|
||||
Some(Details::DocumentDeletion {
|
||||
provided_ids,
|
||||
deleted_documents: Some(stats.document_count),
|
||||
})
|
||||
}
|
||||
_ => {
|
||||
// In the case of a `documentAdditionOrUpdate` or `DocumentDeletion`
|
||||
// the details MUST be set to either addition or deletion
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if tasks.iter().any(|res| res.error.is_none()) {
|
||||
pool.install(|| {
|
||||
indexer::index(
|
||||
index_wtxn,
|
||||
index,
|
||||
indexer_config.grenad_parameters(),
|
||||
&db_fields_ids_map,
|
||||
new_fields_ids_map,
|
||||
primary_key,
|
||||
primary_key_has_been_set.then_some(primary_key),
|
||||
&document_changes,
|
||||
embedders,
|
||||
&|| must_stop_processing.get(),
|
||||
@ -1378,7 +1418,7 @@ impl IndexScheduler {
|
||||
})
|
||||
.unwrap()?;
|
||||
|
||||
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
||||
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
||||
}
|
||||
// else if primary_key_has_been_set {
|
||||
// // Everything failed but we've set a primary key.
|
||||
@ -1395,14 +1435,12 @@ impl IndexScheduler {
|
||||
Ok(tasks)
|
||||
}
|
||||
IndexOperation::DocumentEdition { mut task, .. } => {
|
||||
let (filter, code) = if let KindWithContent::DocumentEdition {
|
||||
filter_expr,
|
||||
context: _,
|
||||
function,
|
||||
..
|
||||
let (filter, context, code) =
|
||||
if let KindWithContent::DocumentEdition {
|
||||
filter_expr, context, function, ..
|
||||
} = &task.kind
|
||||
{
|
||||
(filter_expr, function)
|
||||
(filter_expr, context, function)
|
||||
} else {
|
||||
unreachable!()
|
||||
};
|
||||
@ -1459,8 +1497,7 @@ impl IndexScheduler {
|
||||
|
||||
if task.error.is_none() {
|
||||
let local_pool;
|
||||
let indexer_config = self.index_mapper.indexer_config();
|
||||
let pool = match &indexer_config.thread_pool {
|
||||
let pool = match &self.index_mapper.indexer_config().thread_pool {
|
||||
Some(pool) => pool,
|
||||
None => {
|
||||
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||
@ -1478,7 +1515,6 @@ impl IndexScheduler {
|
||||
indexer::index(
|
||||
index_wtxn,
|
||||
index,
|
||||
indexer_config.grenad_parameters(),
|
||||
&db_fields_ids_map,
|
||||
new_fields_ids_map,
|
||||
None, // cannot change primary key in DocumentEdition
|
||||
@ -1611,8 +1647,7 @@ impl IndexScheduler {
|
||||
|
||||
if !tasks.iter().all(|res| res.error.is_some()) {
|
||||
let local_pool;
|
||||
let indexer_config = self.index_mapper.indexer_config();
|
||||
let pool = match &indexer_config.thread_pool {
|
||||
let pool = match &self.index_mapper.indexer_config().thread_pool {
|
||||
Some(pool) => pool,
|
||||
None => {
|
||||
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||
@ -1630,7 +1665,6 @@ impl IndexScheduler {
|
||||
indexer::index(
|
||||
index_wtxn,
|
||||
index,
|
||||
indexer_config.grenad_parameters(),
|
||||
&db_fields_ids_map,
|
||||
new_fields_ids_map,
|
||||
None, // document deletion never changes primary key
|
||||
|
@ -971,8 +971,6 @@ impl IndexScheduler {
|
||||
let ProcessingTasks { started_at, processing, progress, .. } =
|
||||
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
|
||||
|
||||
let _ = progress;
|
||||
|
||||
let ret = tasks.into_iter();
|
||||
if processing.is_empty() {
|
||||
Ok((ret.collect(), total))
|
||||
@ -4298,11 +4296,11 @@ mod tests {
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed");
|
||||
|
||||
// The second batch should fail.
|
||||
handle.advance_one_successful_batch();
|
||||
handle.advance_one_failed_batch();
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_task_fails");
|
||||
|
||||
// The second batch should fail.
|
||||
handle.advance_one_successful_batch();
|
||||
handle.advance_one_failed_batch();
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_task_fails");
|
||||
|
||||
// Is the primary key still what we expect?
|
||||
@ -4363,7 +4361,7 @@ mod tests {
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed");
|
||||
|
||||
// The second batch should fail and contains two tasks.
|
||||
handle.advance_one_successful_batch();
|
||||
handle.advance_one_failed_batch();
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_and_third_tasks_fails");
|
||||
|
||||
// Is the primary key still what we expect?
|
||||
@ -4442,8 +4440,7 @@ mod tests {
|
||||
snapshot!(primary_key, @"id");
|
||||
|
||||
// We're trying to `bork` again, but now there is already a primary key set for this index.
|
||||
// NOTE: it's marked as successful because the batch didn't fails, it's the individual tasks that failed.
|
||||
handle.advance_one_successful_batch();
|
||||
handle.advance_one_failed_batch();
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "fourth_task_fails");
|
||||
|
||||
// Finally the last task should succeed since its primary key is the same as the valid one.
|
||||
@ -4603,7 +4600,7 @@ mod tests {
|
||||
snapshot!(primary_key.is_none(), @"false");
|
||||
|
||||
// The second batch should contains only one task that fails because it tries to update the primary key to `bork`.
|
||||
handle.advance_one_successful_batch();
|
||||
handle.advance_one_failed_batch();
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_task_fails");
|
||||
|
||||
// The third batch should succeed and only contains one task.
|
||||
@ -5216,10 +5213,9 @@ mod tests {
|
||||
|
||||
let configs = index_scheduler.embedders(configs).unwrap();
|
||||
let (hf_embedder, _, _) = configs.get(&simple_hf_name).unwrap();
|
||||
let beagle_embed =
|
||||
hf_embedder.embed_one(S("Intel the beagle best doggo"), None).unwrap();
|
||||
let lab_embed = hf_embedder.embed_one(S("Max the lab best doggo"), None).unwrap();
|
||||
let patou_embed = hf_embedder.embed_one(S("kefir the patou best doggo"), None).unwrap();
|
||||
let beagle_embed = hf_embedder.embed_one(S("Intel the beagle best doggo")).unwrap();
|
||||
let lab_embed = hf_embedder.embed_one(S("Max the lab best doggo")).unwrap();
|
||||
let patou_embed = hf_embedder.embed_one(S("kefir the patou best doggo")).unwrap();
|
||||
(fakerest_name, simple_hf_name, beagle_embed, lab_embed, patou_embed)
|
||||
};
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
---
|
||||
source: crates/index-scheduler/src/lib.rs
|
||||
source: index-scheduler/src/lib.rs
|
||||
---
|
||||
### Autobatching Enabled = true
|
||||
### Processing Tasks:
|
||||
@ -22,7 +22,7 @@ succeeded [0,1,2,]
|
||||
doggos [0,1,2,]
|
||||
----------------------------------------------------------------------
|
||||
### Index Mapper:
|
||||
doggos: { number_of_documents: 1, field_distribution: {"breed": 1, "doggo": 1, "id": 1} }
|
||||
doggos: { number_of_documents: 1, field_distribution: {"_vectors": 1, "breed": 1, "doggo": 1, "id": 1} }
|
||||
|
||||
----------------------------------------------------------------------
|
||||
### Canceled By:
|
||||
|
@ -220,12 +220,11 @@ pub fn read_json(input: &File, output: impl io::Write) -> Result<u64> {
|
||||
|
||||
let mut out = BufWriter::new(output);
|
||||
let mut deserializer = serde_json::Deserializer::from_slice(&input);
|
||||
let res = array_each(&mut deserializer, |obj: &RawValue| {
|
||||
let count = match array_each(&mut deserializer, |obj: &RawValue| {
|
||||
doc_alloc.reset();
|
||||
let map = RawMap::from_raw_value(obj, &doc_alloc)?;
|
||||
to_writer(&mut out, &map)
|
||||
});
|
||||
let count = match res {
|
||||
}) {
|
||||
// The json data has been deserialized and does not need to be processed again.
|
||||
// The data has been transferred to the writer during the deserialization process.
|
||||
Ok(Ok(count)) => count,
|
||||
|
@ -796,10 +796,8 @@ fn prepare_search<'t>(
|
||||
let span = tracing::trace_span!(target: "search::vector", "embed_one");
|
||||
let _entered = span.enter();
|
||||
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
|
||||
|
||||
embedder
|
||||
.embed_one(query.q.clone().unwrap(), Some(deadline))
|
||||
.embed_one(query.q.clone().unwrap())
|
||||
.map_err(milli::vector::Error::from)
|
||||
.map_err(milli::Error::from)?
|
||||
}
|
||||
|
@ -585,9 +585,9 @@ async fn federation_two_search_two_indexes() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
-100,
|
||||
340,
|
||||
90
|
||||
-100.0,
|
||||
340.0,
|
||||
90.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -613,9 +613,9 @@ async fn federation_two_search_two_indexes() {
|
||||
"cattos": "pésti",
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
1,
|
||||
2,
|
||||
3
|
||||
1.0,
|
||||
2.0,
|
||||
3.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -640,9 +640,9 @@ async fn federation_two_search_two_indexes() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
1,
|
||||
2,
|
||||
54
|
||||
1.0,
|
||||
2.0,
|
||||
54.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -707,9 +707,9 @@ async fn federation_multiple_search_multiple_indexes() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
-100,
|
||||
340,
|
||||
90
|
||||
-100.0,
|
||||
340.0,
|
||||
90.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -735,9 +735,9 @@ async fn federation_multiple_search_multiple_indexes() {
|
||||
"cattos": "pésti",
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
1,
|
||||
2,
|
||||
3
|
||||
1.0,
|
||||
2.0,
|
||||
3.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -773,9 +773,9 @@ async fn federation_multiple_search_multiple_indexes() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
1,
|
||||
2,
|
||||
54
|
||||
1.0,
|
||||
2.0,
|
||||
54.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -793,9 +793,9 @@ async fn federation_multiple_search_multiple_indexes() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
10,
|
||||
-23,
|
||||
32
|
||||
10.0,
|
||||
-23.0,
|
||||
32.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -824,9 +824,9 @@ async fn federation_multiple_search_multiple_indexes() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
10,
|
||||
23,
|
||||
32
|
||||
10.0,
|
||||
23.0,
|
||||
32.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -869,9 +869,9 @@ async fn federation_multiple_search_multiple_indexes() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
1,
|
||||
2,
|
||||
54
|
||||
1.0,
|
||||
2.0,
|
||||
54.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -898,9 +898,9 @@ async fn federation_multiple_search_multiple_indexes() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
-100,
|
||||
231,
|
||||
32
|
||||
-100.0,
|
||||
231.0,
|
||||
32.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -1522,9 +1522,9 @@ async fn federation_sort_same_indexes_same_criterion_same_direction() {
|
||||
"cattos": "pésti",
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
1,
|
||||
2,
|
||||
3
|
||||
1.0,
|
||||
2.0,
|
||||
3.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -1550,9 +1550,9 @@ async fn federation_sort_same_indexes_same_criterion_same_direction() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
1,
|
||||
2,
|
||||
54
|
||||
1.0,
|
||||
2.0,
|
||||
54.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -1582,9 +1582,9 @@ async fn federation_sort_same_indexes_same_criterion_same_direction() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
10,
|
||||
23,
|
||||
32
|
||||
10.0,
|
||||
23.0,
|
||||
32.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -1845,9 +1845,9 @@ async fn federation_sort_same_indexes_different_criterion_same_direction() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
1,
|
||||
2,
|
||||
54
|
||||
1.0,
|
||||
2.0,
|
||||
54.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -1874,9 +1874,9 @@ async fn federation_sort_same_indexes_different_criterion_same_direction() {
|
||||
"cattos": "pésti",
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
1,
|
||||
2,
|
||||
3
|
||||
1.0,
|
||||
2.0,
|
||||
3.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
@ -1906,9 +1906,9 @@ async fn federation_sort_same_indexes_different_criterion_same_direction() {
|
||||
],
|
||||
"_vectors": {
|
||||
"manual": [
|
||||
10,
|
||||
23,
|
||||
32
|
||||
10.0,
|
||||
23.0,
|
||||
32.0
|
||||
]
|
||||
},
|
||||
"_federation": {
|
||||
|
@ -137,14 +137,13 @@ fn long_text() -> &'static str {
|
||||
}
|
||||
|
||||
async fn create_mock_tokenized() -> (MockServer, Value) {
|
||||
create_mock_with_template("{{doc.text}}", ModelDimensions::Large, false, false).await
|
||||
create_mock_with_template("{{doc.text}}", ModelDimensions::Large, false).await
|
||||
}
|
||||
|
||||
async fn create_mock_with_template(
|
||||
document_template: &str,
|
||||
model_dimensions: ModelDimensions,
|
||||
fallible: bool,
|
||||
slow: bool,
|
||||
) -> (MockServer, Value) {
|
||||
let mock_server = MockServer::start().await;
|
||||
const API_KEY: &str = "my-api-key";
|
||||
@ -155,11 +154,7 @@ async fn create_mock_with_template(
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/"))
|
||||
.respond_with(move |req: &Request| {
|
||||
// 0. wait for a long time
|
||||
if slow {
|
||||
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||
}
|
||||
// 1. maybe return 500
|
||||
// 0. maybe return 500
|
||||
if fallible {
|
||||
let attempt = attempt.fetch_add(1, Ordering::Relaxed);
|
||||
let failed = matches!(attempt % 4, 0 | 1 | 3);
|
||||
@ -172,7 +167,7 @@ async fn create_mock_with_template(
|
||||
}))
|
||||
}
|
||||
}
|
||||
// 3. check API key
|
||||
// 1. check API key
|
||||
match req.headers.get("Authorization") {
|
||||
Some(api_key) if api_key == API_KEY_BEARER => {
|
||||
{}
|
||||
@ -207,7 +202,7 @@ async fn create_mock_with_template(
|
||||
)
|
||||
}
|
||||
}
|
||||
// 3. parse text inputs
|
||||
// 2. parse text inputs
|
||||
let query: serde_json::Value = match req.body_json() {
|
||||
Ok(query) => query,
|
||||
Err(_error) => return ResponseTemplate::new(400).set_body_json(
|
||||
@ -228,7 +223,7 @@ async fn create_mock_with_template(
|
||||
panic!("Expected {model_dimensions:?}, got {query_model_dimensions:?}")
|
||||
}
|
||||
|
||||
// 4. for each text, find embedding in responses
|
||||
// 3. for each text, find embedding in responses
|
||||
let serde_json::Value::Array(inputs) = &query["input"] else {
|
||||
panic!("Unexpected `input` value")
|
||||
};
|
||||
@ -288,7 +283,7 @@ async fn create_mock_with_template(
|
||||
"embedding": embedding,
|
||||
})).collect();
|
||||
|
||||
// 5. produce output from embeddings
|
||||
// 4. produce output from embeddings
|
||||
ResponseTemplate::new(200).set_body_json(json!({
|
||||
"object": "list",
|
||||
"data": data,
|
||||
@ -322,27 +317,23 @@ const DOGGO_TEMPLATE: &str = r#"{%- if doc.gender == "F" -%}Une chienne nommée
|
||||
{%- endif %}, de race {{doc.breed}}."#;
|
||||
|
||||
async fn create_mock() -> (MockServer, Value) {
|
||||
create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large, false, false).await
|
||||
create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large, false).await
|
||||
}
|
||||
|
||||
async fn create_mock_dimensions() -> (MockServer, Value) {
|
||||
create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large512, false, false).await
|
||||
create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large512, false).await
|
||||
}
|
||||
|
||||
async fn create_mock_small_embedding_model() -> (MockServer, Value) {
|
||||
create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Small, false, false).await
|
||||
create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Small, false).await
|
||||
}
|
||||
|
||||
async fn create_mock_legacy_embedding_model() -> (MockServer, Value) {
|
||||
create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Ada, false, false).await
|
||||
create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Ada, false).await
|
||||
}
|
||||
|
||||
async fn create_fallible_mock() -> (MockServer, Value) {
|
||||
create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large, true, false).await
|
||||
}
|
||||
|
||||
async fn create_slow_mock() -> (MockServer, Value) {
|
||||
create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large, true, true).await
|
||||
create_mock_with_template(DOGGO_TEMPLATE, ModelDimensions::Large, true).await
|
||||
}
|
||||
|
||||
// basic test "it works"
|
||||
@ -1882,114 +1873,4 @@ async fn it_still_works() {
|
||||
]
|
||||
"###);
|
||||
}
|
||||
|
||||
// test with a server that responds 500 on 3 out of 4 calls
|
||||
#[actix_rt::test]
|
||||
async fn timeout() {
|
||||
let (_mock, setting) = create_slow_mock().await;
|
||||
let server = get_server_vector().await;
|
||||
let index = server.index("doggo");
|
||||
|
||||
let (response, code) = index
|
||||
.update_settings(json!({
|
||||
"embedders": {
|
||||
"default": setting,
|
||||
},
|
||||
}))
|
||||
.await;
|
||||
snapshot!(code, @"202 Accepted");
|
||||
let task = server.wait_task(response.uid()).await;
|
||||
snapshot!(task["status"], @r###""succeeded""###);
|
||||
let documents = json!([
|
||||
{"id": 0, "name": "kefir", "gender": "M", "birthyear": 2023, "breed": "Patou"},
|
||||
]);
|
||||
let (value, code) = index.add_documents(documents, None).await;
|
||||
snapshot!(code, @"202 Accepted");
|
||||
let task = index.wait_task(value.uid()).await;
|
||||
snapshot!(task, @r###"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
"indexUid": "doggo",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 1,
|
||||
"indexedDocuments": 1
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]"
|
||||
}
|
||||
"###);
|
||||
|
||||
let (documents, _code) = index
|
||||
.get_all_documents(GetAllDocumentsOptions { retrieve_vectors: true, ..Default::default() })
|
||||
.await;
|
||||
snapshot!(json_string!(documents, {".results.*._vectors.default.embeddings" => "[vector]"}), @r###"
|
||||
{
|
||||
"results": [
|
||||
{
|
||||
"id": 0,
|
||||
"name": "kefir",
|
||||
"gender": "M",
|
||||
"birthyear": 2023,
|
||||
"breed": "Patou",
|
||||
"_vectors": {
|
||||
"default": {
|
||||
"embeddings": "[vector]",
|
||||
"regenerate": true
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"offset": 0,
|
||||
"limit": 20,
|
||||
"total": 1
|
||||
}
|
||||
"###);
|
||||
|
||||
let (response, code) = index
|
||||
.search_post(json!({
|
||||
"q": "grand chien de berger des montagnes",
|
||||
"hybrid": {"semanticRatio": 0.99, "embedder": "default"}
|
||||
}))
|
||||
.await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response["semanticHitCount"]), @"0");
|
||||
snapshot!(json_string!(response["hits"]), @"[]");
|
||||
|
||||
let (response, code) = index
|
||||
.search_post(json!({
|
||||
"q": "grand chien de berger des montagnes",
|
||||
"hybrid": {"semanticRatio": 0.99, "embedder": "default"}
|
||||
}))
|
||||
.await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response["semanticHitCount"]), @"1");
|
||||
snapshot!(json_string!(response["hits"]), @r###"
|
||||
[
|
||||
{
|
||||
"id": 0,
|
||||
"name": "kefir",
|
||||
"gender": "M",
|
||||
"birthyear": 2023,
|
||||
"breed": "Patou"
|
||||
}
|
||||
]
|
||||
"###);
|
||||
|
||||
let (response, code) = index
|
||||
.search_post(json!({
|
||||
"q": "grand chien de berger des montagnes",
|
||||
"hybrid": {"semanticRatio": 0.99, "embedder": "default"}
|
||||
}))
|
||||
.await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response["semanticHitCount"]), @"0");
|
||||
snapshot!(json_string!(response["hits"]), @"[]");
|
||||
}
|
||||
|
||||
// test with a server that wrongly responds 400
|
||||
|
@ -201,9 +201,7 @@ impl<'a> Search<'a> {
|
||||
let span = tracing::trace_span!(target: "search::hybrid", "embed_one");
|
||||
let _entered = span.enter();
|
||||
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3);
|
||||
|
||||
match embedder.embed_one(query, Some(deadline)) {
|
||||
match embedder.embed_one(query) {
|
||||
Ok(embedding) => embedding,
|
||||
Err(error) => {
|
||||
tracing::error!(error=%error, "Embedding failed");
|
||||
|
@ -119,8 +119,12 @@ impl GrenadParameters {
|
||||
///
|
||||
/// This should be called inside of a rayon thread pool,
|
||||
/// otherwise, it will take the global number of threads.
|
||||
///
|
||||
/// The max memory cannot exceed a given reasonable value.
|
||||
pub fn max_memory_by_thread(&self) -> Option<usize> {
|
||||
self.max_memory.map(|max_memory| (max_memory / rayon::current_num_threads()))
|
||||
self.max_memory.map(|max_memory| {
|
||||
(max_memory / rayon::current_num_threads()).min(MAX_GRENAD_SORTER_USAGE)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
use grenad::CompressionType;
|
||||
|
||||
use super::GrenadParameters;
|
||||
use crate::thread_pool_no_abort::ThreadPoolNoAbort;
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -16,17 +15,6 @@ pub struct IndexerConfig {
|
||||
pub skip_index_budget: bool,
|
||||
}
|
||||
|
||||
impl IndexerConfig {
|
||||
pub fn grenad_parameters(&self) -> GrenadParameters {
|
||||
GrenadParameters {
|
||||
chunk_compression_type: self.chunk_compression_type,
|
||||
chunk_compression_level: self.chunk_compression_level,
|
||||
max_memory: self.max_memory,
|
||||
max_nb_chunks: self.max_nb_chunks,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for IndexerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
|
@ -70,30 +70,28 @@ impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
|
||||
fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'t str, &'t RawValue)>> {
|
||||
let mut it = self.content.iter();
|
||||
|
||||
std::iter::from_fn(move || loop {
|
||||
std::iter::from_fn(move || {
|
||||
let (fid, value) = it.next()?;
|
||||
let name = match self.fields_ids_map.name(fid).ok_or(
|
||||
|
||||
let res = (|| loop {
|
||||
let name = self.fields_ids_map.name(fid).ok_or(
|
||||
InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldId {
|
||||
field_id: fid,
|
||||
process: "getting current document",
|
||||
}),
|
||||
) {
|
||||
Ok(name) => name,
|
||||
Err(error) => return Some(Err(error.into())),
|
||||
};
|
||||
)?;
|
||||
|
||||
if name == RESERVED_VECTORS_FIELD_NAME || name == "_geo" {
|
||||
continue;
|
||||
}
|
||||
|
||||
let res = (|| {
|
||||
let value =
|
||||
serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?;
|
||||
|
||||
Ok((name, value))
|
||||
return Ok((name, value));
|
||||
})();
|
||||
|
||||
return Some(res);
|
||||
Some(res)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -79,7 +79,7 @@ use roaring::RoaringBitmap;
|
||||
use rustc_hash::FxBuildHasher;
|
||||
|
||||
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::indexer::document_changes::MostlySend;
|
||||
use crate::update::new::KvReaderDelAdd;
|
||||
use crate::update::MergeDeladdCboRoaringBitmaps;
|
||||
use crate::{CboRoaringBitmapCodec, Result};
|
||||
|
@ -6,9 +6,8 @@ use hashbrown::HashMap;
|
||||
use super::DelAddRoaringBitmap;
|
||||
use crate::update::new::channel::DocumentsSender;
|
||||
use crate::update::new::document::{write_to_obkv, Document as _};
|
||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
|
||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, FullySend};
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::thread_local::FullySend;
|
||||
use crate::update::new::DocumentChange;
|
||||
use crate::vector::EmbeddingConfigs;
|
||||
use crate::Result;
|
||||
@ -44,12 +43,10 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
|
||||
let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc);
|
||||
let mut document_extractor_data = context.data.0.borrow_mut_or_yield();
|
||||
|
||||
for change in changes {
|
||||
let change = change?;
|
||||
// **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop
|
||||
// Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction)
|
||||
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
|
||||
|
||||
for change in changes {
|
||||
let change = change?;
|
||||
let external_docid = change.external_docid().to_owned();
|
||||
|
||||
// document but we need to create a function that collects and compresses documents.
|
||||
|
@ -15,10 +15,10 @@ use crate::heed_codec::facet::OrderedF64Codec;
|
||||
use crate::update::del_add::DelAdd;
|
||||
use crate::update::new::channel::FieldIdDocidFacetSender;
|
||||
use crate::update::new::indexer::document_changes::{
|
||||
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
||||
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
|
||||
Progress, ThreadLocal,
|
||||
};
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::thread_local::{FullySend, ThreadLocal};
|
||||
use crate::update::new::DocumentChange;
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
|
||||
@ -36,7 +36,7 @@ impl<'a, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a> {
|
||||
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
|
||||
Ok(RefCell::new(BalancedCaches::new_in(
|
||||
self.buckets,
|
||||
self.grenad_parameters.max_memory_by_thread(),
|
||||
self.grenad_parameters.max_memory,
|
||||
extractor_alloc,
|
||||
)))
|
||||
}
|
||||
@ -156,7 +156,6 @@ impl FacetedDocidsExtractor {
|
||||
res
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn facet_fn_with_options<'extractor, 'doc>(
|
||||
doc_alloc: &'doc Bump,
|
||||
cached_sorter: &mut BalancedCaches<'extractor>,
|
||||
@ -337,7 +336,6 @@ fn truncate_str(s: &str) -> &str {
|
||||
}
|
||||
|
||||
impl FacetedDocidsExtractor {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
|
||||
pub fn run_extraction<
|
||||
'pl,
|
||||
|
@ -11,9 +11,8 @@ use serde_json::Value;
|
||||
|
||||
use crate::error::GeoError;
|
||||
use crate::update::new::document::Document;
|
||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
|
||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::DocumentChange;
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result};
|
||||
@ -151,7 +150,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
|
||||
) -> Result<()> {
|
||||
let rtxn = &context.rtxn;
|
||||
let index = context.index;
|
||||
let max_memory = self.grenad_parameters.max_memory_by_thread();
|
||||
let max_memory = self.grenad_parameters.max_memory;
|
||||
let db_fields_ids_map = context.db_fields_ids_map;
|
||||
let mut data_ref = context.data.borrow_mut_or_yield();
|
||||
|
||||
|
@ -13,8 +13,9 @@ pub use geo::*;
|
||||
pub use searchable::*;
|
||||
pub use vectors::EmbeddingExtractor;
|
||||
|
||||
use super::indexer::document_changes::{DocumentChanges, IndexingContext, Progress};
|
||||
use super::thread_local::{FullySend, ThreadLocal};
|
||||
use super::indexer::document_changes::{
|
||||
DocumentChanges, FullySend, IndexingContext, Progress, ThreadLocal,
|
||||
};
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::Result;
|
||||
|
||||
|
@ -11,10 +11,10 @@ use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
|
||||
use crate::update::new::extract::cache::BalancedCaches;
|
||||
use crate::update::new::extract::perm_json_p::contained_in;
|
||||
use crate::update::new::indexer::document_changes::{
|
||||
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
||||
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
|
||||
MostlySend, Progress, ThreadLocal,
|
||||
};
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||
use crate::update::new::DocumentChange;
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
||||
@ -214,7 +214,7 @@ impl<'a, 'extractor> Extractor<'extractor> for WordDocidsExtractorData<'a> {
|
||||
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
|
||||
Ok(RefCell::new(Some(WordDocidsBalancedCaches::new_in(
|
||||
self.buckets,
|
||||
self.grenad_parameters.max_memory_by_thread(),
|
||||
self.grenad_parameters.max_memory,
|
||||
extractor_alloc,
|
||||
))))
|
||||
}
|
||||
|
@ -14,9 +14,9 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer};
|
||||
use super::cache::BalancedCaches;
|
||||
use super::DocidsExtractor;
|
||||
use crate::update::new::indexer::document_changes::{
|
||||
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
||||
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
|
||||
Progress, ThreadLocal,
|
||||
};
|
||||
use crate::update::new::thread_local::{FullySend, ThreadLocal};
|
||||
use crate::update::new::DocumentChange;
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
||||
@ -36,7 +36,7 @@ impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
|
||||
fn init_data(&self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
|
||||
Ok(RefCell::new(BalancedCaches::new_in(
|
||||
self.buckets,
|
||||
self.grenad_parameters.max_memory_by_thread(),
|
||||
self.grenad_parameters.max_memory,
|
||||
extractor_alloc,
|
||||
)))
|
||||
}
|
||||
|
@ -8,8 +8,7 @@ use super::cache::DelAddRoaringBitmap;
|
||||
use crate::error::FaultSource;
|
||||
use crate::prompt::Prompt;
|
||||
use crate::update::new::channel::EmbeddingSender;
|
||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
|
||||
use crate::update::new::vector_document::VectorDocument;
|
||||
use crate::update::new::DocumentChange;
|
||||
use crate::vector::error::{
|
||||
|
@ -8,9 +8,182 @@ use rayon::iter::IndexedParallelIterator;
|
||||
use super::super::document_change::DocumentChange;
|
||||
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
|
||||
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
|
||||
|
||||
/// A trait for types that are **not** [`Send`] only because they would then allow concurrent access to a type that is not [`Sync`].
|
||||
///
|
||||
/// The primary example of such a type is `&T`, with `T: !Sync`.
|
||||
///
|
||||
/// In the authors' understanding, a type can be `!Send` for two distinct reasons:
|
||||
///
|
||||
/// 1. Because it contains data that *genuinely* cannot be moved between threads, such as thread-local data.
|
||||
/// 2. Because sending the type would allow concurrent access to a `!Sync` type, which is undefined behavior.
|
||||
///
|
||||
/// `MostlySend` exists to be used in bounds where you need a type whose data is **not** *attached* to a thread
|
||||
/// because you might access it from a different thread, but where you will never access the type **concurrently** from
|
||||
/// multiple threads.
|
||||
///
|
||||
/// Like [`Send`], `MostlySend` assumes properties on types that cannot be verified by the compiler, which is why implementing
|
||||
/// this trait is unsafe.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// Implementers of this trait promises that the following properties hold on the implementing type:
|
||||
///
|
||||
/// 1. Its data can be accessed from any thread and will be the same regardless of the thread accessing it.
|
||||
/// 2. Any operation that can be performed on the type does not depend on the thread that executes it.
|
||||
///
|
||||
/// As these properties are subtle and are not generally tracked by the Rust type system, great care should be taken before
|
||||
/// implementing `MostlySend` on a type, especially a foreign type.
|
||||
///
|
||||
/// - An example of a type that verifies (1) and (2) is [`std::rc::Rc`] (when `T` is `Send` and `Sync`).
|
||||
/// - An example of a type that doesn't verify (1) is thread-local data.
|
||||
/// - An example of a type that doesn't verify (2) is [`std::sync::MutexGuard`]: a lot of mutex implementations require that
|
||||
/// a lock is returned to the operating system on the same thread that initially locked the mutex, failing to uphold this
|
||||
/// invariant will cause Undefined Behavior
|
||||
/// (see last § in [the nomicon](https://doc.rust-lang.org/nomicon/send-and-sync.html)).
|
||||
///
|
||||
/// It is **always safe** to implement this trait on a type that is `Send`, but no placeholder impl is provided due to limitations in
|
||||
/// coherency. Use the [`FullySend`] wrapper in this situation.
|
||||
pub unsafe trait MostlySend {}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||
pub struct FullySend<T>(pub T);
|
||||
|
||||
// SAFETY: a type **fully** send is always mostly send as well.
|
||||
unsafe impl<T> MostlySend for FullySend<T> where T: Send {}
|
||||
|
||||
unsafe impl<T> MostlySend for RefCell<T> where T: MostlySend {}
|
||||
|
||||
unsafe impl<T> MostlySend for Option<T> where T: MostlySend {}
|
||||
|
||||
impl<T> FullySend<T> {
|
||||
pub fn into(self) -> T {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<T> for FullySend<T> {
|
||||
fn from(value: T) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
struct MostlySendWrapper<T>(T);
|
||||
|
||||
impl<T: MostlySend> MostlySendWrapper<T> {
|
||||
/// # Safety
|
||||
///
|
||||
/// - (P1) Users of this type will never access the type concurrently from multiple threads without synchronization
|
||||
unsafe fn new(t: T) -> Self {
|
||||
Self(t)
|
||||
}
|
||||
|
||||
fn as_ref(&self) -> &T {
|
||||
&self.0
|
||||
}
|
||||
|
||||
fn as_mut(&mut self) -> &mut T {
|
||||
&mut self.0
|
||||
}
|
||||
|
||||
fn into_inner(self) -> T {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// 1. `T` is [`MostlySend`], so by its safety contract it can be accessed by any thread and all of its operations are available
|
||||
/// from any thread.
|
||||
/// 2. (P1) of `MostlySendWrapper::new` forces the user to never access the value from multiple threads concurrently.
|
||||
unsafe impl<T: MostlySend> Send for MostlySendWrapper<T> {}
|
||||
|
||||
/// A wrapper around [`thread_local::ThreadLocal`] that accepts [`MostlySend`] `T`s.
|
||||
#[derive(Default)]
|
||||
pub struct ThreadLocal<T: MostlySend> {
|
||||
inner: thread_local::ThreadLocal<MostlySendWrapper<T>>,
|
||||
// FIXME: this should be necessary
|
||||
//_no_send: PhantomData<*mut ()>,
|
||||
}
|
||||
|
||||
impl<T: MostlySend> ThreadLocal<T> {
|
||||
pub fn new() -> Self {
|
||||
Self { inner: thread_local::ThreadLocal::new() }
|
||||
}
|
||||
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
Self { inner: thread_local::ThreadLocal::with_capacity(capacity) }
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.inner.clear()
|
||||
}
|
||||
|
||||
pub fn get(&self) -> Option<&T> {
|
||||
self.inner.get().map(|t| t.as_ref())
|
||||
}
|
||||
|
||||
pub fn get_or<F>(&self, create: F) -> &T
|
||||
where
|
||||
F: FnOnce() -> T,
|
||||
{
|
||||
/// TODO: move ThreadLocal, MostlySend, FullySend to a dedicated file
|
||||
self.inner.get_or(|| unsafe { MostlySendWrapper::new(create()) }).as_ref()
|
||||
}
|
||||
|
||||
pub fn get_or_try<F, E>(&self, create: F) -> std::result::Result<&T, E>
|
||||
where
|
||||
F: FnOnce() -> std::result::Result<T, E>,
|
||||
{
|
||||
self.inner
|
||||
.get_or_try(|| unsafe { Ok(MostlySendWrapper::new(create()?)) })
|
||||
.map(MostlySendWrapper::as_ref)
|
||||
}
|
||||
|
||||
pub fn get_or_default(&self) -> &T
|
||||
where
|
||||
T: Default,
|
||||
{
|
||||
self.inner.get_or_default().as_ref()
|
||||
}
|
||||
|
||||
pub fn iter_mut(&mut self) -> IterMut<T> {
|
||||
IterMut(self.inner.iter_mut())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: MostlySend> IntoIterator for ThreadLocal<T> {
|
||||
type Item = T;
|
||||
|
||||
type IntoIter = IntoIter<T>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
IntoIter(self.inner.into_iter())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IterMut<'a, T: MostlySend>(thread_local::IterMut<'a, MostlySendWrapper<T>>);
|
||||
|
||||
impl<'a, T: MostlySend> Iterator for IterMut<'a, T> {
|
||||
type Item = &'a mut T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.0.next().map(|t| t.as_mut())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IntoIter<T: MostlySend>(thread_local::IntoIter<MostlySendWrapper<T>>);
|
||||
|
||||
impl<T: MostlySend> Iterator for IntoIter<T> {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.0.next().map(|t| t.into_inner())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DocumentChangeContext<
|
||||
'doc, // covariant lifetime of a single `process` call
|
||||
'extractor: 'doc, // invariant lifetime of the extractor_allocs
|
||||
|
@ -4,9 +4,8 @@ use rayon::iter::IndexedParallelIterator;
|
||||
use rayon::slice::ParallelSlice as _;
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use super::document_changes::{DocumentChangeContext, DocumentChanges};
|
||||
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::{Deletion, DocumentChange};
|
||||
use crate::{DocumentId, Result};
|
||||
|
||||
@ -93,10 +92,9 @@ mod test {
|
||||
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
||||
use crate::index::tests::TempIndex;
|
||||
use crate::update::new::indexer::document_changes::{
|
||||
extract, DocumentChangeContext, Extractor, IndexingContext,
|
||||
extract, DocumentChangeContext, Extractor, IndexingContext, MostlySend, ThreadLocal,
|
||||
};
|
||||
use crate::update::new::indexer::DocumentDeletion;
|
||||
use crate::update::new::thread_local::{MostlySend, ThreadLocal};
|
||||
use crate::update::new::DocumentChange;
|
||||
use crate::DocumentId;
|
||||
|
||||
|
@ -1,377 +1,27 @@
|
||||
use bumpalo::collections::CollectIn;
|
||||
use bumpalo::Bump;
|
||||
use hashbrown::hash_map::Entry;
|
||||
use heed::RoTxn;
|
||||
use memmap2::Mmap;
|
||||
use raw_collections::RawMap;
|
||||
use rayon::slice::ParallelSlice;
|
||||
use serde_json::value::RawValue;
|
||||
use serde_json::Deserializer;
|
||||
use IndexDocumentsMethod as Idm;
|
||||
|
||||
use super::super::document_change::DocumentChange;
|
||||
use super::document_changes::{DocumentChangeContext, DocumentChanges};
|
||||
use super::retrieve_or_guess_primary_key;
|
||||
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::update::new::document::Versions;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::{Deletion, Insertion, Update};
|
||||
use crate::update::{AvailableIds, IndexDocumentsMethod};
|
||||
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
|
||||
use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError};
|
||||
|
||||
pub struct DocumentOperation<'pl> {
|
||||
operations: Vec<Payload<'pl>>,
|
||||
method: MergeMethod,
|
||||
}
|
||||
|
||||
impl<'pl> DocumentOperation<'pl> {
|
||||
pub fn new(method: IndexDocumentsMethod) -> Self {
|
||||
Self { operations: Default::default(), method: MergeMethod::from(method) }
|
||||
}
|
||||
|
||||
/// TODO please give me a type
|
||||
/// The payload is expected to be in the grenad format
|
||||
pub fn add_documents(&mut self, payload: &'pl Mmap) -> Result<()> {
|
||||
payload.advise(memmap2::Advice::Sequential)?;
|
||||
self.operations.push(Payload::Addition(&payload[..]));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn delete_documents(&mut self, to_delete: &'pl [&'pl str]) {
|
||||
self.operations.push(Payload::Deletion(to_delete))
|
||||
}
|
||||
|
||||
pub fn into_changes(
|
||||
self,
|
||||
indexer: &'pl Bump,
|
||||
index: &Index,
|
||||
rtxn: &'pl RoTxn<'pl>,
|
||||
primary_key_from_op: Option<&'pl str>,
|
||||
new_fields_ids_map: &mut FieldsIdsMap,
|
||||
) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)> {
|
||||
let Self { operations, method } = self;
|
||||
|
||||
let documents_ids = index.documents_ids(rtxn)?;
|
||||
let mut operations_stats = Vec::new();
|
||||
let mut available_docids = AvailableIds::new(&documents_ids);
|
||||
let mut docids_version_offsets = hashbrown::HashMap::new();
|
||||
let mut primary_key = None;
|
||||
|
||||
for operation in operations {
|
||||
let (bytes, document_count, result) = match operation {
|
||||
Payload::Addition(payload) => extract_addition_payload_changes(
|
||||
indexer,
|
||||
index,
|
||||
rtxn,
|
||||
primary_key_from_op,
|
||||
&mut primary_key,
|
||||
new_fields_ids_map,
|
||||
&mut available_docids,
|
||||
&docids_version_offsets,
|
||||
method,
|
||||
payload,
|
||||
),
|
||||
Payload::Deletion(to_delete) => extract_deletion_payload_changes(
|
||||
index,
|
||||
rtxn,
|
||||
&mut available_docids,
|
||||
&docids_version_offsets,
|
||||
method,
|
||||
to_delete,
|
||||
),
|
||||
};
|
||||
|
||||
let error = match result {
|
||||
Ok(new_docids_version_offsets) => {
|
||||
// If we don't have any error then we can merge the content of this payload
|
||||
// into to main payload. Else we just drop this payload extraction.
|
||||
merge_version_offsets(&mut docids_version_offsets, new_docids_version_offsets);
|
||||
None
|
||||
}
|
||||
Err(Error::UserError(user_error)) => Some(user_error),
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
operations_stats.push(PayloadStats { document_count, bytes, error });
|
||||
}
|
||||
|
||||
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
|
||||
let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> =
|
||||
docids_version_offsets.drain().collect_in(indexer);
|
||||
|
||||
// Reorder the offsets to make sure we iterate on the file sequentially
|
||||
// And finally sort them
|
||||
docids_version_offsets.sort_unstable_by_key(|(_, po)| method.sort_key(&po.operations));
|
||||
|
||||
let docids_version_offsets = docids_version_offsets.into_bump_slice();
|
||||
Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key))
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn extract_addition_payload_changes<'r, 'pl: 'r>(
|
||||
indexer: &'pl Bump,
|
||||
index: &Index,
|
||||
rtxn: &'r RoTxn<'r>,
|
||||
primary_key_from_op: Option<&'r str>,
|
||||
primary_key: &mut Option<PrimaryKey<'r>>,
|
||||
new_fields_ids_map: &mut FieldsIdsMap,
|
||||
available_docids: &mut AvailableIds,
|
||||
main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>,
|
||||
method: MergeMethod,
|
||||
payload: &'pl [u8],
|
||||
) -> (u64, u64, Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>>) {
|
||||
let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
|
||||
|
||||
/// TODO manage the error
|
||||
let mut previous_offset = 0;
|
||||
let mut iter = Deserializer::from_slice(payload).into_iter::<&RawValue>();
|
||||
loop {
|
||||
let optdoc = match iter.next().transpose() {
|
||||
Ok(optdoc) => optdoc,
|
||||
Err(e) => {
|
||||
return (
|
||||
payload.len() as u64,
|
||||
new_docids_version_offsets.len() as u64,
|
||||
Err(InternalError::SerdeJson(e).into()),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
// Only guess the primary key if it is the first document
|
||||
let retrieved_primary_key = if previous_offset == 0 {
|
||||
let optdoc = match optdoc {
|
||||
Some(doc) => match RawMap::from_raw_value(doc, indexer) {
|
||||
Ok(docmap) => Some(docmap),
|
||||
Err(error) => {
|
||||
return (
|
||||
payload.len() as u64,
|
||||
new_docids_version_offsets.len() as u64,
|
||||
Err(Error::UserError(UserError::SerdeJson(error))),
|
||||
)
|
||||
}
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
|
||||
let result = retrieve_or_guess_primary_key(
|
||||
rtxn,
|
||||
index,
|
||||
new_fields_ids_map,
|
||||
primary_key_from_op,
|
||||
optdoc,
|
||||
);
|
||||
|
||||
let (pk, _has_been_changed) = match result {
|
||||
Ok(Ok(pk)) => pk,
|
||||
Ok(Err(user_error)) => {
|
||||
return (
|
||||
payload.len() as u64,
|
||||
new_docids_version_offsets.len() as u64,
|
||||
Err(Error::UserError(user_error)),
|
||||
)
|
||||
}
|
||||
Err(error) => {
|
||||
return (
|
||||
payload.len() as u64,
|
||||
new_docids_version_offsets.len() as u64,
|
||||
Err(error),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
primary_key.get_or_insert(pk)
|
||||
} else {
|
||||
primary_key.as_ref().unwrap()
|
||||
};
|
||||
|
||||
let doc = match optdoc {
|
||||
Some(doc) => doc,
|
||||
None => break,
|
||||
};
|
||||
|
||||
let external_id = match retrieved_primary_key.extract_fields_and_docid(
|
||||
doc,
|
||||
new_fields_ids_map,
|
||||
indexer,
|
||||
) {
|
||||
Ok(edi) => edi,
|
||||
Err(e) => {
|
||||
return (payload.len() as u64, new_docids_version_offsets.len() as u64, Err(e))
|
||||
}
|
||||
};
|
||||
|
||||
let external_id = external_id.to_de();
|
||||
let current_offset = iter.byte_offset();
|
||||
let document_offset = DocumentOffset { content: &payload[previous_offset..current_offset] };
|
||||
|
||||
match main_docids_version_offsets.get(external_id) {
|
||||
None => {
|
||||
let (docid, is_new) = match index.external_documents_ids().get(rtxn, external_id) {
|
||||
Ok(Some(docid)) => (docid, false),
|
||||
Ok(None) => (
|
||||
match available_docids.next() {
|
||||
Some(docid) => docid,
|
||||
None => {
|
||||
return (
|
||||
payload.len() as u64,
|
||||
new_docids_version_offsets.len() as u64,
|
||||
Err(UserError::DocumentLimitReached.into()),
|
||||
)
|
||||
}
|
||||
},
|
||||
true,
|
||||
),
|
||||
Err(e) => {
|
||||
return (
|
||||
payload.len() as u64,
|
||||
new_docids_version_offsets.len() as u64,
|
||||
Err(e.into()),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
match new_docids_version_offsets.entry(external_id) {
|
||||
Entry::Occupied(mut entry) => entry.get_mut().push_addition(document_offset),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(PayloadOperations::new_addition(
|
||||
method,
|
||||
docid,
|
||||
is_new,
|
||||
document_offset,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(payload_operations) => match new_docids_version_offsets.entry(external_id) {
|
||||
Entry::Occupied(mut entry) => entry.get_mut().push_addition(document_offset),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(PayloadOperations::new_addition(
|
||||
method,
|
||||
payload_operations.docid,
|
||||
payload_operations.is_new,
|
||||
document_offset,
|
||||
));
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
previous_offset = iter.byte_offset();
|
||||
}
|
||||
|
||||
(payload.len() as u64, new_docids_version_offsets.len() as u64, Ok(new_docids_version_offsets))
|
||||
}
|
||||
|
||||
fn extract_deletion_payload_changes<'s, 'pl: 's>(
|
||||
index: &Index,
|
||||
rtxn: &RoTxn,
|
||||
available_docids: &mut AvailableIds,
|
||||
main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
|
||||
method: MergeMethod,
|
||||
to_delete: &'pl [&'pl str],
|
||||
) -> (u64, u64, Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>>) {
|
||||
let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
|
||||
let mut document_count = 0;
|
||||
|
||||
for external_id in to_delete {
|
||||
match main_docids_version_offsets.get(external_id) {
|
||||
None => {
|
||||
let (docid, is_new) = match index.external_documents_ids().get(rtxn, external_id) {
|
||||
Ok(Some(docid)) => (docid, false),
|
||||
Ok(None) => (
|
||||
match available_docids.next() {
|
||||
Some(docid) => docid,
|
||||
None => {
|
||||
return (
|
||||
0,
|
||||
new_docids_version_offsets.len() as u64,
|
||||
Err(UserError::DocumentLimitReached.into()),
|
||||
)
|
||||
}
|
||||
},
|
||||
true,
|
||||
),
|
||||
Err(e) => return (0, new_docids_version_offsets.len() as u64, Err(e.into())),
|
||||
};
|
||||
|
||||
match new_docids_version_offsets.entry(external_id) {
|
||||
Entry::Occupied(mut entry) => entry.get_mut().push_deletion(),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(PayloadOperations::new_deletion(method, docid, is_new));
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(payload_operations) => match new_docids_version_offsets.entry(external_id) {
|
||||
Entry::Occupied(mut entry) => entry.get_mut().push_deletion(),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(PayloadOperations::new_deletion(
|
||||
method,
|
||||
payload_operations.docid,
|
||||
payload_operations.is_new,
|
||||
));
|
||||
}
|
||||
},
|
||||
}
|
||||
document_count += 1;
|
||||
}
|
||||
|
||||
(0, document_count, Ok(new_docids_version_offsets))
|
||||
}
|
||||
|
||||
fn merge_version_offsets<'s, 'pl>(
|
||||
main: &mut hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
|
||||
new: hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
|
||||
) {
|
||||
// We cannot swap like nothing because documents
|
||||
// operations must be in the right order.
|
||||
if main.is_empty() {
|
||||
return *main = new;
|
||||
}
|
||||
|
||||
for (key, new_payload) in new {
|
||||
match main.entry(key) {
|
||||
Entry::Occupied(mut entry) => entry.get_mut().append_operations(new_payload.operations),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(new_payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
|
||||
type Item = (&'pl str, PayloadOperations<'pl>);
|
||||
|
||||
fn iter(
|
||||
&self,
|
||||
chunk_size: usize,
|
||||
) -> impl rayon::prelude::IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
|
||||
self.docids_version_offsets.par_chunks(chunk_size)
|
||||
}
|
||||
|
||||
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
|
||||
&'doc self,
|
||||
context: &'doc DocumentChangeContext<T>,
|
||||
item: &'doc Self::Item,
|
||||
) -> Result<Option<DocumentChange<'doc>>>
|
||||
where
|
||||
'pl: 'doc,
|
||||
{
|
||||
let (external_doc, payload_operations) = item;
|
||||
payload_operations.merge_method.merge(
|
||||
payload_operations.docid,
|
||||
external_doc,
|
||||
payload_operations.is_new,
|
||||
&context.doc_alloc,
|
||||
&payload_operations.operations[..],
|
||||
)
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.docids_version_offsets.len()
|
||||
}
|
||||
index_documents_method: IndexDocumentsMethod,
|
||||
}
|
||||
|
||||
pub struct DocumentOperationChanges<'pl> {
|
||||
docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)],
|
||||
docids_version_offsets: &'pl [(&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>]))],
|
||||
index_documents_method: IndexDocumentsMethod,
|
||||
}
|
||||
|
||||
pub enum Payload<'pl> {
|
||||
@ -380,57 +30,8 @@ pub enum Payload<'pl> {
|
||||
}
|
||||
|
||||
pub struct PayloadStats {
|
||||
pub document_count: usize,
|
||||
pub bytes: u64,
|
||||
pub document_count: u64,
|
||||
pub error: Option<UserError>,
|
||||
}
|
||||
|
||||
pub struct PayloadOperations<'pl> {
|
||||
/// The internal document id of the document.
|
||||
pub docid: DocumentId,
|
||||
/// Wether this document is not in the current database (visible by the rtxn).
|
||||
pub is_new: bool,
|
||||
/// The operations to perform, in order, on this document.
|
||||
pub operations: Vec<InnerDocOp<'pl>>,
|
||||
/// The merge method we are using to merge payloads and documents.
|
||||
merge_method: MergeMethod,
|
||||
}
|
||||
|
||||
impl<'pl> PayloadOperations<'pl> {
|
||||
fn new_deletion(merge_method: MergeMethod, docid: DocumentId, is_new: bool) -> Self {
|
||||
Self { docid, is_new, operations: vec![InnerDocOp::Deletion], merge_method }
|
||||
}
|
||||
|
||||
fn new_addition(
|
||||
merge_method: MergeMethod,
|
||||
docid: DocumentId,
|
||||
is_new: bool,
|
||||
offset: DocumentOffset<'pl>,
|
||||
) -> Self {
|
||||
Self { docid, is_new, operations: vec![InnerDocOp::Addition(offset)], merge_method }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'pl> PayloadOperations<'pl> {
|
||||
fn push_addition(&mut self, offset: DocumentOffset<'pl>) {
|
||||
if self.merge_method.useless_previous_changes() {
|
||||
self.operations.clear();
|
||||
}
|
||||
self.operations.push(InnerDocOp::Addition(offset))
|
||||
}
|
||||
|
||||
fn push_deletion(&mut self) {
|
||||
self.operations.clear();
|
||||
self.operations.push(InnerDocOp::Deletion);
|
||||
}
|
||||
|
||||
fn append_operations(&mut self, mut operations: Vec<InnerDocOp<'pl>>) {
|
||||
debug_assert!(!operations.is_empty());
|
||||
if self.merge_method.useless_previous_changes() {
|
||||
self.operations.clear();
|
||||
}
|
||||
self.operations.append(&mut operations);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -447,15 +48,213 @@ pub struct DocumentOffset<'pl> {
|
||||
pub content: &'pl [u8],
|
||||
}
|
||||
|
||||
impl<'pl> DocumentOperation<'pl> {
|
||||
pub fn new(method: IndexDocumentsMethod) -> Self {
|
||||
Self { operations: Default::default(), index_documents_method: method }
|
||||
}
|
||||
|
||||
/// TODO please give me a type
|
||||
/// The payload is expected to be in the grenad format
|
||||
pub fn add_documents(&mut self, payload: &'pl Mmap) -> Result<PayloadStats> {
|
||||
payload.advise(memmap2::Advice::Sequential)?;
|
||||
let document_count =
|
||||
memchr::memmem::find_iter(&payload[..], "}{").count().saturating_add(1);
|
||||
self.operations.push(Payload::Addition(&payload[..]));
|
||||
Ok(PayloadStats { bytes: payload.len() as u64, document_count })
|
||||
}
|
||||
|
||||
pub fn delete_documents(&mut self, to_delete: &'pl [&'pl str]) {
|
||||
self.operations.push(Payload::Deletion(to_delete))
|
||||
}
|
||||
|
||||
pub fn into_changes(
|
||||
self,
|
||||
indexer: &'pl Bump,
|
||||
index: &Index,
|
||||
rtxn: &RoTxn,
|
||||
primary_key: &PrimaryKey,
|
||||
new_fields_ids_map: &mut FieldsIdsMap,
|
||||
) -> Result<DocumentOperationChanges<'pl>> {
|
||||
// will contain nodes from the intermediate hashmap
|
||||
let document_changes_alloc = Bump::with_capacity(1024 * 1024 * 1024); // 1 MiB
|
||||
|
||||
let documents_ids = index.documents_ids(rtxn)?;
|
||||
let mut available_docids = AvailableIds::new(&documents_ids);
|
||||
let mut docids_version_offsets =
|
||||
hashbrown::HashMap::<&'pl str, _, _, _>::new_in(&document_changes_alloc);
|
||||
|
||||
for operation in self.operations {
|
||||
match operation {
|
||||
Payload::Addition(payload) => {
|
||||
let mut iter =
|
||||
serde_json::Deserializer::from_slice(payload).into_iter::<&RawValue>();
|
||||
|
||||
/// TODO manage the error
|
||||
let mut previous_offset = 0;
|
||||
while let Some(document) =
|
||||
iter.next().transpose().map_err(UserError::SerdeJson)?
|
||||
{
|
||||
let external_document_id = primary_key.extract_fields_and_docid(
|
||||
document,
|
||||
new_fields_ids_map,
|
||||
indexer,
|
||||
)?;
|
||||
|
||||
let external_document_id = external_document_id.to_de();
|
||||
|
||||
let current_offset = iter.byte_offset();
|
||||
let document_operation = InnerDocOp::Addition(DocumentOffset {
|
||||
content: &payload[previous_offset..current_offset],
|
||||
});
|
||||
|
||||
match docids_version_offsets.get_mut(external_document_id) {
|
||||
None => {
|
||||
let (docid, is_new) = match index
|
||||
.external_documents_ids()
|
||||
.get(rtxn, external_document_id)?
|
||||
{
|
||||
Some(docid) => (docid, false),
|
||||
None => (
|
||||
available_docids.next().ok_or(Error::UserError(
|
||||
UserError::DocumentLimitReached,
|
||||
))?,
|
||||
true,
|
||||
),
|
||||
};
|
||||
|
||||
docids_version_offsets.insert(
|
||||
external_document_id,
|
||||
(
|
||||
(docid, is_new),
|
||||
bumpalo::vec![in indexer; document_operation],
|
||||
),
|
||||
);
|
||||
}
|
||||
Some((_, offsets)) => {
|
||||
let useless_previous_addition = match self.index_documents_method {
|
||||
IndexDocumentsMethod::ReplaceDocuments => {
|
||||
MergeDocumentForReplacement::USELESS_PREVIOUS_CHANGES
|
||||
}
|
||||
IndexDocumentsMethod::UpdateDocuments => {
|
||||
MergeDocumentForUpdates::USELESS_PREVIOUS_CHANGES
|
||||
}
|
||||
};
|
||||
|
||||
if useless_previous_addition {
|
||||
offsets.clear();
|
||||
}
|
||||
|
||||
offsets.push(document_operation);
|
||||
}
|
||||
}
|
||||
|
||||
previous_offset = iter.byte_offset();
|
||||
}
|
||||
}
|
||||
Payload::Deletion(to_delete) => {
|
||||
for external_document_id in to_delete {
|
||||
match docids_version_offsets.get_mut(external_document_id) {
|
||||
None => {
|
||||
let (docid, is_new) = match index
|
||||
.external_documents_ids()
|
||||
.get(rtxn, external_document_id)?
|
||||
{
|
||||
Some(docid) => (docid, false),
|
||||
None => (
|
||||
available_docids.next().ok_or(Error::UserError(
|
||||
UserError::DocumentLimitReached,
|
||||
))?,
|
||||
true,
|
||||
),
|
||||
};
|
||||
|
||||
docids_version_offsets.insert(
|
||||
external_document_id,
|
||||
(
|
||||
(docid, is_new),
|
||||
bumpalo::vec![in indexer; InnerDocOp::Deletion],
|
||||
),
|
||||
);
|
||||
}
|
||||
Some((_, offsets)) => {
|
||||
offsets.clear();
|
||||
offsets.push(InnerDocOp::Deletion);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
|
||||
let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = docids_version_offsets
|
||||
.drain()
|
||||
.map(|(item, (docid, v))| (item, (docid, v.into_bump_slice())))
|
||||
.collect_in(indexer);
|
||||
// Reorder the offsets to make sure we iterate on the file sequentially
|
||||
let sort_function_key = match self.index_documents_method {
|
||||
Idm::ReplaceDocuments => MergeDocumentForReplacement::sort_key,
|
||||
Idm::UpdateDocuments => MergeDocumentForUpdates::sort_key,
|
||||
};
|
||||
|
||||
// And finally sort them
|
||||
docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops));
|
||||
let docids_version_offsets = docids_version_offsets.into_bump_slice();
|
||||
Ok(DocumentOperationChanges {
|
||||
docids_version_offsets,
|
||||
index_documents_method: self.index_documents_method,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
|
||||
type Item = (&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>]));
|
||||
|
||||
fn iter(
|
||||
&self,
|
||||
chunk_size: usize,
|
||||
) -> impl rayon::prelude::IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
|
||||
self.docids_version_offsets.par_chunks(chunk_size)
|
||||
}
|
||||
|
||||
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
|
||||
&'doc self,
|
||||
context: &'doc DocumentChangeContext<T>,
|
||||
item: &'doc Self::Item,
|
||||
) -> Result<Option<DocumentChange<'doc>>>
|
||||
where
|
||||
'pl: 'doc,
|
||||
{
|
||||
let document_merge_function = match self.index_documents_method {
|
||||
Idm::ReplaceDocuments => MergeDocumentForReplacement::merge,
|
||||
Idm::UpdateDocuments => MergeDocumentForUpdates::merge,
|
||||
};
|
||||
|
||||
let (external_doc, ((internal_docid, is_new), operations)) = *item;
|
||||
|
||||
let change = document_merge_function(
|
||||
internal_docid,
|
||||
external_doc,
|
||||
is_new,
|
||||
&context.doc_alloc,
|
||||
operations,
|
||||
)?;
|
||||
Ok(change)
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.docids_version_offsets.len()
|
||||
}
|
||||
}
|
||||
|
||||
trait MergeChanges {
|
||||
/// Whether the payloads in the list of operations are useless or not.
|
||||
fn useless_previous_changes(&self) -> bool;
|
||||
const USELESS_PREVIOUS_CHANGES: bool;
|
||||
|
||||
/// Returns a key that is used to order the payloads the right way.
|
||||
fn sort_key(&self, docops: &[InnerDocOp]) -> usize;
|
||||
fn sort_key(docops: &[InnerDocOp]) -> usize;
|
||||
|
||||
fn merge<'doc>(
|
||||
&self,
|
||||
docid: DocumentId,
|
||||
external_docid: &'doc str,
|
||||
is_new: bool,
|
||||
@ -464,69 +263,13 @@ trait MergeChanges {
|
||||
) -> Result<Option<DocumentChange<'doc>>>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum MergeMethod {
|
||||
ForReplacement(MergeDocumentForReplacement),
|
||||
ForUpdates(MergeDocumentForUpdates),
|
||||
}
|
||||
|
||||
impl MergeChanges for MergeMethod {
|
||||
fn useless_previous_changes(&self) -> bool {
|
||||
match self {
|
||||
MergeMethod::ForReplacement(merge) => merge.useless_previous_changes(),
|
||||
MergeMethod::ForUpdates(merge) => merge.useless_previous_changes(),
|
||||
}
|
||||
}
|
||||
|
||||
fn sort_key(&self, docops: &[InnerDocOp]) -> usize {
|
||||
match self {
|
||||
MergeMethod::ForReplacement(merge) => merge.sort_key(docops),
|
||||
MergeMethod::ForUpdates(merge) => merge.sort_key(docops),
|
||||
}
|
||||
}
|
||||
|
||||
fn merge<'doc>(
|
||||
&self,
|
||||
docid: DocumentId,
|
||||
external_docid: &'doc str,
|
||||
is_new: bool,
|
||||
doc_alloc: &'doc Bump,
|
||||
operations: &'doc [InnerDocOp],
|
||||
) -> Result<Option<DocumentChange<'doc>>> {
|
||||
match self {
|
||||
MergeMethod::ForReplacement(merge) => {
|
||||
merge.merge(docid, external_docid, is_new, doc_alloc, operations)
|
||||
}
|
||||
MergeMethod::ForUpdates(merge) => {
|
||||
merge.merge(docid, external_docid, is_new, doc_alloc, operations)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<IndexDocumentsMethod> for MergeMethod {
|
||||
fn from(method: IndexDocumentsMethod) -> Self {
|
||||
match method {
|
||||
IndexDocumentsMethod::ReplaceDocuments => {
|
||||
MergeMethod::ForReplacement(MergeDocumentForReplacement)
|
||||
}
|
||||
IndexDocumentsMethod::UpdateDocuments => {
|
||||
MergeMethod::ForUpdates(MergeDocumentForUpdates)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct MergeDocumentForReplacement;
|
||||
|
||||
impl MergeChanges for MergeDocumentForReplacement {
|
||||
fn useless_previous_changes(&self) -> bool {
|
||||
true
|
||||
}
|
||||
const USELESS_PREVIOUS_CHANGES: bool = true;
|
||||
|
||||
/// Reorders to read only the last change.
|
||||
fn sort_key(&self, docops: &[InnerDocOp]) -> usize {
|
||||
fn sort_key(docops: &[InnerDocOp]) -> usize {
|
||||
let f = |ido: &_| match ido {
|
||||
InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize),
|
||||
InnerDocOp::Deletion => None,
|
||||
@ -538,7 +281,6 @@ impl MergeChanges for MergeDocumentForReplacement {
|
||||
///
|
||||
/// This function is only meant to be used when doing a replacement and not an update.
|
||||
fn merge<'doc>(
|
||||
&self,
|
||||
docid: DocumentId,
|
||||
external_doc: &'doc str,
|
||||
is_new: bool,
|
||||
@ -579,16 +321,13 @@ impl MergeChanges for MergeDocumentForReplacement {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct MergeDocumentForUpdates;
|
||||
|
||||
impl MergeChanges for MergeDocumentForUpdates {
|
||||
fn useless_previous_changes(&self) -> bool {
|
||||
false
|
||||
}
|
||||
const USELESS_PREVIOUS_CHANGES: bool = false;
|
||||
|
||||
/// Reorders to read the first changes first so that it's faster to read the first one and then the rest.
|
||||
fn sort_key(&self, docops: &[InnerDocOp]) -> usize {
|
||||
fn sort_key(docops: &[InnerDocOp]) -> usize {
|
||||
let f = |ido: &_| match ido {
|
||||
InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize),
|
||||
InnerDocOp::Deletion => None,
|
||||
@ -601,7 +340,6 @@ impl MergeChanges for MergeDocumentForUpdates {
|
||||
///
|
||||
/// This function is only meant to be used when doing an update and not a replacement.
|
||||
fn merge<'doc>(
|
||||
&self,
|
||||
docid: DocumentId,
|
||||
external_docid: &'doc str,
|
||||
is_new: bool,
|
||||
|
@ -3,9 +3,9 @@ use std::sync::{OnceLock, RwLock};
|
||||
use std::thread::{self, Builder};
|
||||
|
||||
use big_s::S;
|
||||
use document_changes::{extract, DocumentChanges, IndexingContext, Progress};
|
||||
use document_changes::{extract, DocumentChanges, IndexingContext, Progress, ThreadLocal};
|
||||
pub use document_deletion::DocumentDeletion;
|
||||
pub use document_operation::{DocumentOperation, PayloadStats};
|
||||
pub use document_operation::DocumentOperation;
|
||||
use hashbrown::HashMap;
|
||||
use heed::types::{Bytes, DecodeIgnore, Str};
|
||||
use heed::{RoTxn, RwTxn};
|
||||
@ -20,7 +20,6 @@ use super::channel::*;
|
||||
use super::extract::*;
|
||||
use super::facet_search_builder::FacetSearchBuilder;
|
||||
use super::merger::FacetFieldIdsDelta;
|
||||
use super::thread_local::ThreadLocal;
|
||||
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
|
||||
use super::words_prefix_docids::{
|
||||
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
|
||||
@ -133,7 +132,6 @@ mod steps {
|
||||
pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>(
|
||||
wtxn: &mut RwTxn,
|
||||
index: &'index Index,
|
||||
grenad_parameters: GrenadParameters,
|
||||
db_fields_ids_map: &'indexer FieldsIdsMap,
|
||||
new_fields_ids_map: FieldsIdsMap,
|
||||
new_primary_key: Option<PrimaryKey<'pl>>,
|
||||
@ -211,6 +209,16 @@ where
|
||||
|
||||
field_distribution.retain(|_, v| *v != 0);
|
||||
|
||||
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
|
||||
let current_num_threads = rayon::current_num_threads();
|
||||
let max_memory = TEN_GIB / current_num_threads;
|
||||
eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads");
|
||||
|
||||
let grenad_parameters = GrenadParameters {
|
||||
max_memory: Some(max_memory),
|
||||
..GrenadParameters::default()
|
||||
};
|
||||
|
||||
let facet_field_ids_delta;
|
||||
|
||||
{
|
||||
@ -220,8 +228,7 @@ where
|
||||
let (finished_steps, step_name) = steps::extract_facets();
|
||||
|
||||
facet_field_ids_delta = merge_and_send_facet_docids(
|
||||
FacetedDocidsExtractor::run_extraction(
|
||||
grenad_parameters,
|
||||
FacetedDocidsExtractor::run_extraction(grenad_parameters,
|
||||
document_changes,
|
||||
indexing_context,
|
||||
&mut extractor_allocs,
|
||||
@ -337,8 +344,7 @@ where
|
||||
|
||||
let (finished_steps, step_name) = steps::extract_word_proximity();
|
||||
|
||||
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(
|
||||
grenad_parameters,
|
||||
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters,
|
||||
document_changes,
|
||||
indexing_context,
|
||||
&mut extractor_allocs,
|
||||
@ -392,8 +398,7 @@ where
|
||||
};
|
||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||
let (finished_steps, step_name) = steps::extract_geo_points();
|
||||
extract(
|
||||
document_changes,
|
||||
extract(document_changes,
|
||||
&extractor,
|
||||
indexing_context,
|
||||
&mut extractor_allocs,
|
||||
@ -772,10 +777,16 @@ pub fn retrieve_or_guess_primary_key<'a>(
|
||||
match primary_key_from_op {
|
||||
// we did, and it is different from the DB one
|
||||
Some(primary_key_from_op) if primary_key_from_op != primary_key_from_db => {
|
||||
// is the index empty?
|
||||
if index.number_of_documents(rtxn)? == 0 {
|
||||
// change primary key
|
||||
(primary_key_from_op, true)
|
||||
} else {
|
||||
return Ok(Err(UserError::PrimaryKeyCannotBeChanged(
|
||||
primary_key_from_db.to_string(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
_ => (primary_key_from_db, false),
|
||||
}
|
||||
} else {
|
||||
|
@ -3,12 +3,11 @@ use std::ops::DerefMut;
|
||||
use rayon::iter::IndexedParallelIterator;
|
||||
use serde_json::value::RawValue;
|
||||
|
||||
use super::document_changes::{DocumentChangeContext, DocumentChanges};
|
||||
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
|
||||
use crate::update::new::document::Versions;
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::{DocumentChange, Insertion};
|
||||
use crate::{Error, InternalError, Result, UserError};
|
||||
|
||||
|
@ -4,14 +4,13 @@ use rayon::slice::ParallelSlice as _;
|
||||
use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST};
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use super::document_changes::DocumentChangeContext;
|
||||
use super::document_changes::{DocumentChangeContext, MostlySend};
|
||||
use super::DocumentChanges;
|
||||
use crate::documents::Error::InvalidDocumentFormat;
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::error::{FieldIdMapMissingEntry, InternalError};
|
||||
use crate::update::new::document::Versions;
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update};
|
||||
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
|
||||
|
||||
|
@ -17,7 +17,6 @@ pub mod indexer;
|
||||
mod merger;
|
||||
mod parallel_iterator_ext;
|
||||
mod ref_cell_ext;
|
||||
pub(crate) mod thread_local;
|
||||
mod top_level_map;
|
||||
pub mod vector_document;
|
||||
mod word_fst_builder;
|
||||
|
@ -1,16 +1,37 @@
|
||||
use std::cell::{RefCell, RefMut};
|
||||
use std::cell::{Ref, RefCell, RefMut};
|
||||
|
||||
pub trait RefCellExt<T: ?Sized> {
|
||||
fn try_borrow_or_yield(&self) -> std::result::Result<Ref<'_, T>, std::cell::BorrowError>;
|
||||
fn try_borrow_mut_or_yield(
|
||||
&self,
|
||||
) -> std::result::Result<RefMut<'_, T>, std::cell::BorrowMutError>;
|
||||
|
||||
fn borrow_or_yield(&self) -> Ref<'_, T> {
|
||||
self.try_borrow_or_yield().unwrap()
|
||||
}
|
||||
|
||||
fn borrow_mut_or_yield(&self) -> RefMut<'_, T> {
|
||||
self.try_borrow_mut_or_yield().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized> RefCellExt<T> for RefCell<T> {
|
||||
fn try_borrow_or_yield(&self) -> std::result::Result<Ref<'_, T>, std::cell::BorrowError> {
|
||||
/// TODO: move this trait and impl elsewhere
|
||||
loop {
|
||||
match self.try_borrow() {
|
||||
Ok(borrow) => break Ok(borrow),
|
||||
Err(error) => {
|
||||
tracing::warn!("dynamic borrow failed, yielding to local tasks");
|
||||
match rayon::yield_local() {
|
||||
Some(rayon::Yield::Executed) => continue,
|
||||
_ => return Err(error),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn try_borrow_mut_or_yield(
|
||||
&self,
|
||||
) -> std::result::Result<RefMut<'_, T>, std::cell::BorrowMutError> {
|
||||
|
@ -1,174 +0,0 @@
|
||||
use std::cell::RefCell;
|
||||
|
||||
/// A trait for types that are **not** [`Send`] only because they would then allow concurrent access to a type that is not [`Sync`].
|
||||
///
|
||||
/// The primary example of such a type is `&T`, with `T: !Sync`.
|
||||
///
|
||||
/// In the authors' understanding, a type can be `!Send` for two distinct reasons:
|
||||
///
|
||||
/// 1. Because it contains data that *genuinely* cannot be moved between threads, such as thread-local data.
|
||||
/// 2. Because sending the type would allow concurrent access to a `!Sync` type, which is undefined behavior.
|
||||
///
|
||||
/// `MostlySend` exists to be used in bounds where you need a type whose data is **not** *attached* to a thread
|
||||
/// because you might access it from a different thread, but where you will never access the type **concurrently** from
|
||||
/// multiple threads.
|
||||
///
|
||||
/// Like [`Send`], `MostlySend` assumes properties on types that cannot be verified by the compiler, which is why implementing
|
||||
/// this trait is unsafe.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// Implementers of this trait promises that the following properties hold on the implementing type:
|
||||
///
|
||||
/// 1. Its data can be accessed from any thread and will be the same regardless of the thread accessing it.
|
||||
/// 2. Any operation that can be performed on the type does not depend on the thread that executes it.
|
||||
///
|
||||
/// As these properties are subtle and are not generally tracked by the Rust type system, great care should be taken before
|
||||
/// implementing `MostlySend` on a type, especially a foreign type.
|
||||
///
|
||||
/// - An example of a type that verifies (1) and (2) is [`std::rc::Rc`] (when `T` is `Send` and `Sync`).
|
||||
/// - An example of a type that doesn't verify (1) is thread-local data.
|
||||
/// - An example of a type that doesn't verify (2) is [`std::sync::MutexGuard`]: a lot of mutex implementations require that
|
||||
/// a lock is returned to the operating system on the same thread that initially locked the mutex, failing to uphold this
|
||||
/// invariant will cause Undefined Behavior
|
||||
/// (see last § in [the nomicon](https://doc.rust-lang.org/nomicon/send-and-sync.html)).
|
||||
///
|
||||
/// It is **always safe** to implement this trait on a type that is `Send`, but no placeholder impl is provided due to limitations in
|
||||
/// coherency. Use the [`FullySend`] wrapper in this situation.
|
||||
pub unsafe trait MostlySend {}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||
pub struct FullySend<T>(pub T);
|
||||
|
||||
// SAFETY: a type **fully** send is always mostly send as well.
|
||||
unsafe impl<T> MostlySend for FullySend<T> where T: Send {}
|
||||
|
||||
unsafe impl<T> MostlySend for RefCell<T> where T: MostlySend {}
|
||||
|
||||
unsafe impl<T> MostlySend for Option<T> where T: MostlySend {}
|
||||
|
||||
impl<T> FullySend<T> {
|
||||
pub fn into(self) -> T {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<T> for FullySend<T> {
|
||||
fn from(value: T) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
struct MostlySendWrapper<T>(T);
|
||||
|
||||
impl<T: MostlySend> MostlySendWrapper<T> {
|
||||
/// # Safety
|
||||
///
|
||||
/// - (P1) Users of this type will never access the type concurrently from multiple threads without synchronization
|
||||
unsafe fn new(t: T) -> Self {
|
||||
Self(t)
|
||||
}
|
||||
|
||||
fn as_ref(&self) -> &T {
|
||||
&self.0
|
||||
}
|
||||
|
||||
fn as_mut(&mut self) -> &mut T {
|
||||
&mut self.0
|
||||
}
|
||||
|
||||
fn into_inner(self) -> T {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// 1. `T` is [`MostlySend`], so by its safety contract it can be accessed by any thread and all of its operations are available
|
||||
/// from any thread.
|
||||
/// 2. (P1) of `MostlySendWrapper::new` forces the user to never access the value from multiple threads concurrently.
|
||||
unsafe impl<T: MostlySend> Send for MostlySendWrapper<T> {}
|
||||
|
||||
/// A wrapper around [`thread_local::ThreadLocal`] that accepts [`MostlySend`] `T`s.
|
||||
#[derive(Default)]
|
||||
pub struct ThreadLocal<T: MostlySend> {
|
||||
inner: thread_local::ThreadLocal<MostlySendWrapper<T>>,
|
||||
// FIXME: this should be necessary
|
||||
//_no_send: PhantomData<*mut ()>,
|
||||
}
|
||||
|
||||
impl<T: MostlySend> ThreadLocal<T> {
|
||||
pub fn new() -> Self {
|
||||
Self { inner: thread_local::ThreadLocal::new() }
|
||||
}
|
||||
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
Self { inner: thread_local::ThreadLocal::with_capacity(capacity) }
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.inner.clear()
|
||||
}
|
||||
|
||||
pub fn get(&self) -> Option<&T> {
|
||||
self.inner.get().map(|t| t.as_ref())
|
||||
}
|
||||
|
||||
pub fn get_or<F>(&self, create: F) -> &T
|
||||
where
|
||||
F: FnOnce() -> T,
|
||||
{
|
||||
self.inner.get_or(|| unsafe { MostlySendWrapper::new(create()) }).as_ref()
|
||||
}
|
||||
|
||||
pub fn get_or_try<F, E>(&self, create: F) -> std::result::Result<&T, E>
|
||||
where
|
||||
F: FnOnce() -> std::result::Result<T, E>,
|
||||
{
|
||||
self.inner
|
||||
.get_or_try(|| unsafe { Ok(MostlySendWrapper::new(create()?)) })
|
||||
.map(MostlySendWrapper::as_ref)
|
||||
}
|
||||
|
||||
pub fn get_or_default(&self) -> &T
|
||||
where
|
||||
T: Default,
|
||||
{
|
||||
self.inner.get_or_default().as_ref()
|
||||
}
|
||||
|
||||
pub fn iter_mut(&mut self) -> IterMut<T> {
|
||||
IterMut(self.inner.iter_mut())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: MostlySend> IntoIterator for ThreadLocal<T> {
|
||||
type Item = T;
|
||||
|
||||
type IntoIter = IntoIter<T>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
IntoIter(self.inner.into_iter())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IterMut<'a, T: MostlySend>(thread_local::IterMut<'a, MostlySendWrapper<T>>);
|
||||
|
||||
impl<'a, T: MostlySend> Iterator for IterMut<'a, T> {
|
||||
type Item = &'a mut T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.0.next().map(|t| t.as_mut())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IntoIter<T: MostlySend>(thread_local::IntoIter<MostlySendWrapper<T>>);
|
||||
|
||||
impl<T: MostlySend> Iterator for IntoIter<T> {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.0.next().map(|t| t.into_inner())
|
||||
}
|
||||
}
|
@ -60,7 +60,7 @@ pub enum EmbedErrorKind {
|
||||
ManualEmbed(String),
|
||||
#[error("model not found. Meilisearch will not automatically download models from the Ollama library, please pull the model manually{}", option_info(.0.as_deref(), "server replied with "))]
|
||||
OllamaModelNotFoundError(Option<String>),
|
||||
#[error("error deserializing the response body as JSON:\n - {0}")]
|
||||
#[error("error deserialization the response body as JSON:\n - {0}")]
|
||||
RestResponseDeserialization(std::io::Error),
|
||||
#[error("expected a response containing {0} embeddings, got only {1}")]
|
||||
RestResponseEmbeddingCount(usize, usize),
|
||||
|
@ -1,6 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use arroy::distances::{BinaryQuantizedCosine, Cosine};
|
||||
use arroy::ItemId;
|
||||
@ -596,26 +595,18 @@ impl Embedder {
|
||||
/// Embed one or multiple texts.
|
||||
///
|
||||
/// Each text can be embedded as one or multiple embeddings.
|
||||
pub fn embed(
|
||||
&self,
|
||||
texts: Vec<String>,
|
||||
deadline: Option<Instant>,
|
||||
) -> std::result::Result<Vec<Embedding>, EmbedError> {
|
||||
pub fn embed(&self, texts: Vec<String>) -> std::result::Result<Vec<Embedding>, EmbedError> {
|
||||
match self {
|
||||
Embedder::HuggingFace(embedder) => embedder.embed(texts),
|
||||
Embedder::OpenAi(embedder) => embedder.embed(&texts, deadline),
|
||||
Embedder::Ollama(embedder) => embedder.embed(&texts, deadline),
|
||||
Embedder::OpenAi(embedder) => embedder.embed(&texts),
|
||||
Embedder::Ollama(embedder) => embedder.embed(&texts),
|
||||
Embedder::UserProvided(embedder) => embedder.embed(&texts),
|
||||
Embedder::Rest(embedder) => embedder.embed(texts, deadline),
|
||||
Embedder::Rest(embedder) => embedder.embed(texts),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn embed_one(
|
||||
&self,
|
||||
text: String,
|
||||
deadline: Option<Instant>,
|
||||
) -> std::result::Result<Embedding, EmbedError> {
|
||||
let mut embedding = self.embed(vec![text], deadline)?;
|
||||
pub fn embed_one(&self, text: String) -> std::result::Result<Embedding, EmbedError> {
|
||||
let mut embedding = self.embed(vec![text])?;
|
||||
let embedding = embedding.pop().ok_or_else(EmbedError::missing_embedding)?;
|
||||
Ok(embedding)
|
||||
}
|
||||
|
@ -1,5 +1,3 @@
|
||||
use std::time::Instant;
|
||||
|
||||
use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _};
|
||||
use rayon::slice::ParallelSlice as _;
|
||||
|
||||
@ -82,9 +80,8 @@ impl Embedder {
|
||||
pub fn embed<S: AsRef<str> + serde::Serialize>(
|
||||
&self,
|
||||
texts: &[S],
|
||||
deadline: Option<Instant>,
|
||||
) -> Result<Vec<Embedding>, EmbedError> {
|
||||
match self.rest_embedder.embed_ref(texts, deadline) {
|
||||
match self.rest_embedder.embed_ref(texts) {
|
||||
Ok(embeddings) => Ok(embeddings),
|
||||
Err(EmbedError { kind: EmbedErrorKind::RestOtherStatusCode(404, error), fault: _ }) => {
|
||||
Err(EmbedError::ollama_model_not_found(error))
|
||||
@ -100,7 +97,7 @@ impl Embedder {
|
||||
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
||||
threads
|
||||
.install(move || {
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk)).collect()
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
@ -117,7 +114,7 @@ impl Embedder {
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.map(move |chunk| self.embed(chunk))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
|
@ -1,5 +1,3 @@
|
||||
use std::time::Instant;
|
||||
|
||||
use ordered_float::OrderedFloat;
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
|
||||
use rayon::slice::ParallelSlice as _;
|
||||
@ -213,23 +211,18 @@ impl Embedder {
|
||||
pub fn embed<S: AsRef<str> + serde::Serialize>(
|
||||
&self,
|
||||
texts: &[S],
|
||||
deadline: Option<Instant>,
|
||||
) -> Result<Vec<Embedding>, EmbedError> {
|
||||
match self.rest_embedder.embed_ref(texts, deadline) {
|
||||
match self.rest_embedder.embed_ref(texts) {
|
||||
Ok(embeddings) => Ok(embeddings),
|
||||
Err(EmbedError { kind: EmbedErrorKind::RestBadRequest(error, _), fault: _ }) => {
|
||||
tracing::warn!(error=?error, "OpenAI: received `BAD_REQUEST`. Input was maybe too long, retrying on tokenized version. For best performance, limit the size of your document template.");
|
||||
self.try_embed_tokenized(texts, deadline)
|
||||
self.try_embed_tokenized(texts)
|
||||
}
|
||||
Err(error) => Err(error),
|
||||
}
|
||||
}
|
||||
|
||||
fn try_embed_tokenized<S: AsRef<str>>(
|
||||
&self,
|
||||
text: &[S],
|
||||
deadline: Option<Instant>,
|
||||
) -> Result<Vec<Embedding>, EmbedError> {
|
||||
fn try_embed_tokenized<S: AsRef<str>>(&self, text: &[S]) -> Result<Vec<Embedding>, EmbedError> {
|
||||
let mut all_embeddings = Vec::with_capacity(text.len());
|
||||
for text in text {
|
||||
let text = text.as_ref();
|
||||
@ -237,13 +230,13 @@ impl Embedder {
|
||||
let encoded = self.tokenizer.encode_ordinary(text);
|
||||
let len = encoded.len();
|
||||
if len < max_token_count {
|
||||
all_embeddings.append(&mut self.rest_embedder.embed_ref(&[text], deadline)?);
|
||||
all_embeddings.append(&mut self.rest_embedder.embed_ref(&[text])?);
|
||||
continue;
|
||||
}
|
||||
|
||||
let tokens = &encoded.as_slice()[0..max_token_count];
|
||||
|
||||
let embedding = self.rest_embedder.embed_tokens(tokens, deadline)?;
|
||||
let embedding = self.rest_embedder.embed_tokens(tokens)?;
|
||||
|
||||
all_embeddings.push(embedding);
|
||||
}
|
||||
@ -257,7 +250,7 @@ impl Embedder {
|
||||
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
||||
threads
|
||||
.install(move || {
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk)).collect()
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
@ -274,7 +267,7 @@ impl Embedder {
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.map(move |chunk| self.embed(chunk))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
|
@ -1,5 +1,4 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::Instant;
|
||||
|
||||
use deserr::Deserr;
|
||||
use rand::Rng;
|
||||
@ -154,31 +153,19 @@ impl Embedder {
|
||||
Ok(Self { data, dimensions, distribution: options.distribution })
|
||||
}
|
||||
|
||||
pub fn embed(
|
||||
&self,
|
||||
texts: Vec<String>,
|
||||
deadline: Option<Instant>,
|
||||
) -> Result<Vec<Embedding>, EmbedError> {
|
||||
embed(&self.data, texts.as_slice(), texts.len(), Some(self.dimensions), deadline)
|
||||
pub fn embed(&self, texts: Vec<String>) -> Result<Vec<Embedding>, EmbedError> {
|
||||
embed(&self.data, texts.as_slice(), texts.len(), Some(self.dimensions))
|
||||
}
|
||||
|
||||
pub fn embed_ref<S>(
|
||||
&self,
|
||||
texts: &[S],
|
||||
deadline: Option<Instant>,
|
||||
) -> Result<Vec<Embedding>, EmbedError>
|
||||
pub fn embed_ref<S>(&self, texts: &[S]) -> Result<Vec<Embedding>, EmbedError>
|
||||
where
|
||||
S: AsRef<str> + Serialize,
|
||||
{
|
||||
embed(&self.data, texts, texts.len(), Some(self.dimensions), deadline)
|
||||
embed(&self.data, texts, texts.len(), Some(self.dimensions))
|
||||
}
|
||||
|
||||
pub fn embed_tokens(
|
||||
&self,
|
||||
tokens: &[usize],
|
||||
deadline: Option<Instant>,
|
||||
) -> Result<Embedding, EmbedError> {
|
||||
let mut embeddings = embed(&self.data, tokens, 1, Some(self.dimensions), deadline)?;
|
||||
pub fn embed_tokens(&self, tokens: &[usize]) -> Result<Embedding, EmbedError> {
|
||||
let mut embeddings = embed(&self.data, tokens, 1, Some(self.dimensions))?;
|
||||
// unwrap: guaranteed that embeddings.len() == 1, otherwise the previous line terminated in error
|
||||
Ok(embeddings.pop().unwrap())
|
||||
}
|
||||
@ -190,7 +177,7 @@ impl Embedder {
|
||||
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
||||
threads
|
||||
.install(move || {
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect()
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
@ -207,7 +194,7 @@ impl Embedder {
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed_ref(chunk, None))
|
||||
.map(move |chunk| self.embed_ref(chunk))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
@ -240,7 +227,7 @@ impl Embedder {
|
||||
}
|
||||
|
||||
fn infer_dimensions(data: &EmbedderData) -> Result<usize, NewEmbedderError> {
|
||||
let v = embed(data, ["test"].as_slice(), 1, None, None)
|
||||
let v = embed(data, ["test"].as_slice(), 1, None)
|
||||
.map_err(NewEmbedderError::could_not_determine_dimension)?;
|
||||
// unwrap: guaranteed that v.len() == 1, otherwise the previous line terminated in error
|
||||
Ok(v.first().unwrap().len())
|
||||
@ -251,7 +238,6 @@ fn embed<S>(
|
||||
inputs: &[S],
|
||||
expected_count: usize,
|
||||
expected_dimension: Option<usize>,
|
||||
deadline: Option<Instant>,
|
||||
) -> Result<Vec<Embedding>, EmbedError>
|
||||
where
|
||||
S: Serialize,
|
||||
@ -271,27 +257,16 @@ where
|
||||
|
||||
for attempt in 0..10 {
|
||||
let response = request.clone().send_json(&body);
|
||||
let result = check_response(response, data.configuration_source).and_then(|response| {
|
||||
response_to_embedding(response, data, expected_count, expected_dimension)
|
||||
});
|
||||
let result = check_response(response, data.configuration_source);
|
||||
|
||||
let retry_duration = match result {
|
||||
Ok(response) => return Ok(response),
|
||||
Ok(response) => {
|
||||
return response_to_embedding(response, data, expected_count, expected_dimension)
|
||||
}
|
||||
Err(retry) => {
|
||||
tracing::warn!("Failed: {}", retry.error);
|
||||
if let Some(deadline) = deadline {
|
||||
let now = std::time::Instant::now();
|
||||
if now > deadline {
|
||||
tracing::warn!("Could not embed due to deadline");
|
||||
return Err(retry.into_error());
|
||||
}
|
||||
|
||||
let duration_to_deadline = deadline - now;
|
||||
retry.into_duration(attempt).map(|duration| duration.min(duration_to_deadline))
|
||||
} else {
|
||||
retry.into_duration(attempt)
|
||||
}
|
||||
}
|
||||
}?;
|
||||
|
||||
let retry_duration = retry_duration.min(std::time::Duration::from_secs(60)); // don't wait more than a minute
|
||||
@ -308,7 +283,6 @@ where
|
||||
let result = check_response(response, data.configuration_source);
|
||||
result.map_err(Retry::into_error).and_then(|response| {
|
||||
response_to_embedding(response, data, expected_count, expected_dimension)
|
||||
.map_err(Retry::into_error)
|
||||
})
|
||||
}
|
||||
|
||||
@ -350,28 +324,20 @@ fn response_to_embedding(
|
||||
data: &EmbedderData,
|
||||
expected_count: usize,
|
||||
expected_dimensions: Option<usize>,
|
||||
) -> Result<Vec<Embedding>, Retry> {
|
||||
let response: serde_json::Value = response
|
||||
.into_json()
|
||||
.map_err(EmbedError::rest_response_deserialization)
|
||||
.map_err(Retry::retry_later)?;
|
||||
) -> Result<Vec<Embedding>, EmbedError> {
|
||||
let response: serde_json::Value =
|
||||
response.into_json().map_err(EmbedError::rest_response_deserialization)?;
|
||||
|
||||
let embeddings = data.response.extract_embeddings(response).map_err(Retry::give_up)?;
|
||||
let embeddings = data.response.extract_embeddings(response)?;
|
||||
|
||||
if embeddings.len() != expected_count {
|
||||
return Err(Retry::give_up(EmbedError::rest_response_embedding_count(
|
||||
expected_count,
|
||||
embeddings.len(),
|
||||
)));
|
||||
return Err(EmbedError::rest_response_embedding_count(expected_count, embeddings.len()));
|
||||
}
|
||||
|
||||
if let Some(dimensions) = expected_dimensions {
|
||||
for embedding in &embeddings {
|
||||
if embedding.len() != dimensions {
|
||||
return Err(Retry::give_up(EmbedError::rest_unexpected_dimension(
|
||||
dimensions,
|
||||
embedding.len(),
|
||||
)));
|
||||
return Err(EmbedError::rest_unexpected_dimension(dimensions, embedding.len()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user