2022-09-07 06:10:14 +08:00
|
|
|
|
mod batch;
|
2022-09-08 02:37:15 +08:00
|
|
|
|
mod document_formats;
|
2022-09-06 22:43:59 +08:00
|
|
|
|
pub mod error;
|
2022-09-08 02:08:07 +08:00
|
|
|
|
pub mod index;
|
2022-09-06 22:43:59 +08:00
|
|
|
|
pub mod task;
|
2022-09-08 02:08:07 +08:00
|
|
|
|
mod update_file_store;
|
2022-09-07 06:10:14 +08:00
|
|
|
|
mod utils;
|
2022-09-06 22:43:59 +08:00
|
|
|
|
|
2022-09-08 02:08:07 +08:00
|
|
|
|
use batch::Batch;
|
2022-09-07 06:10:14 +08:00
|
|
|
|
pub use error::Error;
|
2022-09-08 02:08:07 +08:00
|
|
|
|
use index::Index;
|
2022-09-06 22:43:59 +08:00
|
|
|
|
use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str};
|
|
|
|
|
pub use task::Task;
|
2022-09-08 02:08:07 +08:00
|
|
|
|
use task::{Kind, KindWithContent, Status};
|
2022-09-06 22:43:59 +08:00
|
|
|
|
|
|
|
|
|
use std::collections::hash_map::Entry;
|
2022-09-08 02:08:07 +08:00
|
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2022-09-06 22:43:59 +08:00
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::{collections::HashMap, sync::RwLock};
|
|
|
|
|
|
|
|
|
|
use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn};
|
2022-09-08 02:08:07 +08:00
|
|
|
|
use milli::{RoaringBitmapCodec, BEU32};
|
2022-09-06 22:43:59 +08:00
|
|
|
|
use roaring::RoaringBitmap;
|
2022-09-07 07:06:45 +08:00
|
|
|
|
use serde::Deserialize;
|
2022-09-06 22:43:59 +08:00
|
|
|
|
|
2022-09-07 05:49:19 +08:00
|
|
|
|
pub type Result<T> = std::result::Result<T, Error>;
|
2022-09-06 22:43:59 +08:00
|
|
|
|
pub type TaskId = u32;
|
|
|
|
|
type IndexName = String;
|
|
|
|
|
type IndexUuid = String;
|
|
|
|
|
|
2022-09-07 07:06:45 +08:00
|
|
|
|
const DEFAULT_LIMIT: fn() -> u32 = || 20;
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Deserialize)]
|
|
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
|
|
pub struct Query {
|
|
|
|
|
#[serde(default = "DEFAULT_LIMIT")]
|
|
|
|
|
limit: u32,
|
|
|
|
|
from: Option<u32>,
|
|
|
|
|
status: Option<Vec<Status>>,
|
|
|
|
|
#[serde(rename = "type")]
|
|
|
|
|
kind: Option<Vec<Kind>>,
|
|
|
|
|
index_uid: Option<Vec<String>>,
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-06 22:43:59 +08:00
|
|
|
|
/// This module is responsible for two things;
|
|
|
|
|
/// 1. Resolve the name of the indexes.
|
|
|
|
|
/// 2. Schedule the tasks.
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
pub struct IndexScheduler {
|
|
|
|
|
// Keep track of the opened indexes and is used
|
|
|
|
|
// mainly by the index resolver.
|
2022-09-08 02:08:07 +08:00
|
|
|
|
index_map: Arc<RwLock<HashMap<String, Index>>>,
|
2022-09-06 22:43:59 +08:00
|
|
|
|
|
|
|
|
|
/// The list of tasks currently processing.
|
|
|
|
|
processing_tasks: Arc<RwLock<RoaringBitmap>>,
|
|
|
|
|
|
|
|
|
|
/// The LMDB environment which the DBs are associated with.
|
|
|
|
|
env: Env,
|
|
|
|
|
|
|
|
|
|
// The main database, it contains all the tasks accessible by their Id.
|
|
|
|
|
all_tasks: Database<OwnedType<BEU32>, SerdeBincode<Task>>,
|
|
|
|
|
|
|
|
|
|
// All the tasks ids grouped by their status.
|
|
|
|
|
status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
|
|
|
|
|
// All the tasks ids grouped by their kind.
|
2022-09-07 05:49:19 +08:00
|
|
|
|
kind: Database<SerdeBincode<Kind>, RoaringBitmapCodec>,
|
2022-09-06 22:43:59 +08:00
|
|
|
|
|
2022-09-08 02:08:07 +08:00
|
|
|
|
// Tell you if an index is currently available.
|
|
|
|
|
available_index: Database<Str, SerdeBincode<bool>>,
|
2022-09-06 22:43:59 +08:00
|
|
|
|
// Store the tasks associated to an index.
|
2022-09-07 05:49:19 +08:00
|
|
|
|
index_tasks: Database<Str, RoaringBitmapCodec>,
|
2022-09-06 22:43:59 +08:00
|
|
|
|
|
|
|
|
|
// set to true when there is work to do.
|
|
|
|
|
wake_up: Arc<AtomicBool>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl IndexScheduler {
|
2022-09-07 17:21:53 +08:00
|
|
|
|
/// Return the index corresponding to the name. If it wasn't opened before
|
|
|
|
|
/// it'll be opened. But if it doesn't exist on disk it'll throw an
|
|
|
|
|
/// `IndexNotFound` error.
|
2022-09-06 22:43:59 +08:00
|
|
|
|
pub fn index(&self, name: &str) -> Result<Index> {
|
|
|
|
|
let rtxn = self.env.read_txn()?;
|
2022-09-08 02:08:07 +08:00
|
|
|
|
|
|
|
|
|
self.available_index
|
2022-09-06 22:43:59 +08:00
|
|
|
|
.get(&rtxn, name)?
|
2022-09-08 02:08:07 +08:00
|
|
|
|
.ok_or(Error::IndexNotFound(name.to_string()))?;
|
|
|
|
|
|
2022-09-06 22:43:59 +08:00
|
|
|
|
// we clone here to drop the lock before entering the match
|
2022-09-08 02:08:07 +08:00
|
|
|
|
let index = self.index_map.read().unwrap().get(name).cloned();
|
2022-09-06 22:43:59 +08:00
|
|
|
|
let index = match index {
|
|
|
|
|
Some(index) => index,
|
|
|
|
|
// since we're lazy, it's possible that the index doesn't exist yet.
|
|
|
|
|
// We need to open it ourselves.
|
|
|
|
|
None => {
|
|
|
|
|
let mut index_map = self.index_map.write().unwrap();
|
|
|
|
|
// between the read lock and the write lock it's not impossible
|
|
|
|
|
// that someone already opened the index (eg if two search happens
|
|
|
|
|
// at the same time), thus before opening it we check a second time
|
|
|
|
|
// if it's not already there.
|
|
|
|
|
// Since there is a good chance it's not already there we can use
|
|
|
|
|
// the entry method.
|
2022-09-08 02:08:07 +08:00
|
|
|
|
match index_map.entry(name.to_string()) {
|
2022-09-06 22:43:59 +08:00
|
|
|
|
Entry::Vacant(entry) => {
|
2022-09-08 02:08:07 +08:00
|
|
|
|
// TODO: TAMO: get the args from somewhere.
|
|
|
|
|
let index = Index::open(
|
|
|
|
|
name.to_string(),
|
|
|
|
|
name.to_string(),
|
|
|
|
|
100_000_000,
|
|
|
|
|
Arc::default(),
|
|
|
|
|
)?;
|
2022-09-06 22:43:59 +08:00
|
|
|
|
entry.insert(index.clone());
|
|
|
|
|
index
|
|
|
|
|
}
|
|
|
|
|
Entry::Occupied(entry) => entry.get().clone(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Ok(index)
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-07 07:06:45 +08:00
|
|
|
|
/// Returns the tasks corresponding to the query.
|
|
|
|
|
pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> {
|
|
|
|
|
let rtxn = self.env.read_txn()?;
|
|
|
|
|
let last_task_id = match self.last_task_id(&rtxn)? {
|
|
|
|
|
Some(tid) => query.from.map(|from| from.min(tid)).unwrap_or(tid),
|
|
|
|
|
None => return Ok(Vec::new()),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// This is the list of all the tasks.
|
|
|
|
|
let mut tasks = RoaringBitmap::from_iter(0..last_task_id);
|
|
|
|
|
|
|
|
|
|
if let Some(status) = query.status {
|
|
|
|
|
let mut status_tasks = RoaringBitmap::new();
|
|
|
|
|
for status in status {
|
|
|
|
|
status_tasks |= self.get_status(&rtxn, status)?;
|
|
|
|
|
}
|
|
|
|
|
tasks &= status_tasks;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if let Some(kind) = query.kind {
|
|
|
|
|
let mut kind_tasks = RoaringBitmap::new();
|
|
|
|
|
for kind in kind {
|
|
|
|
|
kind_tasks |= self.get_kind(&rtxn, kind)?;
|
|
|
|
|
}
|
|
|
|
|
tasks &= kind_tasks;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if let Some(index) = query.index_uid {
|
|
|
|
|
let mut index_tasks = RoaringBitmap::new();
|
|
|
|
|
for index in index {
|
|
|
|
|
index_tasks |= self.get_index(&rtxn, &index)?;
|
|
|
|
|
}
|
|
|
|
|
tasks &= index_tasks;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.get_existing_tasks(&rtxn, tasks.into_iter().rev().take(query.limit as usize))
|
2022-09-06 22:43:59 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Register a new task in the scheduler. If it fails and data was associated with the task
|
|
|
|
|
/// it tries to delete the file.
|
|
|
|
|
pub fn register(&self, task: Task) -> Result<()> {
|
|
|
|
|
let mut wtxn = self.env.write_txn()?;
|
|
|
|
|
|
|
|
|
|
let task_id = self.next_task_id(&wtxn)?;
|
|
|
|
|
|
|
|
|
|
self.all_tasks
|
|
|
|
|
.append(&mut wtxn, &BEU32::new(task_id), &task)?;
|
|
|
|
|
|
2022-09-07 06:22:58 +08:00
|
|
|
|
if let Some(indexes) = task.indexes() {
|
|
|
|
|
for index in indexes {
|
2022-09-08 02:44:33 +08:00
|
|
|
|
self.update_index(&mut wtxn, index, |bitmap| drop(bitmap.insert(task_id)))?;
|
2022-09-07 06:22:58 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-08 02:44:33 +08:00
|
|
|
|
self.update_status(&mut wtxn, Status::Enqueued, |bitmap| {
|
2022-09-06 22:43:59 +08:00
|
|
|
|
bitmap.insert(task_id);
|
|
|
|
|
})?;
|
|
|
|
|
|
2022-09-08 02:44:33 +08:00
|
|
|
|
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
|
|
|
|
|
(bitmap.insert(task_id));
|
2022-09-06 22:43:59 +08:00
|
|
|
|
})?;
|
|
|
|
|
|
|
|
|
|
// we persist the file in last to be sure everything before was applied successfuly
|
|
|
|
|
task.persist()?;
|
|
|
|
|
|
|
|
|
|
match wtxn.commit() {
|
|
|
|
|
Ok(()) => (),
|
|
|
|
|
e @ Err(_) => {
|
|
|
|
|
task.remove_data()?;
|
|
|
|
|
e?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.notify();
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-08 02:08:07 +08:00
|
|
|
|
/// This worker function must be run in a different thread and must be run only once.
|
|
|
|
|
fn run(&self) {
|
|
|
|
|
loop {
|
|
|
|
|
// TODO: TAMO: remove this horrible spinlock in favor of a sleep / channel / we’ll see
|
|
|
|
|
while !self.wake_up.swap(false, Ordering::Relaxed) {}
|
|
|
|
|
|
|
|
|
|
let mut wtxn = match self.env.write_txn() {
|
|
|
|
|
Ok(wtxn) => wtxn,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
log::error!("{}", e);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
};
|
2022-09-08 02:38:57 +08:00
|
|
|
|
let mut batch = match self.get_next_batch(&wtxn) {
|
2022-09-08 02:08:07 +08:00
|
|
|
|
Ok(batch) => batch,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
log::error!("{}", e);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let res = self.process_batch(&mut wtxn, &mut batch);
|
|
|
|
|
|
|
|
|
|
// TODO: TAMO: do this later
|
|
|
|
|
// self.handle_batch_result(res);
|
2022-09-08 03:27:06 +08:00
|
|
|
|
|
|
|
|
|
match wtxn.commit() {
|
|
|
|
|
Ok(()) => log::info!("A batch of tasks was successfully completed."),
|
|
|
|
|
Err(e) => {
|
|
|
|
|
log::error!("{}", e);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-09-08 02:08:07 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn process_batch(&self, wtxn: &mut RwTxn, batch: &mut Batch) -> Result<()> {
|
|
|
|
|
match batch {
|
2022-09-08 02:38:57 +08:00
|
|
|
|
Batch::One(task) => match &task.kind {
|
2022-09-08 02:08:07 +08:00
|
|
|
|
KindWithContent::ClearAllDocuments { index_name } => {
|
|
|
|
|
self.index(&index_name)?.clear_documents()?;
|
|
|
|
|
}
|
|
|
|
|
KindWithContent::RenameIndex {
|
|
|
|
|
index_name,
|
|
|
|
|
new_name,
|
|
|
|
|
} => {
|
|
|
|
|
if self.available_index.get(wtxn, &new_name)?.unwrap_or(false) {
|
2022-09-08 02:30:33 +08:00
|
|
|
|
return Err(Error::IndexAlreadyExists(new_name.to_string()));
|
2022-09-08 02:08:07 +08:00
|
|
|
|
}
|
|
|
|
|
todo!("wait for @guigui insight");
|
|
|
|
|
}
|
|
|
|
|
KindWithContent::CreateIndex {
|
|
|
|
|
index_name,
|
|
|
|
|
primary_key,
|
|
|
|
|
} => {
|
|
|
|
|
if self
|
|
|
|
|
.available_index
|
|
|
|
|
.get(wtxn, &index_name)?
|
|
|
|
|
.unwrap_or(false)
|
|
|
|
|
{
|
|
|
|
|
return Err(Error::IndexAlreadyExists(index_name.to_string()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.available_index.put(wtxn, &index_name, &true);
|
2022-09-08 03:27:06 +08:00
|
|
|
|
// TODO: TAMO: give real info to the index
|
|
|
|
|
let index = Index::open(
|
|
|
|
|
index_name.to_string(),
|
|
|
|
|
index_name.to_string(),
|
|
|
|
|
100_000_000,
|
|
|
|
|
Arc::default(),
|
|
|
|
|
)?;
|
|
|
|
|
if let Some(primary_key) = primary_key {
|
|
|
|
|
index.update_primary_key(primary_key.to_string())?;
|
|
|
|
|
}
|
|
|
|
|
self.index_map
|
|
|
|
|
.write()
|
|
|
|
|
.map_err(|_| Error::CorruptedTaskQueue)?
|
|
|
|
|
.insert(index_name.to_string(), index.clone());
|
2022-09-08 02:08:07 +08:00
|
|
|
|
}
|
|
|
|
|
KindWithContent::DeleteIndex { index_name } => {
|
|
|
|
|
self.index_map.write();
|
|
|
|
|
if !self.available_index.delete(wtxn, &index_name)? {
|
|
|
|
|
return Err(Error::IndexNotFound(index_name.to_string()));
|
|
|
|
|
}
|
2022-09-08 03:27:06 +08:00
|
|
|
|
if let Some(index) = self
|
|
|
|
|
.index_map
|
|
|
|
|
.write()
|
|
|
|
|
.map_err(|_| Error::CorruptedTaskQueue)?
|
|
|
|
|
.remove(index_name)
|
|
|
|
|
{
|
|
|
|
|
index.delete()?;
|
|
|
|
|
} else {
|
|
|
|
|
// TODO: TAMO: fix the path
|
|
|
|
|
std::fs::remove_file(index_name)?;
|
|
|
|
|
}
|
2022-09-08 02:08:07 +08:00
|
|
|
|
}
|
|
|
|
|
KindWithContent::SwapIndex { lhs, rhs } => {
|
|
|
|
|
if !self.available_index.get(wtxn, &lhs)?.unwrap_or(false) {
|
|
|
|
|
return Err(Error::IndexNotFound(lhs.to_string()));
|
|
|
|
|
}
|
|
|
|
|
if !self.available_index.get(wtxn, &rhs)?.unwrap_or(false) {
|
|
|
|
|
return Err(Error::IndexNotFound(rhs.to_string()));
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-08 03:27:06 +08:00
|
|
|
|
let lhs_bitmap = self.index_tasks.get(wtxn, lhs)?;
|
|
|
|
|
let rhs_bitmap = self.index_tasks.get(wtxn, rhs)?;
|
|
|
|
|
// the bitmap are lazily created and thus may not exists.
|
|
|
|
|
if let Some(bitmap) = rhs_bitmap {
|
|
|
|
|
self.index_tasks.put(wtxn, lhs, &bitmap)?;
|
|
|
|
|
}
|
|
|
|
|
if let Some(bitmap) = lhs_bitmap {
|
|
|
|
|
self.index_tasks.put(wtxn, rhs, &bitmap)?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut index_map = self
|
2022-09-08 02:30:33 +08:00
|
|
|
|
.index_map
|
|
|
|
|
.write()
|
|
|
|
|
.map_err(|_| Error::CorruptedTaskQueue)?;
|
2022-09-08 02:08:07 +08:00
|
|
|
|
|
2022-09-08 03:27:06 +08:00
|
|
|
|
let lhs_index = index_map.remove(lhs).unwrap();
|
|
|
|
|
let rhs_index = index_map.remove(rhs).unwrap();
|
|
|
|
|
|
|
|
|
|
index_map.insert(lhs.to_string(), rhs_index);
|
|
|
|
|
index_map.insert(rhs.to_string(), lhs_index);
|
2022-09-08 02:08:07 +08:00
|
|
|
|
}
|
|
|
|
|
_ => unreachable!(),
|
|
|
|
|
},
|
|
|
|
|
Batch::Cancel(_) => todo!(),
|
|
|
|
|
Batch::Snapshot(_) => todo!(),
|
|
|
|
|
Batch::Dump(_) => todo!(),
|
|
|
|
|
Batch::Contiguous { tasks, kind } => todo!(),
|
|
|
|
|
Batch::Empty => todo!(),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-07 17:21:53 +08:00
|
|
|
|
/// Notify the scheduler there is or may be work to do.
|
2022-09-06 22:43:59 +08:00
|
|
|
|
pub fn notify(&self) {
|
|
|
|
|
self.wake_up
|
|
|
|
|
.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
|
|
|
}
|
2022-09-07 05:49:19 +08:00
|
|
|
|
}
|