mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 12:05:05 +08:00
Use zerocopy without bitpacking as a first step
This commit is contained in:
parent
a81f201fad
commit
3668627e03
59
src/main.rs
59
src/main.rs
@ -19,8 +19,9 @@ use rayon::prelude::*;
|
|||||||
use sdset::{SetOperation, SetBuf};
|
use sdset::{SetOperation, SetBuf};
|
||||||
use slice_group_by::StrGroupBy;
|
use slice_group_by::StrGroupBy;
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
use zerocopy::{LayoutVerified, AsBytes};
|
||||||
|
|
||||||
use self::codec::CodecBitPacker4xSorted;
|
// use self::codec::CodecBitPacker4xSorted;
|
||||||
use self::bp_vec::BpVec;
|
use self::bp_vec::BpVec;
|
||||||
|
|
||||||
pub type FastMap4<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher32>>;
|
pub type FastMap4<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher32>>;
|
||||||
@ -44,7 +45,35 @@ struct Opt {
|
|||||||
files_to_index: Vec<PathBuf>,
|
files_to_index: Vec<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn union_bitpacked_postings_ids(
|
fn bytes_to_u32s(bytes: &[u8]) -> Vec<u32> {
|
||||||
|
fn aligned_to(bytes: &[u8], align: usize) -> bool {
|
||||||
|
(bytes as *const _ as *const () as usize) % align == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
match LayoutVerified::new_slice(bytes) {
|
||||||
|
Some(slice) => slice.to_vec(),
|
||||||
|
None => {
|
||||||
|
let len = bytes.len();
|
||||||
|
|
||||||
|
// ensure that it is the alignment that is wrong and the length is valid
|
||||||
|
assert!(len % 4 == 0, "length is {} and is not modulo 4", len);
|
||||||
|
assert!(!aligned_to(bytes, std::mem::align_of::<u32>()), "bytes are already aligned");
|
||||||
|
|
||||||
|
let elems = len / 4;
|
||||||
|
let mut vec = Vec::<u32>::with_capacity(elems);
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
let dst = vec.as_mut_ptr() as *mut u8;
|
||||||
|
std::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, len);
|
||||||
|
vec.set_len(elems);
|
||||||
|
}
|
||||||
|
|
||||||
|
vec
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn union_postings_ids(
|
||||||
_key: &[u8],
|
_key: &[u8],
|
||||||
old_value: Option<&[u8]>,
|
old_value: Option<&[u8]>,
|
||||||
operands: &mut rocksdb::MergeOperands,
|
operands: &mut rocksdb::MergeOperands,
|
||||||
@ -53,20 +82,21 @@ fn union_bitpacked_postings_ids(
|
|||||||
let mut sets_bufs = Vec::new();
|
let mut sets_bufs = Vec::new();
|
||||||
|
|
||||||
if let Some(old_value) = old_value {
|
if let Some(old_value) = old_value {
|
||||||
let old_value = CodecBitPacker4xSorted::bytes_decode(old_value).unwrap();
|
let old_value = bytes_to_u32s(old_value);
|
||||||
sets_bufs.push(SetBuf::new(old_value).unwrap());
|
sets_bufs.push(SetBuf::new_unchecked(old_value.to_vec()));
|
||||||
}
|
}
|
||||||
|
|
||||||
for operand in operands {
|
for operand in operands {
|
||||||
let new_value = CodecBitPacker4xSorted::bytes_decode(operand).unwrap();
|
let new_value = bytes_to_u32s(operand);
|
||||||
sets_bufs.push(SetBuf::new(new_value).unwrap());
|
sets_bufs.push(SetBuf::new_unchecked(new_value.to_vec()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let sets = sets_bufs.iter().map(|s| s.as_set()).collect();
|
let sets = sets_bufs.iter().map(|s| s.as_set()).collect();
|
||||||
let result = sdset::multi::Union::new(sets).into_set_buf();
|
let result: SetBuf<u32> = sdset::multi::Union::new(sets).into_set_buf();
|
||||||
let compressed = CodecBitPacker4xSorted::bytes_encode(&result).unwrap();
|
|
||||||
|
|
||||||
Some(compressed)
|
assert!(result.as_bytes().len() % 4 == 0);
|
||||||
|
|
||||||
|
Some(result.as_bytes().to_vec())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn union_words_fst(
|
fn union_words_fst(
|
||||||
@ -167,11 +197,7 @@ fn index_csv(
|
|||||||
// We compute and store the postings list into the DB.
|
// We compute and store the postings list into the DB.
|
||||||
for (word, new_ids) in new_postings_ids {
|
for (word, new_ids) in new_postings_ids {
|
||||||
let new_ids = SetBuf::from_dirty(new_ids.to_vec());
|
let new_ids = SetBuf::from_dirty(new_ids.to_vec());
|
||||||
let compressed = CodecBitPacker4xSorted::bytes_encode(&new_ids)
|
db.merge_cf(&postings_ids, word.as_bytes(), new_ids.as_bytes())?;
|
||||||
.context("error while compressing using CodecBitPacker4xSorted")?;
|
|
||||||
|
|
||||||
db.merge_cf(&postings_ids, word.as_bytes(), compressed)?;
|
|
||||||
|
|
||||||
new_words.insert(word);
|
new_words.insert(word);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -184,6 +210,7 @@ fn index_csv(
|
|||||||
db.merge_cf(&main, "words-fst", new_words_fst.as_fst().as_bytes())?;
|
db.merge_cf(&main, "words-fst", new_words_fst.as_fst().as_bytes())?;
|
||||||
|
|
||||||
eprintln!("Finished merging the words-fst");
|
eprintln!("Finished merging the words-fst");
|
||||||
|
eprintln!("Total number of documents seen is {}", ID_GENERATOR.load(Ordering::Relaxed));
|
||||||
|
|
||||||
Ok(number_of_documents)
|
Ok(number_of_documents)
|
||||||
}
|
}
|
||||||
@ -195,8 +222,8 @@ fn main() -> anyhow::Result<()> {
|
|||||||
opts.create_if_missing(true);
|
opts.create_if_missing(true);
|
||||||
opts.create_missing_column_families(true);
|
opts.create_missing_column_families(true);
|
||||||
// Setup the merge operators
|
// Setup the merge operators
|
||||||
opts.set_merge_operator("main", union_words_fst, Some(union_words_fst));
|
opts.set_merge_operator("main", union_words_fst, None); // Some(union_words_fst));
|
||||||
opts.set_merge_operator("postings-ids", union_bitpacked_postings_ids, Some(union_bitpacked_postings_ids));
|
opts.set_merge_operator("postings-ids", union_postings_ids, None); // Some(union_postings_ids));
|
||||||
|
|
||||||
let mut db = rocksdb::DB::open(&opts, &opt.database)?;
|
let mut db = rocksdb::DB::open(&opts, &opt.database)?;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user