make Task index_uid an option

Not all task relate to an index. Tasks that don't have an index_uid set
to None
This commit is contained in:
ad hoc 2022-05-16 19:50:45 +02:00
parent 9935db86c7
commit aa50acb031
No known key found for this signature in database
GPG Key ID: 4F00A782990CC643
8 changed files with 79 additions and 35 deletions

View File

@ -137,7 +137,7 @@ fn serialize_duration<S: Serializer>(
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct TaskView { pub struct TaskView {
uid: TaskId, uid: TaskId,
index_uid: String, index_uid: Option<String>,
status: TaskStatus, status: TaskStatus,
#[serde(rename = "type")] #[serde(rename = "type")]
task_type: TaskType, task_type: TaskType,
@ -313,7 +313,7 @@ impl From<Task> for TaskView {
Self { Self {
uid: id, uid: id,
index_uid: index_uid.into_inner(), index_uid: index_uid.map(|u| u.into_inner()),
status, status,
task_type, task_type,
details, details,
@ -342,7 +342,7 @@ impl From<Vec<TaskView>> for TaskListView {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct SummarizedTaskView { pub struct SummarizedTaskView {
uid: TaskId, uid: TaskId,
index_uid: String, index_uid: Option<String>,
status: TaskStatus, status: TaskStatus,
#[serde(rename = "type")] #[serde(rename = "type")]
task_type: TaskType, task_type: TaskType,
@ -365,7 +365,7 @@ impl From<Task> for SummarizedTaskView {
Self { Self {
uid: other.id, uid: other.id,
index_uid: other.index_uid.to_string(), index_uid: other.index_uid.map(|u| u.into_inner()),
status: TaskStatus::Enqueued, status: TaskStatus::Enqueued,
task_type: other.content.into(), task_type: other.content.into(),
enqueued_at, enqueued_at,

View File

@ -187,7 +187,7 @@ impl From<(UpdateStatus, String, TaskId)> for Task {
// Dummy task // Dummy task
let mut task = Task { let mut task = Task {
id: task_id, id: task_id,
index_uid: IndexUid::new(uid).unwrap(), index_uid: Some(IndexUid::new(uid).unwrap()),
content: TaskContent::IndexDeletion, content: TaskContent::IndexDeletion,
events: Vec::new(), events: Vec::new(),
}; };

View File

@ -419,7 +419,7 @@ where
Update::UpdateIndex { primary_key } => TaskContent::IndexUpdate { primary_key }, Update::UpdateIndex { primary_key } => TaskContent::IndexUpdate { primary_key },
}; };
let task = self.task_store.register(uid, content).await?; let task = self.task_store.register(Some(uid), content).await?;
self.scheduler.read().await.notify(); self.scheduler.read().await.notify();
Ok(task) Ok(task)
@ -569,7 +569,12 @@ where
// Check if the currently indexing update is from our index. // Check if the currently indexing update is from our index.
let is_indexing = processing_tasks let is_indexing = processing_tasks
.first() .first()
.map(|task| task.index_uid.as_str() == uid) .map(|task| {
task.index_uid
.as_ref()
.map(|u| u.as_str() == uid)
.unwrap_or(false)
})
.unwrap_or_default(); .unwrap_or_default();
let index = self.index_resolver.get_index(uid).await?; let index = self.index_resolver.get_index(uid).await?;
@ -605,7 +610,7 @@ where
// Check if the currently indexing update is from our index. // Check if the currently indexing update is from our index.
stats.is_indexing = processing_tasks stats.is_indexing = processing_tasks
.first() .first()
.map(|p| p.index_uid.as_str() == index_uid) .and_then(|p| p.index_uid.as_ref().map(|u| u.as_str() == index_uid))
.or(Some(false)); .or(Some(false));
indexes.insert(index_uid, stats); indexes.insert(index_uid, stats);

View File

@ -204,7 +204,7 @@ where
match batch.tasks.first() { match batch.tasks.first() {
Some(Task { Some(Task {
index_uid, index_uid: Some(ref index_uid),
id, id,
content: content:
TaskContent::DocumentAddition { TaskContent::DocumentAddition {
@ -285,7 +285,7 @@ where
TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"), TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"),
TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) => { TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) => {
let ids = ids.clone(); let ids = ids.clone();
let index = self.get_index(index_uid.into_inner()).await?; let index = self.get_index(index_uid.unwrap().into_inner()).await?;
let DocumentDeletionResult { let DocumentDeletionResult {
deleted_documents, .. deleted_documents, ..
@ -294,7 +294,7 @@ where
Ok(TaskResult::DocumentDeletion { deleted_documents }) Ok(TaskResult::DocumentDeletion { deleted_documents })
} }
TaskContent::DocumentDeletion(DocumentDeletion::Clear) => { TaskContent::DocumentDeletion(DocumentDeletion::Clear) => {
let index = self.get_index(index_uid.into_inner()).await?; let index = self.get_index(index_uid.unwrap().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> { let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
let number_documents = index.stats()?.number_of_documents; let number_documents = index.stats()?.number_of_documents;
index.clear_documents()?; index.clear_documents()?;
@ -310,9 +310,10 @@ where
allow_index_creation, allow_index_creation,
} => { } => {
let index = if *is_deletion || !*allow_index_creation { let index = if *is_deletion || !*allow_index_creation {
self.get_index(index_uid.into_inner()).await? self.get_index(index_uid.unwrap().into_inner()).await?
} else { } else {
self.get_or_create_index(index_uid, task.id).await? self.get_or_create_index(index_uid.unwrap(), task.id)
.await?
}; };
let settings = settings.clone(); let settings = settings.clone();
@ -321,7 +322,7 @@ where
Ok(TaskResult::Other) Ok(TaskResult::Other)
} }
TaskContent::IndexDeletion => { TaskContent::IndexDeletion => {
let index = self.delete_index(index_uid.into_inner()).await?; let index = self.delete_index(index_uid.unwrap().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> { let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
Ok(index.stats()?.number_of_documents) Ok(index.stats()?.number_of_documents)
@ -331,7 +332,7 @@ where
Ok(TaskResult::ClearAll { deleted_documents }) Ok(TaskResult::ClearAll { deleted_documents })
} }
TaskContent::IndexCreation { primary_key } => { TaskContent::IndexCreation { primary_key } => {
let index = self.create_index(index_uid, task.id).await?; let index = self.create_index(index_uid.unwrap(), task.id).await?;
if let Some(primary_key) = primary_key { if let Some(primary_key) = primary_key {
let primary_key = primary_key.clone(); let primary_key = primary_key.clone();
@ -341,7 +342,7 @@ where
Ok(TaskResult::Other) Ok(TaskResult::Other)
} }
TaskContent::IndexUpdate { primary_key } => { TaskContent::IndexUpdate { primary_key } => {
let index = self.get_index(index_uid.into_inner()).await?; let index = self.get_index(index_uid.unwrap().into_inner()).await?;
if let Some(primary_key) = primary_key { if let Some(primary_key) = primary_key {
let primary_key = primary_key.clone(); let primary_key = primary_key.clone();
@ -503,7 +504,7 @@ mod test {
proptest! { proptest! {
#[test] #[test]
fn test_process_task( fn test_process_task(
task in any::<Task>(), task in any::<Task>().prop_filter("uid must be Some", |t| t.index_uid.is_some()),
index_exists in any::<bool>(), index_exists in any::<bool>(),
index_op_fails in any::<bool>(), index_op_fails in any::<bool>(),
any_int in any::<u64>(), any_int in any::<u64>(),

View File

@ -125,7 +125,8 @@ struct TaskQueue {
impl TaskQueue { impl TaskQueue {
fn insert(&mut self, task: Task) { fn insert(&mut self, task: Task) {
let uid = task.index_uid.into_inner(); // TODO(marin): The index uid should be remaped to a task queue identifier here
let uid = task.index_uid.unwrap().into_inner();
let id = task.id; let id = task.id;
let kind = match task.content { let kind = match task.content {
TaskContent::DocumentAddition { TaskContent::DocumentAddition {
@ -443,7 +444,7 @@ mod test {
fn gen_task(id: TaskId, index_uid: &str, content: TaskContent) -> Task { fn gen_task(id: TaskId, index_uid: &str, content: TaskContent) -> Task {
Task { Task {
id, id,
index_uid: IndexUid::new_unchecked(index_uid), index_uid: Some(IndexUid::new_unchecked(index_uid)),
content, content,
events: vec![], events: vec![],
} }

View File

@ -74,7 +74,11 @@ pub enum TaskEvent {
#[cfg_attr(test, derive(proptest_derive::Arbitrary))] #[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub struct Task { pub struct Task {
pub id: TaskId, pub id: TaskId,
pub index_uid: IndexUid, /// The name of the index the task is targeting. If it isn't targeting any idex (i.e Dump task)
/// then this is None
// TODO: when next forward breaking dumps, it would be a good idea to move this field inside of
// the TaskContent.
pub index_uid: Option<IndexUid>,
pub content: TaskContent, pub content: TaskContent,
pub events: Vec<TaskEvent>, pub events: Vec<TaskEvent>,
} }

View File

@ -30,10 +30,14 @@ pub struct TaskFilter {
impl TaskFilter { impl TaskFilter {
fn pass(&self, task: &Task) -> bool { fn pass(&self, task: &Task) -> bool {
self.indexes match task.index_uid {
.as_ref() Some(ref index_uid) => self
.map(|indexes| indexes.contains(&*task.index_uid)) .indexes
.unwrap_or(true) .as_ref()
.map(|indexes| indexes.contains(index_uid.as_str()))
.unwrap_or(true),
None => false,
}
} }
/// Adds an index to the filter, so the filter must match this index. /// Adds an index to the filter, so the filter must match this index.
@ -66,7 +70,11 @@ impl TaskStore {
Ok(Self { store }) Ok(Self { store })
} }
pub async fn register(&self, index_uid: IndexUid, content: TaskContent) -> Result<Task> { pub async fn register(
&self,
index_uid: Option<IndexUid>,
content: TaskContent,
) -> Result<Task> {
debug!("registering update: {:?}", content); debug!("registering update: {:?}", content);
let store = self.store.clone(); let store = self.store.clone();
let task = tokio::task::spawn_blocking(move || -> Result<Task> { let task = tokio::task::spawn_blocking(move || -> Result<Task> {
@ -305,7 +313,11 @@ pub mod test {
} }
} }
pub async fn register(&self, index_uid: IndexUid, content: TaskContent) -> Result<Task> { pub async fn register(
&self,
index_uid: Option<IndexUid>,
content: TaskContent,
) -> Result<Task> {
match self { match self {
Self::Real(s) => s.register(index_uid, content).await, Self::Real(s) => s.register(index_uid, content).await,
Self::Mock(_m) => todo!(), Self::Mock(_m) => todo!(),
@ -335,7 +347,7 @@ pub mod test {
let gen_task = |id: TaskId| Task { let gen_task = |id: TaskId| Task {
id, id,
index_uid: IndexUid::new_unchecked("test"), index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexCreation { primary_key: None }, content: TaskContent::IndexCreation { primary_key: None },
events: Vec::new(), events: Vec::new(),
}; };

View File

@ -109,7 +109,8 @@ impl Store {
pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> { pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> {
self.tasks.put(txn, &BEU64::new(task.id), task)?; self.tasks.put(txn, &BEU64::new(task.id), task)?;
self.uids_task_ids self.uids_task_ids
.put(txn, &(&task.index_uid, task.id), &())?; // TODO(marin): The index uid should be remaped to a task queue identifier here
.put(txn, &(&task.index_uid.as_ref().unwrap(), task.id), &())?;
Ok(()) Ok(())
} }
@ -325,7 +326,7 @@ pub mod test {
let tasks = (0..100) let tasks = (0..100)
.map(|_| Task { .map(|_| Task {
id: rand::random(), id: rand::random(),
index_uid: IndexUid::new_unchecked("test"), index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion, content: TaskContent::IndexDeletion,
events: vec![], events: vec![],
}) })
@ -356,14 +357,14 @@ pub mod test {
let task_1 = Task { let task_1 = Task {
id: 1, id: 1,
index_uid: IndexUid::new_unchecked("test"), index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion, content: TaskContent::IndexDeletion,
events: vec![], events: vec![],
}; };
let task_2 = Task { let task_2 = Task {
id: 0, id: 0,
index_uid: IndexUid::new_unchecked("test1"), index_uid: Some(IndexUid::new_unchecked("test1")),
content: TaskContent::IndexDeletion, content: TaskContent::IndexDeletion,
events: vec![], events: vec![],
}; };
@ -379,18 +380,28 @@ pub mod test {
txn.abort().unwrap(); txn.abort().unwrap();
assert_eq!(tasks.len(), 1); assert_eq!(tasks.len(), 1);
assert_eq!(&*tasks.first().unwrap().index_uid, "test"); assert_eq!(
tasks
.first()
.as_ref()
.unwrap()
.index_uid
.as_ref()
.unwrap()
.as_str(),
"test"
);
// same thing but invert the ids // same thing but invert the ids
let task_1 = Task { let task_1 = Task {
id: 0, id: 0,
index_uid: IndexUid::new_unchecked("test"), index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion, content: TaskContent::IndexDeletion,
events: vec![], events: vec![],
}; };
let task_2 = Task { let task_2 = Task {
id: 1, id: 1,
index_uid: IndexUid::new_unchecked("test1"), index_uid: Some(IndexUid::new_unchecked("test1")),
content: TaskContent::IndexDeletion, content: TaskContent::IndexDeletion,
events: vec![], events: vec![],
}; };
@ -405,7 +416,17 @@ pub mod test {
let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap(); let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap();
assert_eq!(tasks.len(), 1); assert_eq!(tasks.len(), 1);
assert_eq!(&*tasks.first().unwrap().index_uid, "test"); assert_eq!(
&*tasks
.first()
.as_ref()
.unwrap()
.index_uid
.as_ref()
.unwrap()
.as_str(),
"test"
);
} }
proptest! { proptest! {