implement document push

This commit is contained in:
mpostma 2021-02-26 18:11:43 +01:00
parent 6bcc302950
commit 658166c05e
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
5 changed files with 58 additions and 274 deletions

View File

@ -1,15 +1,20 @@
use uuid::Uuid; use std::fs::{File, create_dir_all};
use std::path::{PathBuf, Path}; use std::path::{PathBuf, Path};
use chrono::Utc;
use tokio::sync::{mpsc, oneshot, RwLock};
use thiserror::Error;
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use milli::Index;
use std::collections::hash_map::Entry; use chrono::Utc;
use std::fs::create_dir_all;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use milli::Index;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid;
use log::info;
use super::update_handler::UpdateHandler;
use crate::index_controller::{IndexMetadata, UpdateMeta, updates::{Processed, Failed, Processing}, UpdateResult as UResult}; use crate::index_controller::{IndexMetadata, UpdateMeta, updates::{Processed, Failed, Processing}, UpdateResult as UResult};
use crate::option::IndexerOpts;
pub type Result<T> = std::result::Result<T, IndexError>; pub type Result<T> = std::result::Result<T, IndexError>;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>; type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
@ -22,6 +27,7 @@ enum IndexMsg {
struct IndexActor<S> { struct IndexActor<S> {
inbox: mpsc::Receiver<IndexMsg>, inbox: mpsc::Receiver<IndexMsg>,
update_handler: Arc<UpdateHandler>,
store: S, store: S,
} }
@ -36,18 +42,22 @@ pub enum IndexError {
#[async_trait::async_trait] #[async_trait::async_trait]
trait IndexStore { trait IndexStore {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMetadata>; async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMetadata>;
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<Index>>;
} }
impl<S: IndexStore> IndexActor<S> { impl<S: IndexStore + Sync + Send> IndexActor<S> {
fn new(inbox: mpsc::Receiver<IndexMsg>, store: S) -> Self { fn new(inbox: mpsc::Receiver<IndexMsg>, store: S) -> Self {
Self { inbox, store } let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).unwrap();
let update_handler = Arc::new(update_handler);
Self { inbox, store, update_handler }
} }
async fn run(mut self) { async fn run(mut self) {
loop { loop {
match self.inbox.recv().await { match self.inbox.recv().await {
Some(IndexMsg::CreateIndex { uuid, primary_key, ret }) => self.handle_create_index(uuid, primary_key, ret).await, Some(IndexMsg::CreateIndex { uuid, primary_key, ret }) => self.handle_create_index(uuid, primary_key, ret).await,
Some(IndexMsg::Update { ret, meta, data }) => self.handle_update().await, Some(IndexMsg::Update { ret, meta, data }) => self.handle_update(meta, data, ret).await,
None => break, None => break,
} }
} }
@ -58,8 +68,14 @@ impl<S: IndexStore> IndexActor<S> {
let _ = ret.send(result); let _ = ret.send(result);
} }
async fn handle_update(&self) { async fn handle_update(&self, meta: Processing<UpdateMeta>, data: File, ret: oneshot::Sender<UpdateResult>) {
println!("processing update!!!"); info!("processing update");
let uuid = meta.index_uuid().clone();
let index = self.store.get_or_create(uuid).await.unwrap();
let update_handler = self.update_handler.clone();
let result = tokio::task::spawn_blocking(move || update_handler.handle_update(meta, data, index.as_ref())).await;
let result = result.unwrap();
let _ = ret.send(result);
} }
} }
@ -96,7 +112,7 @@ impl IndexActorHandle {
struct MapIndexStore { struct MapIndexStore {
root: PathBuf, root: PathBuf,
meta_store: AsyncMap<Uuid, IndexMetadata>, meta_store: AsyncMap<Uuid, IndexMetadata>,
index_store: AsyncMap<Uuid, Index>, index_store: AsyncMap<Uuid, Arc<Index>>,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@ -126,10 +142,26 @@ impl IndexStore for MapIndexStore {
Ok(index) Ok(index)
}).await.expect("thread died"); }).await.expect("thread died");
self.index_store.write().await.insert(meta.uuid.clone(), index?); self.index_store.write().await.insert(meta.uuid.clone(), Arc::new(index?));
Ok(meta) Ok(meta)
} }
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<Index>> {
match self.index_store.write().await.entry(uuid.clone()) {
Entry::Vacant(entry) => {
match self.meta_store.write().await.entry(uuid.clone()) {
Entry::Vacant(_) => {
todo!()
}
Entry::Occupied(entry) => {
todo!()
}
}
}
Entry::Occupied(entry) => Ok(entry.get().clone()),
}
}
} }
impl MapIndexStore { impl MapIndexStore {

View File

@ -2,6 +2,7 @@ mod index_actor;
mod update_actor; mod update_actor;
mod uuid_resolver; mod uuid_resolver;
mod update_store; mod update_store;
mod update_handler;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use super::IndexController; use super::IndexController;
@ -10,7 +11,6 @@ use super::IndexMetadata;
use tokio::fs::File; use tokio::fs::File;
use super::UpdateMeta; use super::UpdateMeta;
pub struct ActorIndexController { pub struct ActorIndexController {
uuid_resolver: uuid_resolver::UuidResolverHandle, uuid_resolver: uuid_resolver::UuidResolverHandle,
index_handle: index_actor::IndexActorHandle, index_handle: index_actor::IndexActorHandle,

View File

@ -53,11 +53,11 @@ impl UpdateActor {
} }
} }
async fn handle_update(&self, _uuid: Uuid, meta: UpdateMeta, payload: Option<File>, ret: oneshot::Sender<Result<UpdateStatus>>) { async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, payload: Option<File>, ret: oneshot::Sender<Result<UpdateStatus>>) {
let mut buf = Vec::new(); let mut buf = Vec::new();
let mut payload = payload.unwrap(); let mut payload = payload.unwrap();
payload.read_to_end(&mut buf).await.unwrap(); payload.read_to_end(&mut buf).await.unwrap();
let result = self.store.register_update(meta, &buf).unwrap(); let result = self.store.register_update(meta, &buf, uuid).unwrap();
let _ = ret.send(Ok(UpdateStatus::Pending(result))); let _ = ret.send(Ok(UpdateStatus::Pending(result)));
} }
} }

