2021-02-27 17:19:05 +08:00
|
|
|
use std::fs::File;
|
|
|
|
|
2021-03-16 01:11:10 +08:00
|
|
|
use crate::index::Index;
|
2021-02-27 17:19:05 +08:00
|
|
|
use anyhow::Result;
|
|
|
|
use grenad::CompressionType;
|
2021-03-04 18:56:32 +08:00
|
|
|
use milli::update::UpdateBuilder;
|
2021-02-27 17:19:05 +08:00
|
|
|
use rayon::ThreadPool;
|
|
|
|
|
2021-03-16 01:11:10 +08:00
|
|
|
use crate::index::UpdateResult;
|
2021-02-27 17:19:05 +08:00
|
|
|
use crate::index_controller::updates::{Failed, Processed, Processing};
|
2021-03-04 18:56:32 +08:00
|
|
|
use crate::index_controller::UpdateMeta;
|
2021-02-27 17:19:05 +08:00
|
|
|
use crate::option::IndexerOpts;
|
|
|
|
|
|
|
|
pub struct UpdateHandler {
|
|
|
|
max_nb_chunks: Option<usize>,
|
|
|
|
chunk_compression_level: Option<u32>,
|
|
|
|
thread_pool: ThreadPool,
|
|
|
|
log_frequency: usize,
|
|
|
|
max_memory: usize,
|
|
|
|
linked_hash_map_size: usize,
|
|
|
|
chunk_compression_type: CompressionType,
|
|
|
|
chunk_fusing_shrink_size: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl UpdateHandler {
|
2021-03-16 01:11:10 +08:00
|
|
|
pub fn new(opt: &IndexerOpts) -> anyhow::Result<Self> {
|
2021-02-27 17:19:05 +08:00
|
|
|
let thread_pool = rayon::ThreadPoolBuilder::new()
|
|
|
|
.num_threads(opt.indexing_jobs.unwrap_or(0))
|
|
|
|
.build()?;
|
|
|
|
Ok(Self {
|
|
|
|
max_nb_chunks: opt.max_nb_chunks,
|
|
|
|
chunk_compression_level: opt.chunk_compression_level,
|
|
|
|
thread_pool,
|
|
|
|
log_frequency: opt.log_every_n,
|
|
|
|
max_memory: opt.max_memory.get_bytes() as usize,
|
|
|
|
linked_hash_map_size: opt.linked_hash_map_size,
|
|
|
|
chunk_compression_type: opt.chunk_compression_type,
|
|
|
|
chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
fn update_buidler(&self, update_id: u64) -> UpdateBuilder {
|
|
|
|
// We prepare the update by using the update builder.
|
|
|
|
let mut update_builder = UpdateBuilder::new(update_id);
|
|
|
|
if let Some(max_nb_chunks) = self.max_nb_chunks {
|
|
|
|
update_builder.max_nb_chunks(max_nb_chunks);
|
|
|
|
}
|
|
|
|
if let Some(chunk_compression_level) = self.chunk_compression_level {
|
|
|
|
update_builder.chunk_compression_level(chunk_compression_level);
|
|
|
|
}
|
|
|
|
update_builder.thread_pool(&self.thread_pool);
|
|
|
|
update_builder.log_every_n(self.log_frequency);
|
|
|
|
update_builder.max_memory(self.max_memory);
|
|
|
|
update_builder.linked_hash_map_size(self.linked_hash_map_size);
|
|
|
|
update_builder.chunk_compression_type(self.chunk_compression_type);
|
|
|
|
update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size);
|
|
|
|
update_builder
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn handle_update(
|
|
|
|
&self,
|
|
|
|
meta: Processing<UpdateMeta>,
|
|
|
|
content: File,
|
2021-03-04 18:23:41 +08:00
|
|
|
index: Index,
|
2021-02-27 17:19:05 +08:00
|
|
|
) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
|
|
|
|
use UpdateMeta::*;
|
|
|
|
|
|
|
|
let update_id = meta.id();
|
|
|
|
|
|
|
|
let update_builder = self.update_buidler(update_id);
|
|
|
|
|
|
|
|
let result = match meta.meta() {
|
|
|
|
DocumentsAddition {
|
|
|
|
method,
|
|
|
|
format,
|
|
|
|
primary_key,
|
2021-03-04 18:56:32 +08:00
|
|
|
} => index.update_documents(
|
2021-02-27 17:19:05 +08:00
|
|
|
*format,
|
|
|
|
*method,
|
|
|
|
content,
|
|
|
|
update_builder,
|
|
|
|
primary_key.as_deref(),
|
|
|
|
),
|
2021-03-04 18:56:32 +08:00
|
|
|
ClearDocuments => index.clear_documents(update_builder),
|
|
|
|
DeleteDocuments => index.delete_documents(content, update_builder),
|
|
|
|
Settings(settings) => index.update_settings(settings, update_builder),
|
|
|
|
Facets(levels) => index.update_facets(levels, update_builder),
|
2021-02-27 17:19:05 +08:00
|
|
|
};
|
|
|
|
|
|
|
|
match result {
|
|
|
|
Ok(result) => Ok(meta.process(result)),
|
|
|
|
Err(e) => Err(meta.fail(e.to_string())),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|