Compare commits

..

3 Commits

Author SHA1 Message Date
Louis Dureuil
88b3d2547c
Merge 1fcd5f091e into a5d7ae23bd 2024-11-12 11:23:29 +00:00
Louis Dureuil
1fcd5f091e
Remove progress from task 2024-11-12 12:23:13 +01:00
Louis Dureuil
6094bb299a
Fix user_provided vectors 2024-11-12 10:15:55 +01:00
8 changed files with 24 additions and 56 deletions

View File

@ -148,7 +148,6 @@ pub fn snapshot_task(task: &Task) -> String {
enqueued_at: _, enqueued_at: _,
started_at: _, started_at: _,
finished_at: _, finished_at: _,
progress: _,
error, error,
canceled_by, canceled_by,
details, details,

View File

@ -978,12 +978,7 @@ impl IndexScheduler {
Ok(( Ok((
ret.map(|task| { ret.map(|task| {
if processing.contains(task.uid) { if processing.contains(task.uid) {
Task { Task { status: Status::Processing, started_at: Some(started_at), ..task }
status: Status::Processing,
progress: progress.clone(),
started_at: Some(started_at),
..task
}
} else { } else {
task task
} }
@ -1025,7 +1020,6 @@ impl IndexScheduler {
enqueued_at: OffsetDateTime::now_utc(), enqueued_at: OffsetDateTime::now_utc(),
started_at: None, started_at: None,
finished_at: None, finished_at: None,
progress: None,
error: None, error: None,
canceled_by: None, canceled_by: None,
details: kind.default_details(), details: kind.default_details(),
@ -1606,8 +1600,6 @@ impl<'a> Dump<'a> {
enqueued_at: task.enqueued_at, enqueued_at: task.enqueued_at,
started_at: task.started_at, started_at: task.started_at,
finished_at: task.finished_at, finished_at: task.finished_at,
/// FIXME: should we update dump to contain progress information? 🤔
progress: None,
error: task.error, error: task.error,
canceled_by: task.canceled_by, canceled_by: task.canceled_by,
details: task.details, details: task.details,

View File

@ -345,8 +345,6 @@ impl IndexScheduler {
enqueued_at, enqueued_at,
started_at, started_at,
finished_at, finished_at,
/// FIXME: assert something here? ask tamo 🤔
progress: _,
error: _, error: _,
canceled_by, canceled_by,
details, details,

View File

@ -4,9 +4,7 @@ use time::{Duration, OffsetDateTime};
use crate::error::ResponseError; use crate::error::ResponseError;
use crate::settings::{Settings, Unchecked}; use crate::settings::{Settings, Unchecked};
use crate::tasks::{ use crate::tasks::{serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId};
serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId, TaskProgress,
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
@ -29,8 +27,6 @@ pub struct TaskView {
pub started_at: Option<OffsetDateTime>, pub started_at: Option<OffsetDateTime>,
#[serde(with = "time::serde::rfc3339::option", default)] #[serde(with = "time::serde::rfc3339::option", default)]
pub finished_at: Option<OffsetDateTime>, pub finished_at: Option<OffsetDateTime>,
#[serde(skip_serializing_if = "Option::is_none")]
pub progress: Option<TaskProgress>,
} }
impl TaskView { impl TaskView {
@ -47,7 +43,6 @@ impl TaskView {
enqueued_at: task.enqueued_at, enqueued_at: task.enqueued_at,
started_at: task.started_at, started_at: task.started_at,
finished_at: task.finished_at, finished_at: task.finished_at,
progress: task.progress.clone(),
} }
} }
} }

View File

@ -31,8 +31,6 @@ pub struct Task {
#[serde(with = "time::serde::rfc3339::option")] #[serde(with = "time::serde::rfc3339::option")]
pub finished_at: Option<OffsetDateTime>, pub finished_at: Option<OffsetDateTime>,
pub progress: Option<TaskProgress>,
pub error: Option<ResponseError>, pub error: Option<ResponseError>,
pub canceled_by: Option<TaskId>, pub canceled_by: Option<TaskId>,
pub details: Option<Details>, pub details: Option<Details>,

View File

@ -2,12 +2,11 @@ use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
use hashbrown::HashMap;
use heed::types::Bytes; use heed::types::Bytes;
use roaring::RoaringBitmap;
use super::extract::FacetKind; use super::extract::FacetKind;
use super::StdResult; use super::StdResult;
use crate::index::IndexEmbeddingConfig;
use crate::update::new::KvReaderFieldId; use crate::update::new::KvReaderFieldId;
use crate::vector::Embedding; use crate::vector::Embedding;
use crate::{DocumentId, Index}; use crate::{DocumentId, Index};
@ -87,7 +86,7 @@ pub enum ArroyOperation {
embedding: Embedding, embedding: Embedding,
}, },
Finish { Finish {
user_provided: HashMap<String, RoaringBitmap>, configs: Vec<IndexEmbeddingConfig>,
}, },
} }
@ -418,12 +417,9 @@ impl EmbeddingSender<'_> {
} }
/// Marks all embedders as "to be built" /// Marks all embedders as "to be built"
pub fn finish( pub fn finish(self, configs: Vec<IndexEmbeddingConfig>) -> StdResult<(), SendError<()>> {
self,
user_provided: HashMap<String, RoaringBitmap>,
) -> StdResult<(), SendError<()>> {
self.0 self.0
.send(WriterOperation::ArroyOperation(ArroyOperation::Finish { user_provided })) .send(WriterOperation::ArroyOperation(ArroyOperation::Finish { configs }))
.map_err(|_| SendError(())) .map_err(|_| SendError(()))
} }
} }

View File

@ -85,8 +85,13 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
for change in changes { for change in changes {
let change = change?; let change = change?;
match change { match change {
DocumentChange::Deletion(_deletion) => { DocumentChange::Deletion(deletion) => {
// handled by document sender // vector deletion is handled by document sender,
// we still need to accomodate deletion from user_provided
for chunks in &mut all_chunks {
// regenerate: true means we delete from user_provided
chunks.set_regenerate(deletion.docid(), true);
}
} }
DocumentChange::Update(update) => { DocumentChange::Update(update) => {
let old_vectors = update.current_vectors( let old_vectors = update.current_vectors(
@ -423,9 +428,9 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
let user_provided = user_provided.0.entry_ref(self.embedder_name).or_default(); let user_provided = user_provided.0.entry_ref(self.embedder_name).or_default();
if regenerate { if regenerate {
// regenerate == !user_provided // regenerate == !user_provided
user_provided.del.get_or_insert(Default::default()).insert(docid); user_provided.insert_del_u32(docid);
} else { } else {
user_provided.add.get_or_insert(Default::default()).insert(docid); user_provided.insert_add_u32(docid);
} }
} }

View File

@ -342,35 +342,28 @@ where
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter(); let _entered = span.enter();
let index_embeddings = index.embedding_configs(&rtxn)?; let mut index_embeddings = index.embedding_configs(&rtxn)?;
if index_embeddings.is_empty() { if index_embeddings.is_empty() {
break 'vectors; break 'vectors;
} }
let embedding_sender = extractor_sender.embeddings(); let embedding_sender = extractor_sender.embeddings();
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_embeddings(); let (finished_steps, step_name) = steps::extract_embeddings();
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
for config in &mut index_embeddings {
let mut user_provided = HashMap::new(); 'data: for data in datastore.iter_mut() {
for data in datastore { let data = &mut data.get_mut().0;
let data = data.into_inner().0; let Some(deladd) = data.remove(&config.name) else { continue 'data; };
for (embedder, deladd) in data.into_iter() { deladd.apply_to(&mut config.user_provided);
let user_provided = user_provided.entry(embedder).or_insert(Default::default());
if let Some(del) = deladd.del {
*user_provided -= del;
}
if let Some(add) = deladd.add {
*user_provided |= add;
}
} }
} }
embedding_sender.finish(user_provided).unwrap(); embedding_sender.finish(index_embeddings).unwrap();
} }
// TODO THIS IS TOO MUCH // TODO THIS IS TOO MUCH
@ -472,7 +465,7 @@ where
writer.del_items(wtxn, *dimensions, docid)?; writer.del_items(wtxn, *dimensions, docid)?;
writer.add_item(wtxn, docid, &embedding)?; writer.add_item(wtxn, docid, &embedding)?;
} }
ArroyOperation::Finish { mut user_provided } => { ArroyOperation::Finish { configs } => {
let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
let _entered = span.enter(); let _entered = span.enter();
@ -497,14 +490,6 @@ where
)?; )?;
} }
let mut configs = index.embedding_configs(wtxn)?;
for config in &mut configs {
if let Some(user_provided) = user_provided.remove(&config.name) {
config.user_provided = user_provided;
}
}
index.put_embedding_configs(wtxn, configs)?; index.put_embedding_configs(wtxn, configs)?;
} }
}, },