Make things to compile again

This commit is contained in:
Clément Renault 2023-08-31 12:15:47 +02:00
parent 95a011af13
commit d7233ecdb8
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
6 changed files with 695 additions and 638 deletions

View File

@ -43,7 +43,7 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind}; use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task}; use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId}; use crate::{Error, IndexSchedulerInner, ProcessingTasks, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time. /// Represents a combination of tasks that can all be processed at the same time.
/// ///
@ -213,7 +213,7 @@ impl IndexOperation {
} }
} }
impl IndexScheduler { impl IndexSchedulerInner {
/// Convert an [`BatchKind`](crate::autobatcher::BatchKind) into a [`Batch`]. /// Convert an [`BatchKind`](crate::autobatcher::BatchKind) into a [`Batch`].
/// ///
/// ## Arguments /// ## Arguments
@ -480,8 +480,7 @@ impl IndexScheduler {
if let Some(task_id) = to_cancel.max() { if let Some(task_id) = to_cancel.max() {
// We retrieve the tasks that were processing before this tasks cancelation started. // We retrieve the tasks that were processing before this tasks cancelation started.
// We must *not* reset the processing tasks before calling this method. // We must *not* reset the processing tasks before calling this method.
let ProcessingTasks { started_at, processing, .. } = let ProcessingTasks { started_at, processing, .. } = &*self.processing_tasks.read();
&*self.processing_tasks.read().unwrap();
return Ok(Some(Batch::TaskCancelation { return Ok(Some(Batch::TaskCancelation {
task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?, task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?,
previous_started_at: *started_at, previous_started_at: *started_at,
@ -1392,7 +1391,7 @@ impl IndexScheduler {
fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result<u64> { fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result<u64> {
// 1. Remove from this list the tasks that we are not allowed to delete // 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?; let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); let processing_tasks = &self.processing_tasks.read().processing.clone();
let all_task_ids = self.all_task_ids(wtxn)?; let all_task_ids = self.all_task_ids(wtxn)?;
let mut to_delete_tasks = all_task_ids & matched_tasks; let mut to_delete_tasks = all_task_ids & matched_tasks;

File diff suppressed because it is too large Load Diff

View File

@ -10,9 +10,9 @@ use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status
use roaring::{MultiOps, RoaringBitmap}; use roaring::{MultiOps, RoaringBitmap};
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128}; use crate::{Error, IndexSchedulerInner, Result, Task, TaskId, BEI128};
impl IndexScheduler { impl IndexSchedulerInner {
pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> { pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> {
enum_iterator::all().map(|s| self.get_status(rtxn, s)).union() enum_iterator::all().map(|s| self.get_status(rtxn, s)).union()
} }

View File

@ -280,6 +280,7 @@ fn import_dump(
auth: &mut AuthController, auth: &mut AuthController,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let reader = File::open(dump_path)?; let reader = File::open(dump_path)?;
let index_scheduler = index_scheduler.inner();
let mut dump_reader = dump::DumpReader::open(reader)?; let mut dump_reader = dump::DumpReader::open(reader)?;
if let Some(date) = dump_reader.date() { if let Some(date) = dump_reader.date() {

View File

@ -78,6 +78,6 @@ async fn patch_features(
}), }),
Some(&req), Some(&req),
); );
index_scheduler.put_runtime_features(new_features)?; index_scheduler.inner().put_runtime_features(new_features)?;
Ok(HttpResponse::Ok().json(new_features)) Ok(HttpResponse::Ok().json(new_features))
} }

View File

@ -324,11 +324,8 @@ async fn cancel_tasks(
let query = params.into_query(); let query = params.into_query();
let (tasks, _) = index_scheduler.get_task_ids_from_authorized_indexes( let (tasks, _) =
&index_scheduler.read_txn()?, index_scheduler.get_task_ids_from_authorized_indexes(&query, index_scheduler.filters())?;
&query,
index_scheduler.filters(),
)?;
let task_cancelation = let task_cancelation =
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks }; KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
@ -369,11 +366,8 @@ async fn delete_tasks(
); );
let query = params.into_query(); let query = params.into_query();
let (tasks, _) = index_scheduler.get_task_ids_from_authorized_indexes( let (tasks, _) =
&index_scheduler.read_txn()?, index_scheduler.get_task_ids_from_authorized_indexes(&query, index_scheduler.filters())?;
&query,
index_scheduler.filters(),
)?;
let task_deletion = let task_deletion =
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks }; KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };