mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 03:55:07 +08:00
update queue refactor, first iteration
This commit is contained in:
parent
7c9eaaeadb
commit
55e1552957
5
Cargo.lock
generated
5
Cargo.lock
generated
@ -1142,6 +1142,7 @@ dependencies = [
|
|||||||
"lmdb-rkv-sys",
|
"lmdb-rkv-sys",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"page_size",
|
"page_size",
|
||||||
|
"serde",
|
||||||
"synchronoise",
|
"synchronoise",
|
||||||
"url",
|
"url",
|
||||||
"zerocopy",
|
"zerocopy",
|
||||||
@ -1527,6 +1528,7 @@ dependencies = [
|
|||||||
"actix-rt",
|
"actix-rt",
|
||||||
"actix-service",
|
"actix-service",
|
||||||
"actix-web",
|
"actix-web",
|
||||||
|
"anyhow",
|
||||||
"assert-json-diff",
|
"assert-json-diff",
|
||||||
"byte-unit",
|
"byte-unit",
|
||||||
"bytes 0.6.0",
|
"bytes 0.6.0",
|
||||||
@ -1535,6 +1537,8 @@ dependencies = [
|
|||||||
"env_logger 0.8.2",
|
"env_logger 0.8.2",
|
||||||
"flate2",
|
"flate2",
|
||||||
"futures",
|
"futures",
|
||||||
|
"grenad",
|
||||||
|
"heed",
|
||||||
"http",
|
"http",
|
||||||
"indexmap",
|
"indexmap",
|
||||||
"jemallocator",
|
"jemallocator",
|
||||||
@ -1545,6 +1549,7 @@ dependencies = [
|
|||||||
"mime",
|
"mime",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"rand 0.7.3",
|
"rand 0.7.3",
|
||||||
|
"rayon",
|
||||||
"regex",
|
"regex",
|
||||||
"rustls 0.18.1",
|
"rustls 0.18.1",
|
||||||
"sentry",
|
"sentry",
|
||||||
|
@ -18,21 +18,26 @@ actix-http = "2"
|
|||||||
actix-rt = "1"
|
actix-rt = "1"
|
||||||
actix-service = "1.0.6"
|
actix-service = "1.0.6"
|
||||||
actix-web = { version = "3.3.2", features = ["rustls"] }
|
actix-web = { version = "3.3.2", features = ["rustls"] }
|
||||||
|
anyhow = "1.0.36"
|
||||||
byte-unit = { version = "4.0.9", default-features = false, features = ["std"] }
|
byte-unit = { version = "4.0.9", default-features = false, features = ["std"] }
|
||||||
bytes = "0.6.0"
|
bytes = "0.6.0"
|
||||||
chrono = { version = "0.4.19", features = ["serde"] }
|
chrono = { version = "0.4.19", features = ["serde"] }
|
||||||
crossbeam-channel = "0.5.0"
|
crossbeam-channel = "0.5.0"
|
||||||
env_logger = "0.8.2"
|
env_logger = "0.8.2"
|
||||||
flate2 = "1.0.18"
|
flate2 = "1.0.19"
|
||||||
futures = "0.3.7"
|
futures = "0.3.7"
|
||||||
|
grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" }
|
||||||
|
heed = "0.10.6"
|
||||||
http = "0.2.1"
|
http = "0.2.1"
|
||||||
indexmap = { version = "1.3.2", features = ["serde-1"] }
|
indexmap = { version = "1.3.2", features = ["serde-1"] }
|
||||||
log = "0.4.8"
|
log = "0.4.8"
|
||||||
main_error = "0.1.0"
|
main_error = "0.1.0"
|
||||||
|
meilisearch-error = { path = "../MeiliSearch/meilisearch-error" }
|
||||||
milli = { path = "../milli" }
|
milli = { path = "../milli" }
|
||||||
mime = "0.3.16"
|
mime = "0.3.16"
|
||||||
once_cell = "1.5.2"
|
once_cell = "1.5.2"
|
||||||
rand = "0.7.3"
|
rand = "0.7.3"
|
||||||
|
rayon = "1.5.0"
|
||||||
regex = "1.4.2"
|
regex = "1.4.2"
|
||||||
rustls = "0.18"
|
rustls = "0.18"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
@ -48,7 +53,6 @@ tokio = "*"
|
|||||||
ureq = { version = "1.5.1", default-features = false, features = ["tls"] }
|
ureq = { version = "1.5.1", default-features = false, features = ["tls"] }
|
||||||
walkdir = "2.3.1"
|
walkdir = "2.3.1"
|
||||||
whoami = "1.0.0"
|
whoami = "1.0.0"
|
||||||
meilisearch-error = { path = "../MeiliSearch/meilisearch-error" }
|
|
||||||
|
|
||||||
[dependencies.sentry]
|
[dependencies.sentry]
|
||||||
default-features = false
|
default-features = false
|
||||||
|
@ -11,13 +11,15 @@ use rustls::{
|
|||||||
};
|
};
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
|
||||||
|
use crate::updates::IndexerOpts;
|
||||||
|
|
||||||
const POSSIBLE_ENV: [&str; 2] = ["development", "production"];
|
const POSSIBLE_ENV: [&str; 2] = ["development", "production"];
|
||||||
|
|
||||||
#[derive(Debug, Clone, StructOpt)]
|
#[derive(Debug, Clone, StructOpt)]
|
||||||
pub struct Opt {
|
pub struct Opt {
|
||||||
/// The destination where the database must be created.
|
/// The destination where the database must be created.
|
||||||
#[structopt(long, env = "MEILI_DB_PATH", default_value = "./data.ms")]
|
#[structopt(long, env = "MEILI_DB_PATH", default_value = "./data.ms")]
|
||||||
pub db_path: String,
|
pub db_path: PathBuf,
|
||||||
|
|
||||||
/// The address on which the http server will listen.
|
/// The address on which the http server will listen.
|
||||||
#[structopt(long, env = "MEILI_HTTP_ADDR", default_value = "127.0.0.1:7700")]
|
#[structopt(long, env = "MEILI_HTTP_ADDR", default_value = "127.0.0.1:7700")]
|
||||||
@ -132,6 +134,9 @@ pub struct Opt {
|
|||||||
/// The batch size used in the importation process, the bigger it is the faster the dump is created.
|
/// The batch size used in the importation process, the bigger it is the faster the dump is created.
|
||||||
#[structopt(long, env = "MEILI_DUMP_BATCH_SIZE", default_value = "1024")]
|
#[structopt(long, env = "MEILI_DUMP_BATCH_SIZE", default_value = "1024")]
|
||||||
pub dump_batch_size: usize,
|
pub dump_batch_size: usize,
|
||||||
|
|
||||||
|
#[structopt(flatten)]
|
||||||
|
pub indexer_options: IndexerOpts,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Opt {
|
impl Opt {
|
||||||
|
@ -2,7 +2,22 @@ mod settings;
|
|||||||
|
|
||||||
pub use settings::{Settings, Facets};
|
pub use settings::{Settings, Facets};
|
||||||
|
|
||||||
|
use std::io;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use flate2::read::GzDecoder;
|
||||||
|
use grenad::CompressionType;
|
||||||
|
use byte_unit::Byte;
|
||||||
|
use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod, UpdateIndexingStep::*};
|
||||||
|
use milli::{UpdateStore, UpdateHandler as Handler, Index};
|
||||||
|
use rayon::ThreadPool;
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
use structopt::StructOpt;
|
||||||
|
|
||||||
|
use crate::option::Opt;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(tag = "type")]
|
#[serde(tag = "type")]
|
||||||
@ -13,5 +28,322 @@ enum UpdateMeta {
|
|||||||
Facets(Facets),
|
Facets(Facets),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct UpdateQueue;
|
#[serde(tag = "type")]
|
||||||
|
enum UpdateMetaProgress {
|
||||||
|
DocumentsAddition {
|
||||||
|
step: usize,
|
||||||
|
total_steps: usize,
|
||||||
|
current: usize,
|
||||||
|
total: Option<usize>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(tag = "type")]
|
||||||
|
enum UpdateStatus<M, P, N> {
|
||||||
|
Pending { update_id: u64, meta: M },
|
||||||
|
Progressing { update_id: u64, meta: P },
|
||||||
|
Processed { update_id: u64, meta: N },
|
||||||
|
Aborted { update_id: u64, meta: M },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct UpdateQueue {
|
||||||
|
inner: Arc<UpdateStore<UpdateMeta, String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, StructOpt)]
|
||||||
|
pub struct IndexerOpts {
|
||||||
|
/// The amount of documents to skip before printing
|
||||||
|
/// a log regarding the indexing advancement.
|
||||||
|
#[structopt(long, default_value = "100000")] // 100k
|
||||||
|
pub log_every_n: usize,
|
||||||
|
|
||||||
|
/// MTBL max number of chunks in bytes.
|
||||||
|
#[structopt(long)]
|
||||||
|
pub max_nb_chunks: Option<usize>,
|
||||||
|
|
||||||
|
/// The maximum amount of memory to use for the MTBL buffer. It is recommended
|
||||||
|
/// to use something like 80%-90% of the available memory.
|
||||||
|
///
|
||||||
|
/// It is automatically split by the number of jobs e.g. if you use 7 jobs
|
||||||
|
/// and 7 GB of max memory, each thread will use a maximum of 1 GB.
|
||||||
|
#[structopt(long, default_value = "7 GiB")]
|
||||||
|
pub max_memory: Byte,
|
||||||
|
|
||||||
|
/// Size of the linked hash map cache when indexing.
|
||||||
|
/// The bigger it is, the faster the indexing is but the more memory it takes.
|
||||||
|
#[structopt(long, default_value = "500")]
|
||||||
|
pub linked_hash_map_size: usize,
|
||||||
|
|
||||||
|
/// The name of the compression algorithm to use when compressing intermediate
|
||||||
|
/// chunks during indexing documents.
|
||||||
|
///
|
||||||
|
/// Choosing a fast algorithm will make the indexing faster but may consume more memory.
|
||||||
|
#[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])]
|
||||||
|
pub chunk_compression_type: CompressionType,
|
||||||
|
|
||||||
|
/// The level of compression of the chosen algorithm.
|
||||||
|
#[structopt(long, requires = "chunk-compression-type")]
|
||||||
|
pub chunk_compression_level: Option<u32>,
|
||||||
|
|
||||||
|
/// The number of bytes to remove from the begining of the chunks while reading/sorting
|
||||||
|
/// or merging them.
|
||||||
|
///
|
||||||
|
/// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`,
|
||||||
|
/// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set.
|
||||||
|
#[structopt(long, default_value = "4 GiB")]
|
||||||
|
pub chunk_fusing_shrink_size: Byte,
|
||||||
|
|
||||||
|
/// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2.
|
||||||
|
#[structopt(long)]
|
||||||
|
pub enable_chunk_fusing: bool,
|
||||||
|
|
||||||
|
/// Number of parallel jobs for indexing, defaults to # of CPUs.
|
||||||
|
#[structopt(long)]
|
||||||
|
pub indexing_jobs: Option<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
type UpdateSender = broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>;
|
||||||
|
|
||||||
|
struct UpdateHandler {
|
||||||
|
indexes: Arc<Index>,
|
||||||
|
max_nb_chunks: Option<usize>,
|
||||||
|
chunk_compression_level: Option<u32>,
|
||||||
|
thread_pool: ThreadPool,
|
||||||
|
log_frequency: usize,
|
||||||
|
max_memory: usize,
|
||||||
|
linked_hash_map_size: usize,
|
||||||
|
chunk_compression_type: CompressionType,
|
||||||
|
chunk_fusing_shrink_size: u64,
|
||||||
|
update_status_sender: UpdateSender,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpdateHandler {
|
||||||
|
fn new(
|
||||||
|
opt: &IndexerOpts,
|
||||||
|
indexes: Arc<Index>,
|
||||||
|
update_status_sender: UpdateSender,
|
||||||
|
) -> Result<Self> {
|
||||||
|
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||||
|
.num_threads(opt.indexing_jobs.unwrap_or(0))
|
||||||
|
.build()?;
|
||||||
|
Ok(Self {
|
||||||
|
indexes,
|
||||||
|
max_nb_chunks: opt.max_nb_chunks,
|
||||||
|
chunk_compression_level: opt.chunk_compression_level,
|
||||||
|
thread_pool,
|
||||||
|
log_frequency: opt.log_every_n,
|
||||||
|
max_memory: opt.max_memory.get_bytes() as usize,
|
||||||
|
linked_hash_map_size: opt.linked_hash_map_size,
|
||||||
|
chunk_compression_type: opt.chunk_compression_type,
|
||||||
|
chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(),
|
||||||
|
update_status_sender,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_buidler(&self, update_id: u64) -> UpdateBuilder {
|
||||||
|
// We prepare the update by using the update builder.
|
||||||
|
let mut update_builder = UpdateBuilder::new(update_id);
|
||||||
|
if let Some(max_nb_chunks) = self.max_nb_chunks {
|
||||||
|
update_builder.max_nb_chunks(max_nb_chunks);
|
||||||
|
}
|
||||||
|
if let Some(chunk_compression_level) = self.chunk_compression_level {
|
||||||
|
update_builder.chunk_compression_level(chunk_compression_level);
|
||||||
|
}
|
||||||
|
update_builder.thread_pool(&self.thread_pool);
|
||||||
|
update_builder.log_every_n(self.log_frequency);
|
||||||
|
update_builder.max_memory(self.max_memory);
|
||||||
|
update_builder.linked_hash_map_size(self.linked_hash_map_size);
|
||||||
|
update_builder.chunk_compression_type(self.chunk_compression_type);
|
||||||
|
update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size);
|
||||||
|
update_builder
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_documents(
|
||||||
|
&self,
|
||||||
|
format: String,
|
||||||
|
method: String,
|
||||||
|
content: &[u8],
|
||||||
|
update_builder: UpdateBuilder,
|
||||||
|
) -> Result<()> {
|
||||||
|
// We must use the write transaction of the update here.
|
||||||
|
let mut wtxn = self.indexes.write_txn()?;
|
||||||
|
let mut builder = update_builder.index_documents(&mut wtxn, &self.indexes);
|
||||||
|
|
||||||
|
match format.as_str() {
|
||||||
|
"csv" => builder.update_format(UpdateFormat::Csv),
|
||||||
|
"json" => builder.update_format(UpdateFormat::Json),
|
||||||
|
"json-stream" => builder.update_format(UpdateFormat::JsonStream),
|
||||||
|
otherwise => panic!("invalid update format {:?}", otherwise),
|
||||||
|
};
|
||||||
|
|
||||||
|
match method.as_str() {
|
||||||
|
"replace" => builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments),
|
||||||
|
"update" => builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments),
|
||||||
|
otherwise => panic!("invalid indexing method {:?}", otherwise),
|
||||||
|
};
|
||||||
|
|
||||||
|
let gzipped = true;
|
||||||
|
let reader = if gzipped {
|
||||||
|
Box::new(GzDecoder::new(content))
|
||||||
|
} else {
|
||||||
|
Box::new(content) as Box<dyn io::Read>
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = builder.execute(reader, |indexing_step, update_id| {
|
||||||
|
let (current, total) = match indexing_step {
|
||||||
|
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
|
||||||
|
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
||||||
|
IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
||||||
|
MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)),
|
||||||
|
};
|
||||||
|
let _ = self.update_status_sender.send(UpdateStatus::Progressing {
|
||||||
|
update_id,
|
||||||
|
meta: UpdateMetaProgress::DocumentsAddition {
|
||||||
|
step: indexing_step.step(),
|
||||||
|
total_steps: indexing_step.number_of_steps(),
|
||||||
|
current,
|
||||||
|
total,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(()) => wtxn.commit().map_err(Into::into),
|
||||||
|
Err(e) => Err(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear_documents(&self, update_builder: UpdateBuilder) -> Result<()> {
|
||||||
|
// We must use the write transaction of the update here.
|
||||||
|
let mut wtxn = self.indexes.write_txn()?;
|
||||||
|
let builder = update_builder.clear_documents(&mut wtxn, &self.indexes);
|
||||||
|
|
||||||
|
match builder.execute() {
|
||||||
|
Ok(_count) => wtxn.commit().map_err(Into::into),
|
||||||
|
Err(e) => Err(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_settings(&self, settings: Settings, update_builder: UpdateBuilder) -> Result<()> {
|
||||||
|
// We must use the write transaction of the update here.
|
||||||
|
let mut wtxn = self.indexes.write_txn()?;
|
||||||
|
let mut builder = update_builder.settings(&mut wtxn, &self.indexes);
|
||||||
|
|
||||||
|
// We transpose the settings JSON struct into a real setting update.
|
||||||
|
if let Some(names) = settings.searchable_attributes {
|
||||||
|
match names {
|
||||||
|
Some(names) => builder.set_searchable_fields(names),
|
||||||
|
None => builder.reset_searchable_fields(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We transpose the settings JSON struct into a real setting update.
|
||||||
|
if let Some(names) = settings.displayed_attributes {
|
||||||
|
match names {
|
||||||
|
Some(names) => builder.set_displayed_fields(names),
|
||||||
|
None => builder.reset_displayed_fields(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We transpose the settings JSON struct into a real setting update.
|
||||||
|
if let Some(facet_types) = settings.faceted_attributes {
|
||||||
|
builder.set_faceted_fields(facet_types);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We transpose the settings JSON struct into a real setting update.
|
||||||
|
if let Some(criteria) = settings.criteria {
|
||||||
|
match criteria {
|
||||||
|
Some(criteria) => builder.set_criteria(criteria),
|
||||||
|
None => builder.reset_criteria(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let result = builder.execute(|indexing_step, update_id| {
|
||||||
|
let (current, total) = match indexing_step {
|
||||||
|
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
|
||||||
|
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
||||||
|
IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
||||||
|
MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)),
|
||||||
|
};
|
||||||
|
let _ = self.update_status_sender.send(UpdateStatus::Progressing {
|
||||||
|
update_id,
|
||||||
|
meta: UpdateMetaProgress::DocumentsAddition {
|
||||||
|
step: indexing_step.step(),
|
||||||
|
total_steps: indexing_step.number_of_steps(),
|
||||||
|
current,
|
||||||
|
total,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(_count) => wtxn.commit().map_err(Into::into),
|
||||||
|
Err(e) => Err(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_facets(&self, levels: Facets, update_builder: UpdateBuilder) -> Result<()> {
|
||||||
|
// We must use the write transaction of the update here.
|
||||||
|
let mut wtxn = self.indexes.write_txn()?;
|
||||||
|
let mut builder = update_builder.facets(&mut wtxn, &self.indexes);
|
||||||
|
if let Some(value) = levels.level_group_size {
|
||||||
|
builder.level_group_size(value);
|
||||||
|
}
|
||||||
|
if let Some(value) = levels.min_level_size {
|
||||||
|
builder.min_level_size(value);
|
||||||
|
}
|
||||||
|
match builder.execute() {
|
||||||
|
Ok(()) => wtxn.commit().map_err(Into::into),
|
||||||
|
Err(e) => Err(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Handler<UpdateMeta, String> for UpdateHandler {
|
||||||
|
fn handle_update(&mut self, update_id: u64, meta: UpdateMeta, content: &[u8]) -> heed::Result<String> {
|
||||||
|
use UpdateMeta::*;
|
||||||
|
|
||||||
|
let update_builder = self.update_buidler(update_id);
|
||||||
|
|
||||||
|
let result: anyhow::Result<()> = match meta {
|
||||||
|
DocumentsAddition { method, format } => {
|
||||||
|
self.update_documents(format, method, content, update_builder)
|
||||||
|
},
|
||||||
|
ClearDocuments => self.clear_documents(update_builder),
|
||||||
|
Settings(settings) => self.update_settings(settings, update_builder),
|
||||||
|
Facets(levels) => self.update_facets(levels, update_builder),
|
||||||
|
};
|
||||||
|
|
||||||
|
let meta = match result {
|
||||||
|
Ok(()) => format!("valid update content"),
|
||||||
|
Err(e) => format!("error while processing update content: {:?}", e),
|
||||||
|
};
|
||||||
|
|
||||||
|
let processed = UpdateStatus::Processed { update_id, meta: meta.clone() };
|
||||||
|
let _ = self.update_status_sender.send(processed);
|
||||||
|
|
||||||
|
Ok(meta)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpdateQueue {
|
||||||
|
pub fn new<P: AsRef<Path>>(
|
||||||
|
opt: Opt,
|
||||||
|
indexes: Arc<Index>,
|
||||||
|
) -> Result<Self> {
|
||||||
|
let (sender, _) = broadcast::channel(100);
|
||||||
|
let handler = UpdateHandler::new(&opt.indexer_options, indexes, sender)?;
|
||||||
|
let size = opt.max_udb_size.get_bytes() as usize;
|
||||||
|
let path = opt.db_path.join("updates.mdb");
|
||||||
|
let inner = UpdateStore::open(
|
||||||
|
Some(size),
|
||||||
|
path,
|
||||||
|
handler
|
||||||
|
)?;
|
||||||
|
Ok(Self { inner })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -20,24 +20,24 @@ pub struct Settings {
|
|||||||
deserialize_with = "deserialize_some",
|
deserialize_with = "deserialize_some",
|
||||||
skip_serializing_if = "Option::is_none",
|
skip_serializing_if = "Option::is_none",
|
||||||
)]
|
)]
|
||||||
displayed_attributes: Option<Option<Vec<String>>>,
|
pub displayed_attributes: Option<Option<Vec<String>>>,
|
||||||
|
|
||||||
#[serde(
|
#[serde(
|
||||||
default,
|
default,
|
||||||
deserialize_with = "deserialize_some",
|
deserialize_with = "deserialize_some",
|
||||||
skip_serializing_if = "Option::is_none",
|
skip_serializing_if = "Option::is_none",
|
||||||
)]
|
)]
|
||||||
searchable_attributes: Option<Option<Vec<String>>>,
|
pub searchable_attributes: Option<Option<Vec<String>>>,
|
||||||
|
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
faceted_attributes: Option<HashMap<String, String>>,
|
pub faceted_attributes: Option<HashMap<String, String>>,
|
||||||
|
|
||||||
#[serde(
|
#[serde(
|
||||||
default,
|
default,
|
||||||
deserialize_with = "deserialize_some",
|
deserialize_with = "deserialize_some",
|
||||||
skip_serializing_if = "Option::is_none",
|
skip_serializing_if = "Option::is_none",
|
||||||
)]
|
)]
|
||||||
criteria: Option<Option<Vec<String>>>,
|
pub criteria: Option<Option<Vec<String>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -45,7 +45,7 @@ pub struct Settings {
|
|||||||
#[serde(deny_unknown_fields)]
|
#[serde(deny_unknown_fields)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct Facets {
|
pub struct Facets {
|
||||||
level_group_size: Option<NonZeroUsize>,
|
pub level_group_size: Option<NonZeroUsize>,
|
||||||
min_level_size: Option<NonZeroUsize>,
|
pub min_level_size: Option<NonZeroUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user