mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-19 17:38:28 +08:00
Extract the must_stop flag out of the RwLock
This commit is contained in:
parent
07db4cfab0
commit
fd73306793
@ -680,10 +680,10 @@ impl IndexScheduler {
|
|||||||
self.index_mapper.indexer_config(),
|
self.index_mapper.indexer_config(),
|
||||||
);
|
);
|
||||||
builder.set_primary_key(primary_key);
|
builder.set_primary_key(primary_key);
|
||||||
let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
|
let must_stop_processing = self.must_stop_processing.clone();
|
||||||
builder.execute(
|
builder.execute(
|
||||||
|indexing_step| debug!("update: {:?}", indexing_step),
|
|indexing_step| debug!("update: {:?}", indexing_step),
|
||||||
|| must_stop.load(Relaxed),
|
|| must_stop_processing.get(),
|
||||||
)?;
|
)?;
|
||||||
index_wtxn.commit()?;
|
index_wtxn.commit()?;
|
||||||
}
|
}
|
||||||
@ -762,7 +762,7 @@ impl IndexScheduler {
|
|||||||
content_files,
|
content_files,
|
||||||
mut tasks,
|
mut tasks,
|
||||||
} => {
|
} => {
|
||||||
let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
|
let must_stop_processing = self.must_stop_processing.clone();
|
||||||
let indexer_config = self.index_mapper.indexer_config();
|
let indexer_config = self.index_mapper.indexer_config();
|
||||||
// TODO use the code from the IndexCreate operation
|
// TODO use the code from the IndexCreate operation
|
||||||
if let Some(primary_key) = primary_key {
|
if let Some(primary_key) = primary_key {
|
||||||
@ -772,7 +772,7 @@ impl IndexScheduler {
|
|||||||
builder.set_primary_key(primary_key);
|
builder.set_primary_key(primary_key);
|
||||||
builder.execute(
|
builder.execute(
|
||||||
|indexing_step| debug!("update: {:?}", indexing_step),
|
|indexing_step| debug!("update: {:?}", indexing_step),
|
||||||
|| must_stop.clone().load(Relaxed),
|
|| must_stop_processing.clone().get(),
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -788,7 +788,7 @@ impl IndexScheduler {
|
|||||||
indexer_config,
|
indexer_config,
|
||||||
config,
|
config,
|
||||||
|indexing_step| debug!("update: {:?}", indexing_step),
|
|indexing_step| debug!("update: {:?}", indexing_step),
|
||||||
|| must_stop.load(Relaxed),
|
|| must_stop_processing.get(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let mut results = Vec::new();
|
let mut results = Vec::new();
|
||||||
@ -882,10 +882,10 @@ impl IndexScheduler {
|
|||||||
let mut builder =
|
let mut builder =
|
||||||
milli::update::Settings::new(index_wtxn, index, indexer_config);
|
milli::update::Settings::new(index_wtxn, index, indexer_config);
|
||||||
apply_settings_to_builder(&checked_settings, &mut builder);
|
apply_settings_to_builder(&checked_settings, &mut builder);
|
||||||
let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
|
let must_stop_processing = self.must_stop_processing.clone();
|
||||||
builder.execute(
|
builder.execute(
|
||||||
|indexing_step| debug!("update: {:?}", indexing_step),
|
|indexing_step| debug!("update: {:?}", indexing_step),
|
||||||
|| must_stop.load(Relaxed),
|
|| must_stop_processing.get(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
task.status = Status::Succeeded;
|
task.status = Status::Succeeded;
|
||||||
|
@ -125,32 +125,42 @@ struct ProcessingTasks {
|
|||||||
started_at: OffsetDateTime,
|
started_at: OffsetDateTime,
|
||||||
/// The list of tasks ids that are currently running.
|
/// The list of tasks ids that are currently running.
|
||||||
processing: RoaringBitmap,
|
processing: RoaringBitmap,
|
||||||
/// A boolean that can be set to true to stop the currently processing tasks.
|
|
||||||
must_stop: Arc<AtomicBool>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ProcessingTasks {
|
impl ProcessingTasks {
|
||||||
/// Stores the currently processing tasks, the date time at which it started
|
/// Stores the currently processing tasks, and the date time at which it started.
|
||||||
/// and resets the _must stop_ flag.
|
|
||||||
fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) {
|
fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) {
|
||||||
self.started_at = started_at;
|
self.started_at = started_at;
|
||||||
self.processing = processing;
|
self.processing = processing;
|
||||||
self.must_stop.store(false, Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Resets the processing tasks to an empty list.
|
/// Set the processing tasks to an empty list.
|
||||||
fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) {
|
fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) {
|
||||||
self.started_at = stopped_at;
|
self.started_at = stopped_at;
|
||||||
self.processing = RoaringBitmap::new();
|
self.processing = RoaringBitmap::new();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Forces the currently processing tasks to stop running if necessary.
|
/// Returns `true` if there, at least, is one task that is currently processing we must stop.
|
||||||
fn cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) {
|
fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool {
|
||||||
// If there, at least, is one task that is currently processing we must stop.
|
!self.processing.is_disjoint(canceled_tasks)
|
||||||
if !self.processing.is_disjoint(canceled_tasks) {
|
|
||||||
self.must_stop.store(true, Relaxed);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Clone, Debug)]
|
||||||
|
struct MustStopProcessing(Arc<AtomicBool>);
|
||||||
|
|
||||||
|
impl MustStopProcessing {
|
||||||
|
fn get(&self) -> bool {
|
||||||
|
self.0.load(Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn must_stop(&self) {
|
||||||
|
self.0.store(true, Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset(&self) {
|
||||||
|
self.0.store(false, Relaxed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Database const names for the `IndexScheduler`.
|
/// Database const names for the `IndexScheduler`.
|
||||||
@ -168,6 +178,8 @@ pub struct IndexScheduler {
|
|||||||
/// The LMDB environment which the DBs are associated with.
|
/// The LMDB environment which the DBs are associated with.
|
||||||
pub(crate) env: Env,
|
pub(crate) env: Env,
|
||||||
|
|
||||||
|
/// A boolean that can be set to true to stop the currently processing tasks.
|
||||||
|
pub(crate) must_stop_processing: MustStopProcessing,
|
||||||
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
|
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
|
||||||
pub(crate) file_store: FileStore,
|
pub(crate) file_store: FileStore,
|
||||||
|
|
||||||
@ -233,12 +245,12 @@ impl IndexScheduler {
|
|||||||
let processing_tasks = ProcessingTasks {
|
let processing_tasks = ProcessingTasks {
|
||||||
started_at: OffsetDateTime::now_utc(),
|
started_at: OffsetDateTime::now_utc(),
|
||||||
processing: RoaringBitmap::new(),
|
processing: RoaringBitmap::new(),
|
||||||
must_stop: Arc::new(AtomicBool::new(false)),
|
|
||||||
};
|
};
|
||||||
let file_store = FileStore::new(&update_file_path)?;
|
let file_store = FileStore::new(&update_file_path)?;
|
||||||
|
|
||||||
// allow unreachable_code to get rids of the warning in the case of a test build.
|
// allow unreachable_code to get rids of the warning in the case of a test build.
|
||||||
let this = Self {
|
let this = Self {
|
||||||
|
must_stop_processing: MustStopProcessing::default(),
|
||||||
processing_tasks: Arc::new(RwLock::new(processing_tasks)),
|
processing_tasks: Arc::new(RwLock::new(processing_tasks)),
|
||||||
file_store,
|
file_store,
|
||||||
all_tasks: env.create_database(Some(db_name::ALL_TASKS))?,
|
all_tasks: env.create_database(Some(db_name::ALL_TASKS))?,
|
||||||
@ -263,6 +275,7 @@ impl IndexScheduler {
|
|||||||
/// This function will execute in a different thread and must be called only once.
|
/// This function will execute in a different thread and must be called only once.
|
||||||
fn run(&self) {
|
fn run(&self) {
|
||||||
let run = Self {
|
let run = Self {
|
||||||
|
must_stop_processing: MustStopProcessing::default(),
|
||||||
processing_tasks: self.processing_tasks.clone(),
|
processing_tasks: self.processing_tasks.clone(),
|
||||||
file_store: self.file_store.clone(),
|
file_store: self.file_store.clone(),
|
||||||
env: self.env.clone(),
|
env: self.env.clone(),
|
||||||
@ -433,10 +446,14 @@ impl IndexScheduler {
|
|||||||
// we inform the processing tasks to stop (if necessary).
|
// we inform the processing tasks to stop (if necessary).
|
||||||
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
|
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
|
||||||
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
|
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
|
||||||
self.processing_tasks
|
if self
|
||||||
|
.processing_tasks
|
||||||
.read()
|
.read()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.cancel_processing_tasks(&tasks_to_cancel);
|
.must_cancel_processing_tasks(&tasks_to_cancel)
|
||||||
|
{
|
||||||
|
self.must_stop_processing.must_stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// notify the scheduler loop to execute a new tick
|
// notify the scheduler loop to execute a new tick
|
||||||
@ -612,6 +629,9 @@ impl IndexScheduler {
|
|||||||
let processed_tasks = ids.len();
|
let processed_tasks = ids.len();
|
||||||
let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap();
|
let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap();
|
||||||
let started_at = OffsetDateTime::now_utc();
|
let started_at = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
|
// We reset the must_stop flag to be sure that we don't stop processing tasks
|
||||||
|
self.must_stop_processing.reset();
|
||||||
self.processing_tasks
|
self.processing_tasks
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -14,6 +14,7 @@ use crate::{index_mapper::IndexMapper, IndexScheduler, Kind, Status};
|
|||||||
pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
||||||
let IndexScheduler {
|
let IndexScheduler {
|
||||||
autobatching_enabled,
|
autobatching_enabled,
|
||||||
|
must_stop_processing,
|
||||||
processing_tasks,
|
processing_tasks,
|
||||||
file_store,
|
file_store,
|
||||||
env,
|
env,
|
||||||
|
Loading…
Reference in New Issue
Block a user