mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 08:48:32 +08:00
Merge pull request #299 from meilisearch/default-update-callbacks
Prefer using a global update callback common to all indexes
This commit is contained in:
commit
d30e5f6231
@ -104,14 +104,14 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box<dy
|
|||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let (sender, receiver) = mpsc::sync_channel(100);
|
let (sender, receiver) = mpsc::sync_channel(100);
|
||||||
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
|
let update_fn =
|
||||||
|
move |_name: &str, update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
|
||||||
let index = match database.open_index(&command.index_name) {
|
let index = match database.open_index(&command.index_name) {
|
||||||
Some(index) => index,
|
Some(index) => index,
|
||||||
None => database.create_index(&command.index_name).unwrap(),
|
None => database.create_index(&command.index_name).unwrap(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let done = database.set_update_callback(&command.index_name, Box::new(update_fn));
|
database.set_update_callback(Box::new(update_fn));
|
||||||
assert!(done, "could not set the index update function");
|
|
||||||
|
|
||||||
let env = &database.env;
|
let env = &database.env;
|
||||||
|
|
||||||
|
@ -11,14 +11,15 @@ use log::debug;
|
|||||||
|
|
||||||
use crate::{store, update, Index, MResult};
|
use crate::{store, update, Index, MResult};
|
||||||
|
|
||||||
pub type BoxUpdateFn = Box<dyn Fn(update::ProcessedUpdateResult) + Send + Sync + 'static>;
|
pub type BoxUpdateFn = Box<dyn Fn(&str, update::ProcessedUpdateResult) + Send + Sync + 'static>;
|
||||||
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
|
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
|
||||||
|
|
||||||
pub struct Database {
|
pub struct Database {
|
||||||
pub env: heed::Env,
|
pub env: heed::Env,
|
||||||
common_store: heed::PolyDatabase,
|
common_store: heed::PolyDatabase,
|
||||||
indexes_store: heed::Database<Str, Unit>,
|
indexes_store: heed::Database<Str, Unit>,
|
||||||
indexes: RwLock<HashMap<String, (Index, Arc<ArcSwapFn>, thread::JoinHandle<()>)>>,
|
indexes: RwLock<HashMap<String, (Index, thread::JoinHandle<()>)>>,
|
||||||
|
update_fn: Arc<ArcSwapFn>,
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! r#break_try {
|
macro_rules! r#break_try {
|
||||||
@ -41,7 +42,13 @@ pub enum UpdateEvent {
|
|||||||
pub type UpdateEvents = Receiver<UpdateEvent>;
|
pub type UpdateEvents = Receiver<UpdateEvent>;
|
||||||
pub type UpdateEventsEmitter = Sender<UpdateEvent>;
|
pub type UpdateEventsEmitter = Sender<UpdateEvent>;
|
||||||
|
|
||||||
fn update_awaiter(receiver: UpdateEvents, env: heed::Env, update_fn: Arc<ArcSwapFn>, index: Index) {
|
fn update_awaiter(
|
||||||
|
receiver: UpdateEvents,
|
||||||
|
env: heed::Env,
|
||||||
|
index_name: &str,
|
||||||
|
update_fn: Arc<ArcSwapFn>,
|
||||||
|
index: Index,
|
||||||
|
) {
|
||||||
let mut receiver = receiver.into_iter();
|
let mut receiver = receiver.into_iter();
|
||||||
while let Some(UpdateEvent::NewUpdate) = receiver.next() {
|
while let Some(UpdateEvent::NewUpdate) = receiver.next() {
|
||||||
loop {
|
loop {
|
||||||
@ -84,7 +91,7 @@ fn update_awaiter(receiver: UpdateEvents, env: heed::Env, update_fn: Arc<ArcSwap
|
|||||||
|
|
||||||
// call the user callback when the update and the result are written consistently
|
// call the user callback when the update and the result are written consistently
|
||||||
if let Some(ref callback) = *update_fn.load() {
|
if let Some(ref callback) = *update_fn.load() {
|
||||||
(callback)(status);
|
(callback)(index_name, status);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -103,6 +110,7 @@ impl Database {
|
|||||||
|
|
||||||
let common_store = env.create_poly_database(Some("common"))?;
|
let common_store = env.create_poly_database(Some("common"))?;
|
||||||
let indexes_store = env.create_database::<Str, Unit>(Some("indexes"))?;
|
let indexes_store = env.create_database::<Str, Unit>(Some("indexes"))?;
|
||||||
|
let update_fn = Arc::new(ArcSwapFn::empty());
|
||||||
|
|
||||||
// list all indexes that needs to be opened
|
// list all indexes that needs to be opened
|
||||||
let mut must_open = Vec::new();
|
let mut must_open = Vec::new();
|
||||||
@ -128,21 +136,27 @@ impl Database {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let update_fn = Arc::new(ArcSwapFn::empty());
|
|
||||||
|
|
||||||
let env_clone = env.clone();
|
let env_clone = env.clone();
|
||||||
let index_clone = index.clone();
|
let index_clone = index.clone();
|
||||||
|
let name_clone = index_name.clone();
|
||||||
let update_fn_clone = update_fn.clone();
|
let update_fn_clone = update_fn.clone();
|
||||||
|
|
||||||
let handle = thread::spawn(move || {
|
let handle = thread::spawn(move || {
|
||||||
update_awaiter(receiver, env_clone, update_fn_clone, index_clone)
|
update_awaiter(
|
||||||
|
receiver,
|
||||||
|
env_clone,
|
||||||
|
&name_clone,
|
||||||
|
update_fn_clone,
|
||||||
|
index_clone,
|
||||||
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
// send an update notification to make sure that
|
// send an update notification to make sure that
|
||||||
// possible pre-boot updates are consumed
|
// possible pre-boot updates are consumed
|
||||||
sender.send(UpdateEvent::NewUpdate).unwrap();
|
sender.send(UpdateEvent::NewUpdate).unwrap();
|
||||||
|
|
||||||
let result = indexes.insert(index_name, (index, update_fn, handle));
|
let result = indexes.insert(index_name, (index, handle));
|
||||||
assert!(
|
assert!(
|
||||||
result.is_none(),
|
result.is_none(),
|
||||||
"The index should not have been already open"
|
"The index should not have been already open"
|
||||||
@ -154,6 +168,7 @@ impl Database {
|
|||||||
common_store,
|
common_store,
|
||||||
indexes_store,
|
indexes_store,
|
||||||
indexes: RwLock::new(indexes),
|
indexes: RwLock::new(indexes),
|
||||||
|
update_fn,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -180,16 +195,21 @@ impl Database {
|
|||||||
|
|
||||||
let env_clone = self.env.clone();
|
let env_clone = self.env.clone();
|
||||||
let index_clone = index.clone();
|
let index_clone = index.clone();
|
||||||
|
let name_clone = name.to_owned();
|
||||||
let no_update_fn = Arc::new(ArcSwapFn::empty());
|
let update_fn_clone = self.update_fn.clone();
|
||||||
let no_update_fn_clone = no_update_fn.clone();
|
|
||||||
|
|
||||||
let handle = thread::spawn(move || {
|
let handle = thread::spawn(move || {
|
||||||
update_awaiter(receiver, env_clone, no_update_fn_clone, index_clone)
|
update_awaiter(
|
||||||
|
receiver,
|
||||||
|
env_clone,
|
||||||
|
&name_clone,
|
||||||
|
update_fn_clone,
|
||||||
|
index_clone,
|
||||||
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
writer.commit()?;
|
writer.commit()?;
|
||||||
entry.insert((index.clone(), no_update_fn, handle));
|
entry.insert((index.clone(), handle));
|
||||||
|
|
||||||
Ok(index)
|
Ok(index)
|
||||||
}
|
}
|
||||||
@ -201,7 +221,7 @@ impl Database {
|
|||||||
let mut indexes_lock = self.indexes.write().unwrap();
|
let mut indexes_lock = self.indexes.write().unwrap();
|
||||||
|
|
||||||
match indexes_lock.remove_entry(name) {
|
match indexes_lock.remove_entry(name) {
|
||||||
Some((name, (index, _fn, handle))) => {
|
Some((name, (index, handle))) => {
|
||||||
// remove the index name from the list of indexes
|
// remove the index name from the list of indexes
|
||||||
// and clear all the LMDB dbi
|
// and clear all the LMDB dbi
|
||||||
let mut writer = self.env.write_txn()?;
|
let mut writer = self.env.write_txn()?;
|
||||||
@ -218,27 +238,13 @@ impl Database {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_update_callback(&self, name: impl AsRef<str>, update_fn: BoxUpdateFn) -> bool {
|
pub fn set_update_callback(&self, update_fn: BoxUpdateFn) {
|
||||||
let indexes_lock = self.indexes.read().unwrap();
|
let update_fn = Some(Arc::new(update_fn));
|
||||||
match indexes_lock.get(name.as_ref()) {
|
self.update_fn.swap(update_fn);
|
||||||
Some((_, current_update_fn, _)) => {
|
|
||||||
let update_fn = Some(Arc::new(update_fn));
|
|
||||||
current_update_fn.swap(update_fn);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
None => false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn unset_update_callback(&self, name: impl AsRef<str>) -> bool {
|
pub fn unset_update_callback(&self) {
|
||||||
let indexes_lock = self.indexes.read().unwrap();
|
self.update_fn.swap(None);
|
||||||
match indexes_lock.get(name.as_ref()) {
|
|
||||||
Some((_, current_update_fn, _)) => {
|
|
||||||
current_update_fn.swap(None);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
None => false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn copy_and_compact_to_path<P: AsRef<Path>>(&self, path: P) -> ZResult<File> {
|
pub fn copy_and_compact_to_path<P: AsRef<Path>>(&self, path: P) -> ZResult<File> {
|
||||||
@ -272,11 +278,12 @@ mod tests {
|
|||||||
let env = &database.env;
|
let env = &database.env;
|
||||||
|
|
||||||
let (sender, receiver) = mpsc::sync_channel(100);
|
let (sender, receiver) = mpsc::sync_channel(100);
|
||||||
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
|
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
|
||||||
|
sender.send(update.update_id).unwrap()
|
||||||
|
};
|
||||||
let index = database.create_index("test").unwrap();
|
let index = database.create_index("test").unwrap();
|
||||||
|
|
||||||
let done = database.set_update_callback("test", Box::new(update_fn));
|
database.set_update_callback(Box::new(update_fn));
|
||||||
assert!(done, "could not set the index update function");
|
|
||||||
|
|
||||||
let schema = {
|
let schema = {
|
||||||
let data = r#"
|
let data = r#"
|
||||||
@ -334,11 +341,12 @@ mod tests {
|
|||||||
let env = &database.env;
|
let env = &database.env;
|
||||||
|
|
||||||
let (sender, receiver) = mpsc::sync_channel(100);
|
let (sender, receiver) = mpsc::sync_channel(100);
|
||||||
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
|
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
|
||||||
|
sender.send(update.update_id).unwrap()
|
||||||
|
};
|
||||||
let index = database.create_index("test").unwrap();
|
let index = database.create_index("test").unwrap();
|
||||||
|
|
||||||
let done = database.set_update_callback("test", Box::new(update_fn));
|
database.set_update_callback(Box::new(update_fn));
|
||||||
assert!(done, "could not set the index update function");
|
|
||||||
|
|
||||||
let schema = {
|
let schema = {
|
||||||
let data = r#"
|
let data = r#"
|
||||||
@ -395,11 +403,12 @@ mod tests {
|
|||||||
let env = &database.env;
|
let env = &database.env;
|
||||||
|
|
||||||
let (sender, receiver) = mpsc::sync_channel(100);
|
let (sender, receiver) = mpsc::sync_channel(100);
|
||||||
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
|
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
|
||||||
|
sender.send(update.update_id).unwrap()
|
||||||
|
};
|
||||||
let index = database.create_index("test").unwrap();
|
let index = database.create_index("test").unwrap();
|
||||||
|
|
||||||
let done = database.set_update_callback("test", Box::new(update_fn));
|
database.set_update_callback(Box::new(update_fn));
|
||||||
assert!(done, "could not set the index update function");
|
|
||||||
|
|
||||||
let schema = {
|
let schema = {
|
||||||
let data = r#"
|
let data = r#"
|
||||||
@ -445,11 +454,12 @@ mod tests {
|
|||||||
let env = &database.env;
|
let env = &database.env;
|
||||||
|
|
||||||
let (sender, receiver) = mpsc::sync_channel(100);
|
let (sender, receiver) = mpsc::sync_channel(100);
|
||||||
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
|
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
|
||||||
|
sender.send(update.update_id).unwrap()
|
||||||
|
};
|
||||||
let index = database.create_index("test").unwrap();
|
let index = database.create_index("test").unwrap();
|
||||||
|
|
||||||
let done = database.set_update_callback("test", Box::new(update_fn));
|
database.set_update_callback(Box::new(update_fn));
|
||||||
assert!(done, "could not set the index update function");
|
|
||||||
|
|
||||||
let schema = {
|
let schema = {
|
||||||
let data = r#"
|
let data = r#"
|
||||||
@ -615,11 +625,12 @@ mod tests {
|
|||||||
let env = &database.env;
|
let env = &database.env;
|
||||||
|
|
||||||
let (sender, receiver) = mpsc::sync_channel(100);
|
let (sender, receiver) = mpsc::sync_channel(100);
|
||||||
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
|
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
|
||||||
|
sender.send(update.update_id).unwrap()
|
||||||
|
};
|
||||||
let index = database.create_index("test").unwrap();
|
let index = database.create_index("test").unwrap();
|
||||||
|
|
||||||
let done = database.set_update_callback("test", Box::new(update_fn));
|
database.set_update_callback(Box::new(update_fn));
|
||||||
assert!(done, "could not set the index update function");
|
|
||||||
|
|
||||||
let schema = {
|
let schema = {
|
||||||
let data = r#"
|
let data = r#"
|
||||||
@ -692,11 +703,12 @@ mod tests {
|
|||||||
let env = &database.env;
|
let env = &database.env;
|
||||||
|
|
||||||
let (sender, receiver) = mpsc::sync_channel(100);
|
let (sender, receiver) = mpsc::sync_channel(100);
|
||||||
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
|
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
|
||||||
|
sender.send(update.update_id).unwrap()
|
||||||
|
};
|
||||||
let index = database.create_index("test").unwrap();
|
let index = database.create_index("test").unwrap();
|
||||||
|
|
||||||
let done = database.set_update_callback("test", Box::new(update_fn));
|
database.set_update_callback(Box::new(update_fn));
|
||||||
assert!(done, "could not set the index update function");
|
|
||||||
|
|
||||||
let schema = {
|
let schema = {
|
||||||
let data = r#"
|
let data = r#"
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
@ -35,7 +34,6 @@ pub struct DataInner {
|
|||||||
pub db_path: String,
|
pub db_path: String,
|
||||||
pub admin_token: Option<String>,
|
pub admin_token: Option<String>,
|
||||||
pub server_pid: Pid,
|
pub server_pid: Pid,
|
||||||
pub accept_updates: Arc<AtomicBool>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DataInner {
|
impl DataInner {
|
||||||
@ -70,25 +68,6 @@ impl DataInner {
|
|||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn last_backup(&self, reader: &heed::RoTxn) -> MResult<Option<DateTime<Utc>>> {
|
|
||||||
match self
|
|
||||||
.db
|
|
||||||
.common_store()
|
|
||||||
.get::<Str, SerdeDatetime>(&reader, "last-backup")?
|
|
||||||
{
|
|
||||||
Some(datetime) => Ok(Some(datetime)),
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_last_backup(&self, writer: &mut heed::RwTxn) -> MResult<()> {
|
|
||||||
self.db
|
|
||||||
.common_store()
|
|
||||||
.put::<Str, SerdeDatetime>(writer, "last-backup", &Utc::now())?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn fields_frequency(
|
pub fn fields_frequency(
|
||||||
&self,
|
&self,
|
||||||
reader: &heed::RoTxn,
|
reader: &heed::RoTxn,
|
||||||
@ -143,14 +122,6 @@ impl DataInner {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn stop_accept_updates(&self) {
|
|
||||||
self.accept_updates.store(false, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn accept_updates(&self) -> bool {
|
|
||||||
self.accept_updates.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Data {
|
impl Data {
|
||||||
@ -160,30 +131,22 @@ impl Data {
|
|||||||
let server_pid = sysinfo::get_current_pid().unwrap();
|
let server_pid = sysinfo::get_current_pid().unwrap();
|
||||||
|
|
||||||
let db = Arc::new(Database::open_or_create(opt.database_path.clone()).unwrap());
|
let db = Arc::new(Database::open_or_create(opt.database_path.clone()).unwrap());
|
||||||
let accept_updates = Arc::new(AtomicBool::new(true));
|
|
||||||
|
|
||||||
let inner_data = DataInner {
|
let inner_data = DataInner {
|
||||||
db: db.clone(),
|
db: db.clone(),
|
||||||
db_path,
|
db_path,
|
||||||
admin_token,
|
admin_token,
|
||||||
server_pid,
|
server_pid,
|
||||||
accept_updates,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let data = Data {
|
let data = Data {
|
||||||
inner: Arc::new(inner_data),
|
inner: Arc::new(inner_data),
|
||||||
};
|
};
|
||||||
|
|
||||||
for index_name in db.indexes_names().unwrap() {
|
let callback_context = data.clone();
|
||||||
let callback_context = data.clone();
|
db.set_update_callback(Box::new(move |index_name, status| {
|
||||||
let callback_name = index_name.clone();
|
index_update_callback(&index_name, &callback_context, status);
|
||||||
db.set_update_callback(
|
}));
|
||||||
index_name,
|
|
||||||
Box::new(move |status| {
|
|
||||||
index_update_callback(&callback_name, &callback_context, status);
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
data
|
data
|
||||||
}
|
}
|
||||||
|
@ -8,4 +8,4 @@ pub mod models;
|
|||||||
pub mod option;
|
pub mod option;
|
||||||
pub mod routes;
|
pub mod routes;
|
||||||
|
|
||||||
use self::data::Data;
|
pub use self::data::Data;
|
||||||
|
@ -7,6 +7,7 @@ use tide_log::RequestLogger;
|
|||||||
use meilidb_http::data::Data;
|
use meilidb_http::data::Data;
|
||||||
use meilidb_http::option::Opt;
|
use meilidb_http::option::Opt;
|
||||||
use meilidb_http::routes;
|
use meilidb_http::routes;
|
||||||
|
use meilidb_http::routes::index::index_update_callback;
|
||||||
|
|
||||||
#[cfg(not(target_os = "macos"))]
|
#[cfg(not(target_os = "macos"))]
|
||||||
#[global_allocator]
|
#[global_allocator]
|
||||||
@ -16,8 +17,13 @@ pub fn main() -> Result<(), MainError> {
|
|||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
let opt = Opt::new();
|
let opt = Opt::new();
|
||||||
|
|
||||||
let data = Data::new(opt.clone());
|
let data = Data::new(opt.clone());
|
||||||
|
|
||||||
|
let data_cloned = data.clone();
|
||||||
|
data.db.set_update_callback(Box::new(move |name, status| {
|
||||||
|
index_update_callback(name, &data_cloned, status);
|
||||||
|
}));
|
||||||
|
|
||||||
let mut app = tide::App::with_state(data);
|
let mut app = tide::App::with_state(data);
|
||||||
|
|
||||||
app.middleware(
|
app.middleware(
|
||||||
|
@ -45,10 +45,6 @@ pub struct IndexUpdateResponse {
|
|||||||
pub async fn delete_document(ctx: Context<Data>) -> SResult<Response> {
|
pub async fn delete_document(ctx: Context<Data>) -> SResult<Response> {
|
||||||
ctx.is_allowed(DocumentsWrite)?;
|
ctx.is_allowed(DocumentsWrite)?;
|
||||||
|
|
||||||
if !ctx.state().accept_updates() {
|
|
||||||
return Err(ResponseError::Maintenance);
|
|
||||||
}
|
|
||||||
|
|
||||||
let index = ctx.index()?;
|
let index = ctx.index()?;
|
||||||
let identifier = ctx.identifier()?;
|
let identifier = ctx.identifier()?;
|
||||||
let document_id = meilidb_core::serde::compute_document_id(identifier.clone());
|
let document_id = meilidb_core::serde::compute_document_id(identifier.clone());
|
||||||
@ -154,9 +150,6 @@ fn infered_schema(document: &IndexMap<String, Value>) -> Option<meilidb_schema::
|
|||||||
async fn update_multiple_documents(mut ctx: Context<Data>, is_partial: bool) -> SResult<Response> {
|
async fn update_multiple_documents(mut ctx: Context<Data>, is_partial: bool) -> SResult<Response> {
|
||||||
ctx.is_allowed(DocumentsWrite)?;
|
ctx.is_allowed(DocumentsWrite)?;
|
||||||
|
|
||||||
if !ctx.state().accept_updates() {
|
|
||||||
return Err(ResponseError::Maintenance);
|
|
||||||
}
|
|
||||||
let data: Vec<IndexMap<String, Value>> =
|
let data: Vec<IndexMap<String, Value>> =
|
||||||
ctx.body_json().await.map_err(ResponseError::bad_request)?;
|
ctx.body_json().await.map_err(ResponseError::bad_request)?;
|
||||||
let index = ctx.index()?;
|
let index = ctx.index()?;
|
||||||
@ -211,9 +204,7 @@ pub async fn add_or_update_multiple_documents(ctx: Context<Data>) -> SResult<Res
|
|||||||
|
|
||||||
pub async fn delete_multiple_documents(mut ctx: Context<Data>) -> SResult<Response> {
|
pub async fn delete_multiple_documents(mut ctx: Context<Data>) -> SResult<Response> {
|
||||||
ctx.is_allowed(DocumentsWrite)?;
|
ctx.is_allowed(DocumentsWrite)?;
|
||||||
if !ctx.state().accept_updates() {
|
|
||||||
return Err(ResponseError::Maintenance);
|
|
||||||
}
|
|
||||||
let data: Vec<Value> = ctx.body_json().await.map_err(ResponseError::bad_request)?;
|
let data: Vec<Value> = ctx.body_json().await.map_err(ResponseError::bad_request)?;
|
||||||
let index = ctx.index()?;
|
let index = ctx.index()?;
|
||||||
|
|
||||||
@ -243,9 +234,7 @@ pub async fn delete_multiple_documents(mut ctx: Context<Data>) -> SResult<Respon
|
|||||||
|
|
||||||
pub async fn clear_all_documents(ctx: Context<Data>) -> SResult<Response> {
|
pub async fn clear_all_documents(ctx: Context<Data>) -> SResult<Response> {
|
||||||
ctx.is_allowed(DocumentsWrite)?;
|
ctx.is_allowed(DocumentsWrite)?;
|
||||||
if !ctx.state().accept_updates() {
|
|
||||||
return Err(ResponseError::Maintenance);
|
|
||||||
}
|
|
||||||
let index = ctx.index()?;
|
let index = ctx.index()?;
|
||||||
|
|
||||||
let env = &ctx.state().db.env;
|
let env = &ctx.state().db.env;
|
||||||
|
@ -69,15 +69,6 @@ pub async fn create_index(mut ctx: Context<Data>) -> SResult<Response> {
|
|||||||
Err(e) => return Err(ResponseError::create_index(e)),
|
Err(e) => return Err(ResponseError::create_index(e)),
|
||||||
};
|
};
|
||||||
|
|
||||||
let callback_context = ctx.state().clone();
|
|
||||||
let callback_name = index_name.clone();
|
|
||||||
db.set_update_callback(
|
|
||||||
&index_name,
|
|
||||||
Box::new(move |status| {
|
|
||||||
index_update_callback(&callback_name, &callback_context, status);
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
let env = &db.env;
|
let env = &db.env;
|
||||||
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
|
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user