2022-09-09 18:16:19 +08:00
|
|
|
mod autobatcher;
|
2022-09-07 06:10:14 +08:00
|
|
|
mod batch;
|
2022-09-06 22:43:59 +08:00
|
|
|
pub mod error;
|
2022-09-14 18:35:33 +08:00
|
|
|
mod index_mapper;
|
2022-10-10 18:57:17 +08:00
|
|
|
#[cfg(test)]
|
|
|
|
mod snapshot;
|
2022-09-07 06:10:14 +08:00
|
|
|
mod utils;
|
2022-09-06 22:43:59 +08:00
|
|
|
|
2022-09-07 05:49:19 +08:00
|
|
|
pub type Result<T> = std::result::Result<T, Error>;
|
2022-09-06 22:43:59 +08:00
|
|
|
pub type TaskId = u32;
|
|
|
|
|
2022-10-16 07:39:01 +08:00
|
|
|
use dump::{KindDump, TaskDump, UpdateFile};
|
2022-09-14 22:16:53 +08:00
|
|
|
pub use error::Error;
|
2022-09-15 18:23:41 +08:00
|
|
|
|
2022-10-03 22:15:10 +08:00
|
|
|
use std::path::PathBuf;
|
2022-10-17 19:54:35 +08:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
|
2022-10-03 22:15:10 +08:00
|
|
|
use std::sync::{Arc, RwLock};
|
|
|
|
|
2022-10-16 07:39:01 +08:00
|
|
|
use file_store::FileStore;
|
2022-10-05 22:48:43 +08:00
|
|
|
use meilisearch_types::error::ResponseError;
|
2022-10-17 23:19:17 +08:00
|
|
|
use meilisearch_types::milli;
|
2022-10-03 22:15:10 +08:00
|
|
|
use roaring::RoaringBitmap;
|
2022-10-06 22:53:21 +08:00
|
|
|
use serde::{Deserialize, Serialize};
|
2022-10-03 22:15:10 +08:00
|
|
|
use synchronoise::SignalEvent;
|
|
|
|
use time::OffsetDateTime;
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
2022-10-11 23:42:43 +08:00
|
|
|
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
|
|
|
use meilisearch_types::heed::{self, Database, Env};
|
2022-10-17 19:54:35 +08:00
|
|
|
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
2022-10-11 23:42:43 +08:00
|
|
|
use meilisearch_types::milli::update::IndexerConfig;
|
|
|
|
use meilisearch_types::milli::{Index, RoaringBitmapCodec, BEU32};
|
2022-10-17 19:54:35 +08:00
|
|
|
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
use crate::index_mapper::IndexMapper;
|
|
|
|
|
2022-10-12 06:43:24 +08:00
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
2022-10-03 22:15:10 +08:00
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
|
pub struct Query {
|
2022-10-13 18:48:23 +08:00
|
|
|
pub limit: Option<u32>,
|
2022-10-03 22:15:10 +08:00
|
|
|
pub from: Option<u32>,
|
|
|
|
pub status: Option<Vec<Status>>,
|
|
|
|
#[serde(rename = "type")]
|
|
|
|
pub kind: Option<Vec<Kind>>,
|
|
|
|
pub index_uid: Option<Vec<String>>,
|
|
|
|
pub uid: Option<Vec<TaskId>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Query {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
2022-10-13 18:48:23 +08:00
|
|
|
limit: None,
|
2022-10-03 22:15:10 +08:00
|
|
|
from: None,
|
|
|
|
status: None,
|
|
|
|
kind: None,
|
|
|
|
index_uid: None,
|
|
|
|
uid: None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Query {
|
2022-10-15 17:17:06 +08:00
|
|
|
/// Return `true` iff every field of the query is set to `None`, such that the query
|
|
|
|
/// would match all tasks.
|
|
|
|
pub fn is_empty(&self) -> bool {
|
|
|
|
matches!(
|
|
|
|
self,
|
|
|
|
Query {
|
|
|
|
limit: None,
|
|
|
|
from: None,
|
|
|
|
status: None,
|
|
|
|
kind: None,
|
|
|
|
index_uid: None,
|
|
|
|
uid: None
|
|
|
|
}
|
|
|
|
)
|
|
|
|
}
|
2022-10-03 22:15:10 +08:00
|
|
|
pub fn with_status(self, status: Status) -> Self {
|
|
|
|
let mut status_vec = self.status.unwrap_or_default();
|
|
|
|
status_vec.push(status);
|
|
|
|
Self {
|
|
|
|
status: Some(status_vec),
|
|
|
|
..self
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn with_kind(self, kind: Kind) -> Self {
|
|
|
|
let mut kind_vec = self.kind.unwrap_or_default();
|
|
|
|
kind_vec.push(kind);
|
|
|
|
Self {
|
|
|
|
kind: Some(kind_vec),
|
|
|
|
..self
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn with_index(self, index_uid: String) -> Self {
|
|
|
|
let mut index_vec = self.index_uid.unwrap_or_default();
|
|
|
|
index_vec.push(index_uid);
|
|
|
|
Self {
|
|
|
|
index_uid: Some(index_vec),
|
|
|
|
..self
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn with_uid(self, uid: TaskId) -> Self {
|
|
|
|
let mut task_vec = self.uid.unwrap_or_default();
|
|
|
|
task_vec.push(uid);
|
|
|
|
Self {
|
|
|
|
uid: Some(task_vec),
|
|
|
|
..self
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn with_limit(self, limit: u32) -> Self {
|
2022-10-13 18:48:23 +08:00
|
|
|
Self {
|
|
|
|
limit: Some(limit),
|
|
|
|
..self
|
|
|
|
}
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-17 19:54:35 +08:00
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
struct ProcessingTasks {
|
|
|
|
/// The date and time at which the indexation started.
|
|
|
|
started_at: OffsetDateTime,
|
|
|
|
/// The list of tasks ids that are currently running.
|
|
|
|
processing: RoaringBitmap,
|
|
|
|
/// A boolean that can be set to true to stop the currently processing tasks.
|
|
|
|
must_stop: Arc<AtomicBool>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ProcessingTasks {
|
2022-10-17 23:19:17 +08:00
|
|
|
/// Stores the currently processing tasks, the date time at which it started
|
|
|
|
/// and resets the _must stop_ flag.
|
2022-10-17 19:54:35 +08:00
|
|
|
fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) {
|
|
|
|
self.started_at = started_at;
|
|
|
|
self.processing = processing;
|
2022-10-17 23:19:17 +08:00
|
|
|
self.must_stop.store(false, Relaxed);
|
2022-10-17 19:54:35 +08:00
|
|
|
}
|
|
|
|
|
2022-10-17 23:19:17 +08:00
|
|
|
/// Resets the processing tasks to an empty list.
|
2022-10-17 19:54:35 +08:00
|
|
|
fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) {
|
|
|
|
self.started_at = stopped_at;
|
|
|
|
self.processing = RoaringBitmap::new();
|
|
|
|
}
|
|
|
|
|
2022-10-17 23:19:17 +08:00
|
|
|
/// Forces the currently processing tasks to stop running if necessary.
|
|
|
|
fn cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) {
|
2022-10-17 19:54:35 +08:00
|
|
|
// If there, at least, is one task that is currently processing we must stop.
|
2022-10-17 23:19:17 +08:00
|
|
|
if !self.processing.is_disjoint(canceled_tasks) {
|
2022-10-17 19:54:35 +08:00
|
|
|
self.must_stop.store(true, Relaxed);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-03 22:15:10 +08:00
|
|
|
/// Database const names for the `IndexScheduler`.
|
|
|
|
mod db_name {
|
|
|
|
pub const ALL_TASKS: &str = "all-tasks";
|
|
|
|
pub const STATUS: &str = "status";
|
|
|
|
pub const KIND: &str = "kind";
|
|
|
|
pub const INDEX_TASKS: &str = "index-tasks";
|
|
|
|
}
|
|
|
|
|
|
|
|
/// This module is responsible for two things;
|
|
|
|
/// 1. Resolve the name of the indexes.
|
|
|
|
/// 2. Schedule the tasks.
|
|
|
|
pub struct IndexScheduler {
|
|
|
|
/// The LMDB environment which the DBs are associated with.
|
|
|
|
pub(crate) env: Env,
|
|
|
|
|
2022-10-17 19:54:35 +08:00
|
|
|
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
|
|
|
|
pub(crate) file_store: FileStore,
|
|
|
|
|
2022-10-03 22:15:10 +08:00
|
|
|
// The main database, it contains all the tasks accessible by their Id.
|
|
|
|
pub(crate) all_tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>,
|
|
|
|
|
|
|
|
/// All the tasks ids grouped by their status.
|
2022-10-17 23:19:17 +08:00
|
|
|
// TODO we should not be able to serialize a `Status::Processing` in this database.
|
2022-10-03 22:15:10 +08:00
|
|
|
pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
|
|
|
|
/// All the tasks ids grouped by their kind.
|
|
|
|
pub(crate) kind: Database<SerdeBincode<Kind>, RoaringBitmapCodec>,
|
|
|
|
/// Store the tasks associated to an index.
|
|
|
|
pub(crate) index_tasks: Database<Str, RoaringBitmapCodec>,
|
|
|
|
|
|
|
|
/// In charge of creating, opening, storing and returning indexes.
|
|
|
|
pub(crate) index_mapper: IndexMapper,
|
|
|
|
|
|
|
|
/// Get a signal when a batch needs to be processed.
|
|
|
|
pub(crate) wake_up: Arc<SignalEvent>,
|
|
|
|
|
2022-10-17 19:54:35 +08:00
|
|
|
/// Whether auto-batching is enabled or not.
|
2022-10-10 23:00:56 +08:00
|
|
|
pub(crate) autobatching_enabled: bool,
|
|
|
|
|
2022-10-13 21:02:59 +08:00
|
|
|
/// The path used to create the dumps.
|
|
|
|
pub(crate) dumps_path: PathBuf,
|
|
|
|
|
2022-10-03 22:15:10 +08:00
|
|
|
// ================= test
|
|
|
|
/// The next entry is dedicated to the tests.
|
|
|
|
/// It provide a way to break in multiple part of the scheduler.
|
|
|
|
#[cfg(test)]
|
|
|
|
test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
|
|
pub enum Breakpoint {
|
|
|
|
Start,
|
|
|
|
BatchCreated,
|
|
|
|
BeforeProcessing,
|
|
|
|
AfterProcessing,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl IndexScheduler {
|
|
|
|
pub fn new(
|
|
|
|
tasks_path: PathBuf,
|
|
|
|
update_file_path: PathBuf,
|
|
|
|
indexes_path: PathBuf,
|
2022-10-13 21:02:59 +08:00
|
|
|
dumps_path: PathBuf,
|
2022-10-03 22:15:10 +08:00
|
|
|
index_size: usize,
|
|
|
|
indexer_config: IndexerConfig,
|
2022-10-10 23:00:56 +08:00
|
|
|
autobatching_enabled: bool,
|
2022-10-03 22:15:10 +08:00
|
|
|
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>,
|
|
|
|
) -> Result<Self> {
|
|
|
|
std::fs::create_dir_all(&tasks_path)?;
|
|
|
|
std::fs::create_dir_all(&update_file_path)?;
|
|
|
|
std::fs::create_dir_all(&indexes_path)?;
|
2022-10-13 21:02:59 +08:00
|
|
|
std::fs::create_dir_all(&dumps_path)?;
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
let mut options = heed::EnvOpenOptions::new();
|
|
|
|
options.max_dbs(6);
|
|
|
|
|
|
|
|
let env = options.open(tasks_path)?;
|
2022-10-17 19:54:35 +08:00
|
|
|
let processing_tasks = ProcessingTasks {
|
|
|
|
started_at: OffsetDateTime::now_utc(),
|
|
|
|
processing: RoaringBitmap::new(),
|
|
|
|
must_stop: Arc::new(AtomicBool::new(false)),
|
|
|
|
};
|
2022-10-03 22:15:10 +08:00
|
|
|
let file_store = FileStore::new(&update_file_path)?;
|
|
|
|
|
|
|
|
// allow unreachable_code to get rids of the warning in the case of a test build.
|
|
|
|
let this = Self {
|
|
|
|
processing_tasks: Arc::new(RwLock::new(processing_tasks)),
|
|
|
|
file_store,
|
|
|
|
all_tasks: env.create_database(Some(db_name::ALL_TASKS))?,
|
|
|
|
status: env.create_database(Some(db_name::STATUS))?,
|
|
|
|
kind: env.create_database(Some(db_name::KIND))?,
|
|
|
|
index_tasks: env.create_database(Some(db_name::INDEX_TASKS))?,
|
|
|
|
index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?,
|
|
|
|
env,
|
|
|
|
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
|
|
|
|
wake_up: Arc::new(SignalEvent::auto(true)),
|
2022-10-10 23:00:56 +08:00
|
|
|
autobatching_enabled,
|
2022-10-13 21:02:59 +08:00
|
|
|
dumps_path,
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
test_breakpoint_sdr,
|
|
|
|
};
|
|
|
|
|
|
|
|
this.run();
|
|
|
|
Ok(this)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// This function will execute in a different thread and must be called only once.
|
|
|
|
fn run(&self) {
|
|
|
|
let run = Self {
|
|
|
|
processing_tasks: self.processing_tasks.clone(),
|
|
|
|
file_store: self.file_store.clone(),
|
|
|
|
env: self.env.clone(),
|
|
|
|
all_tasks: self.all_tasks,
|
|
|
|
status: self.status,
|
|
|
|
kind: self.kind,
|
|
|
|
index_tasks: self.index_tasks,
|
|
|
|
index_mapper: self.index_mapper.clone(),
|
|
|
|
wake_up: self.wake_up.clone(),
|
2022-10-10 23:00:56 +08:00
|
|
|
autobatching_enabled: self.autobatching_enabled,
|
2022-10-13 21:02:59 +08:00
|
|
|
dumps_path: self.dumps_path.clone(),
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
|
|
|
};
|
|
|
|
|
|
|
|
std::thread::spawn(move || loop {
|
|
|
|
run.wake_up.wait();
|
|
|
|
|
|
|
|
match run.tick() {
|
2022-10-10 22:19:23 +08:00
|
|
|
Ok(0) => (),
|
|
|
|
Ok(_) => run.wake_up.signal(),
|
2022-10-03 22:15:10 +08:00
|
|
|
Err(e) => log::error!("{}", e),
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2022-10-16 07:39:01 +08:00
|
|
|
pub fn indexer_config(&self) -> &IndexerConfig {
|
|
|
|
&self.index_mapper.indexer_config
|
|
|
|
}
|
|
|
|
|
2022-10-03 22:15:10 +08:00
|
|
|
/// Return the index corresponding to the name. If it wasn't opened before
|
|
|
|
/// it'll be opened. But if it doesn't exist on disk it'll throw an
|
|
|
|
/// `IndexNotFound` error.
|
|
|
|
pub fn index(&self, name: &str) -> Result<Index> {
|
|
|
|
let rtxn = self.env.read_txn()?;
|
|
|
|
self.index_mapper.index(&rtxn, name)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Return and open all the indexes.
|
2022-10-04 17:06:48 +08:00
|
|
|
pub fn indexes(&self) -> Result<Vec<(String, Index)>> {
|
2022-10-03 22:15:10 +08:00
|
|
|
let rtxn = self.env.read_txn()?;
|
|
|
|
self.index_mapper.indexes(&rtxn)
|
|
|
|
}
|
|
|
|
|
2022-10-13 17:09:00 +08:00
|
|
|
/// Return the task ids corresponding to the query
|
|
|
|
pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> {
|
2022-10-03 22:15:10 +08:00
|
|
|
let rtxn = self.env.read_txn()?;
|
|
|
|
|
|
|
|
// This is the list of all the tasks.
|
2022-10-17 18:58:20 +08:00
|
|
|
let mut tasks = self.all_task_ids(&rtxn)?;
|
2022-10-03 22:15:10 +08:00
|
|
|
|
2022-10-13 17:09:00 +08:00
|
|
|
if let Some(uids) = &query.uid {
|
2022-10-03 22:15:10 +08:00
|
|
|
tasks &= RoaringBitmap::from_iter(uids);
|
|
|
|
}
|
|
|
|
|
2022-10-13 17:09:00 +08:00
|
|
|
if let Some(status) = &query.status {
|
2022-10-03 22:15:10 +08:00
|
|
|
let mut status_tasks = RoaringBitmap::new();
|
|
|
|
for status in status {
|
2022-10-13 17:09:00 +08:00
|
|
|
status_tasks |= self.get_status(&rtxn, *status)?;
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
|
|
|
tasks &= status_tasks;
|
|
|
|
}
|
|
|
|
|
2022-10-13 17:09:00 +08:00
|
|
|
if let Some(kind) = &query.kind {
|
2022-10-03 22:15:10 +08:00
|
|
|
let mut kind_tasks = RoaringBitmap::new();
|
|
|
|
for kind in kind {
|
2022-10-13 17:09:00 +08:00
|
|
|
kind_tasks |= self.get_kind(&rtxn, *kind)?;
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
|
|
|
tasks &= kind_tasks;
|
|
|
|
}
|
|
|
|
|
2022-10-13 17:09:00 +08:00
|
|
|
if let Some(index) = &query.index_uid {
|
2022-10-03 22:15:10 +08:00
|
|
|
let mut index_tasks = RoaringBitmap::new();
|
|
|
|
for index in index {
|
2022-10-13 17:07:36 +08:00
|
|
|
index_tasks |= self.index_tasks(&rtxn, &index)?;
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
|
|
|
tasks &= index_tasks;
|
|
|
|
}
|
2022-10-13 17:09:00 +08:00
|
|
|
rtxn.commit().unwrap();
|
|
|
|
Ok(tasks)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the tasks corresponding to the query.
|
|
|
|
pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> {
|
|
|
|
let tasks = self.get_task_ids(&query)?;
|
2022-10-13 19:04:49 +08:00
|
|
|
let rtxn = self.env.read_txn()?;
|
2022-10-03 22:15:10 +08:00
|
|
|
|
2022-10-13 18:48:23 +08:00
|
|
|
let tasks = self.get_existing_tasks(
|
|
|
|
&rtxn,
|
|
|
|
tasks
|
|
|
|
.into_iter()
|
|
|
|
.rev()
|
|
|
|
.take(query.limit.unwrap_or(u32::MAX) as usize),
|
|
|
|
)?;
|
|
|
|
|
2022-10-17 23:19:17 +08:00
|
|
|
let ProcessingTasks {
|
|
|
|
started_at,
|
|
|
|
processing,
|
|
|
|
..
|
|
|
|
} = self
|
2022-10-03 22:15:10 +08:00
|
|
|
.processing_tasks
|
|
|
|
.read()
|
|
|
|
.map_err(|_| Error::CorruptedTaskQueue)?
|
|
|
|
.clone();
|
|
|
|
|
2022-10-12 09:21:25 +08:00
|
|
|
let ret = tasks.into_iter();
|
2022-10-03 22:15:10 +08:00
|
|
|
if processing.is_empty() {
|
|
|
|
Ok(ret.collect())
|
|
|
|
} else {
|
|
|
|
Ok(ret
|
|
|
|
.map(|task| match processing.contains(task.uid) {
|
2022-10-12 09:21:25 +08:00
|
|
|
true => Task {
|
2022-10-03 22:15:10 +08:00
|
|
|
status: Status::Processing,
|
|
|
|
started_at: Some(started_at),
|
|
|
|
..task
|
|
|
|
},
|
|
|
|
false => task,
|
|
|
|
})
|
|
|
|
.collect())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Register a new task in the scheduler. If it fails and data was associated with the task
|
|
|
|
/// it tries to delete the file.
|
2022-10-17 23:19:17 +08:00
|
|
|
pub fn register(&self, kind: KindWithContent) -> Result<Task> {
|
2022-10-03 22:15:10 +08:00
|
|
|
let mut wtxn = self.env.write_txn()?;
|
|
|
|
|
|
|
|
let task = Task {
|
|
|
|
uid: self.next_task_id(&wtxn)?,
|
|
|
|
enqueued_at: time::OffsetDateTime::now_utc(),
|
|
|
|
started_at: None,
|
|
|
|
finished_at: None,
|
|
|
|
error: None,
|
2022-10-18 19:57:58 +08:00
|
|
|
canceled_by: None,
|
2022-10-17 23:19:17 +08:00
|
|
|
details: kind.default_details(),
|
2022-10-03 22:15:10 +08:00
|
|
|
status: Status::Enqueued,
|
2022-10-17 23:19:17 +08:00
|
|
|
kind: kind.clone(),
|
2022-10-03 22:15:10 +08:00
|
|
|
};
|
|
|
|
self.all_tasks
|
|
|
|
.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
|
|
|
|
|
|
|
|
if let Some(indexes) = task.indexes() {
|
|
|
|
for index in indexes {
|
|
|
|
self.update_index(&mut wtxn, index, |bitmap| {
|
|
|
|
bitmap.insert(task.uid);
|
|
|
|
})?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
self.update_status(&mut wtxn, Status::Enqueued, |bitmap| {
|
|
|
|
bitmap.insert(task.uid);
|
|
|
|
})?;
|
|
|
|
|
|
|
|
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
|
|
|
|
(bitmap.insert(task.uid));
|
|
|
|
})?;
|
|
|
|
|
|
|
|
match wtxn.commit() {
|
|
|
|
Ok(()) => (),
|
2022-10-13 21:02:59 +08:00
|
|
|
_e @ Err(_) => {
|
2022-10-18 21:04:14 +08:00
|
|
|
self.delete_persisted_task_data(&task)?;
|
2022-10-13 21:02:59 +08:00
|
|
|
// _e?;
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-17 23:19:17 +08:00
|
|
|
// If the registered task is a task cancelation
|
|
|
|
// we inform the processing tasks to stop (if necessary).
|
|
|
|
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
|
|
|
|
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
|
|
|
|
self.processing_tasks
|
|
|
|
.read()
|
|
|
|
.unwrap()
|
|
|
|
.cancel_processing_tasks(&tasks_to_cancel);
|
|
|
|
}
|
|
|
|
|
2022-10-10 22:20:35 +08:00
|
|
|
// notify the scheduler loop to execute a new tick
|
|
|
|
self.wake_up.signal();
|
2022-10-03 22:15:10 +08:00
|
|
|
|
2022-10-12 09:21:25 +08:00
|
|
|
Ok(task)
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
|
|
|
|
2022-10-16 07:39:01 +08:00
|
|
|
/// Register a new task comming from a dump in the scheduler.
|
|
|
|
/// By takinig a mutable ref we're pretty sure no one will ever import a dump while actix is running.
|
2022-10-17 22:45:00 +08:00
|
|
|
pub fn register_dumped_task(
|
2022-10-16 07:39:01 +08:00
|
|
|
&mut self,
|
|
|
|
task: TaskDump,
|
|
|
|
content_file: Option<Box<UpdateFile>>,
|
|
|
|
) -> Result<Task> {
|
|
|
|
// Currently we don't need to access the tasks queue while loading a dump thus I can block everything.
|
|
|
|
let mut wtxn = self.env.write_txn()?;
|
|
|
|
|
2022-10-16 08:38:36 +08:00
|
|
|
let content_uuid = match content_file {
|
|
|
|
Some(content_file) if task.status == Status::Enqueued => {
|
|
|
|
let (uuid, mut file) = self.create_update_file()?;
|
|
|
|
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
|
|
|
|
for doc in content_file {
|
|
|
|
builder.append_json_object(&doc?)?;
|
|
|
|
}
|
|
|
|
builder.into_inner()?;
|
|
|
|
file.persist()?;
|
|
|
|
|
|
|
|
Some(uuid)
|
2022-10-16 07:39:01 +08:00
|
|
|
}
|
2022-10-16 08:38:36 +08:00
|
|
|
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
|
|
|
|
// in case we try to open it later.
|
2022-10-17 22:45:00 +08:00
|
|
|
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
|
2022-10-16 08:38:36 +08:00
|
|
|
_ => None,
|
2022-10-16 07:39:01 +08:00
|
|
|
};
|
|
|
|
|
|
|
|
let task = Task {
|
|
|
|
uid: task.uid,
|
|
|
|
enqueued_at: task.enqueued_at,
|
|
|
|
started_at: task.started_at,
|
|
|
|
finished_at: task.finished_at,
|
|
|
|
error: task.error,
|
2022-10-18 19:57:58 +08:00
|
|
|
canceled_by: task.canceled_by,
|
2022-10-16 07:39:01 +08:00
|
|
|
details: task.details,
|
|
|
|
status: task.status,
|
|
|
|
kind: match task.kind {
|
|
|
|
KindDump::DocumentImport {
|
|
|
|
primary_key,
|
|
|
|
method,
|
|
|
|
documents_count,
|
|
|
|
allow_index_creation,
|
|
|
|
} => KindWithContent::DocumentImport {
|
|
|
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
|
|
|
primary_key,
|
|
|
|
method,
|
|
|
|
content_file: content_uuid.ok_or(Error::CorruptedDump)?,
|
|
|
|
documents_count,
|
|
|
|
allow_index_creation,
|
|
|
|
},
|
|
|
|
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
|
|
|
|
documents_ids,
|
|
|
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
|
|
|
},
|
|
|
|
KindDump::DocumentClear => KindWithContent::DocumentClear {
|
|
|
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
|
|
|
},
|
|
|
|
KindDump::Settings {
|
|
|
|
settings,
|
|
|
|
is_deletion,
|
|
|
|
allow_index_creation,
|
|
|
|
} => KindWithContent::Settings {
|
|
|
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
|
|
|
new_settings: settings,
|
|
|
|
is_deletion,
|
|
|
|
allow_index_creation,
|
|
|
|
},
|
|
|
|
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
|
|
|
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
|
|
|
},
|
|
|
|
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
|
|
|
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
|
|
|
primary_key,
|
|
|
|
},
|
|
|
|
KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate {
|
|
|
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
|
|
|
primary_key,
|
|
|
|
},
|
|
|
|
KindDump::IndexSwap { lhs, rhs } => KindWithContent::IndexSwap { lhs, rhs },
|
2022-10-17 23:19:17 +08:00
|
|
|
KindDump::TaskCancelation { query, tasks } => {
|
|
|
|
KindWithContent::TaskCancelation { query, tasks }
|
|
|
|
}
|
2022-10-18 00:11:28 +08:00
|
|
|
KindDump::TasksDeletion { query, tasks } => {
|
2022-10-17 21:11:35 +08:00
|
|
|
KindWithContent::TaskDeletion { query, tasks }
|
2022-10-16 07:39:01 +08:00
|
|
|
}
|
2022-10-17 23:38:31 +08:00
|
|
|
KindDump::DumpExport {
|
2022-10-16 07:39:01 +08:00
|
|
|
dump_uid,
|
2022-10-17 23:38:31 +08:00
|
|
|
keys,
|
|
|
|
instance_uid,
|
|
|
|
} => KindWithContent::DumpExport {
|
|
|
|
dump_uid,
|
|
|
|
keys,
|
2022-10-16 07:39:01 +08:00
|
|
|
instance_uid,
|
|
|
|
},
|
|
|
|
KindDump::Snapshot => KindWithContent::Snapshot,
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
self.all_tasks
|
2022-10-17 22:45:00 +08:00
|
|
|
.put(&mut wtxn, &BEU32::new(task.uid), &task)?;
|
2022-10-16 07:39:01 +08:00
|
|
|
|
|
|
|
if let Some(indexes) = task.indexes() {
|
|
|
|
for index in indexes {
|
|
|
|
self.update_index(&mut wtxn, index, |bitmap| {
|
|
|
|
bitmap.insert(task.uid);
|
|
|
|
})?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
self.update_status(&mut wtxn, task.status, |bitmap| {
|
|
|
|
bitmap.insert(task.uid);
|
|
|
|
})?;
|
|
|
|
|
|
|
|
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
|
|
|
|
(bitmap.insert(task.uid));
|
|
|
|
})?;
|
|
|
|
|
2022-10-17 23:14:44 +08:00
|
|
|
wtxn.commit()?;
|
2022-10-16 08:44:27 +08:00
|
|
|
self.wake_up.signal();
|
2022-10-17 23:14:44 +08:00
|
|
|
|
2022-10-16 07:39:01 +08:00
|
|
|
Ok(task)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Create a new index without any associated task.
|
|
|
|
pub fn create_raw_index(&self, name: &str) -> Result<Index> {
|
|
|
|
let mut wtxn = self.env.write_txn()?;
|
2022-10-16 09:14:01 +08:00
|
|
|
let index = self.index_mapper.create_index(&mut wtxn, name)?;
|
|
|
|
wtxn.commit()?;
|
|
|
|
|
|
|
|
Ok(index)
|
2022-10-16 07:39:01 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> {
|
2022-10-03 22:15:10 +08:00
|
|
|
Ok(self.file_store.new_update()?)
|
|
|
|
}
|
2022-10-17 19:11:12 +08:00
|
|
|
|
2022-10-10 21:51:28 +08:00
|
|
|
#[cfg(test)]
|
2022-10-16 07:39:01 +08:00
|
|
|
pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, file_store::File)> {
|
2022-10-11 15:55:03 +08:00
|
|
|
Ok(self.file_store.new_update_with_uuid(uuid)?)
|
2022-10-10 21:51:28 +08:00
|
|
|
}
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
pub fn delete_update_file(&self, uuid: Uuid) -> Result<()> {
|
|
|
|
Ok(self.file_store.delete(uuid)?)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Create and execute and store the result of one batch of registered tasks.
|
2022-10-10 22:19:23 +08:00
|
|
|
///
|
|
|
|
/// Returns the number of processed tasks.
|
|
|
|
fn tick(&self) -> Result<usize> {
|
2022-10-03 22:15:10 +08:00
|
|
|
#[cfg(test)]
|
|
|
|
self.test_breakpoint_sdr.send(Breakpoint::Start).unwrap();
|
|
|
|
|
|
|
|
let rtxn = self.env.read_txn()?;
|
|
|
|
let batch = match self.create_next_batch(&rtxn)? {
|
|
|
|
Some(batch) => batch,
|
2022-10-10 22:19:23 +08:00
|
|
|
None => return Ok(0),
|
2022-10-03 22:15:10 +08:00
|
|
|
};
|
|
|
|
// we don't need this transaction any longer.
|
|
|
|
drop(rtxn);
|
|
|
|
|
|
|
|
// 1. store the starting date with the bitmap of processing tasks.
|
|
|
|
let mut ids = batch.ids();
|
|
|
|
ids.sort_unstable();
|
2022-10-10 22:19:23 +08:00
|
|
|
let processed_tasks = ids.len();
|
2022-10-03 22:15:10 +08:00
|
|
|
let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap();
|
|
|
|
let started_at = OffsetDateTime::now_utc();
|
2022-10-17 19:54:35 +08:00
|
|
|
self.processing_tasks
|
|
|
|
.write()
|
|
|
|
.unwrap()
|
|
|
|
.start_processing_at(started_at, processing_tasks);
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
{
|
|
|
|
self.test_breakpoint_sdr
|
|
|
|
.send(Breakpoint::BatchCreated)
|
|
|
|
.unwrap();
|
|
|
|
self.test_breakpoint_sdr
|
|
|
|
.send(Breakpoint::BeforeProcessing)
|
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
// 2. Process the tasks
|
|
|
|
let res = self.process_batch(batch);
|
|
|
|
let mut wtxn = self.env.write_txn()?;
|
|
|
|
let finished_at = OffsetDateTime::now_utc();
|
|
|
|
match res {
|
|
|
|
Ok(tasks) => {
|
|
|
|
for mut task in tasks {
|
|
|
|
task.started_at = Some(started_at);
|
|
|
|
task.finished_at = Some(finished_at);
|
|
|
|
self.update_task(&mut wtxn, &task)?;
|
2022-10-18 21:04:14 +08:00
|
|
|
self.delete_persisted_task_data(&task)?;
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
2022-10-13 21:02:59 +08:00
|
|
|
log::info!("A batch of tasks was successfully completed.");
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
2022-10-17 23:19:17 +08:00
|
|
|
// If we have an abortion error we must stop the tick here and re-schedule tasks.
|
|
|
|
Err(Error::Milli(milli::Error::InternalError(
|
|
|
|
milli::InternalError::AbortedIndexation,
|
|
|
|
))) => {
|
|
|
|
// TODO should we add a breakpoint here?
|
|
|
|
wtxn.abort()?;
|
|
|
|
return Ok(0);
|
|
|
|
}
|
2022-10-03 22:15:10 +08:00
|
|
|
// In case of a failure we must get back and patch all the tasks with the error.
|
2022-10-05 22:48:43 +08:00
|
|
|
Err(err) => {
|
|
|
|
let error: ResponseError = err.into();
|
2022-10-03 22:15:10 +08:00
|
|
|
for id in ids {
|
|
|
|
let mut task = self.get_task(&wtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
|
|
|
task.started_at = Some(started_at);
|
|
|
|
task.finished_at = Some(finished_at);
|
|
|
|
task.status = Status::Failed;
|
2022-10-05 22:48:43 +08:00
|
|
|
task.error = Some(error.clone());
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
self.update_task(&mut wtxn, &task)?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-10-17 19:54:35 +08:00
|
|
|
self.processing_tasks
|
|
|
|
.write()
|
|
|
|
.unwrap()
|
|
|
|
.stop_processing_at(finished_at);
|
2022-10-03 22:15:10 +08:00
|
|
|
wtxn.commit()?;
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
self.test_breakpoint_sdr
|
|
|
|
.send(Breakpoint::AfterProcessing)
|
|
|
|
.unwrap();
|
|
|
|
|
2022-10-10 22:19:23 +08:00
|
|
|
Ok(processed_tasks)
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
2022-10-18 21:04:14 +08:00
|
|
|
|
|
|
|
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
|
|
|
|
match &task.kind {
|
|
|
|
KindWithContent::DocumentImport { content_file, .. } => {
|
|
|
|
self.delete_update_file(*content_file)
|
|
|
|
}
|
|
|
|
KindWithContent::DocumentDeletion { .. }
|
|
|
|
| KindWithContent::DocumentClear { .. }
|
|
|
|
| KindWithContent::Settings { .. }
|
|
|
|
| KindWithContent::IndexDeletion { .. }
|
|
|
|
| KindWithContent::IndexCreation { .. }
|
|
|
|
| KindWithContent::IndexUpdate { .. }
|
|
|
|
| KindWithContent::IndexSwap { .. }
|
2022-10-18 17:02:46 +08:00
|
|
|
| KindWithContent::TaskCancelation { .. }
|
2022-10-18 21:04:14 +08:00
|
|
|
| KindWithContent::TaskDeletion { .. }
|
|
|
|
| KindWithContent::DumpExport { .. }
|
|
|
|
| KindWithContent::Snapshot => Ok(()),
|
|
|
|
}
|
|
|
|
}
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
|
|
|
|
2022-09-15 18:23:41 +08:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
2022-10-03 22:15:10 +08:00
|
|
|
use big_s::S;
|
2022-10-18 01:24:06 +08:00
|
|
|
use file_store::File;
|
2022-10-13 17:09:00 +08:00
|
|
|
use meili_snap::snapshot;
|
2022-10-11 23:42:43 +08:00
|
|
|
use meilisearch_types::milli::update::IndexDocumentsMethod::ReplaceDocuments;
|
2022-10-03 22:15:10 +08:00
|
|
|
use tempfile::TempDir;
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
2022-10-10 23:02:28 +08:00
|
|
|
use crate::snapshot::snapshot_index_scheduler;
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
use super::*;
|
|
|
|
|
2022-10-10 23:02:28 +08:00
|
|
|
/// Return a `KindWithContent::IndexCreation` task
|
|
|
|
fn index_creation_task(index: &'static str, primary_key: &'static str) -> KindWithContent {
|
|
|
|
KindWithContent::IndexCreation {
|
|
|
|
index_uid: S(index),
|
|
|
|
primary_key: Some(S(primary_key)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
/// Create a `KindWithContent::DocumentImport` task that imports documents.
|
|
|
|
///
|
|
|
|
/// - `index_uid` is given as parameter
|
|
|
|
/// - `primary_key` is given as parameter
|
|
|
|
/// - `method` is set to `ReplaceDocuments`
|
|
|
|
/// - `content_file` is given as parameter
|
|
|
|
/// - `documents_count` is given as parameter
|
|
|
|
/// - `allow_index_creation` is set to `true`
|
|
|
|
fn replace_document_import_task(
|
|
|
|
index: &'static str,
|
|
|
|
primary_key: Option<&'static str>,
|
|
|
|
content_file_uuid: u128,
|
|
|
|
documents_count: u64,
|
|
|
|
) -> KindWithContent {
|
|
|
|
KindWithContent::DocumentImport {
|
|
|
|
index_uid: S(index),
|
|
|
|
primary_key: primary_key.map(ToOwned::to_owned),
|
|
|
|
method: ReplaceDocuments,
|
|
|
|
content_file: Uuid::from_u128(content_file_uuid),
|
|
|
|
documents_count: documents_count,
|
|
|
|
allow_index_creation: true,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Create an update file with the given file uuid.
|
|
|
|
///
|
|
|
|
/// The update file contains just one simple document whose id is given by `document_id`.
|
|
|
|
///
|
|
|
|
/// The uuid of the file and its documents count is returned.
|
|
|
|
fn sample_documents(
|
|
|
|
index_scheduler: &IndexScheduler,
|
|
|
|
file_uuid: u128,
|
|
|
|
document_id: usize,
|
|
|
|
) -> (File, u64) {
|
|
|
|
let content = format!(
|
|
|
|
r#"
|
|
|
|
{{
|
|
|
|
"id" : "{document_id}"
|
|
|
|
}}"#
|
|
|
|
);
|
|
|
|
|
|
|
|
let (_uuid, mut file) = index_scheduler
|
|
|
|
.create_update_file_with_uuid(file_uuid)
|
|
|
|
.unwrap();
|
|
|
|
let documents_count =
|
2022-10-11 23:42:43 +08:00
|
|
|
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
|
|
|
|
.unwrap() as u64;
|
2022-10-10 23:02:28 +08:00
|
|
|
(file, documents_count)
|
|
|
|
}
|
|
|
|
|
2022-10-03 22:15:10 +08:00
|
|
|
impl IndexScheduler {
|
2022-10-10 23:00:56 +08:00
|
|
|
pub fn test(autobatching: bool) -> (Self, IndexSchedulerHandle) {
|
2022-10-03 22:15:10 +08:00
|
|
|
let tempdir = TempDir::new().unwrap();
|
|
|
|
let (sender, receiver) = crossbeam::channel::bounded(0);
|
|
|
|
|
|
|
|
let index_scheduler = Self::new(
|
|
|
|
tempdir.path().join("db_path"),
|
|
|
|
tempdir.path().join("file_store"),
|
|
|
|
tempdir.path().join("indexes"),
|
2022-10-13 21:02:59 +08:00
|
|
|
tempdir.path().join("dumps"),
|
2022-10-03 22:15:10 +08:00
|
|
|
1024 * 1024,
|
|
|
|
IndexerConfig::default(),
|
2022-10-10 23:00:56 +08:00
|
|
|
autobatching, // enable autobatching
|
2022-10-03 22:15:10 +08:00
|
|
|
sender,
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let index_scheduler_handle = IndexSchedulerHandle {
|
|
|
|
_tempdir: tempdir,
|
|
|
|
test_breakpoint_rcv: receiver,
|
|
|
|
};
|
|
|
|
|
|
|
|
(index_scheduler, index_scheduler_handle)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct IndexSchedulerHandle {
|
|
|
|
_tempdir: TempDir,
|
|
|
|
test_breakpoint_rcv: crossbeam::channel::Receiver<Breakpoint>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl IndexSchedulerHandle {
|
|
|
|
/// Wait until the provided breakpoint is reached.
|
|
|
|
fn wait_till(&self, breakpoint: Breakpoint) {
|
|
|
|
self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint);
|
|
|
|
}
|
|
|
|
|
2022-10-10 23:02:28 +08:00
|
|
|
#[allow(unused)]
|
2022-10-03 22:15:10 +08:00
|
|
|
/// Wait until the provided breakpoint is reached.
|
|
|
|
fn next_breakpoint(&self) -> Breakpoint {
|
|
|
|
self.test_breakpoint_rcv.recv().unwrap()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// The scheduler will not stop on breakpoints anymore.
|
|
|
|
fn dont_block(self) {
|
|
|
|
std::thread::spawn(move || loop {
|
|
|
|
// unroll and ignore all the state the scheduler is going to send us.
|
|
|
|
self.test_breakpoint_rcv.iter().last();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn register() {
|
2022-10-10 23:02:28 +08:00
|
|
|
// In this test, the handle doesn't make any progress, we only check that the tasks are registered
|
2022-10-11 17:58:33 +08:00
|
|
|
let (index_scheduler, _handle) = IndexScheduler::test(true);
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
let kinds = [
|
2022-10-10 23:02:28 +08:00
|
|
|
index_creation_task("catto", "mouse"),
|
|
|
|
replace_document_import_task("catto", None, 0, 12),
|
2022-10-17 23:19:17 +08:00
|
|
|
KindWithContent::TaskCancelation {
|
|
|
|
query: format!("uid=0,1"),
|
2022-10-18 20:48:40 +08:00
|
|
|
tasks: RoaringBitmap::from_iter([0, 1]),
|
2022-10-17 23:19:17 +08:00
|
|
|
},
|
2022-10-10 23:02:28 +08:00
|
|
|
replace_document_import_task("catto", None, 1, 50),
|
|
|
|
replace_document_import_task("doggo", Some("bone"), 2, 5000),
|
2022-10-03 22:15:10 +08:00
|
|
|
];
|
|
|
|
for (idx, kind) in kinds.into_iter().enumerate() {
|
|
|
|
let k = kind.as_kind();
|
|
|
|
let task = index_scheduler.register(kind).unwrap();
|
|
|
|
|
|
|
|
assert_eq!(task.uid, idx as u32);
|
|
|
|
assert_eq!(task.status, Status::Enqueued);
|
2022-10-12 09:21:25 +08:00
|
|
|
assert_eq!(task.kind.as_kind(), k);
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
|
|
|
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler));
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn insert_task_while_another_task_is_processing() {
|
2022-10-10 23:00:56 +08:00
|
|
|
let (index_scheduler, handle) = IndexScheduler::test(true);
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
index_scheduler.register(KindWithContent::Snapshot).unwrap();
|
|
|
|
handle.wait_till(Breakpoint::BatchCreated);
|
|
|
|
// while the task is processing can we register another task?
|
|
|
|
index_scheduler.register(KindWithContent::Snapshot).unwrap();
|
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::IndexDeletion {
|
|
|
|
index_uid: S("doggos"),
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler));
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
|
|
|
|
2022-10-10 22:18:35 +08:00
|
|
|
/// We send a lot of tasks but notify the tasks scheduler only once as
|
|
|
|
/// we send them very fast, we must make sure that they are all processed.
|
|
|
|
#[test]
|
|
|
|
fn process_tasks_inserted_without_new_signal() {
|
2022-10-10 23:00:56 +08:00
|
|
|
let (index_scheduler, handle) = IndexScheduler::test(true);
|
2022-10-10 22:18:35 +08:00
|
|
|
|
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::IndexCreation {
|
|
|
|
index_uid: S("doggos"),
|
|
|
|
primary_key: None,
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::IndexCreation {
|
|
|
|
index_uid: S("cattos"),
|
|
|
|
primary_key: None,
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::IndexDeletion {
|
|
|
|
index_uid: S("doggos"),
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
handle.wait_till(Breakpoint::Start);
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
|
|
|
|
|
|
|
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
|
|
|
|
tasks.reverse();
|
|
|
|
assert_eq!(tasks.len(), 3);
|
|
|
|
assert_eq!(tasks[0].status, Status::Succeeded);
|
|
|
|
assert_eq!(tasks[1].status, Status::Succeeded);
|
|
|
|
assert_eq!(tasks[2].status, Status::Succeeded);
|
|
|
|
}
|
|
|
|
|
2022-10-10 23:00:56 +08:00
|
|
|
#[test]
|
|
|
|
fn process_tasks_without_autobatching() {
|
|
|
|
let (index_scheduler, handle) = IndexScheduler::test(false);
|
|
|
|
|
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::IndexCreation {
|
|
|
|
index_uid: S("doggos"),
|
|
|
|
primary_key: None,
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::DocumentClear {
|
|
|
|
index_uid: S("doggos"),
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::DocumentClear {
|
|
|
|
index_uid: S("doggos"),
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::DocumentClear {
|
|
|
|
index_uid: S("doggos"),
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
|
|
|
|
|
|
|
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
|
|
|
|
tasks.reverse();
|
|
|
|
assert_eq!(tasks.len(), 4);
|
|
|
|
assert_eq!(tasks[0].status, Status::Succeeded);
|
|
|
|
assert_eq!(tasks[1].status, Status::Succeeded);
|
|
|
|
assert_eq!(tasks[2].status, Status::Succeeded);
|
|
|
|
assert_eq!(tasks[3].status, Status::Succeeded);
|
|
|
|
}
|
|
|
|
|
2022-10-06 22:53:21 +08:00
|
|
|
#[test]
|
2022-10-10 23:02:28 +08:00
|
|
|
fn task_deletion_undeleteable() {
|
2022-10-11 17:58:33 +08:00
|
|
|
let (index_scheduler, handle) = IndexScheduler::test(true);
|
2022-10-06 22:53:21 +08:00
|
|
|
|
|
|
|
let to_enqueue = [
|
2022-10-10 23:02:28 +08:00
|
|
|
index_creation_task("catto", "mouse"),
|
|
|
|
replace_document_import_task("catto", None, 0, 12),
|
|
|
|
replace_document_import_task("doggo", Some("bone"), 1, 5000),
|
2022-10-06 22:53:21 +08:00
|
|
|
];
|
|
|
|
for task in to_enqueue {
|
|
|
|
let _ = index_scheduler.register(task).unwrap();
|
|
|
|
}
|
2022-10-10 21:51:28 +08:00
|
|
|
|
2022-10-10 23:02:28 +08:00
|
|
|
// here we have registered all the tasks, but the index scheduler
|
|
|
|
// has not progressed at all
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
|
2022-10-06 22:53:21 +08:00
|
|
|
|
2022-10-10 23:02:28 +08:00
|
|
|
index_scheduler
|
2022-10-13 17:09:00 +08:00
|
|
|
.register(KindWithContent::TaskDeletion {
|
2022-10-10 23:02:28 +08:00
|
|
|
query: "test_query".to_owned(),
|
2022-10-13 17:09:00 +08:00
|
|
|
tasks: RoaringBitmap::from_iter(&[0, 1]),
|
2022-10-10 23:02:28 +08:00
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
// again, no progress made at all, but one more task is registered
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_enqueued");
|
2022-10-06 22:53:21 +08:00
|
|
|
|
2022-10-10 23:02:28 +08:00
|
|
|
// now we create the first batch
|
2022-10-06 22:53:21 +08:00
|
|
|
handle.wait_till(Breakpoint::BatchCreated);
|
|
|
|
|
2022-10-10 23:02:28 +08:00
|
|
|
// the task deletion should now be "processing"
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processing");
|
2022-10-06 22:53:21 +08:00
|
|
|
|
2022-10-10 18:57:17 +08:00
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
2022-10-06 22:53:21 +08:00
|
|
|
|
2022-10-10 23:02:28 +08:00
|
|
|
// after the task deletion is processed, no task should actually have been deleted,
|
|
|
|
// because the tasks with ids 0 and 1 were still "enqueued", and thus undeleteable
|
|
|
|
// the "task deletion" task should be marked as "succeeded" and, in its details, the
|
|
|
|
// number of deleted tasks should be 0
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_done");
|
2022-10-10 23:02:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn task_deletion_deleteable() {
|
2022-10-11 17:58:33 +08:00
|
|
|
let (index_scheduler, handle) = IndexScheduler::test(true);
|
2022-10-10 23:02:28 +08:00
|
|
|
|
|
|
|
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
|
|
|
|
let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1);
|
|
|
|
|
|
|
|
let to_enqueue = [
|
|
|
|
replace_document_import_task("catto", None, 0, documents_count0),
|
|
|
|
replace_document_import_task("doggo", Some("bone"), 1, documents_count1),
|
|
|
|
];
|
|
|
|
|
|
|
|
for task in to_enqueue {
|
|
|
|
let _ = index_scheduler.register(task).unwrap();
|
|
|
|
}
|
|
|
|
file0.persist().unwrap();
|
|
|
|
file1.persist().unwrap();
|
|
|
|
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
|
2022-10-10 23:02:28 +08:00
|
|
|
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
|
|
|
// first addition of documents should be successful
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_processed");
|
2022-10-10 23:02:28 +08:00
|
|
|
|
|
|
|
// Now we delete the first task
|
|
|
|
index_scheduler
|
2022-10-13 17:09:00 +08:00
|
|
|
.register(KindWithContent::TaskDeletion {
|
2022-10-10 23:02:28 +08:00
|
|
|
query: "test_query".to_owned(),
|
2022-10-13 17:09:00 +08:00
|
|
|
tasks: RoaringBitmap::from_iter(&[0]),
|
2022-10-10 23:02:28 +08:00
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processed");
|
2022-10-06 22:53:21 +08:00
|
|
|
}
|
|
|
|
|
2022-10-15 17:38:43 +08:00
|
|
|
#[test]
|
|
|
|
fn task_deletion_delete_same_task_twice() {
|
|
|
|
let (index_scheduler, handle) = IndexScheduler::test(true);
|
|
|
|
|
|
|
|
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
|
|
|
|
let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1);
|
|
|
|
|
|
|
|
let to_enqueue = [
|
|
|
|
replace_document_import_task("catto", None, 0, documents_count0),
|
|
|
|
replace_document_import_task("doggo", Some("bone"), 1, documents_count1),
|
|
|
|
];
|
|
|
|
|
|
|
|
for task in to_enqueue {
|
|
|
|
let _ = index_scheduler.register(task).unwrap();
|
|
|
|
}
|
|
|
|
file0.persist().unwrap();
|
|
|
|
file1.persist().unwrap();
|
|
|
|
|
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
|
|
|
|
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
|
|
|
// first addition of documents should be successful
|
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_processed");
|
|
|
|
|
|
|
|
// Now we delete the first task multiple times in a row
|
|
|
|
for _ in 0..2 {
|
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::TaskDeletion {
|
|
|
|
query: "test_query".to_owned(),
|
|
|
|
tasks: RoaringBitmap::from_iter(&[0]),
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
for _ in 0..2 {
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
|
|
|
}
|
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processed");
|
|
|
|
}
|
|
|
|
|
2022-10-03 22:15:10 +08:00
|
|
|
#[test]
|
|
|
|
fn document_addition() {
|
2022-10-10 23:00:56 +08:00
|
|
|
let (index_scheduler, handle) = IndexScheduler::test(true);
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
let content = r#"
|
|
|
|
{
|
|
|
|
"id": 1,
|
|
|
|
"doggo": "bob"
|
|
|
|
}"#;
|
|
|
|
|
2022-10-10 21:51:28 +08:00
|
|
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
2022-10-03 22:15:10 +08:00
|
|
|
let documents_count =
|
2022-10-11 23:42:43 +08:00
|
|
|
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
|
|
|
|
.unwrap() as u64;
|
2022-10-03 22:15:10 +08:00
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::DocumentImport {
|
|
|
|
index_uid: S("doggos"),
|
|
|
|
primary_key: Some(S("id")),
|
|
|
|
method: ReplaceDocuments,
|
|
|
|
content_file: uuid,
|
|
|
|
documents_count,
|
|
|
|
allow_index_creation: true,
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
file.persist().unwrap();
|
|
|
|
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler));
|
2022-10-03 22:15:10 +08:00
|
|
|
|
|
|
|
handle.wait_till(Breakpoint::BatchCreated);
|
|
|
|
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler));
|
2022-10-03 22:15:10 +08:00
|
|
|
|
2022-10-10 21:51:28 +08:00
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
2022-10-03 22:15:10 +08:00
|
|
|
|
2022-10-13 17:09:00 +08:00
|
|
|
snapshot!(snapshot_index_scheduler(&index_scheduler));
|
2022-10-03 22:15:10 +08:00
|
|
|
}
|
|
|
|
|
2022-10-13 16:57:33 +08:00
|
|
|
#[test]
|
|
|
|
fn do_not_batch_task_of_different_indexes() {
|
|
|
|
let (index_scheduler, handle) = IndexScheduler::test(true);
|
|
|
|
let index_names = ["doggos", "cattos", "girafos"];
|
|
|
|
|
|
|
|
for name in index_names {
|
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::IndexCreation {
|
|
|
|
index_uid: name.to_string(),
|
|
|
|
primary_key: None,
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
for name in index_names {
|
|
|
|
index_scheduler
|
|
|
|
.register(KindWithContent::DocumentClear {
|
|
|
|
index_uid: name.to_string(),
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
for _ in 0..(index_names.len() * 2) {
|
|
|
|
handle.wait_till(Breakpoint::AfterProcessing);
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
|
|
|
|
tasks.reverse();
|
|
|
|
assert_eq!(tasks.len(), 6);
|
|
|
|
assert_eq!(tasks[0].status, Status::Succeeded);
|
|
|
|
assert_eq!(tasks[1].status, Status::Succeeded);
|
|
|
|
assert_eq!(tasks[2].status, Status::Succeeded);
|
|
|
|
assert_eq!(tasks[3].status, Status::Succeeded);
|
|
|
|
assert_eq!(tasks[4].status, Status::Succeeded);
|
|
|
|
assert_eq!(tasks[5].status, Status::Succeeded);
|
|
|
|
}
|
|
|
|
|
2022-09-15 18:23:41 +08:00
|
|
|
#[macro_export]
|
2022-10-13 17:09:00 +08:00
|
|
|
macro_rules! debug_snapshot {
|
2022-09-21 18:01:46 +08:00
|
|
|
($value:expr, @$snapshot:literal) => {{
|
|
|
|
let value = format!("{:?}", $value);
|
2022-10-13 17:09:00 +08:00
|
|
|
meili_snap::snapshot!(value, @$snapshot);
|
2022-09-21 18:01:46 +08:00
|
|
|
}};
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn simple_new() {
|
2022-10-10 23:00:56 +08:00
|
|
|
crate::IndexScheduler::test(true);
|
2022-09-21 18:01:46 +08:00
|
|
|
}
|
2022-09-15 18:23:41 +08:00
|
|
|
}
|