mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 10:07:40 +08:00
Replace obkv with the temporary new version of it
This commit is contained in:
parent
27df9e6c73
commit
0c57cf7565
13
Cargo.lock
generated
13
Cargo.lock
generated
@ -3434,7 +3434,7 @@ dependencies = [
|
||||
"mimalloc",
|
||||
"mime",
|
||||
"num_cpus",
|
||||
"obkv 0.2.2",
|
||||
"obkv",
|
||||
"once_cell",
|
||||
"ordered-float",
|
||||
"parking_lot",
|
||||
@ -3601,8 +3601,7 @@ dependencies = [
|
||||
"memchr",
|
||||
"memmap2",
|
||||
"mimalloc",
|
||||
"obkv 0.2.2",
|
||||
"obkv 0.3.0",
|
||||
"obkv",
|
||||
"once_cell",
|
||||
"ordered-float",
|
||||
"rand",
|
||||
@ -3849,16 +3848,10 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "obkv"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a2e27bcfe835a379d32352112f6b8dbae2d99d16a5fff42abe6e5ba5386c1e5a"
|
||||
|
||||
[[package]]
|
||||
name = "obkv"
|
||||
version = "0.3.0"
|
||||
source = "git+https://github.com/kerollmops/obkv?branch=unsized-kvreader#5289a6658cd471f4212c1edc1a40b2a3c3d11fe0"
|
||||
source = "git+https://github.com/kerollmops/obkv?branch=unsized-kvreader#9c2900d106fa84e7079b288e7f7c366ec7cae948"
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
|
@ -57,7 +57,7 @@ meilisearch-types = { path = "../meilisearch-types" }
|
||||
mimalloc = { version = "0.1.43", default-features = false }
|
||||
mime = "0.3.17"
|
||||
num_cpus = "1.16.0"
|
||||
obkv = "0.2.2"
|
||||
obkv = { git = "https://github.com/kerollmops/obkv", branch = "unsized-kvreader" }
|
||||
once_cell = "1.19.0"
|
||||
ordered-float = "4.2.1"
|
||||
parking_lot = "0.12.3"
|
||||
|
@ -1247,7 +1247,7 @@ impl<'a> HitMaker<'a> {
|
||||
self.index.iter_documents(self.rtxn, std::iter::once(id))?.next().unwrap()?;
|
||||
|
||||
// First generate a document with all the displayed fields
|
||||
let displayed_document = make_document(&self.displayed_ids, &self.fields_ids_map, obkv)?;
|
||||
let displayed_document = make_document(&self.displayed_ids, &self.fields_ids_map, &obkv)?;
|
||||
|
||||
let add_vectors_fid =
|
||||
self.vectors_fid.filter(|_fid| self.retrieve_vectors == RetrieveVectors::Retrieve);
|
||||
|
@ -12,6 +12,7 @@ readme.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
big_s = "1.0.2"
|
||||
bimap = { version = "0.6.3", features = ["serde"] }
|
||||
bincode = "1.3.3"
|
||||
bstr = "1.9.1"
|
||||
@ -44,8 +45,7 @@ levenshtein_automata = { version = "0.2.1", features = ["fst_automaton"] }
|
||||
lru = "0.12.3"
|
||||
memchr = "2.5.0"
|
||||
memmap2 = "0.9.4"
|
||||
obkv = "0.2.2"
|
||||
obkv2 = { package = "obkv", git = "https://github.com/kerollmops/obkv", branch = "unsized-kvreader" }
|
||||
obkv = { git = "https://github.com/kerollmops/obkv", branch = "unsized-kvreader" }
|
||||
once_cell = "1.19.0"
|
||||
ordered-float = "4.2.1"
|
||||
rayon = "1.10.0"
|
||||
@ -94,7 +94,6 @@ rayon-par-bridge = "0.1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
mimalloc = { version = "0.1.43", default-features = false }
|
||||
big_s = "1.0.2"
|
||||
insta = "1.39.0"
|
||||
maplit = "1.0.2"
|
||||
md5 = "0.7.0"
|
||||
|
@ -69,7 +69,7 @@ impl<R: io::Read + io::Seek> EnrichedDocumentsBatchReader<R> {
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EnrichedDocument<'a> {
|
||||
pub document: KvReader<'a, FieldId>,
|
||||
pub document: &'a KvReader<FieldId>,
|
||||
pub document_id: DocumentId,
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ use crate::{FieldId, Object, Result};
|
||||
const DOCUMENTS_BATCH_INDEX_KEY: [u8; 8] = u64::MAX.to_be_bytes();
|
||||
|
||||
/// Helper function to convert an obkv reader into a JSON object.
|
||||
pub fn obkv_to_object(obkv: &KvReader<'_, FieldId>, index: &DocumentsBatchIndex) -> Result<Object> {
|
||||
pub fn obkv_to_object(obkv: &KvReader<FieldId>, index: &DocumentsBatchIndex) -> Result<Object> {
|
||||
obkv.iter()
|
||||
.map(|(field_id, value)| {
|
||||
let field_name = index
|
||||
@ -76,7 +76,7 @@ impl DocumentsBatchIndex {
|
||||
self.0.get_by_right(name).cloned()
|
||||
}
|
||||
|
||||
pub fn recreate_json(&self, document: &obkv::KvReaderU16<'_>) -> Result<Object> {
|
||||
pub fn recreate_json(&self, document: &obkv::KvReaderU16) -> Result<Object> {
|
||||
let mut map = Object::new();
|
||||
|
||||
for (k, v) in document.iter() {
|
||||
|
@ -52,7 +52,7 @@ impl<'a> PrimaryKey<'a> {
|
||||
|
||||
pub fn document_id(
|
||||
&self,
|
||||
document: &obkv::KvReader<'_, FieldId>,
|
||||
document: &obkv::KvReader<FieldId>,
|
||||
fields: &impl FieldIdMapper,
|
||||
) -> Result<StdResult<String, DocumentIdExtractionError>> {
|
||||
match self {
|
||||
|
@ -76,11 +76,9 @@ impl<R: io::Read + io::Seek> DocumentsBatchCursor<R> {
|
||||
pub fn get(
|
||||
&mut self,
|
||||
offset: u32,
|
||||
) -> Result<Option<KvReader<'_, FieldId>>, DocumentsBatchCursorError> {
|
||||
) -> Result<Option<&KvReader<FieldId>>, DocumentsBatchCursorError> {
|
||||
match self.cursor.move_on_key_equal_to(offset.to_be_bytes())? {
|
||||
Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => {
|
||||
Ok(Some(KvReader::new(value)))
|
||||
}
|
||||
Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => Ok(Some(value.into())),
|
||||
_otherwise => Ok(None),
|
||||
}
|
||||
}
|
||||
@ -89,11 +87,9 @@ impl<R: io::Read + io::Seek> DocumentsBatchCursor<R> {
|
||||
/// `next_document` advance the document reader until all the documents have been read.
|
||||
pub fn next_document(
|
||||
&mut self,
|
||||
) -> Result<Option<KvReader<'_, FieldId>>, DocumentsBatchCursorError> {
|
||||
) -> Result<Option<&KvReader<FieldId>>, DocumentsBatchCursorError> {
|
||||
match self.cursor.move_on_next()? {
|
||||
Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => {
|
||||
Ok(Some(KvReader::new(value)))
|
||||
}
|
||||
Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => Ok(Some(value.into())),
|
||||
_otherwise => Ok(None),
|
||||
}
|
||||
}
|
||||
|
@ -6,10 +6,10 @@ use obkv::{KvReaderU16, KvWriterU16};
|
||||
pub struct ObkvCodec;
|
||||
|
||||
impl<'a> heed::BytesDecode<'a> for ObkvCodec {
|
||||
type DItem = KvReaderU16<'a>;
|
||||
type DItem = &'a KvReaderU16;
|
||||
|
||||
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, BoxedError> {
|
||||
Ok(KvReaderU16::new(bytes))
|
||||
Ok(KvReaderU16::from_slice(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -122,7 +122,7 @@ impl CboRoaringBitmapCodec {
|
||||
|
||||
/// Merges a DelAdd delta into a CboRoaringBitmap.
|
||||
pub fn merge_deladd_into<'a>(
|
||||
deladd: KvReaderDelAdd<'_>,
|
||||
deladd: &KvReaderDelAdd,
|
||||
previous: &[u8],
|
||||
buffer: &'a mut Vec<u8>,
|
||||
) -> io::Result<Option<&'a [u8]>> {
|
||||
|
@ -1252,7 +1252,7 @@ impl Index {
|
||||
/* documents */
|
||||
|
||||
/// Returns a document by using the document id.
|
||||
pub fn document<'t>(&self, rtxn: &'t RoTxn, id: DocumentId) -> Result<obkv::KvReaderU16<'t>> {
|
||||
pub fn document<'t>(&self, rtxn: &'t RoTxn, id: DocumentId) -> Result<&'t obkv::KvReaderU16> {
|
||||
self.documents
|
||||
.get(rtxn, &id)?
|
||||
.ok_or(UserError::UnknownInternalDocumentId { document_id: id })
|
||||
@ -1264,7 +1264,7 @@ impl Index {
|
||||
&'a self,
|
||||
rtxn: &'t RoTxn<'t>,
|
||||
ids: impl IntoIterator<Item = DocumentId> + 'a,
|
||||
) -> Result<impl Iterator<Item = Result<(DocumentId, obkv::KvReaderU16<'t>)>> + 'a> {
|
||||
) -> Result<impl Iterator<Item = Result<(DocumentId, &'t obkv::KvReaderU16)>> + 'a> {
|
||||
Ok(ids.into_iter().map(move |id| {
|
||||
let kv = self
|
||||
.documents
|
||||
@ -1279,7 +1279,7 @@ impl Index {
|
||||
&self,
|
||||
rtxn: &'t RoTxn<'t>,
|
||||
ids: impl IntoIterator<Item = DocumentId>,
|
||||
) -> Result<Vec<(DocumentId, obkv::KvReaderU16<'t>)>> {
|
||||
) -> Result<Vec<(DocumentId, &'t obkv::KvReaderU16)>> {
|
||||
self.iter_documents(rtxn, ids)?.collect()
|
||||
}
|
||||
|
||||
@ -1287,7 +1287,7 @@ impl Index {
|
||||
pub fn all_documents<'a, 't: 'a>(
|
||||
&'a self,
|
||||
rtxn: &'t RoTxn<'t>,
|
||||
) -> Result<impl Iterator<Item = Result<(DocumentId, obkv::KvReaderU16<'t>)>> + 'a> {
|
||||
) -> Result<impl Iterator<Item = Result<(DocumentId, &'t obkv::KvReaderU16)>> + 'a> {
|
||||
self.iter_documents(rtxn, self.documents_ids(rtxn)?)
|
||||
}
|
||||
|
||||
|
@ -214,7 +214,7 @@ pub fn bucketed_position(relative: u16) -> u16 {
|
||||
pub fn obkv_to_json(
|
||||
displayed_fields: &[FieldId],
|
||||
fields_ids_map: &FieldsIdsMap,
|
||||
obkv: obkv::KvReaderU16<'_>,
|
||||
obkv: &obkv::KvReaderU16,
|
||||
) -> Result<Object> {
|
||||
displayed_fields
|
||||
.iter()
|
||||
@ -232,10 +232,7 @@ pub fn obkv_to_json(
|
||||
}
|
||||
|
||||
/// Transform every field of a raw obkv store into a JSON Object.
|
||||
pub fn all_obkv_to_json(
|
||||
obkv: obkv::KvReaderU16<'_>,
|
||||
fields_ids_map: &FieldsIdsMap,
|
||||
) -> Result<Object> {
|
||||
pub fn all_obkv_to_json(obkv: &obkv::KvReaderU16, fields_ids_map: &FieldsIdsMap) -> Result<Object> {
|
||||
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
|
||||
obkv_to_json(all_keys.as_slice(), fields_ids_map, obkv)
|
||||
}
|
||||
|
@ -30,13 +30,13 @@ impl ParsedValue {
|
||||
|
||||
impl<'a> Document<'a> {
|
||||
pub fn new(
|
||||
data: obkv::KvReaderU16<'a>,
|
||||
data: &'a obkv::KvReaderU16,
|
||||
side: DelAdd,
|
||||
inverted_field_map: &'a FieldsIdsMap,
|
||||
) -> Self {
|
||||
let mut out_data = BTreeMap::new();
|
||||
for (fid, raw) in data {
|
||||
let obkv = KvReaderDelAdd::new(raw);
|
||||
let obkv = KvReaderDelAdd::from_slice(raw);
|
||||
let Some(raw) = obkv.get(side) else {
|
||||
continue;
|
||||
};
|
||||
|
@ -91,7 +91,7 @@ impl Prompt {
|
||||
|
||||
pub fn render(
|
||||
&self,
|
||||
document: obkv::KvReaderU16<'_>,
|
||||
document: &obkv::KvReaderU16,
|
||||
side: DelAdd,
|
||||
field_id_map: &FieldsIdsMap,
|
||||
) -> Result<String, RenderPromptError> {
|
||||
|
@ -1,7 +1,7 @@
|
||||
use obkv::Key;
|
||||
|
||||
pub type KvWriterDelAdd<W> = obkv::KvWriter<W, DelAdd>;
|
||||
pub type KvReaderDelAdd<'a> = obkv::KvReader<'a, DelAdd>;
|
||||
pub type KvReaderDelAdd = obkv::KvReader<DelAdd>;
|
||||
|
||||
/// DelAdd defines the new value to add in the database and old value to delete from the database.
|
||||
///
|
||||
@ -30,31 +30,13 @@ impl Key for DelAdd {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO remove this implementation
|
||||
impl obkv2::Key for DelAdd {
|
||||
const BYTES_SIZE: usize = std::mem::size_of::<DelAdd>();
|
||||
type BYTES = [u8; <Self as obkv2::Key>::BYTES_SIZE];
|
||||
|
||||
fn to_be_bytes(&self) -> Self::BYTES {
|
||||
u8::to_be_bytes(*self as u8)
|
||||
}
|
||||
|
||||
fn from_be_bytes(array: Self::BYTES) -> Self {
|
||||
match u8::from_be_bytes(array) {
|
||||
0 => Self::Deletion,
|
||||
1 => Self::Addition,
|
||||
otherwise => unreachable!("DelAdd has only 2 variants, unknown variant: {}", otherwise),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a Kv<K, Kv<DelAdd, value>> from Kv<K, value>
|
||||
///
|
||||
/// Deletion: put all the values under DelAdd::Deletion
|
||||
/// Addition: put all the values under DelAdd::Addition,
|
||||
/// DeletionAndAddition: put all the values under DelAdd::Deletion and DelAdd::Addition,
|
||||
pub fn into_del_add_obkv<K: obkv::Key + PartialOrd>(
|
||||
reader: obkv::KvReader<'_, K>,
|
||||
reader: &obkv::KvReader<K>,
|
||||
operation: DelAddOperation,
|
||||
buffer: &mut Vec<u8>,
|
||||
) -> Result<(), std::io::Error> {
|
||||
@ -64,7 +46,7 @@ pub fn into_del_add_obkv<K: obkv::Key + PartialOrd>(
|
||||
/// Akin to the [into_del_add_obkv] function but lets you
|
||||
/// conditionally define the `DelAdd` variant based on the obkv key.
|
||||
pub fn into_del_add_obkv_conditional_operation<K, F>(
|
||||
reader: obkv::KvReader<'_, K>,
|
||||
reader: &obkv::KvReader<K>,
|
||||
buffer: &mut Vec<u8>,
|
||||
operation: F,
|
||||
) -> std::io::Result<()>
|
||||
@ -104,8 +86,8 @@ pub enum DelAddOperation {
|
||||
/// putting each deletion obkv's keys under an DelAdd::Deletion
|
||||
/// and putting each addition obkv's keys under an DelAdd::Addition
|
||||
pub fn del_add_from_two_obkvs<K: obkv::Key + PartialOrd + Ord>(
|
||||
deletion: &obkv::KvReader<'_, K>,
|
||||
addition: &obkv::KvReader<'_, K>,
|
||||
deletion: &obkv::KvReader<K>,
|
||||
addition: &obkv::KvReader<K>,
|
||||
buffer: &mut Vec<u8>,
|
||||
) -> Result<(), std::io::Error> {
|
||||
use itertools::merge_join_by;
|
||||
@ -139,7 +121,7 @@ pub fn del_add_from_two_obkvs<K: obkv::Key + PartialOrd + Ord>(
|
||||
writer.finish()
|
||||
}
|
||||
|
||||
pub fn is_noop_del_add_obkv(del_add: KvReaderDelAdd<'_>) -> bool {
|
||||
pub fn is_noop_del_add_obkv(del_add: &KvReaderDelAdd) -> bool {
|
||||
del_add.get(DelAdd::Deletion) == del_add.get(DelAdd::Addition)
|
||||
}
|
||||
|
||||
@ -154,5 +136,5 @@ pub fn deladd_serialize_add_side<'a>(
|
||||
obkv: &'a [u8],
|
||||
_buffer: &mut Vec<u8>,
|
||||
) -> crate::Result<&'a [u8]> {
|
||||
Ok(KvReaderDelAdd::new(obkv).get(DelAdd::Addition).unwrap_or_default())
|
||||
Ok(KvReaderDelAdd::from_slice(obkv).get(DelAdd::Addition).unwrap_or_default())
|
||||
}
|
||||
|
@ -135,7 +135,7 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
|
||||
if !valid_lmdb_key(key) {
|
||||
continue;
|
||||
}
|
||||
let value = KvReaderDelAdd::new(value);
|
||||
let value = KvReaderDelAdd::from_slice(value);
|
||||
|
||||
// DB is empty, it is safe to ignore Del operations
|
||||
let Some(value) = value.get(DelAdd::Addition) else {
|
||||
@ -161,7 +161,7 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
|
||||
continue;
|
||||
}
|
||||
|
||||
let value = KvReaderDelAdd::new(value);
|
||||
let value = KvReaderDelAdd::from_slice(value);
|
||||
|
||||
// the value is a CboRoaringBitmap, but I still need to prepend the
|
||||
// group size for level 0 (= 1) to it
|
||||
|
@ -109,7 +109,7 @@ impl FacetsUpdateIncremental {
|
||||
}
|
||||
current_field_id = Some(key.field_id);
|
||||
|
||||
let value = KvReader::new(value);
|
||||
let value = KvReader::from_slice(value);
|
||||
let docids_to_delete = value
|
||||
.get(DelAdd::Deletion)
|
||||
.map(CboRoaringBitmapCodec::bytes_decode)
|
||||
|
@ -187,7 +187,7 @@ fn index_facet_search(
|
||||
) -> Result<()> {
|
||||
let mut iter = normalized_delta_data.into_stream_merger_iter()?;
|
||||
while let Some((key_bytes, delta_bytes)) = iter.next()? {
|
||||
let deladd_reader = KvReaderDelAdd::new(delta_bytes);
|
||||
let deladd_reader = KvReaderDelAdd::from_slice(delta_bytes);
|
||||
|
||||
let database_set = index
|
||||
.facet_id_normalized_string_strings
|
||||
|
@ -145,7 +145,7 @@ pub fn enrich_documents_batch<R: Read + Seek>(
|
||||
#[tracing::instrument(level = "trace", skip(uuid_buffer, documents_batch_index, document)
|
||||
target = "indexing::documents")]
|
||||
fn fetch_or_generate_document_id(
|
||||
document: &obkv::KvReader<'_, FieldId>,
|
||||
document: &obkv::KvReader<FieldId>,
|
||||
documents_batch_index: &DocumentsBatchIndex,
|
||||
primary_key: PrimaryKey<'_>,
|
||||
autogenerate_docids: bool,
|
||||
|
@ -80,7 +80,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|
||||
.try_into()
|
||||
.map(u32::from_be_bytes)
|
||||
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
|
||||
let obkv = KvReader::<FieldId>::new(value);
|
||||
let obkv = KvReader::<FieldId>::from_slice(value);
|
||||
|
||||
// if the searchable fields didn't change, skip the searchable indexing for this document.
|
||||
if !force_reindexing && !searchable_fields_changed(&obkv, settings_diff) {
|
||||
@ -126,13 +126,13 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|
||||
// transforming two KV<FieldId, KV<u16, String>> into one KV<FieldId, KV<DelAdd, KV<u16, String>>>
|
||||
value_buffer.clear();
|
||||
del_add_from_two_obkvs(
|
||||
&KvReader::<FieldId>::new(del_obkv),
|
||||
&KvReader::<FieldId>::new(add_obkv),
|
||||
&KvReader::<FieldId>::from_slice(del_obkv),
|
||||
&KvReader::<FieldId>::from_slice(add_obkv),
|
||||
&mut value_buffer,
|
||||
)?;
|
||||
|
||||
// write each KV<DelAdd, KV<u16, String>> into the sorter, field by field.
|
||||
let obkv = KvReader::<FieldId>::new(&value_buffer);
|
||||
let obkv = KvReader::<FieldId>::from_slice(&value_buffer);
|
||||
for (field_id, value) in obkv.iter() {
|
||||
key_buffer.truncate(mem::size_of::<u32>());
|
||||
key_buffer.extend_from_slice(&field_id.to_be_bytes());
|
||||
@ -146,13 +146,13 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|
||||
|
||||
/// Check if any searchable fields of a document changed.
|
||||
fn searchable_fields_changed(
|
||||
obkv: &KvReader<'_, FieldId>,
|
||||
obkv: &KvReader<FieldId>,
|
||||
settings_diff: &InnerIndexSettingsDiff,
|
||||
) -> bool {
|
||||
let searchable_fields = &settings_diff.new.searchable_fields_ids;
|
||||
for (field_id, field_bytes) in obkv.iter() {
|
||||
if searchable_fields.contains(&field_id) {
|
||||
let del_add = KvReaderDelAdd::new(field_bytes);
|
||||
let del_add = KvReaderDelAdd::from_slice(field_bytes);
|
||||
match (del_add.get(DelAdd::Deletion), del_add.get(DelAdd::Addition)) {
|
||||
// if both fields are None, check the next field.
|
||||
(None, None) => (),
|
||||
@ -189,7 +189,7 @@ fn tokenizer_builder<'a>(
|
||||
|
||||
/// Extract words mapped with their positions of a document.
|
||||
fn tokens_from_document<'a>(
|
||||
obkv: &KvReader<'a, FieldId>,
|
||||
obkv: &'a KvReader<FieldId>,
|
||||
settings: &InnerIndexSettings,
|
||||
tokenizer: &Tokenizer<'_>,
|
||||
max_positions_per_attributes: u32,
|
||||
@ -202,7 +202,7 @@ fn tokens_from_document<'a>(
|
||||
// if field is searchable.
|
||||
if settings.searchable_fields_ids.contains(&field_id) {
|
||||
// extract deletion or addition only.
|
||||
if let Some(field_bytes) = KvReaderDelAdd::new(field_bytes).get(del_add) {
|
||||
if let Some(field_bytes) = KvReaderDelAdd::from_slice(field_bytes).get(del_add) {
|
||||
// parse json.
|
||||
let value =
|
||||
serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?;
|
||||
|
@ -45,7 +45,7 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
|
||||
|
||||
buffer.clear();
|
||||
let mut obkv = KvWriterDelAdd::new(&mut buffer);
|
||||
for (deladd_key, _) in KvReaderDelAdd::new(deladd_obkv_bytes).iter() {
|
||||
for (deladd_key, _) in KvReaderDelAdd::from_slice(deladd_obkv_bytes).iter() {
|
||||
obkv.insert(deladd_key, document_id.to_ne_bytes())?;
|
||||
}
|
||||
obkv.finish()?;
|
||||
|
@ -75,7 +75,7 @@ fn extract_facet_string_docids_document_update<R: io::Read + io::Seek>(
|
||||
let mut buffer = Vec::new();
|
||||
let mut cursor = docid_fid_facet_string.into_cursor()?;
|
||||
while let Some((key, deladd_original_value_bytes)) = cursor.move_on_next()? {
|
||||
let deladd_reader = KvReaderDelAdd::new(deladd_original_value_bytes);
|
||||
let deladd_reader = KvReaderDelAdd::from_slice(deladd_original_value_bytes);
|
||||
|
||||
let is_same_value = deladd_reader.get(DelAdd::Deletion).is_some()
|
||||
&& deladd_reader.get(DelAdd::Addition).is_some();
|
||||
@ -163,7 +163,7 @@ fn extract_facet_string_docids_settings<R: io::Read + io::Seek>(
|
||||
let mut buffer = Vec::new();
|
||||
let mut cursor = docid_fid_facet_string.into_cursor()?;
|
||||
while let Some((key, deladd_original_value_bytes)) = cursor.move_on_next()? {
|
||||
let deladd_reader = KvReaderDelAdd::new(deladd_original_value_bytes);
|
||||
let deladd_reader = KvReaderDelAdd::from_slice(deladd_original_value_bytes);
|
||||
|
||||
let is_same_value = deladd_reader.get(DelAdd::Deletion).is_some()
|
||||
&& deladd_reader.get(DelAdd::Addition).is_some();
|
||||
|
@ -83,10 +83,10 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
||||
if !settings_diff.settings_update_only || old_faceted_fids != new_faceted_fids {
|
||||
let mut cursor = obkv_documents.into_cursor()?;
|
||||
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
|
||||
let obkv = obkv::KvReader::new(value);
|
||||
let obkv = obkv::KvReader::from_slice(value);
|
||||
let get_document_json_value = move |field_id, side| {
|
||||
obkv.get(field_id)
|
||||
.map(KvReaderDelAdd::new)
|
||||
.map(KvReaderDelAdd::from_slice)
|
||||
.and_then(|kv| kv.get(side))
|
||||
.map(from_slice)
|
||||
.transpose()
|
||||
|
@ -45,19 +45,23 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
|
||||
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
||||
let document_id = u32::from_be_bytes(document_id_bytes);
|
||||
|
||||
let del_add_reader = KvReaderDelAdd::new(value);
|
||||
let del_add_reader = KvReaderDelAdd::from_slice(value);
|
||||
let deletion = del_add_reader
|
||||
// get deleted words
|
||||
.get(DelAdd::Deletion)
|
||||
// count deleted words
|
||||
.map(|deletion| KvReaderU16::new(deletion).iter().take(MAX_COUNTED_WORDS + 1).count())
|
||||
.map(|deletion| {
|
||||
KvReaderU16::from_slice(deletion).iter().take(MAX_COUNTED_WORDS + 1).count()
|
||||
})
|
||||
// keep the count if under or equal to MAX_COUNTED_WORDS
|
||||
.filter(|&word_count| word_count <= MAX_COUNTED_WORDS);
|
||||
let addition = del_add_reader
|
||||
// get added words
|
||||
.get(DelAdd::Addition)
|
||||
// count added words
|
||||
.map(|addition| KvReaderU16::new(addition).iter().take(MAX_COUNTED_WORDS + 1).count())
|
||||
.map(|addition| {
|
||||
KvReaderU16::from_slice(addition).iter().take(MAX_COUNTED_WORDS + 1).count()
|
||||
})
|
||||
// keep the count if under or equal to MAX_COUNTED_WORDS
|
||||
.filter(|&word_count| word_count <= MAX_COUNTED_WORDS);
|
||||
|
||||
|
@ -29,11 +29,11 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
|
||||
|
||||
let mut cursor = obkv_documents.into_cursor()?;
|
||||
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
|
||||
let obkv = obkv::KvReader::new(value);
|
||||
let obkv = obkv::KvReader::from_slice(value);
|
||||
// since we only need the primary key when we throw an error
|
||||
// we create this getter to lazily get it when needed
|
||||
let document_id = || -> Value {
|
||||
let reader = KvReaderDelAdd::new(obkv.get(primary_key_id).unwrap());
|
||||
let reader = KvReaderDelAdd::from_slice(obkv.get(primary_key_id).unwrap());
|
||||
let document_id =
|
||||
reader.get(DelAdd::Deletion).or(reader.get(DelAdd::Addition)).unwrap();
|
||||
serde_json::from_slice(document_id).unwrap()
|
||||
@ -68,15 +68,17 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
|
||||
|
||||
/// Extract the finite floats lat and lng from two bytes slices.
|
||||
fn extract_lat_lng(
|
||||
document: &obkv::KvReader<'_, FieldId>,
|
||||
document: &obkv::KvReader<FieldId>,
|
||||
settings: &InnerIndexSettings,
|
||||
deladd: DelAdd,
|
||||
document_id: impl Fn() -> Value,
|
||||
) -> Result<Option<[f64; 2]>> {
|
||||
match settings.geo_fields_ids {
|
||||
Some((lat_fid, lng_fid)) => {
|
||||
let lat = document.get(lat_fid).map(KvReaderDelAdd::new).and_then(|r| r.get(deladd));
|
||||
let lng = document.get(lng_fid).map(KvReaderDelAdd::new).and_then(|r| r.get(deladd));
|
||||
let lat =
|
||||
document.get(lat_fid).map(KvReaderDelAdd::from_slice).and_then(|r| r.get(deladd));
|
||||
let lng =
|
||||
document.get(lng_fid).map(KvReaderDelAdd::from_slice).and_then(|r| r.get(deladd));
|
||||
let (lat, lng) = match (lat, lng) {
|
||||
(Some(lat), Some(lng)) => (lat, lng),
|
||||
(Some(_), None) => {
|
||||
|
@ -307,7 +307,7 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
||||
debug_assert!(from_utf8(external_id_bytes).is_ok());
|
||||
let docid = DocumentId::from_be_bytes(docid_bytes);
|
||||
|
||||
let obkv = obkv::KvReader::new(value);
|
||||
let obkv = obkv::KvReader::from_slice(value);
|
||||
key_buffer.clear();
|
||||
key_buffer.extend_from_slice(docid_bytes.as_slice());
|
||||
|
||||
@ -475,7 +475,7 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
||||
#[allow(clippy::too_many_arguments)] // feel free to find efficient way to factor arguments
|
||||
fn extract_vector_document_diff(
|
||||
docid: DocumentId,
|
||||
obkv: obkv::KvReader<'_, FieldId>,
|
||||
obkv: &obkv::KvReader<FieldId>,
|
||||
prompt: &Prompt,
|
||||
(add_to_user_provided, remove_from_user_provided): (&mut RoaringBitmap, &mut RoaringBitmap),
|
||||
(old, new): (VectorState, VectorState),
|
||||
@ -517,7 +517,7 @@ fn extract_vector_document_diff(
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.map(|(_, deladd)| KvReaderDelAdd::from_slice(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
|
||||
if document_is_kept {
|
||||
@ -553,7 +553,7 @@ fn extract_vector_document_diff(
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.map(|(_, deladd)| KvReaderDelAdd::from_slice(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
if document_is_kept {
|
||||
if embedder_is_manual {
|
||||
@ -579,7 +579,7 @@ fn extract_vector_document_diff(
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.map(|(_, deladd)| KvReaderDelAdd::from_slice(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
if document_is_kept {
|
||||
// if the new version of documents has the vectors in the DB,
|
||||
@ -597,7 +597,7 @@ fn extract_vector_document_diff(
|
||||
}
|
||||
|
||||
fn regenerate_if_prompt_changed(
|
||||
obkv: obkv::KvReader<'_, FieldId>,
|
||||
obkv: &obkv::KvReader<FieldId>,
|
||||
(old_prompt, new_prompt): (&Prompt, &Prompt),
|
||||
(old_fields_ids_map, new_fields_ids_map): (&FieldsIdsMap, &FieldsIdsMap),
|
||||
) -> Result<VectorStateDelta> {
|
||||
@ -612,7 +612,7 @@ fn regenerate_if_prompt_changed(
|
||||
}
|
||||
|
||||
fn regenerate_prompt(
|
||||
obkv: obkv::KvReader<'_, FieldId>,
|
||||
obkv: &obkv::KvReader<FieldId>,
|
||||
prompt: &Prompt,
|
||||
new_fields_ids_map: &FieldsIdsMap,
|
||||
) -> Result<VectorStateDelta> {
|
||||
|
@ -58,17 +58,17 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
||||
let document_id = u32::from_be_bytes(document_id_bytes);
|
||||
let fid = u16::from_be_bytes(fid_bytes);
|
||||
|
||||
let del_add_reader = KvReaderDelAdd::new(value);
|
||||
let del_add_reader = KvReaderDelAdd::from_slice(value);
|
||||
// extract all unique words to remove.
|
||||
if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) {
|
||||
for (_pos, word) in KvReaderU16::new(deletion).iter() {
|
||||
for (_pos, word) in KvReaderU16::from_slice(deletion).iter() {
|
||||
del_words.insert(word.to_vec());
|
||||
}
|
||||
}
|
||||
|
||||
// extract all unique additional words.
|
||||
if let Some(addition) = del_add_reader.get(DelAdd::Addition) {
|
||||
for (_pos, word) in KvReaderU16::new(addition).iter() {
|
||||
for (_pos, word) in KvReaderU16::from_slice(addition).iter() {
|
||||
add_words.insert(word.to_vec());
|
||||
}
|
||||
}
|
||||
@ -115,7 +115,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
||||
// NOTE: replacing sorters by bitmap merging is less efficient, so, use sorters.
|
||||
while let Some((key, value)) = iter.next()? {
|
||||
// only keep the value if their is a change to apply in the DB.
|
||||
if !is_noop_del_add_obkv(KvReaderDelAdd::new(value)) {
|
||||
if !is_noop_del_add_obkv(KvReaderDelAdd::from_slice(value)) {
|
||||
word_fid_docids_writer.insert(key, value)?;
|
||||
}
|
||||
|
||||
@ -123,7 +123,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
||||
.map_err(|_| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
||||
|
||||
// merge all deletions
|
||||
let obkv = KvReaderDelAdd::new(value);
|
||||
let obkv = KvReaderDelAdd::from_slice(value);
|
||||
if let Some(value) = obkv.get(DelAdd::Deletion) {
|
||||
let delete_from_exact = settings_diff.old.exact_attributes.contains(&fid);
|
||||
buffer.clear();
|
||||
|
@ -92,8 +92,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
||||
}
|
||||
|
||||
// deletions
|
||||
if let Some(deletion) = KvReaderDelAdd::new(value).get(DelAdd::Deletion) {
|
||||
for (position, word) in KvReaderU16::new(deletion).iter() {
|
||||
if let Some(deletion) = KvReaderDelAdd::from_slice(value).get(DelAdd::Deletion) {
|
||||
for (position, word) in KvReaderU16::from_slice(deletion).iter() {
|
||||
// drain the proximity window until the head word is considered close to the word we are inserting.
|
||||
while del_word_positions.front().map_or(false, |(_w, p)| {
|
||||
index_proximity(*p as u32, position as u32) >= MAX_DISTANCE
|
||||
@ -125,8 +125,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
||||
}
|
||||
|
||||
// additions
|
||||
if let Some(addition) = KvReaderDelAdd::new(value).get(DelAdd::Addition) {
|
||||
for (position, word) in KvReaderU16::new(addition).iter() {
|
||||
if let Some(addition) = KvReaderDelAdd::from_slice(value).get(DelAdd::Addition) {
|
||||
for (position, word) in KvReaderU16::from_slice(addition).iter() {
|
||||
// drain the proximity window until the head word is considered close to the word we are inserting.
|
||||
while add_word_positions.front().map_or(false, |(_w, p)| {
|
||||
index_proximity(*p as u32, position as u32) >= MAX_DISTANCE
|
||||
|
@ -60,10 +60,10 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
|
||||
|
||||
current_document_id = Some(document_id);
|
||||
|
||||
let del_add_reader = KvReaderDelAdd::new(value);
|
||||
let del_add_reader = KvReaderDelAdd::from_slice(value);
|
||||
// extract all unique words to remove.
|
||||
if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) {
|
||||
for (position, word_bytes) in KvReaderU16::new(deletion).iter() {
|
||||
for (position, word_bytes) in KvReaderU16::from_slice(deletion).iter() {
|
||||
let position = bucketed_position(position);
|
||||
del_word_positions.insert((position, word_bytes.to_vec()));
|
||||
}
|
||||
@ -71,7 +71,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
|
||||
|
||||
// extract all unique additional words.
|
||||
if let Some(addition) = del_add_reader.get(DelAdd::Addition) {
|
||||
for (position, word_bytes) in KvReaderU16::new(addition).iter() {
|
||||
for (position, word_bytes) in KvReaderU16::from_slice(addition).iter() {
|
||||
let position = bucketed_position(position);
|
||||
add_word_positions.insert((position, word_bytes.to_vec()));
|
||||
}
|
||||
|
@ -45,8 +45,8 @@ pub fn keep_latest_obkv<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<
|
||||
}
|
||||
|
||||
pub fn merge_two_del_add_obkvs(
|
||||
base: obkv::KvReaderU16<'_>,
|
||||
update: obkv::KvReaderU16<'_>,
|
||||
base: &obkv::KvReaderU16,
|
||||
update: &obkv::KvReaderU16,
|
||||
merge_additions: bool,
|
||||
buffer: &mut Vec<u8>,
|
||||
) {
|
||||
@ -66,7 +66,7 @@ pub fn merge_two_del_add_obkvs(
|
||||
// If merge_additions is false, recreate an obkv keeping the deletions only.
|
||||
value_buffer.clear();
|
||||
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
|
||||
let base_reader = KvReaderDelAdd::new(v);
|
||||
let base_reader = KvReaderDelAdd::from_slice(v);
|
||||
|
||||
if let Some(deletion) = base_reader.get(DelAdd::Deletion) {
|
||||
value_writer.insert(DelAdd::Deletion, deletion).unwrap();
|
||||
@ -80,8 +80,8 @@ pub fn merge_two_del_add_obkvs(
|
||||
// merge deletions and additions.
|
||||
value_buffer.clear();
|
||||
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
|
||||
let base_reader = KvReaderDelAdd::new(base);
|
||||
let update_reader = KvReaderDelAdd::new(update);
|
||||
let base_reader = KvReaderDelAdd::from_slice(base);
|
||||
let update_reader = KvReaderDelAdd::from_slice(update);
|
||||
|
||||
// keep newest deletion.
|
||||
if let Some(deletion) = update_reader
|
||||
@ -131,8 +131,8 @@ fn inner_merge_del_add_obkvs<'a>(
|
||||
break;
|
||||
}
|
||||
|
||||
let newest = obkv::KvReader::new(&acc);
|
||||
let oldest = obkv::KvReader::new(¤t[1..]);
|
||||
let newest = obkv::KvReader::from_slice(&acc);
|
||||
let oldest = obkv::KvReader::from_slice(¤t[1..]);
|
||||
merge_two_del_add_obkvs(oldest, newest, merge_additions, &mut buffer);
|
||||
|
||||
// we want the result of the merge into our accumulator.
|
||||
@ -187,7 +187,7 @@ pub fn merge_deladd_cbo_roaring_bitmaps<'a>(
|
||||
let mut del_bitmaps_bytes = Vec::new();
|
||||
let mut add_bitmaps_bytes = Vec::new();
|
||||
for value in values {
|
||||
let obkv = KvReaderDelAdd::new(value);
|
||||
let obkv = KvReaderDelAdd::from_slice(value);
|
||||
if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) {
|
||||
del_bitmaps_bytes.push(bitmap_bytes);
|
||||
}
|
||||
@ -217,7 +217,7 @@ pub fn merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap<'a>(
|
||||
buffer: &'a mut Vec<u8>,
|
||||
) -> Result<Option<&'a [u8]>> {
|
||||
Ok(CboRoaringBitmapCodec::merge_deladd_into(
|
||||
KvReaderDelAdd::new(deladd_obkv),
|
||||
KvReaderDelAdd::from_slice(deladd_obkv),
|
||||
previous,
|
||||
buffer,
|
||||
)?)
|
||||
@ -236,7 +236,7 @@ pub fn merge_deladd_btreeset_string<'a>(
|
||||
let mut del_set = BTreeSet::new();
|
||||
let mut add_set = BTreeSet::new();
|
||||
for value in values {
|
||||
let obkv = KvReaderDelAdd::new(value);
|
||||
let obkv = KvReaderDelAdd::from_slice(value);
|
||||
if let Some(bytes) = obkv.get(DelAdd::Deletion) {
|
||||
let set = serde_json::from_slice::<BTreeSet<String>>(bytes).unwrap();
|
||||
for value in set {
|
||||
|
@ -31,14 +31,14 @@ impl<'t> ImmutableObkvs<'t> {
|
||||
}
|
||||
|
||||
/// Returns the OBKVs identified by the given ID.
|
||||
pub fn obkv(&self, docid: DocumentId) -> heed::Result<Option<KvReaderU16<'t>>> {
|
||||
pub fn obkv(&self, docid: DocumentId) -> heed::Result<Option<&'t KvReaderU16>> {
|
||||
match self
|
||||
.ids
|
||||
.rank(docid)
|
||||
.checked_sub(1)
|
||||
.and_then(|offset| self.slices.get(offset as usize))
|
||||
{
|
||||
Some(bytes) => Ok(Some(KvReaderU16::new(bytes))),
|
||||
Some(&bytes) => Ok(Some(bytes.into())),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
@ -278,13 +278,13 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
document_sorter_value_buffer.clear();
|
||||
document_sorter_value_buffer.push(Operation::Addition as u8);
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::new(base_obkv),
|
||||
KvReaderU16::from_slice(base_obkv),
|
||||
deladd_operation,
|
||||
&mut document_sorter_value_buffer,
|
||||
)?;
|
||||
self.original_sorter
|
||||
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
||||
let base_obkv = KvReader::new(base_obkv);
|
||||
let base_obkv = KvReader::from_slice(base_obkv);
|
||||
if let Some(flattened_obkv) =
|
||||
Self::flatten_from_fields_ids_map(&base_obkv, &mut self.fields_ids_map)?
|
||||
{
|
||||
@ -292,7 +292,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
document_sorter_value_buffer.clear();
|
||||
document_sorter_value_buffer.push(Operation::Addition as u8);
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::new(&flattened_obkv),
|
||||
KvReaderU16::from_slice(&flattened_obkv),
|
||||
deladd_operation,
|
||||
&mut document_sorter_value_buffer,
|
||||
)?;
|
||||
@ -311,7 +311,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
document_sorter_value_buffer.clear();
|
||||
document_sorter_value_buffer.push(Operation::Addition as u8);
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::new(&obkv_buffer),
|
||||
KvReaderU16::from_slice(&obkv_buffer),
|
||||
DelAddOperation::Addition,
|
||||
&mut document_sorter_value_buffer,
|
||||
)?;
|
||||
@ -319,14 +319,14 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
self.original_sorter
|
||||
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
||||
|
||||
let flattened_obkv = KvReader::new(&obkv_buffer);
|
||||
let flattened_obkv = KvReader::from_slice(&obkv_buffer);
|
||||
if let Some(obkv) =
|
||||
Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)?
|
||||
{
|
||||
document_sorter_value_buffer.clear();
|
||||
document_sorter_value_buffer.push(Operation::Addition as u8);
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::new(&obkv),
|
||||
KvReaderU16::from_slice(&obkv),
|
||||
DelAddOperation::Addition,
|
||||
&mut document_sorter_value_buffer,
|
||||
)?
|
||||
@ -519,14 +519,14 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
document_sorter_value_buffer.clear();
|
||||
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::new(base_obkv),
|
||||
KvReaderU16::from_slice(base_obkv),
|
||||
DelAddOperation::Deletion,
|
||||
document_sorter_value_buffer,
|
||||
)?;
|
||||
self.original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
||||
|
||||
// flatten it and push it as to delete in the flattened_sorter
|
||||
let flattened_obkv = KvReader::new(base_obkv);
|
||||
let flattened_obkv = KvReader::from_slice(base_obkv);
|
||||
if let Some(obkv) =
|
||||
Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)?
|
||||
{
|
||||
@ -534,7 +534,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
document_sorter_value_buffer.clear();
|
||||
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::new(&obkv),
|
||||
KvReaderU16::from_slice(&obkv),
|
||||
DelAddOperation::Deletion,
|
||||
document_sorter_value_buffer,
|
||||
)?;
|
||||
@ -552,7 +552,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
target = "indexing::transform"
|
||||
)]
|
||||
fn flatten_from_fields_ids_map(
|
||||
obkv: &KvReader<'_, FieldId>,
|
||||
obkv: &KvReader<FieldId>,
|
||||
fields_ids_map: &mut FieldsIdsMap,
|
||||
) -> Result<Option<Vec<u8>>> {
|
||||
if obkv
|
||||
@ -720,10 +720,10 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
total_documents: self.documents_count,
|
||||
});
|
||||
|
||||
for (key, value) in KvReader::new(val) {
|
||||
let reader = KvReaderDelAdd::new(value);
|
||||
for (key, value) in KvReader::from_slice(val) {
|
||||
let reader = KvReaderDelAdd::from_slice(value);
|
||||
match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) {
|
||||
(None, None) => {}
|
||||
(None, None) => (),
|
||||
(None, Some(_)) => {
|
||||
// New field
|
||||
let name = self.fields_ids_map.name(key).ok_or(
|
||||
@ -837,7 +837,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
/// then fill the provided buffers with delta documents using KvWritterDelAdd.
|
||||
#[allow(clippy::too_many_arguments)] // need the vectors + fid, feel free to create a struct xo xo
|
||||
fn rebind_existing_document(
|
||||
old_obkv: KvReader<'_, FieldId>,
|
||||
old_obkv: &KvReader<FieldId>,
|
||||
settings_diff: &InnerIndexSettingsDiff,
|
||||
modified_faceted_fields: &HashSet<String>,
|
||||
mut injected_vectors: serde_json::Map<String, serde_json::Value>,
|
||||
@ -925,7 +925,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
}
|
||||
|
||||
let data = obkv_writer.into_inner()?;
|
||||
let obkv = KvReader::<FieldId>::new(&data);
|
||||
let obkv = KvReader::<FieldId>::from_slice(&data);
|
||||
|
||||
if let Some(original_obkv_buffer) = original_obkv_buffer {
|
||||
original_obkv_buffer.clear();
|
||||
@ -936,7 +936,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
// take the non-flattened version if flatten_from_fields_ids_map returns None.
|
||||
let mut fields_ids_map = settings_diff.new.fields_ids_map.clone();
|
||||
let flattened = Self::flatten_from_fields_ids_map(&obkv, &mut fields_ids_map)?;
|
||||
let flattened = flattened.as_deref().map_or(obkv, KvReader::new);
|
||||
let flattened = flattened.as_deref().map_or(obkv, KvReader::from_slice);
|
||||
|
||||
flattened_obkv_buffer.clear();
|
||||
into_del_add_obkv_conditional_operation(flattened, flattened_obkv_buffer, |id| {
|
||||
@ -1173,21 +1173,21 @@ mod test {
|
||||
kv_writer.insert(0_u8, [0]).unwrap();
|
||||
let buffer = kv_writer.into_inner().unwrap();
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::new(&buffer),
|
||||
KvReaderU16::from_slice(&buffer),
|
||||
DelAddOperation::Addition,
|
||||
&mut additive_doc_0,
|
||||
)
|
||||
.unwrap();
|
||||
additive_doc_0.insert(0, Operation::Addition as u8);
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::new(&buffer),
|
||||
KvReaderU16::from_slice(&buffer),
|
||||
DelAddOperation::Deletion,
|
||||
&mut deletive_doc_0,
|
||||
)
|
||||
.unwrap();
|
||||
deletive_doc_0.insert(0, Operation::Deletion as u8);
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::new(&buffer),
|
||||
KvReaderU16::from_slice(&buffer),
|
||||
DelAddOperation::DeletionAndAddition,
|
||||
&mut del_add_doc_0,
|
||||
)
|
||||
@ -1199,7 +1199,7 @@ mod test {
|
||||
kv_writer.insert(1_u8, [1]).unwrap();
|
||||
let buffer = kv_writer.into_inner().unwrap();
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::new(&buffer),
|
||||
KvReaderU16::from_slice(&buffer),
|
||||
DelAddOperation::Addition,
|
||||
&mut additive_doc_1,
|
||||
)
|
||||
@ -1212,7 +1212,7 @@ mod test {
|
||||
kv_writer.insert(1_u8, [1]).unwrap();
|
||||
let buffer = kv_writer.into_inner().unwrap();
|
||||
into_del_add_obkv(
|
||||
KvReaderU16::new(&buffer),
|
||||
KvReaderU16::from_slice(&buffer),
|
||||
DelAddOperation::Addition,
|
||||
&mut additive_doc_0_1,
|
||||
)
|
||||
|
@ -162,7 +162,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
let mut vectors_buffer = Vec::new();
|
||||
while let Some((key, reader)) = iter.next()? {
|
||||
let mut writer: KvWriter<_, FieldId> = KvWriter::memory();
|
||||
let reader: KvReader<'_, FieldId> = KvReader::new(reader);
|
||||
let reader: &KvReader<FieldId> = reader.into();
|
||||
|
||||
let (document_id_bytes, external_id_bytes) = try_split_array_at(key)
|
||||
.ok_or(SerializationError::Decoding { db_name: Some(DOCUMENTS) })?;
|
||||
@ -170,7 +170,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
let external_id = std::str::from_utf8(external_id_bytes)?;
|
||||
|
||||
for (field_id, value) in reader.iter() {
|
||||
let del_add_reader = KvReaderDelAdd::new(value);
|
||||
let del_add_reader = KvReaderDelAdd::from_slice(value);
|
||||
|
||||
if let Some(addition) = del_add_reader.get(DelAdd::Addition) {
|
||||
let addition = if vectors_fid == Some(field_id) {
|
||||
@ -529,7 +529,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
index.field_id_docid_facet_f64s.remap_types::<Bytes, Bytes>();
|
||||
let mut iter = merger.into_stream_merger_iter()?;
|
||||
while let Some((key, value)) = iter.next()? {
|
||||
let reader = KvReaderDelAdd::new(value);
|
||||
let reader = KvReaderDelAdd::from_slice(value);
|
||||
if valid_lmdb_key(key) {
|
||||
match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) {
|
||||
(None, None) => {}
|
||||
@ -563,7 +563,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
index.field_id_docid_facet_strings.remap_types::<Bytes, Bytes>();
|
||||
let mut iter = merger.into_stream_merger_iter()?;
|
||||
while let Some((key, value)) = iter.next()? {
|
||||
let reader = KvReaderDelAdd::new(value);
|
||||
let reader = KvReaderDelAdd::from_slice(value);
|
||||
if valid_lmdb_key(key) {
|
||||
match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) {
|
||||
(None, None) => {}
|
||||
@ -600,7 +600,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
// convert the key back to a u32 (4 bytes)
|
||||
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
|
||||
|
||||
let deladd_obkv = KvReaderDelAdd::new(value);
|
||||
let deladd_obkv = KvReaderDelAdd::from_slice(value);
|
||||
if let Some(value) = deladd_obkv.get(DelAdd::Deletion) {
|
||||
let geopoint = extract_geo_point(value, docid);
|
||||
rtree.remove(&geopoint);
|
||||
@ -723,7 +723,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
let (left, _index) = try_split_array_at(key).unwrap();
|
||||
let docid = DocumentId::from_be_bytes(left);
|
||||
|
||||
let vector_deladd_obkv = KvReaderDelAdd::new(value);
|
||||
let vector_deladd_obkv = KvReaderDelAdd::from_slice(value);
|
||||
if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) {
|
||||
let vector: Vec<f32> = pod_collect_to_vec(value);
|
||||
|
||||
@ -852,7 +852,7 @@ where
|
||||
if valid_lmdb_key(key) {
|
||||
let (proximity_to_insert, word1, word2) =
|
||||
U8StrStrCodec::bytes_decode(key).map_err(heed::Error::Decoding)?;
|
||||
let data_to_insert = match KvReaderDelAdd::new(value).get(DelAdd::Addition) {
|
||||
let data_to_insert = match KvReaderDelAdd::from_slice(value).get(DelAdd::Addition) {
|
||||
Some(value) => {
|
||||
CboRoaringBitmapCodec::bytes_decode(value).map_err(heed::Error::Decoding)?
|
||||
}
|
||||
|
@ -1,4 +1,3 @@
|
||||
use core::slice::SlicePattern;
|
||||
use std::fs::File;
|
||||
|
||||
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
|
||||
@ -44,11 +43,11 @@ impl KeyValueEntry {
|
||||
}
|
||||
|
||||
pub fn key(&self) -> &[u8] {
|
||||
&self.data.as_slice()[..self.key_length]
|
||||
&self.data.as_ref()[..self.key_length]
|
||||
}
|
||||
|
||||
pub fn value(&self) -> &[u8] {
|
||||
&self.data.as_slice()[self.key_length..]
|
||||
&self.data.as_ref()[self.key_length..]
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
use heed::RoTxn;
|
||||
use obkv2::KvReader;
|
||||
use obkv::KvReader;
|
||||
|
||||
use super::indexer::KvReaderFieldId;
|
||||
use crate::{DocumentId, FieldId};
|
||||
|
@ -4,7 +4,8 @@ mod channel;
|
||||
mod items_pool;
|
||||
mod merge;
|
||||
|
||||
mod global_fields_ids_map;
|
||||
/// TODO remove this
|
||||
// mod global_fields_ids_map;
|
||||
|
||||
pub type StdResult<T, E> = std::result::Result<T, E>;
|
||||
|
||||
@ -27,8 +28,7 @@ mod indexer {
|
||||
|
||||
use super::channel::{
|
||||
extractors_merger_channels, merger_writer_channels, EntryOperation,
|
||||
ExtractorsMergerChannels, MergerReceiver, MergerSender, MergerWriterChannels,
|
||||
WriterOperation,
|
||||
ExtractorsMergerChannels, MergerReceiver, MergerSender, WriterOperation,
|
||||
};
|
||||
use super::document_change::{Deletion, DocumentChange, Insertion, Update};
|
||||
use super::items_pool::ItemsPool;
|
||||
@ -44,10 +44,10 @@ mod indexer {
|
||||
Result, UserError,
|
||||
};
|
||||
|
||||
pub type KvReaderFieldId = obkv2::KvReader<FieldId>;
|
||||
pub type KvReaderDelAdd = obkv2::KvReader<DelAdd>;
|
||||
pub type KvWriterFieldId<W> = obkv2::KvWriter<W, FieldId>;
|
||||
pub type KvWriterDelAdd<W> = obkv2::KvWriter<W, DelAdd>;
|
||||
pub type KvReaderFieldId = obkv::KvReader<FieldId>;
|
||||
pub type KvReaderDelAdd = obkv::KvReader<DelAdd>;
|
||||
pub type KvWriterFieldId<W> = obkv::KvWriter<W, FieldId>;
|
||||
pub type KvWriterDelAdd<W> = obkv::KvWriter<W, DelAdd>;
|
||||
|
||||
pub struct DocumentOperationIndexer {
|
||||
operations: Vec<Payload>,
|
||||
@ -105,7 +105,7 @@ mod indexer {
|
||||
rtxn: &'a RoTxn,
|
||||
mut fields_ids_map: FieldsIdsMap,
|
||||
primary_key: &'a PrimaryKey<'a>,
|
||||
) -> Result<impl ParallelIterator<Item = DocumentChange> + 'a> {
|
||||
) -> Result<impl ParallelIterator<Item = Result<Option<DocumentChange>>> + 'a> {
|
||||
let documents_ids = index.documents_ids(rtxn)?;
|
||||
let mut available_docids = AvailableDocumentsIds::from_documents_ids(&documents_ids);
|
||||
let mut docids_version_offsets = HashMap::<String, _>::new();
|
||||
@ -198,7 +198,7 @@ mod indexer {
|
||||
}
|
||||
|
||||
let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from)));
|
||||
docids_version_offsets.into_par_iter().map_with(
|
||||
Ok(docids_version_offsets.into_par_iter().map_with(
|
||||
items,
|
||||
|context_pool, (external_docid, (internal_docid, operations))| {
|
||||
context_pool.with(|rtxn| match self.method {
|
||||
@ -221,58 +221,7 @@ mod indexer {
|
||||
),
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
Ok(vec![].into_par_iter())
|
||||
|
||||
// let mut file_count: usize = 0;
|
||||
// for result in WalkDir::new(update_files_path)
|
||||
// // TODO handle errors
|
||||
// .sort_by_key(|entry| entry.metadata().unwrap().created().unwrap())
|
||||
// {
|
||||
// let entry = result?;
|
||||
// if !entry.file_type().is_file() {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// let file = File::open(entry.path())
|
||||
// .with_context(|| format!("While opening {}", entry.path().display()))?;
|
||||
// let content = unsafe {
|
||||
// Mmap::map(&file)
|
||||
// .map(Arc::new)
|
||||
// .with_context(|| format!("While memory mapping {}", entry.path().display()))?
|
||||
// };
|
||||
|
||||
// let reader =
|
||||
// crate::documents::DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?;
|
||||
// let (mut batch_cursor, batch_index) = reader.into_cursor_and_fields_index();
|
||||
// batch_index.iter().for_each(|(_, name)| {
|
||||
// fields_ids_map.insert(name);
|
||||
// });
|
||||
// let mut offset: u32 = 0;
|
||||
// while let Some(document) = batch_cursor.next_document()? {
|
||||
// let primary_key = batch_index.id(primary_key).unwrap();
|
||||
// let document_id = document.get(primary_key).unwrap();
|
||||
// let document_id = std::str::from_utf8(document_id).unwrap();
|
||||
|
||||
// let document_offset = DocumentOffset { content: content.clone(), offset };
|
||||
// match docids_version_offsets.get_mut(document_id) {
|
||||
// None => {
|
||||
// let docid = match maindb.external_documents_ids.get(rtxn, document_id)? {
|
||||
// Some(docid) => docid,
|
||||
// None => sequential_docids.next().context("no more available docids")?,
|
||||
// };
|
||||
// docids_version_offsets
|
||||
// .insert(document_id.into(), (docid, smallvec![document_offset]));
|
||||
// }
|
||||
// Some((_, offsets)) => offsets.push(document_offset),
|
||||
// }
|
||||
// offset += 1;
|
||||
// p.inc(1);
|
||||
// }
|
||||
|
||||
// file_count += 1;
|
||||
// }
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -109,14 +109,13 @@ impl ParsedVectorsDiff {
|
||||
pub fn new(
|
||||
docid: DocumentId,
|
||||
embedders_configs: &[IndexEmbeddingConfig],
|
||||
documents_diff: KvReader<'_, FieldId>,
|
||||
documents_diff: &KvReader<FieldId>,
|
||||
old_vectors_fid: Option<FieldId>,
|
||||
new_vectors_fid: Option<FieldId>,
|
||||
) -> Result<Self, Error> {
|
||||
let mut old = match old_vectors_fid
|
||||
.and_then(|vectors_fid| documents_diff.get(vectors_fid))
|
||||
.map(KvReaderDelAdd::new)
|
||||
.map(|obkv| to_vector_map(obkv, DelAdd::Deletion))
|
||||
.map(|bytes| to_vector_map(bytes.into(), DelAdd::Deletion))
|
||||
.transpose()
|
||||
{
|
||||
Ok(del) => del,
|
||||
@ -143,8 +142,7 @@ impl ParsedVectorsDiff {
|
||||
let Some(bytes) = documents_diff.get(new_vectors_fid) else {
|
||||
break 'new VectorsState::NoVectorsFieldInDocument;
|
||||
};
|
||||
let obkv = KvReaderDelAdd::new(bytes);
|
||||
match to_vector_map(obkv, DelAdd::Addition)? {
|
||||
match to_vector_map(bytes.into(), DelAdd::Addition)? {
|
||||
Some(new) => VectorsState::Vectors(new),
|
||||
None => VectorsState::NoVectorsFieldInDocument,
|
||||
}
|
||||
@ -239,7 +237,7 @@ impl Error {
|
||||
}
|
||||
|
||||
fn to_vector_map(
|
||||
obkv: KvReaderDelAdd<'_>,
|
||||
obkv: &KvReaderDelAdd,
|
||||
side: DelAdd,
|
||||
) -> Result<Option<BTreeMap<String, Vectors>>, Error> {
|
||||
Ok(if let Some(value) = obkv.get(side) {
|
||||
|
Loading…
Reference in New Issue
Block a user