View File

@ -1,255 +0,0 @@
use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use anyhow::Result;
use flate2::read::GzDecoder;
use grenad::CompressionType;
use log::info;
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
use milli::Index;
use rayon::ThreadPool;
use super::update_store::HandleUpdate;
use crate::index_controller::updates::{Failed, Processed, Processing};
use crate::index_controller::{Facets, Settings, UpdateMeta, UpdateResult};
use crate::option::IndexerOpts;
pub struct UpdateHandler {
index: Arc<Index>,
max_nb_chunks: Option<usize>,
chunk_compression_level: Option<u32>,
thread_pool: Arc<ThreadPool>,
log_frequency: usize,
max_memory: usize,
linked_hash_map_size: usize,
chunk_compression_type: CompressionType,
chunk_fusing_shrink_size: u64,
}
impl UpdateHandler {
pub fn new(
opt: &IndexerOpts,
index: Arc<Index>,
thread_pool: Arc<ThreadPool>,
) -> anyhow::Result<Self> {
Ok(Self {
index,
max_nb_chunks: opt.max_nb_chunks,
chunk_compression_level: opt.chunk_compression_level,
thread_pool,
log_frequency: opt.log_every_n,
max_memory: opt.max_memory.get_bytes() as usize,
linked_hash_map_size: opt.linked_hash_map_size,
chunk_compression_type: opt.chunk_compression_type,
chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(),
})
}
fn update_buidler(&self, update_id: u64) -> UpdateBuilder {
// We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new(update_id);
if let Some(max_nb_chunks) = self.max_nb_chunks {
update_builder.max_nb_chunks(max_nb_chunks);
}
if let Some(chunk_compression_level) = self.chunk_compression_level {
update_builder.chunk_compression_level(chunk_compression_level);
}
update_builder.thread_pool(&self.thread_pool);
update_builder.log_every_n(self.log_frequency);
update_builder.max_memory(self.max_memory);
update_builder.linked_hash_map_size(self.linked_hash_map_size);
update_builder.chunk_compression_type(self.chunk_compression_type);
update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size);
update_builder
}
fn update_documents(
&self,
format: UpdateFormat,
method: IndexDocumentsMethod,
content: &[u8],
update_builder: UpdateBuilder,
primary_key: Option<&str>,
) -> anyhow::Result<UpdateResult> {
// We must use the write transaction of the update here.
let mut wtxn = self.index.write_txn()?;
// Set the primary key if not set already, ignore if already set.
match (self.index.primary_key(&wtxn)?, primary_key) {
(None, Some(ref primary_key)) => {
self.index.put_primary_key(&mut wtxn, primary_key)?;
}
_ => (),
}
let mut builder = update_builder.index_documents(&mut wtxn, &self.index);
builder.update_format(format);
builder.index_documents_method(method);
let gzipped = true;
let reader = if gzipped && !content.is_empty() {
Box::new(GzDecoder::new(content))
} else {
Box::new(content) as Box<dyn io::Read>
};
let result = builder.execute(reader, |indexing_step, update_id| {
info!("update {}: {:?}", update_id, indexing_step)
});
match result {
Ok(addition_result) => wtxn
.commit()
.and(Ok(UpdateResult::DocumentsAddition(addition_result)))
.map_err(Into::into),
Err(e) => Err(e.into()),
}
}
fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result<UpdateResult> {
// We must use the write transaction of the update here.
let mut wtxn = self.index.write_txn()?;
let builder = update_builder.clear_documents(&mut wtxn, &self.index);
match builder.execute() {
Ok(_count) => wtxn
.commit()
.and(Ok(UpdateResult::Other))
.map_err(Into::into),
Err(e) => Err(e.into()),
}
}
fn update_settings(
&self,
settings: &Settings,
update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> {
// We must use the write transaction of the update here.
let mut wtxn = self.index.write_txn()?;
let mut builder = update_builder.settings(&mut wtxn, &self.index);
// We transpose the settings JSON struct into a real setting update.
if let Some(ref names) = settings.searchable_attributes {
match names {
Some(names) => builder.set_searchable_fields(names.clone()),
None => builder.reset_searchable_fields(),
}
}
// We transpose the settings JSON struct into a real setting update.
if let Some(ref names) = settings.displayed_attributes {
match names {
Some(names) => builder.set_displayed_fields(names.clone()),
None => builder.reset_displayed_fields(),
}
}
// We transpose the settings JSON struct into a real setting update.
if let Some(ref facet_types) = settings.faceted_attributes {
let facet_types = facet_types.clone().unwrap_or_else(|| HashMap::new());
builder.set_faceted_fields(facet_types);
}
// We transpose the settings JSON struct into a real setting update.
if let Some(ref criteria) = settings.criteria {
match criteria {
Some(criteria) => builder.set_criteria(criteria.clone()),
None => builder.reset_criteria(),
}
}
let result = builder
.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step));
match result {
Ok(()) => wtxn
.commit()
.and(Ok(UpdateResult::Other))
.map_err(Into::into),
Err(e) => Err(e.into()),
}
}
fn update_facets(
&self,
levels: &Facets,
update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> {
// We must use the write transaction of the update here.
let mut wtxn = self.index.write_txn()?;
let mut builder = update_builder.facets(&mut wtxn, &self.index);
if let Some(value) = levels.level_group_size {
builder.level_group_size(value);
}
if let Some(value) = levels.min_level_size {
builder.min_level_size(value);
}
match builder.execute() {
Ok(()) => wtxn
.commit()
.and(Ok(UpdateResult::Other))
.map_err(Into::into),
Err(e) => Err(e.into()),
}
}
fn delete_documents(
&self,
document_ids: &[u8],
update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> {
let ids: Vec<String> = serde_json::from_slice(document_ids)?;
let mut txn = self.index.write_txn()?;
let mut builder = update_builder.delete_documents(&mut txn, &self.index)?;
// We ignore unexisting document ids
ids.iter().for_each(|id| { builder.delete_external_id(id); });
match builder.execute() {
Ok(deleted) => txn
.commit()
.and(Ok(UpdateResult::DocumentDeletion { deleted }))
.map_err(Into::into),
Err(e) => Err(e.into())
}
}
}
impl HandleUpdate<UpdateMeta, UpdateResult, String> for UpdateHandler {
fn handle_update(
&mut self,
meta: Processing<UpdateMeta>,
content: &[u8],
) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
use UpdateMeta::*;
let update_id = meta.id();
let update_builder = self.update_buidler(update_id);
let result = match meta.meta() {
DocumentsAddition {
method,
format,
primary_key,
} => self.update_documents(
*format,
*method,
content,
update_builder,
primary_key.as_deref(),
),
ClearDocuments => self.clear_documents(update_builder),
DeleteDocuments => self.delete_documents(content, update_builder),
Settings(settings) => self.update_settings(settings, update_builder),
Facets(levels) => self.update_facets(levels, update_builder),
};
match result {
Ok(result) => Ok(meta.process(result)),
Err(e) => Err(meta.fail(e.to_string())),
}
}
}

View File

@ -1,5 +1,6 @@
use chrono::{Utc, DateTime}; use chrono::{Utc, DateTime};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use uuid::Uuid;
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] #[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
@ -7,14 +8,16 @@ pub struct Pending<M> {
pub update_id: u64, pub update_id: u64,
pub meta: M, pub meta: M,
pub enqueued_at: DateTime<Utc>, pub enqueued_at: DateTime<Utc>,
pub index_uuid: Uuid,
} }
impl<M> Pending<M> { impl<M> Pending<M> {
pub fn new(meta: M, update_id: u64) -> Self { pub fn new(meta: M, update_id: u64, index_uuid: Uuid) -> Self {
Self { Self {
enqueued_at: Utc::now(), enqueued_at: Utc::now(),
meta, meta,
update_id, update_id,
index_uuid,
} }
} }
@ -73,6 +76,10 @@ impl<M> Processing<M> {
self.from.meta() self.from.meta()
} }
pub fn index_uuid(&self) -> &Uuid {
&self.from.index_uuid
}
pub fn process<N>(self, meta: N) -> Processed<M, N> { pub fn process<N>(self, meta: N) -> Processed<M, N> {
Processed { Processed {
success: meta, success: meta,