mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 02:27:40 +08:00
Compare commits
2 Commits
bef8fc6cf1
...
1fcd5f091e
Author | SHA1 | Date | |
---|---|---|---|
|
1fcd5f091e | ||
|
6094bb299a |
@ -148,7 +148,6 @@ pub fn snapshot_task(task: &Task) -> String {
|
|||||||
enqueued_at: _,
|
enqueued_at: _,
|
||||||
started_at: _,
|
started_at: _,
|
||||||
finished_at: _,
|
finished_at: _,
|
||||||
progress: _,
|
|
||||||
error,
|
error,
|
||||||
canceled_by,
|
canceled_by,
|
||||||
details,
|
details,
|
||||||
|
@ -978,12 +978,7 @@ impl IndexScheduler {
|
|||||||
Ok((
|
Ok((
|
||||||
ret.map(|task| {
|
ret.map(|task| {
|
||||||
if processing.contains(task.uid) {
|
if processing.contains(task.uid) {
|
||||||
Task {
|
Task { status: Status::Processing, started_at: Some(started_at), ..task }
|
||||||
status: Status::Processing,
|
|
||||||
progress: progress.clone(),
|
|
||||||
started_at: Some(started_at),
|
|
||||||
..task
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
task
|
task
|
||||||
}
|
}
|
||||||
@ -1025,7 +1020,6 @@ impl IndexScheduler {
|
|||||||
enqueued_at: OffsetDateTime::now_utc(),
|
enqueued_at: OffsetDateTime::now_utc(),
|
||||||
started_at: None,
|
started_at: None,
|
||||||
finished_at: None,
|
finished_at: None,
|
||||||
progress: None,
|
|
||||||
error: None,
|
error: None,
|
||||||
canceled_by: None,
|
canceled_by: None,
|
||||||
details: kind.default_details(),
|
details: kind.default_details(),
|
||||||
@ -1606,8 +1600,6 @@ impl<'a> Dump<'a> {
|
|||||||
enqueued_at: task.enqueued_at,
|
enqueued_at: task.enqueued_at,
|
||||||
started_at: task.started_at,
|
started_at: task.started_at,
|
||||||
finished_at: task.finished_at,
|
finished_at: task.finished_at,
|
||||||
/// FIXME: should we update dump to contain progress information? 🤔
|
|
||||||
progress: None,
|
|
||||||
error: task.error,
|
error: task.error,
|
||||||
canceled_by: task.canceled_by,
|
canceled_by: task.canceled_by,
|
||||||
details: task.details,
|
details: task.details,
|
||||||
|
@ -345,8 +345,6 @@ impl IndexScheduler {
|
|||||||
enqueued_at,
|
enqueued_at,
|
||||||
started_at,
|
started_at,
|
||||||
finished_at,
|
finished_at,
|
||||||
/// FIXME: assert something here? ask tamo 🤔
|
|
||||||
progress: _,
|
|
||||||
error: _,
|
error: _,
|
||||||
canceled_by,
|
canceled_by,
|
||||||
details,
|
details,
|
||||||
|
@ -4,9 +4,7 @@ use time::{Duration, OffsetDateTime};
|
|||||||
|
|
||||||
use crate::error::ResponseError;
|
use crate::error::ResponseError;
|
||||||
use crate::settings::{Settings, Unchecked};
|
use crate::settings::{Settings, Unchecked};
|
||||||
use crate::tasks::{
|
use crate::tasks::{serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId};
|
||||||
serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId, TaskProgress,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
@ -29,8 +27,6 @@ pub struct TaskView {
|
|||||||
pub started_at: Option<OffsetDateTime>,
|
pub started_at: Option<OffsetDateTime>,
|
||||||
#[serde(with = "time::serde::rfc3339::option", default)]
|
#[serde(with = "time::serde::rfc3339::option", default)]
|
||||||
pub finished_at: Option<OffsetDateTime>,
|
pub finished_at: Option<OffsetDateTime>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub progress: Option<TaskProgress>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TaskView {
|
impl TaskView {
|
||||||
@ -47,7 +43,6 @@ impl TaskView {
|
|||||||
enqueued_at: task.enqueued_at,
|
enqueued_at: task.enqueued_at,
|
||||||
started_at: task.started_at,
|
started_at: task.started_at,
|
||||||
finished_at: task.finished_at,
|
finished_at: task.finished_at,
|
||||||
progress: task.progress.clone(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,8 +31,6 @@ pub struct Task {
|
|||||||
#[serde(with = "time::serde::rfc3339::option")]
|
#[serde(with = "time::serde::rfc3339::option")]
|
||||||
pub finished_at: Option<OffsetDateTime>,
|
pub finished_at: Option<OffsetDateTime>,
|
||||||
|
|
||||||
pub progress: Option<TaskProgress>,
|
|
||||||
|
|
||||||
pub error: Option<ResponseError>,
|
pub error: Option<ResponseError>,
|
||||||
pub canceled_by: Option<TaskId>,
|
pub canceled_by: Option<TaskId>,
|
||||||
pub details: Option<Details>,
|
pub details: Option<Details>,
|
||||||
|
@ -2,12 +2,11 @@ use std::marker::PhantomData;
|
|||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
|
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
|
||||||
use hashbrown::HashMap;
|
|
||||||
use heed::types::Bytes;
|
use heed::types::Bytes;
|
||||||
use roaring::RoaringBitmap;
|
|
||||||
|
|
||||||
use super::extract::FacetKind;
|
use super::extract::FacetKind;
|
||||||
use super::StdResult;
|
use super::StdResult;
|
||||||
|
use crate::index::IndexEmbeddingConfig;
|
||||||
use crate::update::new::KvReaderFieldId;
|
use crate::update::new::KvReaderFieldId;
|
||||||
use crate::vector::Embedding;
|
use crate::vector::Embedding;
|
||||||
use crate::{DocumentId, Index};
|
use crate::{DocumentId, Index};
|
||||||
@ -87,7 +86,7 @@ pub enum ArroyOperation {
|
|||||||
embedding: Embedding,
|
embedding: Embedding,
|
||||||
},
|
},
|
||||||
Finish {
|
Finish {
|
||||||
user_provided: HashMap<String, RoaringBitmap>,
|
configs: Vec<IndexEmbeddingConfig>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -418,12 +417,9 @@ impl EmbeddingSender<'_> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Marks all embedders as "to be built"
|
/// Marks all embedders as "to be built"
|
||||||
pub fn finish(
|
pub fn finish(self, configs: Vec<IndexEmbeddingConfig>) -> StdResult<(), SendError<()>> {
|
||||||
self,
|
|
||||||
user_provided: HashMap<String, RoaringBitmap>,
|
|
||||||
) -> StdResult<(), SendError<()>> {
|
|
||||||
self.0
|
self.0
|
||||||
.send(WriterOperation::ArroyOperation(ArroyOperation::Finish { user_provided }))
|
.send(WriterOperation::ArroyOperation(ArroyOperation::Finish { configs }))
|
||||||
.map_err(|_| SendError(()))
|
.map_err(|_| SendError(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -85,8 +85,13 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
|||||||
for change in changes {
|
for change in changes {
|
||||||
let change = change?;
|
let change = change?;
|
||||||
match change {
|
match change {
|
||||||
DocumentChange::Deletion(_deletion) => {
|
DocumentChange::Deletion(deletion) => {
|
||||||
// handled by document sender
|
// vector deletion is handled by document sender,
|
||||||
|
// we still need to accomodate deletion from user_provided
|
||||||
|
for chunks in &mut all_chunks {
|
||||||
|
// regenerate: true means we delete from user_provided
|
||||||
|
chunks.set_regenerate(deletion.docid(), true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
DocumentChange::Update(update) => {
|
DocumentChange::Update(update) => {
|
||||||
let old_vectors = update.current_vectors(
|
let old_vectors = update.current_vectors(
|
||||||
@ -423,9 +428,9 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
|
|||||||
let user_provided = user_provided.0.entry_ref(self.embedder_name).or_default();
|
let user_provided = user_provided.0.entry_ref(self.embedder_name).or_default();
|
||||||
if regenerate {
|
if regenerate {
|
||||||
// regenerate == !user_provided
|
// regenerate == !user_provided
|
||||||
user_provided.del.get_or_insert(Default::default()).insert(docid);
|
user_provided.insert_del_u32(docid);
|
||||||
} else {
|
} else {
|
||||||
user_provided.add.get_or_insert(Default::default()).insert(docid);
|
user_provided.insert_add_u32(docid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -342,35 +342,28 @@ where
|
|||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
|
|
||||||
let index_embeddings = index.embedding_configs(&rtxn)?;
|
let mut index_embeddings = index.embedding_configs(&rtxn)?;
|
||||||
if index_embeddings.is_empty() {
|
if index_embeddings.is_empty() {
|
||||||
break 'vectors;
|
break 'vectors;
|
||||||
}
|
}
|
||||||
|
|
||||||
let embedding_sender = extractor_sender.embeddings();
|
let embedding_sender = extractor_sender.embeddings();
|
||||||
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
|
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
|
||||||
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
|
let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads());
|
||||||
let (finished_steps, step_name) = steps::extract_embeddings();
|
let (finished_steps, step_name) = steps::extract_embeddings();
|
||||||
|
|
||||||
|
|
||||||
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
|
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
|
||||||
|
|
||||||
|
for config in &mut index_embeddings {
|
||||||
let mut user_provided = HashMap::new();
|
'data: for data in datastore.iter_mut() {
|
||||||
for data in datastore {
|
let data = &mut data.get_mut().0;
|
||||||
let data = data.into_inner().0;
|
let Some(deladd) = data.remove(&config.name) else { continue 'data; };
|
||||||
for (embedder, deladd) in data.into_iter() {
|
deladd.apply_to(&mut config.user_provided);
|
||||||
let user_provided = user_provided.entry(embedder).or_insert(Default::default());
|
|
||||||
if let Some(del) = deladd.del {
|
|
||||||
*user_provided -= del;
|
|
||||||
}
|
|
||||||
if let Some(add) = deladd.add {
|
|
||||||
*user_provided |= add;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
embedding_sender.finish(user_provided).unwrap();
|
embedding_sender.finish(index_embeddings).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO THIS IS TOO MUCH
|
// TODO THIS IS TOO MUCH
|
||||||
@ -472,7 +465,7 @@ where
|
|||||||
writer.del_items(wtxn, *dimensions, docid)?;
|
writer.del_items(wtxn, *dimensions, docid)?;
|
||||||
writer.add_item(wtxn, docid, &embedding)?;
|
writer.add_item(wtxn, docid, &embedding)?;
|
||||||
}
|
}
|
||||||
ArroyOperation::Finish { mut user_provided } => {
|
ArroyOperation::Finish { configs } => {
|
||||||
let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
|
let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
|
|
||||||
@ -497,14 +490,6 @@ where
|
|||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut configs = index.embedding_configs(wtxn)?;
|
|
||||||
|
|
||||||
for config in &mut configs {
|
|
||||||
if let Some(user_provided) = user_provided.remove(&config.name) {
|
|
||||||
config.user_provided = user_provided;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
index.put_embedding_configs(wtxn, configs)?;
|
index.put_embedding_configs(wtxn, configs)?;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
Loading…
Reference in New Issue
Block a user