mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 12:05:05 +08:00
WIP
This commit is contained in:
parent
7864530589
commit
32dec62030
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -3661,6 +3661,7 @@ dependencies = [
|
||||
"time",
|
||||
"tokenizers",
|
||||
"tracing",
|
||||
"uell",
|
||||
"ureq",
|
||||
"url",
|
||||
"uuid",
|
||||
@ -5789,6 +5790,15 @@ version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
|
||||
|
||||
[[package]]
|
||||
name = "uell"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40de5982e28612e20330e77d81f1559b74f66caf3c7fc10b19ada4843f4b4fd7"
|
||||
dependencies = [
|
||||
"bumpalo",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unescaper"
|
||||
version = "0.1.5"
|
||||
|
@ -100,6 +100,7 @@ bumpalo = "3.16.0"
|
||||
thread_local = "1.1.8"
|
||||
allocator-api2 = "0.2.18"
|
||||
rustc-hash = "2.0.0"
|
||||
uell = "0.1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
mimalloc = { version = "0.1.43", default-features = false }
|
||||
|
@ -737,7 +737,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
}
|
||||
|
||||
/// Converts the latitude and longitude back to an xyz GeoPoint.
|
||||
fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint {
|
||||
pub fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint {
|
||||
let (lat, tail) = helpers::try_split_array_at::<u8, 8>(value).unwrap();
|
||||
let (lng, _) = helpers::try_split_array_at::<u8, 8>(tail).unwrap();
|
||||
let point = [f64::from_ne_bytes(lat), f64::from_ne_bytes(lng)];
|
||||
|
@ -54,7 +54,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
|
||||
DocumentChange::Deletion(deletion) => {
|
||||
let docid = deletion.docid();
|
||||
let content = deletion.current(
|
||||
&context.txn,
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
&context.db_fields_ids_map,
|
||||
)?;
|
||||
@ -72,7 +72,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
|
||||
DocumentChange::Update(update) => {
|
||||
let docid = update.docid();
|
||||
let content =
|
||||
update.current(&context.txn, context.index, &context.db_fields_ids_map)?;
|
||||
update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
||||
for res in content.iter_top_level_fields() {
|
||||
let (f, _) = res?;
|
||||
let entry = document_extractor_data
|
||||
@ -92,9 +92,9 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
|
||||
}
|
||||
|
||||
let content =
|
||||
update.merged(&context.txn, context.index, &context.db_fields_ids_map)?;
|
||||
update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
||||
let vector_content = update.merged_vectors(
|
||||
&context.txn,
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
&context.db_fields_ids_map,
|
||||
&context.doc_alloc,
|
||||
|
@ -63,7 +63,7 @@ impl FacetedDocidsExtractor {
|
||||
document_change: DocumentChange,
|
||||
) -> Result<()> {
|
||||
let index = &context.index;
|
||||
let rtxn = &context.txn;
|
||||
let rtxn = &context.rtxn;
|
||||
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
|
||||
let mut cached_sorter = context.data.borrow_mut_or_yield();
|
||||
match document_change {
|
||||
|
100
crates/milli/src/update/new/extract/geo/mod.rs
Normal file
100
crates/milli/src/update/new/extract/geo/mod.rs
Normal file
@ -0,0 +1,100 @@
|
||||
use std::cell::RefCell;
|
||||
use std::f32::consts::PI;
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufWriter};
|
||||
use std::mem::{self, size_of};
|
||||
|
||||
use bincode::ErrorKind;
|
||||
use heed::RoTxn;
|
||||
use raw_collections::bbbul::BitPacker4x;
|
||||
use raw_collections::Bbbul;
|
||||
use uell::Uell;
|
||||
|
||||
use crate::update::new::document::Document;
|
||||
use crate::update::new::indexer::document_changes::{
|
||||
DocumentChangeContext, Extractor, FullySend, MostlySend,
|
||||
};
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::DocumentChange;
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::{
|
||||
CboRoaringBitmapCodec, DocumentId, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index,
|
||||
InternalError, Result,
|
||||
};
|
||||
|
||||
pub struct GeoExtractor {
|
||||
grenad_parameters: GrenadParameters,
|
||||
// rtree: Option<rstar::RTree<GeoPoint>>,
|
||||
}
|
||||
|
||||
impl GeoExtractor {
|
||||
pub fn new(
|
||||
rtxn: &RoTxn,
|
||||
index: &Index,
|
||||
grenad_parameters: GrenadParameters,
|
||||
) -> Result<Option<Self>> {
|
||||
let is_sortable = index.sortable_fields(rtxn)?.contains("_geo");
|
||||
let is_filterable = index.filterable_fields(rtxn)?.contains("_geo");
|
||||
if is_sortable || is_filterable {
|
||||
// Ok(Some(GeoExtractor { rtree: index.geo_rtree(rtxn)? }))
|
||||
Ok(Some(GeoExtractor { grenad_parameters }))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct GeoExtractorData<'extractor> {
|
||||
/// The set of documents ids that were removed. If a document sees its geo
|
||||
/// point being updated, we first delete it and then insert it in the inserted.
|
||||
removed: Bbbul<'extractor, BitPacker4x>,
|
||||
/// The set of document ids associated to the two f64 geo points.
|
||||
inserted: Uell<'extractor, [u8; size_of::<DocumentId>() + 2 * size_of::<f64>()]>,
|
||||
/// TODO Do the doc
|
||||
spilled_removed: Option<BufWriter<File>>,
|
||||
/// TODO Do the doc
|
||||
spilled_inserted: Option<BufWriter<File>>,
|
||||
}
|
||||
|
||||
unsafe impl MostlySend for GeoExtractorData<'_> {}
|
||||
|
||||
impl<'extractor> Extractor<'extractor> for GeoExtractor {
|
||||
type Data = RefCell<GeoExtractorData<'extractor>>;
|
||||
|
||||
fn init_data<'doc>(
|
||||
&'doc self,
|
||||
extractor_alloc: &'extractor bumpalo::Bump,
|
||||
) -> Result<Self::Data> {
|
||||
Ok(RefCell::new(GeoExtractorData {
|
||||
inserted: Uell::new_in(extractor_alloc),
|
||||
removed: Bbbul::new_in(extractor_alloc),
|
||||
spilled: None,
|
||||
}))
|
||||
}
|
||||
|
||||
fn process<'doc>(
|
||||
&'doc self,
|
||||
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
|
||||
context: &'doc DocumentChangeContext<Self::Data>,
|
||||
) -> Result<()> {
|
||||
let rtxn = &context.rtxn;
|
||||
let index = context.index;
|
||||
let db_fields_ids_map = context.db_fields_ids_map;
|
||||
let mut data_ref = context.data.borrow_mut_or_yield();
|
||||
|
||||
for change in changes {
|
||||
match change? {
|
||||
DocumentChange::Deletion(deletion) => todo!(),
|
||||
DocumentChange::Update(update) => {
|
||||
let current = update.current(rtxn, index, db_fields_ids_map)?;
|
||||
let current_geo = current.geo_field()?;
|
||||
let updated_geo = update.updated().geo_field()?;
|
||||
// ...
|
||||
}
|
||||
DocumentChange::Insertion(insertion) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
mod cache;
|
||||
mod documents;
|
||||
mod faceted;
|
||||
mod geo;
|
||||
mod searchable;
|
||||
mod vectors;
|
||||
|
||||
@ -8,6 +9,7 @@ use bumpalo::Bump;
|
||||
pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap};
|
||||
pub use documents::*;
|
||||
pub use faceted::*;
|
||||
pub use geo::*;
|
||||
pub use searchable::*;
|
||||
pub use vectors::EmbeddingExtractor;
|
||||
|
||||
|
@ -326,7 +326,7 @@ impl WordDocidsExtractors {
|
||||
document_change: DocumentChange,
|
||||
) -> Result<()> {
|
||||
let index = &context.index;
|
||||
let rtxn = &context.txn;
|
||||
let rtxn = &context.rtxn;
|
||||
let mut cached_sorter_ref = context.data.borrow_mut_or_yield();
|
||||
let cached_sorter = cached_sorter_ref.as_mut().unwrap();
|
||||
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
|
||||
|
@ -39,7 +39,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
|
||||
let doc_alloc = &context.doc_alloc;
|
||||
|
||||
let index = context.index;
|
||||
let rtxn = &context.txn;
|
||||
let rtxn = &context.rtxn;
|
||||
|
||||
let mut key_buffer = bumpalo::collections::Vec::new_in(doc_alloc);
|
||||
let mut del_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc);
|
||||
|
@ -2,13 +2,13 @@ use std::cell::RefCell;
|
||||
|
||||
use bumpalo::collections::Vec as BVec;
|
||||
use bumpalo::Bump;
|
||||
use hashbrown::HashMap;
|
||||
use hashbrown::{DefaultHashBuilder, HashMap};
|
||||
|
||||
use super::cache::DelAddRoaringBitmap;
|
||||
use crate::error::FaultSource;
|
||||
use crate::prompt::Prompt;
|
||||
use crate::update::new::channel::EmbeddingSender;
|
||||
use crate::update::new::indexer::document_changes::{Extractor, MostlySend};
|
||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
|
||||
use crate::update::new::vector_document::VectorDocument;
|
||||
use crate::update::new::DocumentChange;
|
||||
use crate::vector::error::{
|
||||
@ -37,7 +37,7 @@ impl<'a> EmbeddingExtractor<'a> {
|
||||
}
|
||||
|
||||
pub struct EmbeddingExtractorData<'extractor>(
|
||||
pub HashMap<String, DelAddRoaringBitmap, hashbrown::DefaultHashBuilder, &'extractor Bump>,
|
||||
pub HashMap<String, DelAddRoaringBitmap, DefaultHashBuilder, &'extractor Bump>,
|
||||
);
|
||||
|
||||
unsafe impl MostlySend for EmbeddingExtractorData<'_> {}
|
||||
@ -52,9 +52,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
||||
fn process<'doc>(
|
||||
&'doc self,
|
||||
changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
|
||||
context: &'doc crate::update::new::indexer::document_changes::DocumentChangeContext<
|
||||
Self::Data,
|
||||
>,
|
||||
context: &'doc DocumentChangeContext<Self::Data>,
|
||||
) -> crate::Result<()> {
|
||||
let embedders = self.embedders.inner_as_ref();
|
||||
let mut unused_vectors_distribution =
|
||||
@ -63,7 +61,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
||||
let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc);
|
||||
for (embedder_name, (embedder, prompt, _is_quantized)) in embedders {
|
||||
let embedder_id =
|
||||
context.index.embedder_category_id.get(&context.txn, embedder_name)?.ok_or_else(
|
||||
context.index.embedder_category_id.get(&context.rtxn, embedder_name)?.ok_or_else(
|
||||
|| InternalError::DatabaseMissingEntry {
|
||||
db_name: "embedder_category_id",
|
||||
key: None,
|
||||
@ -90,7 +88,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
||||
}
|
||||
DocumentChange::Update(update) => {
|
||||
let old_vectors = update.current_vectors(
|
||||
&context.txn,
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
&context.doc_alloc,
|
||||
@ -130,7 +128,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
||||
} else if new_vectors.regenerate {
|
||||
let new_rendered = prompt.render_document(
|
||||
update.current(
|
||||
&context.txn,
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
)?,
|
||||
@ -139,7 +137,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
||||
)?;
|
||||
let old_rendered = prompt.render_document(
|
||||
update.merged(
|
||||
&context.txn,
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
)?,
|
||||
@ -157,7 +155,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
||||
} else if old_vectors.regenerate {
|
||||
let old_rendered = prompt.render_document(
|
||||
update.current(
|
||||
&context.txn,
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
)?,
|
||||
@ -166,7 +164,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
||||
)?;
|
||||
let new_rendered = prompt.render_document(
|
||||
update.merged(
|
||||
&context.txn,
|
||||
&context.rtxn,
|
||||
context.index,
|
||||
context.db_fields_ids_map,
|
||||
)?,
|
||||
|
@ -197,7 +197,7 @@ pub struct DocumentChangeContext<
|
||||
/// inside of the DB.
|
||||
pub db_fields_ids_map: &'indexer FieldsIdsMap,
|
||||
/// A transaction providing data from the DB before all indexing operations
|
||||
pub txn: RoTxn<'indexer>,
|
||||
pub rtxn: RoTxn<'indexer>,
|
||||
|
||||
/// Global field id map that is up to date with the current state of the indexing process.
|
||||
///
|
||||
@ -255,7 +255,7 @@ impl<
|
||||
let txn = index.read_txn()?;
|
||||
Ok(DocumentChangeContext {
|
||||
index,
|
||||
txn,
|
||||
rtxn: txn,
|
||||
db_fields_ids_map,
|
||||
new_fields_ids_map: fields_ids_map,
|
||||
doc_alloc,
|
||||
|
@ -63,7 +63,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
|
||||
where
|
||||
'pl: 'doc, // the payload must survive the process calls
|
||||
{
|
||||
let current = context.index.document(&context.txn, *docid)?;
|
||||
let current = context.index.document(&context.rtxn, *docid)?;
|
||||
|
||||
let external_document_id = self.primary_key.extract_docid_from_db(
|
||||
current,
|
||||
|
@ -143,11 +143,8 @@ where
|
||||
let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000);
|
||||
|
||||
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
|
||||
|
||||
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
|
||||
|
||||
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
|
||||
|
||||
let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads());
|
||||
let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
|
||||
let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
|
||||
|
@ -93,7 +93,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
|
||||
let DocumentChangeContext {
|
||||
index,
|
||||
db_fields_ids_map,
|
||||
txn,
|
||||
rtxn: txn,
|
||||
new_fields_ids_map,
|
||||
doc_alloc,
|
||||
..
|
||||
|
@ -1,6 +1,3 @@
|
||||
use std::io::{self};
|
||||
|
||||
use bincode::ErrorKind;
|
||||
use hashbrown::HashSet;
|
||||
use heed::types::Bytes;
|
||||
use heed::{Database, RoTxn};
|
||||
@ -11,59 +8,7 @@ use super::channel::*;
|
||||
use super::extract::{
|
||||
merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind,
|
||||
};
|
||||
use super::DocumentChange;
|
||||
use crate::{
|
||||
CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, InternalError,
|
||||
Result,
|
||||
};
|
||||
|
||||
pub struct GeoExtractor {
|
||||
rtree: Option<rstar::RTree<GeoPoint>>,
|
||||
}
|
||||
|
||||
impl GeoExtractor {
|
||||
pub fn new(rtxn: &RoTxn, index: &Index) -> Result<Option<Self>> {
|
||||
let is_sortable = index.sortable_fields(rtxn)?.contains("_geo");
|
||||
let is_filterable = index.filterable_fields(rtxn)?.contains("_geo");
|
||||
if is_sortable || is_filterable {
|
||||
Ok(Some(GeoExtractor { rtree: index.geo_rtree(rtxn)? }))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn manage_change(
|
||||
&mut self,
|
||||
fidmap: &mut GlobalFieldsIdsMap,
|
||||
change: &DocumentChange,
|
||||
) -> Result<()> {
|
||||
match change {
|
||||
DocumentChange::Deletion(_) => todo!(),
|
||||
DocumentChange::Update(_) => todo!(),
|
||||
DocumentChange::Insertion(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialize_rtree<W: io::Write>(self, writer: &mut W) -> Result<bool> {
|
||||
match self.rtree {
|
||||
Some(rtree) => {
|
||||
// TODO What should I do?
|
||||
bincode::serialize_into(writer, &rtree).map(|_| true).map_err(|e| match *e {
|
||||
ErrorKind::Io(e) => Error::IoError(e),
|
||||
ErrorKind::InvalidUtf8Encoding(_) => todo!(),
|
||||
ErrorKind::InvalidBoolEncoding(_) => todo!(),
|
||||
ErrorKind::InvalidCharEncoding => todo!(),
|
||||
ErrorKind::InvalidTagEncoding(_) => todo!(),
|
||||
ErrorKind::DeserializeAnyNotSupported => todo!(),
|
||||
ErrorKind::SizeLimit => todo!(),
|
||||
ErrorKind::SequenceMustHaveLength => todo!(),
|
||||
ErrorKind::Custom(_) => todo!(),
|
||||
})
|
||||
}
|
||||
None => Ok(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
use crate::{CboRoaringBitmapCodec, FieldId, Index, InternalError, Result};
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
|
||||
pub fn merge_and_send_docids<'extractor, MSP>(
|
||||
|
Loading…
Reference in New Issue
Block a user